Akka model for computing tasks

I have the following requirement

  • Connect to the web server with a username and password and get an authentication token.
  • Read the file for different parameters
  • Use the authentication token for step 1 and the parameters from step 2 to send the HTTP request to the web server

Now I have one actor who performs all of the above tasks as follows

package akka.first.java; import akka.actor.UntypedActor; public class MySingleActor extends UntypedActor { public void onReceive(Object msg) { if( msg instanceof sendRequest ) { //Connect to a webserver with a username and password and get an authetication token String token = getToken(); // Read file to get different parameters Param param = readFile(); // Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server Response response = sendRequest (server, token, param); } } private Param readFile() { // reads file } private String getToken() { //gets token } } 

The readFile operation contains various subtasks, which, I think, should be a separate actor. But since the actor’s job requires a return from the readFile () operation to complete the main task of sending the request, this can block, which, according to the documents, is not recommended, what is the best way to do this? Futures?

+8
java akka blocking
source share
2 answers

The official documentation offers the following solutions:

  • Make a blocking call inside the actor (or a set of participants controlled by the router [Java, Scala]) to configure a thread pool that is designed for this purpose or large enough.
  • Make a blocking call in the future, providing an upper bound on the number of such calls at any given time (sending an unlimited number of tasks of this kind will exhaust the limitations of your memory or thread).
  • Make a blocking call in the future by providing the thread pool with an upper limit on the number of threads suitable for the equipment on which the application is running.
  • Allocate a single thread to control the set of blocking resources (for example, an NIO selector that controls several channels) and send events as they appear as actor messages.

The use of futures refers to the officially proposed approaches, but with extreme caution.

Consider the first approach, since IMO is more consistent.

First of all, remove all blocking I / O operations to new members that perform only one blocking I / O operation. Suppose for brevity there is only one such operation:

 public class MyBlockingIOActor extends UntypedActor { public void onReceive(Object msg) { // do blocking IO call here and send the result back to sender } } 

Add a dispatcher configuration that takes care of blocking participants, an actor system configuration file (usually application.conf ):

 #Configuring a dispatcher with fixed thread pool size, eg for actors that perform blocking IO blocking-io-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 32 } throughput = 1 } 

Please make sure that you use the configuration file when creating the actors system (especially if you decide to use a non-standard file name for the configuration):

 ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf")); 

After that, you want to assign an actor who performs an I / O lock for the dedicated dispatcher. You can do this in the configuration as described here or when creating the actor:

 ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")); 

To increase throughput, consider blocking actor blocking in the pool:

 SupervisorStrategy strategy = new OneForOneStrategy( 5, Duration.create(1, TimeUnit.MINUTES), Collections.singletonList(Exception.class) ); ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"))); 

You can verify that the actor is using the correct dispatcher as follows:

 public class MyBlockingIOActor extends UntypedActor { public void preStart() { LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id()); } } 
+5
source share

You can use Futures, or perhaps RxJava with Observers and Observers. Or different participants and send the final answer to the sender orginial.

  public class MySingleActor extends UntypedActor{ private ActorRef tokenActor; private ActorRef readFileActor; public MySingleActor(){ tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor"); readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor"); } public void onReceive(Object msg) { if( msg instanceof sendRequest ) { Future<String> f= Futures.future(new Callable<String>() { @Override public String call() throws Exception { return getToken(); } },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self()); } }} 

Or instead of a pipe

f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());

+2
source share

All Articles