How to properly invoke a single server from multiple actors / web handlers using Akka HTTP?

I have a service (call Service A) that uses Akka Server to process incoming requests. I also have a third-party application (Service B) that provides several web services. The purpose of service A is to transform client requests, call one or more web services of service B, combine / transform the results, and return them to the client.

I use Actors for some parts, and just the Future for others. To call service B, I use the Akka HTTP client.

Http.get(actorSystem).singleRequest(HttpRequest.create() .withUri("http://127.0.0.1:8082/test"), materializer) .onComplete(...) 

The problem is that a new thread is created for each service request, and if there are several simultaneous connections, akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

I already asked this question and received an offer to use a single thread. How to call Akka HTTP client for several requests (10k - 100k)?

While it works for a batch of requests coming from one place, I don’t know how to use one stream from all my parallel request handlers.

What is the correct "Akka-way" for this?

+7
java akka
source share
3 answers

I think you could use Source.queue to buffer your requests. The code below assumes that you need to get a response from a third-party service, so the Future[HttpResponse] greeting is welcome. In this way, you can also provide an overflow strategy to prevent resource starvation.

 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} import scala.util.{Failure, Success} import scala.concurrent.ExecutionContext.Implicits.global implicit val system = ActorSystem("main") implicit val materializer = ActorMaterializer() val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80) val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew) .via(pool) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run val promise = Promise[HttpResponse] val request = HttpRequest(uri = "/") -> promise val response = queue.offer(request).flatMap(buffered => { if (buffered) promise.future else Future.failed(new RuntimeException()) }) Await.ready(response, 3 seconds) 

(code copied from my message)

+12
source

Here is the Java version of the accepted answer

 final Flow< Pair<HttpRequest, Promise<HttpResponse>>, Pair<Try<HttpResponse>, Promise<HttpResponse>>, NotUsed> flow = Http.get(actorSystem).superPool(materializer); final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>> queue(BUFFER_SIZE, OverflowStrategy.dropNew()) .via(flow) .toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left()) .run(materializer); ... public CompletionStage<HttpResponse> request(HttpRequest request) { log.debug("Making request {}", request); Promise<HttpResponse> promise = Futures.promise(); return queue.offer(Pair.create(request, promise)) .thenCompose(buffered -> { if (buffered instanceof QueueOfferResult.Enqueued$) { return FutureConverters.toJava(promise.future()) .thenApply(resp -> { if (log.isDebugEnabled()) { log.debug("Got response {} {}", resp.status(), resp.getHeaders()); } return resp; }); } else { log.error("Could not buffer request {}", request); return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE)); } }); } 
+3
source

All you have to do is install HostConnectionPool in service B in your service. This will give you a Flow , which can be added to Service A service flows to send requests from A to B using a connection pool instead of a new connection to the stream. From the documentation:

Unlike the client-side connection-level APIs, the host-level API relieves you of the manual control of individual HTTP connections. It autonomously manages a custom pool of connections to one specific target endpoint (i.e. a host / port combination).

Each materialization of this stream in different streams will be extracted from this base connection pool:

The best way to hold the pool of connections to a given target. An endpoint is the Http.get(system).cachedHostConnectionPool(...) method, which returns a Flow that can be baked at the thread application level. This thread is also called the "pool client thread".

0
source

All Articles