I am using the latest version of Spark 1.6.1 released on March 9, 2016.
I am starting a new cluster of 10 nodes each with 256 GB of RAM and 32 cores. I guarantee that SPARK_WORKER_DIR is empty on each node. I download two .csv tables (each 200 GB) from HDFS and attach them to one column. Despite its use .cache, I found that 28 GB of data is written to the block manager directory on each node (a total of 280 GB). Is there a way to disable the block manager? I just would like to join the two tables in memory.
If anything else that you may notice, I am doing wrong here, it would be great to know. Thank.
These parameters are set on each of 10 nodes:
export SPARK_PRINT_LAUNCH_COMMAND=1
export SPARK_MASTER_IP="NODE1"
export SPARK_MASTER_PORT="17077"
export SPARK_WORKER_PORT="17087"
export SPARK_WORKER_INSTANCES=1
export MASTER="spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
export SPARK_HOME="/home/mattd/spark-1.6.1-bin-hadoop2.6"
tmpdir=~/tmp
export SPARK_LOG_DIR=$tmpdir/spark/logs
export SPARK_WORKER_DIR=$tmpdir/spark/work
export SPARK_LOCAL_DIRS=$tmpdir/spark/work
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-slave.sh $MASTER
NODE1 ( HDFS Node, ) :
$SPARK_HOME/bin/spark-shell --master spark://NODE1:17077
--packages com.databricks:spark-csv_2.10:1.3.0
--executor-memory 220G --driver-memory 20G
import org.apache.spark.sql.SQLContext
import org.joda.time._
val sqlContext = SQLContext.getOrCreate(sc)
val X = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("hdfs://NODE1/datasets/X.csv")
val Y = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("hdfs://NODE1/datasets/Y.csv")
X.show Y.show
+----------+-----------+ +----------+-----------+
| KEY| X2| | KEY| Y2|
+----------+-----------+ +----------+-----------+
|7632858426| 1476754316| |2055977176|-5965358667|
|8176486913|-7805583191| | 358321276| 8309936938|
|3235589527|-9261053875| |4311730481|-1352785819|
|6017229071| -756595861| |2084690536|-1218203804|
|1039519144| 8161598094| |3183436600| 6996336786|
... snip ... snip
X.count
10000000000
Y.count
10000000000
val ans = X.join(Y, "KEY").orderBy("KEY")
ans.cache()
ans.show
ans.count
ans.cache(), , node:
$ pwd
/home/mattd/tmp/spark/work/spark-27273513-649b-4232-83eb-112eec922158/
executor-2154b748-3550-488a-b76c-6935d7c41699/
$ du -sh blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
29G blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
$ ls blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
00 04 08 0c 10 14 18 1c 20 24 28 2c 30 34 38 3c
01 05 09 0d 11 15 19 1d 21 25 29 2d 31 35 39 3d
02 06 0a 0e 12 16 1a 1e 22 26 2a 2e 32 36 3a 3e
03 07 0b 0f 13 17 1b 1f 23 27 2b 2f 33 37 3b 3f