Haskell: Pipe separation (translation) without using spawn

This question is a bit of codegolf and a lot of newb.

I use a huge library pipesin Haskell, and I would like to split the channel to send the same data across multiple channels (make a broadcast). The manual suggests using it to create mailboxes using monoidal status . For example, we could do something like this: Pipes.ConcurrentspawnOutput

main = do
 (output1, input1) <- spawn Unbounded
 (output2, input2) <- spawn Unbounded
 let effect1 = fromInput input1 >-> pipe1
 let effect2 = fromInput input2 >-> pipe2
 let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
 ...

Is this indirect contact through mailboxes? Could we write something like this?

main = do
 let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
 ...

The above does not compile because it Pipedoes not have an instance Monoid. Is there a good reason for this? Is the first method really the cleanest way to split a pipe?

+4
1

, concurrency, .

, pipe1 pipe2 Consumer, , :

p1 = for cat f  -- i.e. p1 = forever $ await >>= f
p2 = for cat g  -- i.e. p2 = forever $ await >>= g

... - :

for P.stdinLn $ \str -> do
    f str
    g str

, p1 - print :

p1 = for cat (lift . print)

... p2 :

p2 = for cat (lift . hPutStrLn h)

... :

for P.stdinLn $ \str -> do
    lift $ print str
    lift $ hPutStrLn h str

Consumer, . , , ArrowChoice . , pull-based Pipe , push-based Pipe do:

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad m) => Category (Edge m r) where
    id = Edge push
    (Edge p2) . (Edge p1) = Edge (p1 >~> p2)

instance (Monad m) => Arrow (Edge m r) where
    arr f = Edge (push />/ respond . f)
    first (Edge p) = Edge $ \(b, d) ->
        evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
      where
        up () = do
            (b, d) <- request ()
            lift $ put d
            return b
        dn c = do
            d <- lift get
            respond (c, d)

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

, , ArrowChoice.

push-based Pipe, Pipe, , :

a -> Pipe a b m r

Pipe, "", .

push-based Pipe "" Pipe s, :

(>~>) :: (Monad m)
      => (a -> Pipe a b m r)
      -> (b -> Pipe b c m r)
      -> (a -> Pipe a c m r)

push  :: (Monad m)
      ->  a -> Pipe a a m r

... API Pipes . Pipes.Core ( , , ). , push-based Pipe pull-based Pipe , , , .

Arrow push-based pipe, - :

p >>> bifurcate >>> (p1 +++ p2)
  where
    bifurcate = Edge $ pull ~> \a -> do
        yield (Left  a)  -- First give `p1` the value
        yield (Right a)  -- Then give `p2` the value

runEdge, , .

, , pull-to-push-pipe ( , ). , Pipes.Prelude.map push-based Pipe, :

mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
    yield (f a)
    Pipes.Prelude.map f

, Arrow:

mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)

, :

mapEdge f = Edge $ push ~> yield . f

, .

, Arrow ArrowChoice , , : concurrency? " " , , Arrow ArrowChoice .

+8

All Articles