Running Parallel Work in Spark

I used the input in the following format:

0 1 2 3 4 5 … 14 Input Location: hdfs://localhost:9000/Input/datasource 

I used the following code snippet to save the RDD as a text file using multiple streams:

 package org.apache.spark.examples; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.avro.ipc.specific.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; class RunnableDemo implements Runnable { private Thread t; private String threadName; private String path; private JavaRDD<String> javaRDD; // private JavaSparkContext javaSparkContext; RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) { this.threadName=threadName; this.javaRDD=javaRDD; this.path=path; // this.javaSparkContext=javaSparkContext; } @Override public void run() { System.out.println("Running " + threadName ); try { this.javaRDD.saveAsTextFile(path); // System.out.println(this.javaRDD.count()); Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } System.out.println("Thread " + threadName + " exiting."); // this.javaSparkContext.stop(); } public void start () { System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } } public class SparkJavaTest { public static void main(String[] args) { //Spark Configurations: SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); JavaSparkContext ctx=new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(ctx); JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); List<StructField> fields= new ArrayList<StructField>(); fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); JavaRDD<Row> rowRDD =dataCollection.map( new Function<String, Row>() { @Override public Row call(String record) throws Exception { String[] fields = record.split("\u0001"); return RowFactory.create(Integer.parseInt(fields[0].trim())); } }); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema); dataFrame.registerTempTable("data"); long recordsCount=dataFrame.count(); long splitRecordsCount=5; long splitCount =recordsCount/splitRecordsCount; List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); for(int i=0;i<splitCount;i++) { long start = i*splitRecordsCount; long end = (i+1)*splitRecordsCount; DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end); list1.add(temp.toJavaRDD()); } long length =list1.size(); int split=0; for (int i = 0; i < length; i++) { JavaRDD rdd1 =list1.get(i); JavaPairRDD rdd3=rdd1.cartesian(rdd1); JavaPairRDD<Row,Row> rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; R.start(); int index =i; while(index<length) { JavaRDD rdd2 =list1.get(index); rdd3=rdd1.cartesian(rdd2); rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; index++; } } } } 

In this case, I encountered the following exception:

enter image description here

I tried the solution indicated in the following link

How to do parallel jobs (actions) in Apache Spark using a single spark source

But still I can not solve this problem.

Could you advise me to allow this?

+5
source share
2 answers

First of all, you are trying to do all the work with the node driver using multiple threads. This is not in the spirit of Spark, since each unit of work in your case is independent of the others and can be performed on different machines. You have an example with a toy, but it will become very important with large amounts of data.

A better approach would be to use something like mapPartitions to send a range of keys to each employee and let them execute the appropriate SQL, and then save the results with one thread per employee. This will make the code more understandable and understandable (as soon as you get used to working with RDD). You obviously need to set the parallelism level and number of sections (about here ) for your input accordingly.

The immediate problem with your code is that the main thread starts other threads, but does not wait for them to complete. Usually this causes the spawned threads to end together with the parent (see javadoc ). Notice how in the answer to the related question, the main function executes get() on the generated futures before returning.

+4
source

Could you use

 SparkConf sparkConf=new SparkConf() .setAppName("SparkJavaTest") .set("spark.driver.allowMultipleContexts", "true"); 

I am also new to Spark programming, but it helped me when running several Spark jobs in the scheduler. In my opinion, we don’t need to think about threads or the M / R logic / execution plan, since Spark does this for you.

0
source

All Articles