I am new to Akka streams and streams in general, so I could have completely misunderstood something at a conceptual level, but is there a way to create back pressure until the future is decided? Essentially, I want to do the following:
object Parser { def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ??? } val futures = FileIO.fromPath(path) .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) .batch(1000, x => x)(_ ++ _) .map(values => doAsyncOp(values)) .runWith(Sink.seq) def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
Bytes are read from the file and passed to the parser, which emits Seq from ExampleObject s, and they are passed to the async operation, which returns Future . I want to make sure that until Future resolves, the rest of the stream will be deleted, then resume after the future is resolved, passing another Seq[ExampleObject] to doAsyncOp , which will resume backpressure, etc.
Now it works for me:
Await.result(doAsyncOp(values), 10 seconds)
But I understand that this blocks the whole thread and is bad. Is there a better way around this?
If this helps, the big picture is that I'm trying to parse an extremely large JSON file (too large to fit in memory), a piece by block with Jawn, and then pass the objects to ElasticSearch for indexing, re parsed - ElasticSearch has a queue from 50 pending operations, if it is an overflow, it begins to reject new objects.
source share