Coordinating the closing of writers and readers from the channel is a non-trivial problem, but you can reuse the solution from the pipes ecosystem to solve this problem, which is to use the pipes-concurrency library. This library provides several pipe-independent utilities that you can reuse with conduit libraries to exchange information between readers and writers, so that each side automatically correctly knows when to clean, and you can also manually clean both sides.
The key function used in the pipes-concurrency library is spawn . His type:
spawn :: Buffer a -> IO (Output a, Input a)
Buffer indicates which basic STM channel abstraction to use. Judging by your example, it seems like you want a Bounded buffer:
spawn (Bounded 8) :: IO (Output a, Input a)
a can be anything in this case, so it can be ByteString , for example:
spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
Input and Output behave like a mailbox. You add messages to the mailbox using send data in Output and you send messages from the mailbox (in FIFO order) using recv data from Input s:
A distinctive feature of pipes-concurrency is that it allows the garbage collector to automatically seal the mailbox if there are no readers or writers in the mailbox. This avoids a common source of deadlocks.
If you used the pipes ecosystem, you usually used the following two higher-level utilities to read and write to the mailbox.
However, since the main core of pipes is independent, you can rewrite the equivalent versions of these conduit functions:
import Control.Monad.Trans.Class (lift) import Data.Conduit import Pipes.Concurrent toOutput' :: Output a -> Sink a IO () toOutput' o = awaitForever (\a -> lift $ atomically $ send oa) fromInput' :: Input a -> Source IO a fromInput' i = do ma <- lift $ atomically $ recv i case ma of Nothing -> return () Just a -> do yield a fromInput' i
Then your main function will look something like this:
main :: IO () main = do soc <- socket AF_UNIX Stream 0 bind soc (SockAddrUnix "mysock") (output, input) <- spawn (Bounded 8) forkIO $ readFromSocket soc $$ toOutput output fromInput input $$ DCB.sinkHandle stdout removeFile "mysock"
... where readFromSocket will be some Source that is read from your Socket .
You can then freely write to Output using other data sources and not worry about having to coordinate them or properly dispose of Input or Output when you are done.
To learn more about pipes-concurrency , I recommend reading the official official tutorial .