Channel broadcast

A few days ago I asked a question about this. Now I need a clean single-threaded version of this function:

I repeat, I need a function that sends each received value to each receiver and collects their results. A typical function signature should look something like this:

broadcast :: [Sink amb] -> Sink am [b] 

Best Sven


PS This is not a sequence , I tried this:

 > C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0] [5050, 0] 

Expected Result:

 [5050, 5050] 

PPS zipSinks gives the desired result, but it only works with tuples:

 > C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0) (5050, 5050) 
+8
haskell conduit
source share
1 answer

Basically all we need to do is redefine the sequence , but instead of zipSinks instead of the initial sequencing operation:

 import Data.Conduit as C import Data.Conduit.List as C import Data.Conduit.Util as C fromPairs :: (Functor f) => f [a] -- ^ an empty list to start with -> (fa -> f [a] -> f (a, [a])) -- ^ a combining function -> [fa] -- ^ input list -> f [a] -- ^ combined list fromPairs empty comb = g where g [] = empty g (x:xs) = uncurry (:) `fmap` (x `comb` g xs) 

Now creating broadcast just applies fromPairs to zipSinks :

 broadcast :: (Monad m) => [Sink amb] -> Sink am [b] broadcast = fromPairs (return []) zipSinks 

And we can do something like

 main = C.sourceList [1..100] $$ broadcast [C.fold (+) 0, C.fold (*) 1] 

Update: We can see that fromPairs just looks like sequenceA , and so we can push the idea even further. Let us define an approximating functional zip on channels similar to ZipList :

 import Control.Applicative import Control.Monad import Data.Conduit import Data.Conduit.Util import Data.Traversable (Traversable(..), sequenceA) newtype ZipSink imr = ZipSink { getZipSink :: Sink imr } instance Monad m => Functor (ZipSink im) where fmap f (ZipSink x) = ZipSink (liftM fx) instance Monad m => Applicative (ZipSink im) where pure = ZipSink . return (ZipSink f) <*> (ZipSink x) = ZipSink $ liftM (uncurry ($)) $ zipSinks fx 

Then broadcast becomes as simple as

 broadcast :: (Traversable f, Monad m) => f (Sink imr) -> Sink im (fr) broadcast = getZipSink . sequenceA . fmap ZipSink 
+9
source share

All Articles