I am writing a program that counts the frequencies of NGrams in a package. I already have a function that consumes a stream of tokens and creates NGrams of the same order:
ngram :: Monad m => Int -> Conduit tm [t] trigrams = ngram 3 countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
At the moment, I can connect one stream user to the stream source:
tokens --- trigrams --- countFreq
How to connect multiple stream users to the same stream source? I would like to have something like this:
.--- unigrams --- countFreq |--- bigrams --- countFreq tokens ----|--- trigrams --- countFreq '--- ... --- countFreq
The plus will be the parallel launch of each consumer
EDIT: Thanks to Peter, I came up with this solution.
spawnMultiple orders = do chan <- atomically newBroadcastTMChan results <- forM orders $ \_ -> newEmptyMVar threads <- forM (zip results orders) $ forkIO . uncurry (sink chan) forkIO . runResourceT $ sourceFile "test.txt" $$ javascriptTokenizer =$ sinkTMChan chan forM results readMVar where sink chan result n = do chan' <- atomically $ dupTMChan chan freqs <- runResourceT $ sourceTMChan chan' $$ ngram n =$ frequencies putMVar result freqs
haskell nlp conduit
Svenk
source share