Usually, the implementation is performed with the publication of the message through the receiver, and then expects some kind of response to some source, for example, your topic.
There are actually a lot of idioms in our code:
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = { merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf) }
In fact, this is the first hook to respond to a thread, waiting for any O received to confirm our request. Then we post I and advise pf so that any incoming O will eventually be transferred to O2 and then completed.
source share