Signal downstream upstream exhausted

Question

Using the Haskell pipes library, I am trying to define a Pipe with the following type:

 signalExhausted :: Monad m => Pipe a (Value a) mr 

where the Value data type is defined as follows:

 data Value a = Value a | Exhausted 

The pipe must obey the following laws:

 toList (each [] >-> signalExhausted) == [Exhausted] toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted] 

In other words, the pipe should be equivalent to the Pipes.Prelude.map Value , except that it should give an additional Exhausted after all the upstream values ​​have been processed, giving the downstream the ability to perform some final action.

Is it possible to define such a Pipe ?

Example

 > let xs = words "hubble bubble toil and trouble" > toList $ each xs >-> signalExhausted [Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted] 

Notes

I know that the pipes-parse library provides the draw and parseForever . They look useful, but I can't figure out how to combine them into a Pipe that meets the above specification.

+6
source share
1 answer

A pipe of type signalExhausted cannot be defined, but a function equivalent to (>-> signalExhausted) can.

>-> is a specialized version of the pull category . Execution is carried out using proxies located downstream of the upstream proxies. The proxy server down sends an empty request () upstream and blocks until the response containing the value returns up from the proxy server. When the upstream proxy is exhausted and has no more values ​​to send back, it return s. You can see return , which is important for these examples in the definition of each .

 each = F.foldr (\ap -> yield a >> p) (return ()) -- what to do when the data exhausted ^ 

The downstream proxy needs a value to continue, but there is no value that the channel library can provide, so the downstream proxy never starts again. Since it never starts again, it cannot change or respond to data.

There are two solutions to this problem. The simplest is the map Value along the upper channel and add yield Exhausted after its execution.

 import Pipes import qualified Pipes.Prelude as P data Value a = Value a | Exhausted deriving (Show) signalExhausted p = p >-> P.map Value >> yield Exhausted 

This does exactly what you are looking for, with the exception of the signalExhausted function instead (>-> signalExhausted) .

 let xs = words "hubble bubble toil and trouble" print . P.toList . signalExhausted $ each xs [Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted] 

A more general solution to this problem is to stop the upstream proxy from returning and instead signal downstream when it is exhausted. I demonstrated how to do this in response to the corresponding question .

 import Control.Monad import Pipes.Core returnDownstream :: Monad m => Proxy a' ab' bmr -> Proxy a' ab' (Either rb) mr' returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\) 

This replaces each respond with respond . Right respond . Right and replace return with forever . respond . left forever . respond . left forever . respond . left , sending returns downstream along with answers.

returnDownstream is more general than what you are looking for. We can demonstrate how to use it to recreate signalExhausted . returnDownstream converts a channel that returns to one that never returns, and instead translates its return value downstream as the Left value for Either .

 signalExhausted p = returnDownstream p >-> respondLeftOnce 

respondLeftOnce is an example of a downstream proxy. A downstream proxy can distinguish between regular values ​​stored in Right and the return value contained in Left .

 respondLeftOnce :: Monad m => Pipe (Either ea) (Value a) m () respondLeftOnce = go where go = do ea <- await case ea of Right a -> yield (Value a) >> go Left _ -> yield Exhausted -- The upstream proxy is exhausted; do something else 
+4
source

All Articles