All aggregation operations are performed after the entire data set is retrieved into memory in the DataFrame collection. Thus, counting in Spark will never be as effective as it would be directly in TeraData. Sometimes itβs worth adding some calculations to the database by creating views and then displaying these views using the JDBC API.
Each time you use the JDBC driver to access a large table, you must specify a partitioning strategy, otherwise you will create a DataFrame / RDD with one partition and overload one JDBC connection.
Instead, you want to try the following AI (starting with Spark 1.4. 0+):
sqlctx.read.jdbc( url = "<URL>", table = "<TABLE>", columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", lowerBound = minValue, upperBound = maxValue, numPartitions = 20, connectionProperties = new java.util.Properties() )
It is also possible to undo some filtering.
If you do not have a uniformly distributed integral column, you want to create several user partitions by specifying user predicates ( where clauses). For example, suppose you have a timestamp column and want to separate date ranges:
val predicates = Array( "2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20", "2015-07-21" -> "2015-07-31" ) .map { case (start, end) => s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) <= date '$end'" } predicates.foreach(println) // Below is the result of how predicates were formed //cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) <= date '2015-06-30' //cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) <= date '2015-07-10' //cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) <= date //'2015-07-20' //cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) <= date '2015-07-31' sqlctx.read.jdbc( url = "<URL>", table = "<TABLE>", predicates = predicates, connectionProperties = new java.util.Properties() )
It will create a DataFrame where each section will contain records of each subquery associated with different predicates.
Check source code on DataFrameReader.scala