How to get good performance when reading cassandra partitions in spark mode?

I am reading data from the cassandra section into a spark using cassandra-connector. I tried to find solutions for reading sections. I tried to parallelize the task by creating rdds as much as possible, but both the ONE solutions and the TWO solution had the same performance.

In the ONE solution, I could immediately see the steps in the spark interface. I tried to avoid one loop in the TWO solution.

In a TWO solution, steps appear after a considerable amount of time. Also, as the number of user elements increases, there is a significant increase in the time before the steps appear in the spark UI for the TWO solution.

Version
 spark - 1.1
 Dse - 4.6
 cassandra-connector -1.1

Setup
 3 - Nodes with spark cassandra
 Each node has 1 core dedicated to this task.
 512MB ram for the executor memory.

My cassandra table schema,

 CREATE TABLE   test (
   user text,
   userid bigint,
   period timestamp,
   ip text,
   data blob,
   PRIMARY KEY((user,userid,period),ip)
   );

First decision:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val partitions = users.flatMap(x => period.map(y => (x,y))))
 val userids = 1 to 10
 for (userid <- userids){
 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,userid)
                          )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)
 }

The second solution:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val userids = 1 to 10
 val partitions = users.flatMap(x => period.flatMap(
                  y => userids.map(z => (x,y,z))))

 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,x._3)
                     )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)

Why is TWO not faster than ONE?

, , , , . , , .

+4
1

, joinWithCassandraTable, api , ( , ). api RDD palatalizes C *.

-

sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")

, repartitionByCassandraReplica, , , . .

raw driver, -

val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it => 
  cc.withSessionDo{ session =>
    session.execute( Some query )
  }
}

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

:

- , 60 C *. , , , , C *.

- , Spark . , 60 RDD, C *. (1 RDD , RDD , ). , 60 RDD , , , . RDD .

, , C *, , , ( ) . 60 RDD , . , .

, RDD, .

+4

All Articles