Combine two datasets that have different column names into Apache sparks

We need to combine two data sets that have different column names, there are no common columns in the data sets.

We tried a couple of approaches, both approaches fail. Please let us know how to combine two datasets using Apache spark Java

Input Data Set 1

"405-048011-62815", "CRC Industries", "630-0746","Dixon value", "4444-444","3M INdustries", "555-55","Dixon coupling valve" 

Input Dataset 2

 "222-2222-5555", "Tata", "7777-88886","WestSide", "22222-22224","Reliance", "33333-3333","V industries" 

Expected

  ----------label1----|------sentence1------|------label2---|------sentence2----------- | 405-048011-62815 | CRC Industries | 222-2222-5555 | Tata| | 630-0746 | Dixon value | 7777-88886 | WestSide| ------------------------------------------------------------------------------------- 

`

  List<Row> data = Arrays.asList( RowFactory.create("405-048011-62815", "CRC Industries"), RowFactory.create("630-0746","Dixon value"), RowFactory.create("4444-444","3M INdustries"), RowFactory.create("555-55","Dixon coupling valve")); StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); List<String> listStrings = new ArrayList<String>(); listStrings.add("405-048011-62815"); listStrings.add("630-0746"); Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new))); matchFound1.show(); listStrings.clear(); listStrings.add("222-2222-5555"); listStrings.add("7777-88886"); List<Row> data2 = Arrays.asList( RowFactory.create("222-2222-5555", "Tata"), RowFactory.create("7777-88886","WestSide"), RowFactory.create("22222-22224","Reliance"), RowFactory.create("33333-3333","V industries")); StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new))); matchFound2.show(); //Approach 1 Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"), matchFound2.col("sentence2")); System.out.println("After concat"); matchFound3.show(); //Approach 2 Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"), matchFound2.col("sentence2"))); System.out.println("After concat 2"); matchFound4.show();` 

The error for each approach is as follows

Error Approach 1

 ---------- org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];; !Project [label1#0, sentence1#1, label2#10, sentence2#11] +- Filter label1#0 IN (405-048011-62815,630-0746) +- LocalRelation [label1#0, sentence1#1] ---------- Error for each of the approaches are as follows Approach 2 error org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;; !Filter concat(label1#0, sentence1#1, label2#10, sentence2#11) +- Filter label1#0 IN (405-048011-62815,630-0746) +- LocalRelation [label1#0, sentence1#1] 
+5
source share
2 answers

With the row index I made with java, this will work.

 public class StringIndexer11 { public static void main(String[] args) { Dataset<Row> csvDataSet=null; try{ System.setProperty("hadoop.home.dir", "D:\\AI matching\\winutil"); JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); SQLContext sqlContext = new SQLContext(sc); SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate(); List<Row> data = Arrays.asList( RowFactory.create("405-048011-62815", "CRC Industries"), RowFactory.create("630-0746","Dixon value"), RowFactory.create("4444-444","3M INdustries"), RowFactory.create("555-55","Dixon coupling valve")); StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); List<String> listStrings = new ArrayList<String>(); listStrings.add("405-048011-62815"); listStrings.add("630-0746"); Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new))); matchFound1.show(); listStrings.clear(); listStrings.add("222-2222-5555"); listStrings.add("7777-88886"); StringIndexer indexer = new StringIndexer() .setInputCol("label1") .setOutputCol("label1Index"); Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1); //Dataset1.show(); List<Row> data2 = Arrays.asList( RowFactory.create("222-2222-5555", "Tata"), RowFactory.create("7777-88886","WestSide"), RowFactory.create("22222-22224","Reliance"), RowFactory.create("33333-3333","V industries")); StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()), new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) }); Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new))); matchFound2.show(); StringIndexer indexer1 = new StringIndexer() .setInputCol("label2") .setOutputCol("label2Index"); Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2); //Dataset2.show(); Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index").equalTo(Dataset2.col("label2Index"))).drop(Dataset1.col("label1Index")).drop(Dataset2.col("label2Index")); Finalresult.show(); }catch(Exception e) { e.printStackTrace(); } } 
+5
source

hope this work is for you

Df

 val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve") val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55") val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1") val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries") val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333") val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2") 

Row Index Index

import org.apache.spark.ml.feature.StringIndexer

 val indexer = new StringIndexer() .setInputCol("label1") .setOutputCol("label1Index") val indexed = indexer.fit(df1).transform(df1) indexed.show() val indexer1 = new StringIndexer() .setInputCol("label2") .setOutputCol("label2Index") val indexed1 = indexer1.fit(df2).transform(df2) indexed1.show() 

Join

  val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index")) rnd_reslt12.show() +---------------+--------------------+-------------+------------+ | label1| sentence1| label2| sentence2| +---------------+--------------------+-------------+------------+ | 630-0746| Dixon value|222-2222-5555| Tata| | 4444-444| 3M INdustries| 22222-22224| Reliance| | 555-55|Dixon coupling valve| 33333-3333|V industries| |405048011-62815| CRC Industries| 7777-88886| WestSide| +---------------+--------------------+-------------+------------+ 
+4
source

All Articles