I have a little problem with two datasets in sparks, I have this:
SparkConf conf = new SparkConf() .setAppName("MyFunnyApp") .setMaster("local[*]"); SparkSession spark = SparkSession .builder() .config(conf) .config("spark.debug.maxToStringFields", 150) .getOrCreate(); //... //Do stuff //... Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class); Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class); Dataset<MyOwnObject1> object1DS = spark.read() .option("header","true") .option("delimiter",";") .option("inferSchema","true") .csv(pathToFile1) .as(encoderObject1); Dataset<MyOwnObject2> object2DS = spark.read() .option("header","true") .option("delimiter",";") .option("inferSchema","true") .csv(pathToFile2) .as(encoderObject2);
I can print the diagram and show it correctly.
//Here start the problem Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS.join(object2DS, object1DS.col("column01") .equalTo(object2DS.col("column01"))) .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
The last line cannot connect and get this error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
This is true because Tuple2 (object2) does not have all vars ...
Then I tried this:
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS .joinWith(object2DS, object1DS .col("column01") .equalTo(object2DS.col("column01")));
And it works great! But I need a new dataset without a tuple, I have an object3 that has some vars from object1 and object2, then I have this problem:
Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class); Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> { MyOwnObject1 myOwnObject1 = tupleObject1Object2._1(); MyOwnObject2 myOwnObject2 = tupleObject1Object2._2(); MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values //... //Sets data from object 1 and 2 to 3. //... return myOwnObject3; }, encoderObject3);
Error! ... here is the error:
17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
and more than a thousand lines of errors ...
What can I do? I tried:
- Create my object only with String, int (or Integer) and double (or Double) (no more)
- use different encoders like kryo or javaSerialization.
- use JavaRDD (works! but very slowly) and use Dataframes with Rows (works, but I need to change many objects)
- All my java objects are serializable
- use sparks 2.1.0 and 2.1.1, now i have 2.1.1 on my pom.xml
I want to use Datasets, use speed from Dataframes and sintax object from JavaRDD ...
Help?
thanks