Scalaz-stream how to implement the request-then-wait-reply-tcp client request

I want to implement a client application that first sends a request to the server and then waits for a response (similar to http)

My client process may be

val topic = async.topic[ByteVector] val client = topic.subscribe 

Here is the api

 trait Client { val incoming = tcp.connect(...)(client) val reqBus = topic.pubsh() def ask(req: ByteVector): Task[Throwable \/ ByteVector] = { (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus) ??? } } 

Then, how to implement the remainder of ask ?

+5
source share
1 answer

Usually, the implementation is performed with the publication of the message through the receiver, and then expects some kind of response to some source, for example, your topic.

There are actually a lot of idioms in our code:

 def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = { merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf) } 

In fact, this is the first hook to respond to a thread, waiting for any O received to confirm our request. Then we post I and advise pf so that any incoming O will eventually be transferred to O2 and then completed.

+6
source

Source: https://habr.com/ru/post/1211172/


All Articles