Cassandra insertion efficiency using spark-cassandra connector

I am new to sparks and cassandra. I am trying to insert a cassandra into the table using the spark-cassandra connector as shown below:

import java.util.UUID import org.apache.spark.{SparkContext, SparkConf} import org.joda.time.DateTime import com.datastax.spark.connector._ case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long) object SparkConnectorContext { val conf = new SparkConf(true).setMaster("local") .set("spark.cassandra.connection.host", "192.168.xxx.xxx") val sc = new SparkContext(conf) } object TestRepo { def insertList(list: List[TestEntity]) = { SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily") } } object TestApp extends App { val start = System.currentTimeMillis() TestRepo.insertList(Utility.generateRandomData()) val end = System.currentTimeMillis() val timeDiff = end-start println("Difference (in millis)= "+timeDiff) } 

When I insert this method (a list of 100 objects), it takes 300-1100 milliseconds . I tried to insert the same data using phantom . It takes less than 20-40 milliseconds .

Can someone tell me why the spark plug takes a long time to insert? Am I doing something wrong in my code or is it not recommended to use the spark-cassandra connector for insert operations?

+3
source share
2 answers

It looks like you are turning on the parallelization operation in due time. In addition, since you have a spark worker running on a different machine than Cassandra, the saveToCassandra operation will write over the network.

Try setting up your system to run spark workers on Cassandra nodes. Then create an RDD in a separate step and call an action such as count () for it to load the data into memory. You may also want to save () or cache () the RDD to make sure that it remains in memory for the test.

Then the time is just saveToCassandra of this cached RDD.

You can also look at the repartitionByCassandraReplica method offered by the Cassandra connector. This would separate the data in the RDD from which the Cassandra node needed to be written. This way you use data locality and often avoid recording and shuffling over the network.

+5
source

There are several serious issues with your โ€œbenchmarkโ€:

  • Your data set is so small that you only measure the job setup time. Saving 100 objects should be on the order of one millisecond on one node, not seconds. In addition, saving 100 objects gives the JVM the ability to compile the code you run to optimize machine code.
  • In your dimension, spark context initialization is enabled. The JVM loads classes lazily, so the spark initialization code is actually called after the start of measurement. This is an extremely expensive item, usually run only once for the entire spark application, not even for every job.
  • You take measurements only once per run. This means that you are even measuring the ctx spark setting and job setup time incorrectly, because the JVM must load all classes first, and Hotspot probably has no chance to strike.

To summarize, you most likely measure the class loading time, which depends on the size and number of classes loaded. Spark is a pretty big thing to download, and a few hundred milliseconds is not surprising.

To correctly measure insert performance:

  • use a larger dataset
  • exclude one-time setup from measurement
  • perform several launches sharing the same spark context, and discard several initial ones until you reach a steady state.

BTW If you enable the debug logging level, the connector records the insertion time for each section in the executor logs.

+1
source

All Articles