Why Apache Spark performs filters on the client

Being new to apache sparks, facing some problem while getting Cassandra data about Spark.

List<String> dates = Arrays.asList("2015-01-21","2015-01-22"); CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc). cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)). where("Id=? and date IN ?","Open",dates); 

This request does not filter data on the cassandra server. Although this java expression does its memory spin and finally throws a spark java.lang.OutOfMemoryError exception. The request should be filtered on the cassandra server instead of the client side, as indicated at https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md .

While I am executing a query with filters in cassandra cqlsh, it executes fine, but it executes a query without a filter (where clause) giving the wait time that is expected. Therefore, it is clear that the spark does not apply filters on the client side.

 SparkConf conf = new SparkConf(); conf.setAppName("Test"); conf.setMaster("local[8]"); conf.set("spark.cassandra.connection.host", "192.168.1.15") 

Why filters are applied on the client side and how they can be improved to apply filters on the server side.

How can we configure a spark cluster on top of a cassandra cluster on a Windows platform?

+1
source share
3 answers

Without using Cassandra with Spark, after reading the section that you provided (thanks for that), I see that:

Note. Although the ALLOW FILTERING clause is implicitly added to the generated CQL query, not all predicates are currently allowed by the Cassandra Engine. This limitation will be addressed in future releases of Cassandra. Currently ALLOW FILTERING works well with columns indexed by secondary indexes or clustering columns.

I am sure (but not tested) that the "IN" predicate is not supported: see https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/srcm java / com / datastax / spark / connector / japi / rdd / CassandraJavaRDD.java # L80

So, you can try to limit your where-Id clause to Id (if there is an additional index for this) and use spark filtering for the date range.

+2
source

I would suggest reading the table as a DataFrame instead of RDD. They are available in Spark 1.3 and higher. Then you can specify the CQL query as a string as follows:

 CassandraSQLContext sqlContext = new CassandraSQLContext(sc); String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')"; DataFrame resultsFrame = sqlContext.sql(query); System.out.println(resultsFrame.count()); 

So try this and see if it works best for you.

Once you have the data in the DataFrame, you can run Spark SQL operations on it. And if you need data in RDD, you can convert the DataFrame to RDD.

+1
source

setting spark.cassandra.input.split.size_in_mb in SparkConfing solved the problem.

 conf = new SparkConf(); conf.setAppName("Test"); conf.setMaster("local[4]"); conf.set("spark.cassandra.connection.host", "192.168.1.15"). set("spark.executor.memory", "2g"). set("spark.cassandra.input.split.size_in_mb", "67108864"); 

The Spark-cassnadra-connector reads the incorrect spark.cassandra.input.split.size_in_mb value, so overriding this value in SparkConf does the job. Now the IN article also works well.

+1
source

All Articles