Insert analytics data from Spark to Postgres

I have a Cassandra database from which I parsed data using SparkSQL through Apache Spark. Now I want to insert this parsed data into PostgreSQL. Is there a way to achieve this directly besides using the PostgreSQL driver (I achieved this using postREST and the driver, I want to know if there are any methods like saveToCassandra() )?

+8
java postgresql cassandra apache-spark apache-spark-sql
source share
4 answers

There is currently no built-in implementation of RDD records in any DBMS. Here are links to related discussions in the Spark user list: one , two

In general, the most effective approach would be the following:

  • Confirm the number of partitions in the RDD, it should not be too low and too high. 20-50 sections must be accurate, if the number is less - call a repartition with 20 sections, if higher - call coalesce up to 50 sections
  • Call the mapPartition transformation, inside which a function is called to insert records into your DBMS using JDBC. In this function, you open a connection to your database and use the COPY command with this API , this will allow you to eliminate the need for a separate command for each record - this is because the insert will be processed much faster

Therefore, you must embed data in Postgres in parallel using up to 50 parallel connections (depending on the size of your Spark cluster and its configuration). The whole approach can be implemented as a Java / Scala function that accepts RDD and connection string

+13
source share

The answer to this question is 0x0FFF. Here is another point that will be useful.

I use foreachPartition to save external storage. This also follows the Design Patterns for using foreachRDD design template specified in the Spark documentation https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

Example:

 dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } 
+1
source share

You can use the Postgres copy api to write it, much faster. See the following two methods: one iteration over the RDD to fill the buffer, which can be saved with a copy of the api. The only thing you need to take care of is to create the correct operator in csv format, which will be used when copying the api.

 def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = { val sb = mutable.StringBuilder.newBuilder val now = System.currentTimeMillis() rdd.collect().foreach(itr => { itr.foreach(_.createCSV(sb, now).append("\n")) }) copyIn("myTable", new StringReader(sb.toString), "statement") sb.clear } def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = { val conn = connectionPool.getConnection() try { conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader) } catch { case se: SQLException => logWarning(se.getMessage) case t: Throwable => logWarning(t.getMessage) } finally { conn.close() } } 
+1
source share

The answers above apply to older versions of spark, in spark 2. * There is a jdbc connector that allows you to write directly to RDBS from a dataFrame.

example:

 jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) 

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

0
source share

All Articles