Creating back pressure from the Future inside the Akka stream

I am new to Akka streams and streams in general, so I could have completely misunderstood something at a conceptual level, but is there a way to create back pressure until the future is decided? Essentially, I want to do the following:

object Parser { def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ??? } val futures = FileIO.fromPath(path) .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) .batch(1000, x => x)(_ ++ _) .map(values => doAsyncOp(values)) .runWith(Sink.seq) def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ??? 

Bytes are read from the file and passed to the parser, which emits Seq from ExampleObject s, and they are passed to the async operation, which returns Future . I want to make sure that until Future resolves, the rest of the stream will be deleted, then resume after the future is resolved, passing another Seq[ExampleObject] to doAsyncOp , which will resume backpressure, etc.

Now it works for me:

 Await.result(doAsyncOp(values), 10 seconds) 

But I understand that this blocks the whole thread and is bad. Is there a better way around this?

If this helps, the big picture is that I'm trying to parse an extremely large JSON file (too large to fit in memory), a piece by block with Jawn, and then pass the objects to ElasticSearch for indexing, re parsed - ElasticSearch has a queue from 50 pending operations, if it is an overflow, it begins to reject new objects.

+6
source share
1 answer

This is pretty easy. You need to use mapAync :)

 val futures = FileIO.fromPath(path) .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) .batch(1000, x => x)(_ ++ _) .mapAsync(4)(values => doAsyncOp(values)) .runWith(Sink.seq) 

where 4 is the level of parallelism.

+9
source

All Articles