Spark 2.0: moving from RDD to dataset

I want to adapt a Java Spark application (which actually uses RDD for some calculations) to use Datasets instead of RDDs . I am new to Datasets and don't know how to map a transaction to the corresponding dataset operation.

I am currently comparing them as follows:

 JavaSparkContext.textFile(...) -> SQLContext.read().textFile(...) JavaRDD.filter(Function) -> Dataset.filter(FilterFunction) JavaRDD.map(Function) -> Dataset.map(MapFunction) JavaRDD.mapToPair(PairFunction) -> Dataset.groupByKey(MapFunction) ??? JavaPairRDD.aggregateByKey(U, Function2, Function2) -> KeyValueGroupedDataset.??? 

And related questions:

  • JavaRDD.mapToPair method?
  • Is JavaPairRDD to KeyValueGroupedDataset ?
  • Which method is equal to the JavaPairRDD.aggregateByKey method?

However, I want to port the following RDD code to a dataset:

 JavaRDD<Article> goodRdd = ... JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() { // Build PairRDD<<Date|Store|Transaction><Article>> public Tuple2<String, Article> call(Article article) throws Exception { String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter(); return new Tuple2<String, Article>(key, article); } }); JavaPairRDD<String, String> transactionRdd = ArticlePairRdd.aggregateByKey("", // Aggregate distributed data -> PairRDD<String, String> new Function2<String, Article, String>() { public String call(String oldString, Article newArticle) throws Exception { String articleString = newArticle.getOwg() + "_" + newArticle.getTextOwg(); // <<Date|Store|Transaction><owg_textOwg###owg_textOwg>> return oldString + "###" + articleString; } }, new Function2<String, String, String>() { public String call(String a, String b) throws Exception { String c = a.concat(b); ... return c; } } ); 

My code still looks:

 Dataset<Article> goodDS = ... KeyValueGroupedDataset<String, Article> ArticlePairDS = goodDS.groupByKey(new MapFunction<Article, String>() { public String call(Article article) throws Exception { String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter(); return key; } }, Encoders.STRING()); // here I need something similar to aggregateByKey! Not reduceByKey as I need to return another data type (String) than I have before (Article) 
+6
source share

All Articles