Failed to deserialize ActorRef to send result to other players

I am starting to use Spark Streaming to process the real-time data feed that I receive. My scenario is that I have an Akka Accor Receiver using "with ActorHelper", then I have a Spark job doing some transformations and transformations, and then I want to send the result to another actor.

My problem is the last part. When trying to send to another actor, Spark throws an exception:

02/15/20 16:43:16 WARN TaskSetManager: lost task 0.0 in step 2.0 (TID 2, localhost): java.lang.IllegalStateException: attempt to deserialize a serialized ActorRef without using the ActorSystem system. Use 'akka.serialization.Serialization.currentSystem.withValue (system) {...}'

I create this last actor as follows:

val actorSystem = SparkEnv.get.actorSystem val lastActor = actorSystem.actorOf(MyLastActor.props(someParam), "MyLastActor") 

And then using it like this:

 result.foreachRDD(rdd => rdd.foreachPartition(lastActor ! _)) 

I’m not sure where and how to make the β€œUse” advice akka.serialization.Serialization.currentSystem.withValue (system) {...} '". Do I need to install any special configuration? Or create my actor differently?

+4
source share
2 answers

I found that if I am going to, before I send it to the actor, it works like a charm:

 result.foreachRDD(rdd => rdd.collect().foreach(producer ! _)) 
+1
source

See the following example to access an actor outside the Spark domain.

/ * * The following is the use of actorStream to connect a user actor as a receiver * * It is important to note: * Since the Actor can exist outside the scope of the spark, therefore, it is the user's responsibility * to ensure the type safety, that is, the type of received data and InputDstream * must be the same . * * For example: both parameters actorStream and SampleActorReceiver are parameterized * to the same type to ensure type safety. * /

 val lines = ssc.actorStream[String]( Props(new SampleActorReceiver[String]("akka.tcp:// test@ %s:%s/user/FeederActor".format( host, port.toInt))), "SampleReceiver") 
+1
source

All Articles