How do you do I / O lock in apache spark operation?

What if, when crossing the RDD, I need to calculate the values ​​in the data set by calling an external (blocking) service? What do you think can be achieved?

val values: Future[RDD[Double]] = Future sequence tasks

I tried to create a list of futures, but since RDD id is not Traversable, Future.sequence is not suitable.

I'm just wondering if anyone has such a problem and how did you solve it? I am trying to get parallelism for one working node, so I can call this external service 3000 times per second .. p>

Perhaps there is another solution that is more suitable for a spark, for example, the presence of several work nodes on the same host.

It is interesting to know how you deal with this challenge. Thanks.

+7
scala parallel-processing apache-spark
source share
2 answers

Here is the answer to my question:

 val buckets = sc.textFile(logFile, 100) val tasks: RDD[Future[Object]] = buckets map { item => future { // call native code } } val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] => val searchFuture: Future[Iterator[Object]] = Future sequence f Await result (searchFuture, JOB_TIMEOUT) } 

The idea here is that we get a collection of sections, where each section is sent to a specific employee and is the smallest part of the work. Each part of the work contains data that can be processed by calling its own code and sending this data.

'values' contains data that is returned from native code and that work is done through the cluster.

+3
source share

Based on your answer that the blocking call is to compare the provided input with each individual element in RDD, I would strongly recommend rewriting the comparison in java / scala so that it can be run as part of your spark process. If the comparison is a "clean" function ( no side effects depend only on its inputs), this should be simple for re-implementation, as well as reducing complexity and increasing stability in the spark process due to the lack of the need to delete calls, probably, will be worth it.

It seems unlikely that your remote service will be able to handle 3000 calls per second, so the local version in the process would be preferable.

If this is completely impossible for some reason, then you can create an RDD transform that turns your data into futures RDD in pseudo-code:

 val callRemote(data:Data):Future[Double] = ... val inputData:RDD[Data] = ... val transformed:RDD[Future[Double]] = inputData.map(callRemote) 

And then continue from there, computing your Future [Double] objects.

If you know how much parallelism your remote process can handle, it is best to abandon Future mode and accept that it is a bottleneck.

 val remoteParallelism:Int = 100 // some constant val callRemoteBlocking(data:Data):Double = ... val inputData:RDD[Data] = ... val transformed:RDD[Double] = inputData. coalesce(remoteParallelism). map(callRemoteBlocking) 

Your work will probably take quite some time, but it should not overrun your remote service and die horribly.

The final option is that if the inputs are reasonably predictable and the range of results is consistent and limited by some reasonable number of outputs (millions or so), you can pre-copy them all as a dataset using a remote service and find them when sparking using a connection .

+1
source share

All Articles