Spark: connect DB to Spark RDD section and make mapPartition

I want to do mapPartitions on my rdd sparks,

val newRd = myRdd.mapPartitions( partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map( record => { readMatchingFromDB(record, connection) }) connection.close() newPartition }) 

But this gives me a connection to an already closed exception, as expected, because before the control reaches .map() , my connection will be closed. I want to create a connection for an RDD section and close it correctly. How can I achieve this?

Thank!

+2
scala apache-spark rdd
Jun 17 '16 at 11:59
source share
2 answers

As mentioned in the discussion here - the problem stems from the laziness of the card on the partition iterator. This laziness means that for each section the connection is created and closed, and only later (when the RDD is in effect) readMatchingFromDB is readMatchingFromDB .

To fix this, you must force iterator traversal before closing the connection, for example. by converting it to a list (and then back):

 val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() newPartition.iterator // create a new iterator }) 
+5
Jun 20 '16 at 5:29
source share
 rdd.foreachPartitionAsync(iterator->{ // this object will be cached inside each executor JVM. For the first time, the //connection will be created and hence forward, it will be reused. // Very useful for streaming apps DBConn conn=DBConn.getConnection() while(iterator.hasNext()) { conn.read(); } }); public class DBConn{ private static dbObj=null; //Create a singleton method that returns only one instance of this object } } 
0
Dec 06 '18 at 20:17
source share



All Articles