Pipeline: multi-threaded consumers

I am writing a program that counts the frequencies of NGrams in a package. I already have a function that consumes a stream of tokens and creates NGrams of the same order:

ngram :: Monad m => Int -> Conduit tm [t] trigrams = ngram 3 countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int) 

At the moment, I can connect one stream user to the stream source:

 tokens --- trigrams --- countFreq 

How to connect multiple stream users to the same stream source? I would like to have something like this:

  .--- unigrams --- countFreq |--- bigrams --- countFreq tokens ----|--- trigrams --- countFreq '--- ... --- countFreq 

The plus will be the parallel launch of each consumer

EDIT: Thanks to Peter, I came up with this solution.

 spawnMultiple orders = do chan <- atomically newBroadcastTMChan results <- forM orders $ \_ -> newEmptyMVar threads <- forM (zip results orders) $ forkIO . uncurry (sink chan) forkIO . runResourceT $ sourceFile "test.txt" $$ javascriptTokenizer =$ sinkTMChan chan forM results readMVar where sink chan result n = do chan' <- atomically $ dupTMChan chan freqs <- runResourceT $ sourceTMChan chan' $$ ngram n =$ frequencies putMVar result freqs 
+5
haskell nlp conduit
source share
1 answer

I assume that you want all your receivers to get all the values.

I would suggest:

  • Use newBroadcastTMChan to create a new Control.Concurrent.STM.TMChan (stm-chans) channel.
  • Use this channel to create a sink using sinkTBMChan from Data.Conduit.TMChan (stm-conduit) for your main manufacturer.
  • For each client, use dupTMChan to create its own copy for reading. Start a new thread that will read this copy using sourceTBMChan .
  • Gather results from your topics.
  • Make sure your clients can read data as quickly as they are created, otherwise you might get heap overflow.

(I have not tried, let us know how this works.)


Update:. One way to get results is to create an MVar for each consumer stream. Each of them would complete its result. And your main thread will takeMVar for all of these MVar s, thus waiting for each thread to complete. For example, if vars is a list of your MVar s, the main thread will issue mapM takeMVar vars to collect all the results.

+5
source share

All Articles