Conduits and sockets: allow multiple connections

Here is the code that implements a small receiving server using conduit , network-conduit and stm-conduit . It receives data on the socket and then passes it through the STM channel to the main stream.

 import Control.Concurrent (forkIO) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan()) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Trans.Class import Data.ByteString (ByteString) import qualified Data.ByteString as B import Data.Conduit import qualified Data.Conduit.Binary as DCB import Data.Conduit.Extra.Resumable import Data.Conduit.Network (sourceSocket) import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources) import System.Directory (removeFile) import System.IO type BSChan = TBMChan ByteString listenSocket :: Socket -> Int -> IO BSChan listenSocket soc bufSize = do chan <- atomically $ newTBMChan bufSize forkListener chan return chan where forkListener chan = void . forkIO $ listen soc 2 >> loop where loop = do (conn, _) <- accept soc sourceSocket conn $$ sinkTBMChan chan close conn loop main :: IO () main = do soc <- socket AF_UNIX Stream 0 bind soc (SockAddrUnix "mysock") socChan <- listenSocket soc 8 sourceTBMChan socChan $$ DCB.sinkHandle stdout removeFile "mysock" 

(In a real application, the data stream from the socket is combined with some others, so I do not process it directly in the listener).

The problem is that when I expected that it would remain open until the main thread was killed, instead it would exit after the first message was received on the socket. I can’t understand why he does this, unless he drops the receiver (from the second to the last line), as soon as he sees the end of the first data stream. Can I convince him not to do this? There are some things in the conduit about making the source renewable, but not the sink.

+7
haskell sockets conduit
source share
4 answers

From sinkTBMChan document:

When the sink is closed, the channel closes.

So, when the first socket descriptor closes, it causes Source to sourceSocket , closing the connected receiver, which in turn closes TBMChan , which extends to the sinkHandle , stopping the receiver.

The easiest way to solve this is probably to change your loop to a custom source that does not close between connections and connects this source to TBMChan .

 listenSocket :: Socket -> Int -> IO BSChan listenSocket soc bufSize = do chan <- atomically $ newTBMChan bufSize forkListener chan return chan where forkListener chan = void . forkIO $ do listen soc 2 loop $$ sinkTBMChan chan loop = do (conn, _) <- liftIO $ accept soc sourceSocket conn liftIO $ close conn loop 
+6
source share

Coordinating the closing of writers and readers from the channel is a non-trivial problem, but you can reuse the solution from the pipes ecosystem to solve this problem, which is to use the pipes-concurrency library. This library provides several pipe-independent utilities that you can reuse with conduit libraries to exchange information between readers and writers, so that each side automatically correctly knows when to clean, and you can also manually clean both sides.

The key function used in the pipes-concurrency library is spawn . His type:

 spawn :: Buffer a -> IO (Output a, Input a) 

Buffer indicates which basic STM channel abstraction to use. Judging by your example, it seems like you want a Bounded buffer:

 spawn (Bounded 8) :: IO (Output a, Input a) 

a can be anything in this case, so it can be ByteString , for example:

 spawn (Bounded 8) :: IO (Output ByteString, Input ByteString) 

Input and Output behave like a mailbox. You add messages to the mailbox using send data in Output and you send messages from the mailbox (in FIFO order) using recv data from Input s:

 -- Returns `False` if the mailbox is sealed send :: Output a -> a -> STM Bool -- Returns `Nothing` if the mailbox is sealed recv :: Input a -> STM (Maybe a) 

A distinctive feature of pipes-concurrency is that it allows the garbage collector to automatically seal the mailbox if there are no readers or writers in the mailbox. This avoids a common source of deadlocks.

If you used the pipes ecosystem, you usually used the following two higher-level utilities to read and write to the mailbox.

 -- Stream values into the mailbox until it is sealed toOutput :: Output a -> Consumer a IO () -- Stream values from the mailbox until it is sealed fromInput :: Input a -> Producer a IO () 

However, since the main core of pipes is independent, you can rewrite the equivalent versions of these conduit functions:

 import Control.Monad.Trans.Class (lift) import Data.Conduit import Pipes.Concurrent toOutput' :: Output a -> Sink a IO () toOutput' o = awaitForever (\a -> lift $ atomically $ send oa) fromInput' :: Input a -> Source IO a fromInput' i = do ma <- lift $ atomically $ recv i case ma of Nothing -> return () Just a -> do yield a fromInput' i 

Then your main function will look something like this:

 main :: IO () main = do soc <- socket AF_UNIX Stream 0 bind soc (SockAddrUnix "mysock") (output, input) <- spawn (Bounded 8) forkIO $ readFromSocket soc $$ toOutput output fromInput input $$ DCB.sinkHandle stdout removeFile "mysock" 

... where readFromSocket will be some Source that is read from your Socket .

You can then freely write to Output using other data sources and not worry about having to coordinate them or properly dispose of Input or Output when you are done.

To learn more about pipes-concurrency , I recommend reading the official official tutorial .

+4
source share

So, here is one answer that does not imply the creation of a renewable sink. sourceSocket in network-conduit allows one connection, but we can implement repeated behavior inside sourceSocket (sorry for the code, I think it needs to be cleaned up, but at least it works!):

 sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString sourceSocket sock = loop where loop = do (conn, _) <- lift . liftIO $ accept sock loop' conn lift . liftIO $ close conn loop loop' conn = do bs <- lift . liftIO $ recv conn 4096 if B.null bs then return () else yield bs >> loop' conn 

One problem is that it never quits (until the program dies). This is not a problem in my use case, as the socket should continue to listen to the life of the program.

+1
source share

I think @shang's answer is correct, I would go a little further and say that writeTBMChan behavior looks like the best culprit here. I would recommend changing it so that TBMChan not automatically closed. Simple implementation of this idea:

 sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan 

If you use this in your program, it will work as expected.

+1
source share

All Articles