How to deal with changing parquet layout in Apache Spark

I had a problem when I have Parquet data as daily chunks in S3 (in the form s3://bucketName/prefix/YYYY/MM/DD/ ), but I can not read the data in AWS EMR Spark from different dates, because that some types of columns do not match, and I get one of many exceptions, for example:

 java.lang.ClassCastException: optional binary element (UTF8) is not a group 

appears when an array type exists in some files that matters, but the same column can be null in other files, which are then output as row types.

or

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal): org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true) 

I have the source data in S3 in JSON format, and my initial plan was to create an automatic task that starts the EMR cluster, reads the JSON data from the previous date and just writes it as parquet back to S3.

JSON data is also divided by date, i.e. keys have a date prefix. Reading JSON works great. The schema is derived from the data regardless of how much data is currently being read.

But the problem arises when writing parquet files. As far as I understand, when I write parquet with metadata files, these files contain a diagram for all parts / sections of parquet files. Which, in my opinion, can also be with different schemes. When I turned off metadata recording, Spark was said to have deduced the entire schema from the first file within a given Parquet path and assumed that it remains unchanged through other files.

When some columns, which should be double type, have only integer values ​​for a given day, reading them from JSON (which has these numbers as integers without floating points) makes Spark think that it is a column of type long . Even if I can hide these columns before writing Parquet files, it’s still not as good as the layout can change, you can add new columns and it’s impossible to track.

I saw that some people have the same problems, but I still have to find a good enough solution.

What are the best methods or solutions for this?

+7
emr apache-spark apache-spark-sql spark-dataframe parquet
source share
3 answers

When I read data in daily JSON snippets and write Parquet in daily S3 folders without specifying my own schema when reading JSON or converting error-prone columns to correct the type before writing to Parquet, Spark can output different schemes for the value of days per day in Dependence on values ​​in data instances and recording Parquet files with conflicting schemas.

This may not be an ideal solution, but the only way to solve my problem with a developing scheme is:

Prior to my daily (more specifically nightly) cron job of batch processing the data of the previous day, I create a dummy object, mostly empty values.

I make sure that the identifier is recognized, for example, since the real data has unique identifiers, I add the string "dummy" as an identifier to the dummy data object.

Then I will give the expected values ​​for properties with error prone types, for example, I will give float / doubles non-zero values, so when sorting by JSON they will definitely have a decimal separator, for example, β€œ0.2” instead of β€œ0” (When sorting by JSON , double / floats with 0 values ​​are shown as "0", not "0.0").

Strings, Booleans and integers work fine, but in addition to double / float, I also need to create arrays in the form of empty arrays and objects of other classes / structures with corresponding empty objects, so that they are not "null" -s, since Spark reads null-s as strings.


Then, if I have filled all the completed fields, I will march the object in JSON and write files to S3.

Then I would use these files in my batch processing Scala script to read them, save the circuit in a variable, and provide that circuit as a parameter when I read JSON in real data, to prevent Spark from doing its own circuit output.

Thus, I know that all fields are always of the same type, and merging schemes is necessary only for joining schemes when adding new fields.

Of course, this adds a flaw in manually updating the creation of a dummy object when adding new type fields that are error prone, but this is currently a small flaw, as this is the only solution I found that works.

+3
source share

These are the options that I use to write parquet in S3; turning off circuit merging improves writeback performance - it can also solve your problem

 val PARQUET_OPTIONS = Map( "spark.sql.parquet.mergeSchema" -> "false", "spark.sql.parquet.filterPushdown" -> "true") 
+6
source share

Just create rdd [String] where each line is json when rdd uses the primitiveAsString parameter as the dataframe so that all data types are bound to String

  val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000) // rdd[String] each string is a json ,lowercased json val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp)) // now the schema of dataframe would be consolidate schema of all json strings val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD) println(jsonDataframe_stream.printSchema()) jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY) 
0
source share

All Articles