How to improve performance for slow Spark jobs using a DataFrame and JDBC connection?

I am trying to access a medium-sized Teradata table (~ 100 million rows) through JDBC offline on a single node (local [*]).

I am using Spark 1.4.1. and configured on a very powerful machine (2 processors, 24 cores, 126 GB of RAM).

I tried several memory settings and settings to make it work faster, but none of them had a big impact.

I am sure there is something that I don’t see, and below is my last attempt, which took about 11 minutes to get this simple score, and it took 40 seconds using the JDBC connection via R to get the counts.

bin/pyspark --driver-memory 40g --executor-memory 40g df = sqlContext.read.jdbc("jdbc:teradata://......) df.count() 

When I tried with the BIG table (records 5B), then no results were obtained after the query was completed.

+8
source share
3 answers

All aggregation operations are performed after the entire data set is retrieved into memory in the DataFrame collection. Thus, counting in Spark will never be as effective as it would be directly in TeraData. Sometimes it’s worth adding some calculations to the database by creating views and then displaying these views using the JDBC API.

Each time you use the JDBC driver to access a large table, you must specify a partitioning strategy, otherwise you will create a DataFrame / RDD with one partition and overload one JDBC connection.

Instead, you want to try the following AI (starting with Spark 1.4. 0+):

 sqlctx.read.jdbc( url = "<URL>", table = "<TABLE>", columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", lowerBound = minValue, upperBound = maxValue, numPartitions = 20, connectionProperties = new java.util.Properties() ) 

It is also possible to undo some filtering.

If you do not have a uniformly distributed integral column, you want to create several user partitions by specifying user predicates ( where clauses). For example, suppose you have a timestamp column and want to separate date ranges:

  val predicates = Array( "2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20", "2015-07-21" -> "2015-07-31" ) .map { case (start, end) => s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) <= date '$end'" } predicates.foreach(println) // Below is the result of how predicates were formed //cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) <= date '2015-06-30' //cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) <= date '2015-07-10' //cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) <= date //'2015-07-20' //cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) <= date '2015-07-31' sqlctx.read.jdbc( url = "<URL>", table = "<TABLE>", predicates = predicates, connectionProperties = new java.util.Properties() ) 

It will create a DataFrame where each section will contain records of each subquery associated with different predicates.

Check source code on DataFrameReader.scala

+15
source

Is a non-serialized table suitable for 40 GB? If he begins to replace the drive, performance will drop dramatically.

In any case, when you use standard JDBC with ansi SQL syntax, you use the DB mechanism, so if teradata (I don't know teradata) contains statistics about your table, the classic "select count (*) from the table" will be very fast. Instead, the spark loads 100 million rows in memory with something like "select * from table", and then it will perform RDD row counting. This is a fairly different workload.

+5
source

One solution that differs from others is to save data from an Oracle table into an avro file (split into several files), stored in hadoop. Thus, reading these avro files using spark would be peaceful, since you would no longer call db.

0
source

All Articles