You can use the Postgres copy api to write it, much faster. See the following two methods: one iteration over the RDD to fill the buffer, which can be saved with a copy of the api. The only thing you need to take care of is to create the correct operator in csv format, which will be used when copying the api.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = { val sb = mutable.StringBuilder.newBuilder val now = System.currentTimeMillis() rdd.collect().foreach(itr => { itr.foreach(_.createCSV(sb, now).append("\n")) }) copyIn("myTable", new StringReader(sb.toString), "statement") sb.clear } def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = { val conn = connectionPool.getConnection() try { conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader) } catch { case se: SQLException => logWarning(se.getMessage) case t: Throwable => logWarning(t.getMessage) } finally { conn.close() } }
smishra
source share