Multiple Inlet Plasma

I am trying to create a channel that can consume multiple input streams. I need to be able to wait on one or the other input stream in a specific order (for example, without alternating), making zip useless. There is nothing parallel or non-deterministic here: I am waiting for one thread or another. I want to be able to write code similar to the following (where awaitA and awaitB are waiting for the first or second input stream, respectively):

 do _ <- awaitA x <- awaitA y <- awaitB yield (x,y) _ <- awaitB _ <- awaitB y' <- awaitB yield (x,y') 

The best solution I have is to make the inner monad another channel, for example.

 foo :: Sink i1 (ConduitM i2 om) () 

What allow

 awaitA = await awaitB = lift await 

And that basically works. Unfortunately, this seems to make it very difficult to connect to the internal conduit before the external channel is fully connected. The first thing I tried was:

 fuseInner :: Monad m => Conduit i2' m i2 -> Sink i1 (ConduitM i2 om) () -> Sink i1 (ConduitM i2' om) () fuseInner x = transPipe (x =$=) 

But this does not work, at least when x has a state, since (x =$=) is executed several times, effectively restarting x every time.

Is there a way to write fuseInner without going into the inside of the conduit (it would seem pretty dirty)? Is there a better way to handle multiple input streams? I just got far from what the channel was designed for?

Thank!

+8
haskell conduit
Mar 24 '13 at 2:37
source share
2 answers

If you want to combine two IO generated streams, then Gabriel's comment is the solution.

Otherwise, you cannot wait for both threads that first generate the value. Pipelines are single-threaded and deterministic - it processes only one pipe at a time. But you can create a function that interleaves two threads, allowing them to decide when to switch:

 {-# OPTIONS_GHC -fwarn-incomplete-patterns #-} import Control.Monad (liftM) import Data.Conduit.Internal ( Pipe (..), Source, Sink, injectLeftovers, ConduitM (..), mapOutput, mapOutputMaybe ) -- | Alternate two given sources, running one until it yields `Nothing`, -- then switching to the other one. merge :: Monad m => Source m (Maybe a) -> Source m (Maybe b) -> Source m (Either ab) merge (ConduitM l) (ConduitM r) = ConduitM $ goL lr where goL :: Monad m => Pipe () () (Maybe a) () m () -> Pipe () () (Maybe b) () m () -> Pipe () () (Either ab) () m () goL (Leftover l ()) r = goL lr goL (NeedInput _ c) r = goL (c ()) r goL (PipeM mx) r = PipeM $ liftM (`goL` r) mx goL (Done _) r = mapOutputMaybe (liftM Right) r goL (HaveOutput cf (Just o)) r = HaveOutput (goL cr) f (Left o) goL (HaveOutput cf Nothing) r = goR cr -- This is just a mirror copy of goL. We should combine them together to -- avoid code repetition. goR :: Monad m => Pipe () () (Maybe a) () m () -> Pipe () () (Maybe b) () m () -> Pipe () () (Either ab) () m () goR l (Leftover r ()) = goR lr goR l (NeedInput _ c) = goR l (c ()) goR l (PipeM mx) = PipeM $ liftM (goR l) mx goR l (Done _) = mapOutputMaybe (liftM Left) l goR l (HaveOutput cf (Just o)) = HaveOutput (goR lc) f (Right o) goR l (HaveOutput cf Nothing) = goL lc 

It processes one source until it returns Nothing , and then switches to another, etc. If one of the sources is completed, the other will be processed to the end.

As an example, we can combine and alternate two lists:

 import Control.Monad.Trans import Data.Conduit (($$), awaitForever) import Data.Conduit.List (sourceList) main = (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [ 1..10]) (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) ) $$ awaitForever (\x -> lift $ print x) 



If you need multiple sources, merge can be adapted to something like

 mergeList :: Monad m => [Source m (Maybe a)] -> Source ma 

which will cycle through this list of sources until they are all completed.

+3
Mar 24 '13 at 9:45
source share

This can be done by plunging into the interior of the pipeline. I wanted to avoid this because it looked very dirty. Based on the answers here, there seems to be no way around this (but I really appreciate a cleaner solution).

The key difficulty is that (x =$=) is a pure function, but in order for transPipe to give the correct answer, it needs some kind of function with stateful, function-like function:

 data StatefulMorph mn = StatefulMorph { stepStatefulMorph :: forall a. ma -> n (StatefulMorph mn, a) , finalizeStatefulMorph :: n () } 

The StatefulMorph mn step takes a value in m and returns in n both this value and the next StatefulMorph , which should be used to convert the next value of m . The last StatefulMorph must be finalized (which, in the case of "stateful (x =$=) " terminates channel x .

Formation merging can be implemented as StatefulMorph , using the code for pipeL with minor changes. Signature:

 fuseStateful :: Monad m => Conduit amb -> StatefulMorph (ConduitM bcm) (ConduitM acm) 

I also need a replacement for transPipe (a special case of hoist ), which uses StatefulMorph values โ€‹โ€‹instead of functions.

 class StatefulHoist t where statefulHoist :: (Monad m, Monad n) => StatefulMorph mn -> tmr -> tnr 

A StatefulHoist instance for ConduitM io can be written using transPipe code with some minor changes.

fuseInner then easy to implement.

 fuseInner :: Monad m => Conduit amb -> ConduitM io (ConduitM bcm) r -> ConduitM io (ConduitM acm) r fuseInner left = statefulHoist (fuseStateful left) 

I wrote a more detailed explanation here and posted the full code. If someone can come up with a cleaner solution, or one that uses the public conduit API, submit it.

Thanks for all the suggestions and input!

+3
Mar 25 '13 at 1:33
source share



All Articles