Spark joinWithCassandraTable () on map ERROR partitioned key

I am trying to filter out a small part of a huge Cassandra table using:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") 

I want to match the rows in the cassandra table on the "created" column, which is part of the section key.

My table key (section of a table section), defined as:

 case class TableKey(imei: String, created: Long, when: Long) 

The result is an error:

[error] /home/ubuntu/scala/test/test.scala:61: insufficient arguments for the method: (imei: String, created: Long) test.TableKey in the TableKey object. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize (startDate to endDate) .map (TableKey (_2)). joinWithCassandraTable ("listener", "snapshots_test_b") [error] ^ [error] one error was detected [error] (compilation: compilation) Compilation error

It worked with only one object in a section key, as in the Documentation .

Why is there a problem with multiple partition keys? - answered.

EDIT: I tried using joinWithCassandraTable in the correct form:

 val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c") 

When I try to run it on Spark, there are no errors, but it is stuck on "[stage 0:> (0 + 2) / 2]" forever ...

What's wrong?

+6
source share
1 answer

The error tells you that the TableKey class TableKey to initialize 3 components, but only one argument has been passed. This is a Scala compilation error and is not related to C * or Spark.

  val snapshotsFiltered = sc.parallelize(startDate to endDate) .map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail .joinWithCassandraTable("listener","snapshots_test_b") 

In general, C * uses the entire partition key to determine where a particular string lives. Because of this, you can only efficiently retrieve data if you know the entire partition key , so transferring only part of it does not matter.

A joinWithCassandraTable connection requires full partition key values ​​in order for it to work efficiently. if you have only part of the parition key , you will need to perform a full table scan and use Spark to filter.

If you want to filter based only on clustering column , you can do this by clicking the where clause in C *, for example

 sc.cassandraTable("ks","test").where("clustering_key > someValue") 
+5
source

All Articles