Remove from cassandra table in Spark

I use Spark with cassandra. And I read a few rows from the table to remove the topic using PrimaryKey. This is my code:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

But this method creates a session for each row, and it takes a lot of time. This way you can delete my lines using sc.CassandraTable or another solution, better than mine.

thank

+4
source share
2 answers

I don’t think there is support deleteat the moment on the Cassandra Connector. To amortize the cost of establishing a connection, it is recommended that you use an operation for each section.

So your code will look like this:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})

DELETE FROM ... WHERE pk IN (list) list . , , . RDD .

+8

, , , .: P , , Java. Cassandra. BETA PRODUCTION, , Cassandra, 1 , .:(

, , Cassandra, !

public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    myData.foreachPartition(partition -> {
        Session session = connector.openSession();

        while(partition.hasNext()) {
            Row row = (Row) partition.next();
            boolean isTested = (boolean) row.get(0);
            String product = (String) row.get(1);
            long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
            String id = (String) row.get(3);

            String deleteMyData = "DELETE FROM test.my_table"
                    + " WHERE is_tested=" + isTested
                    + " AND product='" + product + "'"
                    + " AND report_date=" + reportDateInMillSeconds
                    + " AND id=" + id + ";";

            System.out.println("%%% " + deleteMyData);
            ResultSet deleteResult = session.execute(deleteMyData);
            boolean result = deleteResult.wasApplied();
            System.out.println("%%% deleteResult =" + result);
        }
        session.close();
    });
}
0

All Articles