Akka Flow hangs while executing HTTP requests through connection pool

I am using Akka 2.4.4 and trying to migrate with Apache HttpAsyncClient (unsuccessfully).

The following is a simplified version of the code that I use in my project.

The problem is that it freezes if I send more than 1-3 requests per stream. So far, after 6 hours of debugging, I could not even find the problem. I do not see exceptions, error logs, events in Decider . NOTHING:)

I tried to reduce the connection-timeout parameter to 1s, thinking that maybe it is waiting for a response from the server, but this did not help.

What am I doing wrong?

 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.Referer import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.Supervision.Decider import akka.stream.scaladsl.{Sink, Source} import akka.stream.{ActorAttributes, Supervision} import com.typesafe.config.ConfigFactory import scala.collection.immutable.{Seq => imSeq} import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration import scala.util.Try object Main { implicit val system = ActorSystem("root") implicit val executor = system.dispatcher val config = ConfigFactory.load() private val baseDomain = "www.google.com" private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) private val decider: Decider = { case ex => ex.printStackTrace() Supervision.Stop } private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = Source.fromIterator(() => items.toIterator) .via(poolClientFlow) .log("Logger")(log = myAdapter) .recoverWith { case ex => println(ex) null } .withAttributes(ActorAttributes.supervisionStrategy(decider)) .runWith(Sink.seq) .map { v => println(s"Got ${v.length} responses in Flow") v.asInstanceOf[Seq[(Try[HttpResponse], T)]] } def main(args: Array[String]) { val headers = imSeq(Referer("https://www.google.com/")) val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" val requests = List.fill(10)(reqPair) val qwe = sendMultipleRequests(requests).map { case responses => println(s"Got ${responses.length} responses") system.terminate() } Await.ready(system.whenTerminated, Duration.Inf) } } 

And what about proxy support? It doesn't seem to work for me either.

+7
scala akka akka-stream
source share
1 answer

You need to fully use the response body so that the connection is available for subsequent requests. If you don't care about the response object at all, you can simply merge it into Sink.ignore , something like this:

 resp.entity.dataBytes.runWith(Sink.ignore) 

In the default configuration, when using the host connection pool, the maximum connections are set to 4. Each pool has its own queue, where requests wait until one of the open connections becomes available. If this queue ever exceeds 32 (the default configuration can be changed, there must be a capacity of 2), then yo will start to see failures. In your case, you only make 10 requests, so you do not fall into this limit. But, without consuming the response object, you do not release the connection, and everything else is just a queue in the queue, waiting for the connection to be released.

+7
source

All Articles