A pipe of type signalExhausted cannot be defined, but a function equivalent to (>-> signalExhausted) can.
>-> is a specialized version of the pull category . Execution is carried out using proxies located downstream of the upstream proxies. The proxy server down sends an empty request () upstream and blocks until the response containing the value returns up from the proxy server. When the upstream proxy is exhausted and has no more values ββto send back, it return s. You can see return , which is important for these examples in the definition of each .
each = F.foldr (\ap -> yield a >> p) (return ()) -- what to do when the data exhausted ^
The downstream proxy needs a value to continue, but there is no value that the channel library can provide, so the downstream proxy never starts again. Since it never starts again, it cannot change or respond to data.
There are two solutions to this problem. The simplest is the map Value along the upper channel and add yield Exhausted after its execution.
import Pipes import qualified Pipes.Prelude as P data Value a = Value a | Exhausted deriving (Show) signalExhausted p = p >-> P.map Value >> yield Exhausted
This does exactly what you are looking for, with the exception of the signalExhausted function instead (>-> signalExhausted) .
let xs = words "hubble bubble toil and trouble" print . P.toList . signalExhausted $ each xs [Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]
A more general solution to this problem is to stop the upstream proxy from returning and instead signal downstream when it is exhausted. I demonstrated how to do this in response to the corresponding question .
import Control.Monad import Pipes.Core returnDownstream :: Monad m => Proxy a' ab' bmr -> Proxy a' ab' (Either rb) mr' returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)
This replaces each respond with respond . Right respond . Right and replace return with forever . respond . left forever . respond . left forever . respond . left , sending returns downstream along with answers.
returnDownstream is more general than what you are looking for. We can demonstrate how to use it to recreate signalExhausted . returnDownstream converts a channel that returns to one that never returns, and instead translates its return value downstream as the Left value for Either .
signalExhausted p = returnDownstream p >-> respondLeftOnce
respondLeftOnce is an example of a downstream proxy. A downstream proxy can distinguish between regular values ββstored in Right and the return value contained in Left .
respondLeftOnce :: Monad m => Pipe (Either ea) (Value a) m () respondLeftOnce = go where go = do ea <- await case ea of Right a -> yield (Value a) >> go Left _ -> yield Exhausted -- The upstream proxy is exhausted; do something else