I create the uber jar spark application that I represent in the EMR 4.3 cluster, I provide 4 r3.xlarge instances, one of which is the main one, and the other three are the kernels.
I have hasoop 2.7.1, ganglia 3.7.2 spark 1.6 and hive 1.0.0 pre-installed from the console.
I run the following command:
spark-submit \ --deploy-mode cluster \ --executor-memory 4g \ --executor-cores 2 \ --num-executors 4 --driver-memory 4g --driver-cores 2 --conf "spark.driver.maxResultSize=2g" --conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter" --conf "spark.shuffle.memoryFraction=0.2" --class com.jackar.spark.Main spark-app.jar args
I understand that I do not fully use the cluster, but at the moment I’m definitely not trying to configure (or maybe what should I do?). My main job is doing something like:
1) Read the parquet files from s3 that represent two data sets, run registerTempTable in Dataframes, then run cacheTable for each. Each of them has about 300 MB of memory. (note: I tried this using the EMP s3: // protocol as well as s3a: //)
2) Use the sql spark to start aggregates (i.e. sums and group bytes).
3) Write the results in s3 as parquet files.
The work is doing fine when I look at the Spark interface, and it takes as much as I expect. The problem is that after the write-agg-to-parquet-in-s3 job (Job tab) is complete, there is a period of time when no other jobs are in the queue.
If I go to the SQL tab in the Spark user interface, I will see that there is a “running query” for the same job that the Jobs tab is talking about. When I click and look at the DAG for this request, I notice that the DAG is already being evaluated.

However, this request takes several minutes, and sometimes leads to a reboot of the entire spark application and, ultimately, to failure ...

I started doing some investigations to find out if I can understand the problem, because in my Databricks study this work is incredibly fast and the DAG is identical to the EMR job (as expected). But I can’t bring myself to justify using Databricks when I have no idea why I don’t see similar performance in EMR.
Perhaps these are my JVM options? For example, garbage collection? Time to check artist logs.
2016-02-23T18:25:48.598+0000: [GC2016-02-23T18:25:48.598+0000: [ParNew: 299156K->30449K(306688K), 0.0255600 secs] 1586767K->1329022K(4160256K), 0.0256610 secs] [Times: user=0.05 sys=0.00, real=0.03 secs] 2016-02-23T18:25:50.422+0000: [GC2016-02-23T18:25:50.422+0000: [ParNew: 303089K->32739K(306688K), 0.0263780 secs] 1601662K->1342494K(4160256K), 0.0264830 secs] [Times: user=0.07 sys=0.01, real=0.02 secs] 2016-02-23T18:25:52.223+0000: [GC2016-02-23T18:25:52.223+0000: [ParNew: 305379K->29373K(306688K), 0.0297360 secs] 1615134K->1348874K(4160256K), 0.0298410 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 2016-02-23T18:25:54.247+0000: [GC2016-02-23T18:25:54.247+0000: [ParNew: 302013K->28521K(306688K), 0.0220650 secs] 1621514K->1358123K(4160256K), 0.0221690 secs] [Times: user=0.06 sys=0.01, real=0.02 secs] 2016-02-23T18:25:57.994+0000: [GC2016-02-23T18:25:57.994+0000: [ParNew: 301161K->23609K(306688K), 0.0278800 secs] 1630763K->1364319K(4160256K), 0.0279460 secs] [Times: user=0.07 sys=0.01, real=0.03 secs]
Good. It does not look good. Parnew stops the world , and it happens every couple of seconds.
The next step is by looking at the Spark UI on the Databricks to find out if the gc configuration is different from EMR. I found something interesting. Databricks sets the value of spark.executor.extraJavaOptions to:
-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -javaagent:/databricks/DatabricksAgent.jar -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError -Ddatabricks.serviceName=spark-executor-1
Hey, I'm not a gc expert, and I added “learn how to configure gc” to the todo list, but what I see here is more than just gc params for performers. What does DatabricksAgent.jar do - does it help? I am not sure, therefore I force my spark job to use java options for executors minus specific databricks data:
--conf spark.executor.extraJavaOptions="-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError"
This does not change the running request behavior - it still takes forever - but I have PSYoungGen instead of Parnew (the frequency is still every two seconds):
2016-02-23T19:40:58.645+0000: [GC [PSYoungGen: 515040K->12789K(996352K)] 1695803K->1193777K(3792896K), 0.0203380 secs] [Times: user=0.03 sys=0.01, real=0.02 secs] 2016-02-23T19:57:50.463+0000: [GC [PSYoungGen: 588789K->13391K(977920K)] 1769777K->1196033K(3774464K), 0.0237240 secs] [Times: user=0.04 sys=0.00, real=0.02 secs]
If you read here, I recommend you. I know how long this post takes.
Another sign that I discovered is that stderr and stdout are at rest during query execution, and no new log lines are added to any executor (including driver).
i.e.
16/02/23 19:41:23 INFO ContextCleaner: Cleaned shuffle 5 16/02/23 19:57:32 INFO DynamicPartitionWriterContainer: Job job_201602231940_0000 committed.
The same ~ 17-minute gap is taken into account in the Spark user interface as an executable request ... Any idea what happens?
Ultimately, this task tends to reboot after several agg commands are written to S3 (say 10% of them), and then, in the end, the spark application fails.
I am not sure if the problem is that EMR is running on YARN, and Databricks is working in a standalone cluster or if it is not completely connected.
The crash that I get after I looked into the yarn logs is as follows:
java.io.FileNotFoundException: No such file or directory: s3a://bucket_file_stuff/_temporary/0/task_201602232109_0020_m_000000/
Any advice is greatly appreciated. I will add notes as I leave. Thanks!