!
You need to create a connection inside the function foreachRDD, and if you want to do this optimally, you need to create a "pool" of connections, and then bring the connection you want inside the function foreachPartition, and call the function foreachto send elements through this connection. This is the sample code for this in the best way:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}
In any case, check out the other comments as they provide a good knowledge of the context of the problem.
source
share