I am using Akka 2.4.4 and trying to migrate with Apache HttpAsyncClient (unsuccessfully).
The following is a simplified version of the code that I use in my project.
The problem is that it freezes if I send more than 1-3 requests per stream. So far, after 6 hours of debugging, I could not even find the problem. I do not see exceptions, error logs, events in Decider . NOTHING:)
I tried to reduce the connection-timeout parameter to 1s, thinking that maybe it is waiting for a response from the server, but this did not help.
What am I doing wrong?
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.Referer import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.Supervision.Decider import akka.stream.scaladsl.{Sink, Source} import akka.stream.{ActorAttributes, Supervision} import com.typesafe.config.ConfigFactory import scala.collection.immutable.{Seq => imSeq} import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration import scala.util.Try object Main { implicit val system = ActorSystem("root") implicit val executor = system.dispatcher val config = ConfigFactory.load() private val baseDomain = "www.google.com" private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) private val decider: Decider = { case ex => ex.printStackTrace() Supervision.Stop } private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = Source.fromIterator(() => items.toIterator) .via(poolClientFlow) .log("Logger")(log = myAdapter) .recoverWith { case ex => println(ex) null } .withAttributes(ActorAttributes.supervisionStrategy(decider)) .runWith(Sink.seq) .map { v => println(s"Got ${v.length} responses in Flow") v.asInstanceOf[Seq[(Try[HttpResponse], T)]] } def main(args: Array[String]) { val headers = imSeq(Referer("https://www.google.com/")) val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" val requests = List.fill(10)(reqPair) val qwe = sendMultipleRequests(requests).map { case responses => println(s"Got ${responses.length} responses") system.terminate() } Await.ready(system.whenTerminated, Duration.Inf) } }
And what about proxy support? It doesn't seem to work for me either.
expert
source share