Is there a way to add additional metadata for Spark frames?

Can I add additional metadata to a DataFrame s?

Cause

I have a Spark DataFrame for which I need to save additional information. Example: A DataFrame for which I want to β€œremember” the highest index used in the id column of Integer.

Current solution

I use a separate DataFrame to store this information. Of course, keeping this information separately is tedious and error prone.

Is there a better solution for storing such extra information on DataFrame s?

+10
scala apache-spark apache-spark-sql
source share
5 answers

To expand and Scala -fy nealmcb answer (the question was tagged scala, not python, so I do not think this answer will be irrelevant or redundant), suppose you have a DataFrame:

 import org.apache.spark.sql val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt") 

And somehow get the maximum or whatever you want to memoize in the DataFrame:

 val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max) 

sql.types.Metadata can only contain strings, booleans, some types of numbers, and other metadata structures. Therefore, we need to use Long:

 val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build() 

DataFrame.withColumn () actually has an overload that allows you to supply the metadata argument at the end, but is inexplicably marked [private], so we just do what it does - use Column.as(alias, metadata) :

 val newColumn = df.col("randInt").as("randInt_withMax", metadata) val dfWithMax = df.withColumn("randInt_withMax", newColumn) 

dfWithMax now has (column c) the metadata you want!

 dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}")) > randInt: metadata={} > randInt_withMax: metadata={"columnMax":2094414111} 

Or programmatically and safely (for example, Metadata.getLong () and others do not return Option and may throw a key not found exception):

 dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax") > res29: Long = 209341992 

Attaching max to a column makes sense in your case, but in general, with binding metadata to a DataFrame, and not to a column, in particular, you will need to take a wrapper route described by other answers.

+12
source share

As in Spark 1.2, StructType schemes have a metadata attribute, which can contain an arbitrary display / dictionary of information for each column in the Dataframe. For example. (when used with a separate spark-csv library):

 customSchema = StructType([ StructField("cat_id", IntegerType(), True, {'description': "Unique id, primary key"}), StructField("cat_title", StringType(), True, {'description': "Name of the category, with underscores"}) ]) categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv') .options(header='false') .load(csvFilename, schema = customSchema) ) f = categoryDumpDF.schema.fields ["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f] ["cat_id (IntegerType): {u'description': u'Unique id, primary key'}", "cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"] 

This was added in [SPARK-3569] Add Metadata Field to StructField - ASF JIRA and is intended for use in machine learning pipelines to track information about stored functions in columns, such as categorical / continuous, number categories, category map to index. See SPARK-3569: Add a metadata field to the StructField design document .

I would like this to be used more widely, for example. to describe and document columns, the unit of measure used in a column, coordinate axis information, etc.

Problems include how to properly store or process metadata information when converting a column, how to process several types of metadata, how to do everything possible, etc.

In the interest of those who are thinking of expanding this functionality in Spark frames, I refer to some similar discussions around Pandas.

For example, see xray - Bring tagged pandas data power to the physical sciences , which supports metadata for tagged arrays.

And see the discussion of metadata for pandas at Allow attaching custom metadata to the / df / series panel? Β· Issue No. 2485 Β· pydata / pandas .

See also the unit section: ENH: unit of measurement / physical quantities Β· Issue No. 10349 Β· pydata / pandas

+7
source share

If you want to have less tedious work, I think you can add an implicit conversion between the DataFrame and your custom shell (although you haven't tested it yet).

  implicit class WrappedDataFrame(val df: DataFrame) { var metadata = scala.collection.mutable.Map[String, Long]() def addToMetaData(key: String, value: Long) { metadata += key -> value } ...[other methods you consider useful, getters, setters, whatever]... } 

If the implicit wrapper is in the DataFrame area, you can simply use a regular DataFrame, as if it were your wrapper, i.e.:

 df.addtoMetaData("size", 100) 

This method also changes your metadata, so you should not force it to be calculated only once and transfer it.

+2
source share

I would keep a wrapper around your frame. For example:

 case class MyDFWrapper(dataFrame: DataFrame, metadata: Map[String, Long]) val maxIndex = df1.agg("index" ->"MAX").head.getLong(0) MyDFWrapper(df1, Map("maxIndex" -> maxIndex)) 
0
source share

Many people saw the word "metadata" and immediately switched to "column metadata." This is not what you wanted, and not what I wanted when I had a similar problem. Ultimately, the problem here is that the DataFrame is an immutable data structure that, whenever an operation is performed on it, transfers data, and the rest of the DataFrame does not. This means that you cannot just put a wrapper on it, because as soon as you perform the operation, you get a completely new DataFrame (a potentially completely new type, especially with Scala / Spark trends towards implicit conversions). Finally, if a DataFrame ever escapes its shell, there is no way to recover metadata from a DataFrame.

I had this problem in Spark Streaming, which focuses on RDD (as well as on the underlying DataFrame data structure) and came to one simple conclusion: the only place to store metadata is the name of the RDD. The RDD name is never used by the underlying Spark system, except for reports, so it can be reused. You can then create your wrapper based on the RDD name with an explicit conversion between any DataFrame and your metadata wrapper.

Unfortunately, this still leaves you with the problem of immutability and the new RDDs created with each operation. The name RDD (our metadata field) is lost with each new RDD. This means that you need a way to re-add the name to your new SDR. This can be solved by providing a method that takes a function as an argument. It can extract the metadata before the function, call the function and get a new RDD / DataFrame, and then call it metadata:

 def withMetadata(fn: (df: DataFrame) => DataFrame): MetaDataFrame = { val meta = df.rdd.name val result = fn(wrappedFrame) result.rdd.setName(meta) MetaDataFrame(result) } 

Your packaging class (MetaDataFrame) can provide convenient methods for analyzing and setting metadata values, as well as implicit conversions between Spark DataFrame and MetaDataFrame. As long as you perform all your mutations using the withMetadata method, your metadata will be distributed across the entire transformation pipeline. Yes, using this method for each call is a little troublesome, but the simple reality is that Spark does not have the concept of first-class metadata.

0
source share

All Articles