I am very new to Apache Spark. I would like to focus on the basic specification of the Spark API and want to understand and write some programs using the Spark API. I wrote a Java program using Apache Spark to implement the Joins concept.
When I use Left Outer Join - leftOuterJoin () or Right Outer Join - rightOuterJoin (), both methods return a JavaPairRDD that contains special Google Options. But I do not know how to extract the original values ββfrom an optional type.
In any case, I would like to know if I can use the same join methods that return data in my own format. I have not found a way to do this. The point is that when I use Apache Spark, I canβt configure the code in my own style, because they already set all the predefined things.
Please find the code below.
my 2 sample input datasets customers_data.txt: 4000001,Kristina,Chung,55,Pilot 4000002,Paige,Chen,74,Teacher 4000003,Sherri,Melton,34,Firefighter and trasaction_data.txt 00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit
Here is my java code
**SparkJoins.java:** public class SparkJoins { @SuppressWarnings("serial") public static void main(String[] args) throws FileNotFoundException { JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) { String[] customerSplit = s.split(","); return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); } }).distinct(); JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) { String[] transactionSplit = s.split(","); return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); } });
And here is the result that I get
Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]
I am running this program on a windows platform
Pay attention to the above output and help me in extracting values ββfrom an optional type
Thanks in advance