Record Spark Streaming to a socket

I have a DStream "Crowd" and I want to write each item in "Crowd" to a socket. When I try to read from this socket, it does not print anything. I use the following line of code:

val server = new ServerSocket(4000,200);
val conn = server.accept()
val out = new PrintStream(conn.getOutputStream());
crowd.foreachRDD(rdd => {rdd.foreach(record=>{out.println(record)})})

But if you use (this is not what I want, though):

crowd.foreachRDD(rdd => out.println(rdd)) 

He writes something to the socket.

I suspect there is a problem using rdd.foreach (). Although it should work. I'm not sure what I am missing.

+4
source share
3 answers

DStream , rdd.foreach(...) RDD. , , , - .

DStream.foreachRDD , . .

RDD , , " ?". - , . Kafka - .

+4
crowd.foreachRDD(rdd => {rdd.collect.foreach(record=>{out.println(record)})})

, RDD . , , , , . . , RDD , RDD , , . , , , RDD .

, , . - - , .

0

!

You need to create a connection inside the function foreachRDD, and if you want to do this optimally, you need to create a "pool" of connections, and then bring the connection you want inside the function foreachPartition, and call the function foreachto send elements through this connection. This is the sample code for this in the best way:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

In any case, check out the other comments as they provide a good knowledge of the context of the problem.

0
source

All Articles