We need to combine two data sets that have different column names, there are no common columns in the data sets.
We tried a couple of approaches, both approaches fail. Please let us know how to combine two datasets using Apache spark Java
Input Data Set 1
"405-048011-62815", "CRC Industries", "630-0746","Dixon value", "4444-444","3M INdustries", "555-55","Dixon coupling valve"
Input Dataset 2
"222-2222-5555", "Tata", "7777-88886","WestSide", "22222-22224","Reliance", "33333-3333","V industries"
Expected
----------label1----|------sentence1------|------label2---|------sentence2----------- | 405-048011-62815 | CRC Industries | 222-2222-5555 | Tata| | 630-0746 | Dixon value | 7777-88886 | WestSide| -------------------------------------------------------------------------------------
`
List<Row> data = Arrays.asList( RowFactory.create("405-048011-62815", "CRC Industries"), RowFactory.create("630-0746","Dixon value"), RowFactory.create("4444-444","3M INdustries"), RowFactory.create("555-55","Dixon coupling valve")); StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); List<String> listStrings = new ArrayList<String>(); listStrings.add("405-048011-62815"); listStrings.add("630-0746"); Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new))); matchFound1.show(); listStrings.clear(); listStrings.add("222-2222-5555"); listStrings.add("7777-88886"); List<Row> data2 = Arrays.asList( RowFactory.create("222-2222-5555", "Tata"), RowFactory.create("7777-88886","WestSide"), RowFactory.create("22222-22224","Reliance"), RowFactory.create("33333-3333","V industries")); StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new))); matchFound2.show(); //Approach 1 Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"), matchFound2.col("sentence2")); System.out.println("After concat"); matchFound3.show(); //Approach 2 Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"), matchFound2.col("sentence2"))); System.out.println("After concat 2"); matchFound4.show();`
The error for each approach is as follows
Error Approach 1
---------- org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];; !Project [label1#0, sentence1#1, label2#10, sentence2#11] +- Filter label1#0 IN (405-048011-62815,630-0746) +- LocalRelation [label1#0, sentence1#1] ---------- Error for each of the approaches are as follows Approach 2 error org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;; !Filter concat(label1#0, sentence1#1, label2#10, sentence2#11) +- Filter label1#0 IN (405-048011-62815,630-0746) +- LocalRelation [label1#0, sentence1#1]