Akka lock options if an HTTP response is required

I understand how to make an application based on a non-blocking message in akka, and can easily mock up examples that perform simultaneous operations and pass aggregated results in the message. Where it is difficult for me to understand that non-blocking parameters are when my application should respond to an HTTP request. The goal is to receive the request and immediately forward it to the local or remote actor to do the work, which in turn will pass it on to get the result, which may take some time. Unfortunately, according to this model, I donโ€™t understand how I could express this with the help of a non-blocking series of โ€œspeaksโ€ rather than blocking โ€œasksโ€. If at any time in the chain I use a hint, I no longer have the future to use the response as possible content (required by the http infrastructure interface, which in this case is the final), but this is not important). I understand that the request is in its own stream, and my example is pretty far-fetched, but just trying to understand my design options.

In conclusion, if my contrived example below can be redesigned to block less, I really like to understand how to do it. This is my first use of akka, since some light studies a year ago +, and in every article, document and conversations that I examined, do not block services.

Conceptual answers may be helpful, but may also coincide with what I have already read. Working / Editing my example is likely to be the key to my understanding of the exact problem I'm trying to solve. If the current example generally needs to be done to make it useful, so I'm not looking for magic that doesn't exist.

Note. The following aliases: import com.twitter.util. {Future => TwitterFuture, Await => TwitterAwait}

object Server { val system = ActorSystem("Example-System") implicit val timeout = Timeout(1 seconds) implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = { val promise = TwitterPromise[T] scFuture onComplete { case Success(result) โ‡’ promise.setValue(result) case Failure(failure) โ‡’ promise.setException(failure) } promise } val service = new Service[HttpRequest, HttpResponse] { def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { case "/a/b/c" => val w1 = system.actorOf(Props(new Worker1)) val r = w1 ? "take work" val response: Future[HttpResponse] = r.mapTo[String].map { c => val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) resp } response } } //val server = Http.serve(":8080", service); TwitterAwait.ready(server) class Worker1 extends Actor with ActorLogging { def receive = { case "take work" => val w2 = context.actorOf(Props(new Worker2)) pipe (w2 ? "do work") to sender } } class Worker2 extends Actor with ActorLogging { def receive = { case "do work" => //Long operation... sender ! "The Work" } } def main(args: Array[String]) { val r = service.apply( com.twitter.finagle.http.Request("/a/b/c") ) println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work } } 

Thanks in advance for any recommendations!

+8
scala akka nonblocking
source share
2 answers

You can avoid sending the future as a message using a template, in Worker1 you should write:

 pipe(w2 ? "do work") to sender 

Instead:

 sender ! (w2 ? "do work") 

Now r will be Future[String] instead of Future[Future[String]] .


Update: The above pipe solution is a general way to avoid your actorโ€™s response to the future. As Victor points out in the comment below, in this case, you can completely exit the Worker1 cycle by telling Worker2 to respond directly to the actor so that he ( Worker1 ) receives a message from:

 w2.tell("do work", sender) 

It will not be an option if Worker1 responds in some way from a response from Worker2 (using map on w2 ? "do work" , combining several futures with flatMap or for understanding, etc.), but if this is not necessary, this The version is cleaner and more efficient.


This kills one Await.result . You can get rid of another by writing something like the following:

 val response: Future[HttpResponse] = r.mapTo[String].map { c => val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) resp } 

Now you just need to enable this Future on TwitterFuture . I cannot tell you how to do this, but it should be pretty trivial , and definitely does not require blocking.

+5
source

You definitely don't need to block here. First, update your import for Twitter stuff:

  import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise} 

You will need a Promise twitter, since imp << 22> you will return from the apply method. Then follow what Travis Brown said in his answer so that your actor responds in such a way that you do not have any embedded futures. Once you do this, you can change your apply method to something like this:

 def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { case "/a/b/c" => val w1 = system.actorOf(Props(new Worker1)) val r = (w1 ? "take work").mapTo[String] val prom = new TwitterPromise[HttpResponse] r.map(toResponse) onComplete{ case Success(resp) => prom.setValue(resp) case Failure(ex) => prom.setException(ex) } prom } def toResponse(c:String):HttpResponse = { val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) resp } 

This probably requires a bit more work. I did not install it in my development environment, so I cannot guarantee that you will compile it, but I believe that the idea sounds. What you return from the apply method is TwitterFuture , which is not complete yet. It will be completed when the future from the actor asks (?) And happens with the non-blocking callback onComplete .

0
source

All Articles