How to join two spark datasets to one with Java objects?

I have a little problem with two datasets in sparks, I have this:

SparkConf conf = new SparkConf() .setAppName("MyFunnyApp") .setMaster("local[*]"); SparkSession spark = SparkSession .builder() .config(conf) .config("spark.debug.maxToStringFields", 150) .getOrCreate(); //... //Do stuff //... Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class); Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class); Dataset<MyOwnObject1> object1DS = spark.read() .option("header","true") .option("delimiter",";") .option("inferSchema","true") .csv(pathToFile1) .as(encoderObject1); Dataset<MyOwnObject2> object2DS = spark.read() .option("header","true") .option("delimiter",";") .option("inferSchema","true") .csv(pathToFile2) .as(encoderObject2); 

I can print the diagram and show it correctly.

 //Here start the problem Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS.join(object2DS, object1DS.col("column01") .equalTo(object2DS.col("column01"))) .as(Encoders.tuple(MyOwnObject1,MyOwnObject2)); 

The last line cannot connect and get this error:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.; 

This is true because Tuple2 (object2) does not have all vars ...

Then I tried this:

  Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS .joinWith(object2DS, object1DS .col("column01") .equalTo(object2DS.col("column01"))); 

And it works great! But I need a new dataset without a tuple, I have an object3 that has some vars from object1 and object2, then I have this problem:

 Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class); Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> { MyOwnObject1 myOwnObject1 = tupleObject1Object2._1(); MyOwnObject2 myOwnObject2 = tupleObject1Object2._2(); MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values //... //Sets data from object 1 and 2 to 3. //... return myOwnObject3; }, encoderObject3); 

Error! ... here is the error:

 17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import 

and more than a thousand lines of errors ...

What can I do? I tried:

  • Create my object only with String, int (or Integer) and double (or Double) (no more)
  • use different encoders like kryo or javaSerialization.
  • use JavaRDD (works! but very slowly) and use Dataframes with Rows (works, but I need to change many objects)
  • All my java objects are serializable
  • use sparks 2.1.0 and 2.1.1, now i have 2.1.1 on my pom.xml

I want to use Datasets, use speed from Dataframes and sintax object from JavaRDD ...

Help?

thanks

+5
source share
1 answer

Finally, I found a solution,

I had a problem with the inferSchema option when my code was creating a dataset. I have a String column that the inferSchema option returns me an integer column, because all values ​​are "numeric", but I need to use them as String (for example, "0001", "0002" ...) I need to make a diagram, but I have many vars, then I write this with all my classes:

 List<StructField> fieldsObject1 = new ArrayList<>(); for (Field field : MyOwnObject1.class.getDeclaredFields()) { fieldsObject1.add(DataTypes.createStructField( field.getName(), CatalystSqlParser.parseDataType(field.getType().getSimpleName()), true) ); } StructType schemaObject1 = DataTypes.createStructType(fieldsObject1); Dataset<MyOwnObject1> object1DS = spark.read() .option("header","true") .option("delimiter",";") .schema(schemaObject1) .csv(pathToFile1) .as(encoderObject1); 

It works great.

The β€œbest” solution would be:

  Dataset<MyOwnObject1> object1DS = spark.read() .option("header","true") .option("delimiter",";") .schema(encoderObject1.schema()) .csv(pathToFile1) .as(encoderObject1); 

but encoderObject1.schema () returns me a scheme with vars in alphabetical order and not in the original order, then this option does not work when I read csv. Perhaps coders should return the scheme with vars in the original order, and not in alphabetical order

-1
source

All Articles