How to turn a sink into a pipeline?

I am trying to write Conduit using the attoparsec parser. In particular, given parseOne :: Parser T , I would like to build a Conduit ByteString m T , which re-applies the parser to the input and passes the results.

attoparsec-conduit offers sinkParser turn Parser into Sink , but how can I turn this Sink into Conduit ? I am looking for a function:

 conduitSink :: (Resource m) => Sink amb -> Conduit amb 

which submits data to Sink many times, producing each result as it appears. It seems like this could be written quite easily, like a manual loop, but I'm wondering if there is a better way.

The absence of this seemingly obvious function in the conduit library makes me think that I can do something wrong; Is there a better way to do this? The use case turns raw bytes into a parsed form of a message-based network protocol that will be processed at later stages of the pipeline. I already have the opposite direction (i.e. Conduit T m ByteString ) thanks to blaze-builder-conduit , so this seemed like the most natural way to structure things.

+7
source share
1 answer

For this you need to use SequencedSink ; it uses the sink and its tracked state to create a channel from the reapplication of the sink manufacturer.

The created receiver is optimized for incremental analysis of a single value, which will be the result at the end of the channel sequence.

Since you want this to be part of the pipeline, however, and each fragment of the incoming ByteString may or may not match your parser one or more times, you need to take care to get finer-grained control over the parsing process, passing the incomplete parsing state between each application shells.

Assuming, for example, that your parser is analyzing [--] or [----] , etc., and T is Int , indicating the number of dashes to analyze, you need to monitor the condition of the analyzer, as shown in the figure.

 Input chunk Sink result - Data.Conduit.SequencedSinkResponse [--][---] Emit Nothing [2, 3] [---][--- Emit (Just #func) [3] --------- Emit (Just #func) [] ] Emit Nothing [12] Stop 

In this case, I use Maybe (ByteString -> Data.Attoparsec.ByteString.Result) as the passed state; another type of data may be more appropriate depending on the situation.

This explicit flow treatment is necessary to maintain the nature of the pipeline; having a parser is a bottleneck that always expects enough data to satisfy the requirements of the parser and will be the main absorber of performance.

The implementation of the required shell should be quite trivial with the accessible ResourceT monad interface.

EDIT: Just using your sink in a loop will really be the easiest solution, but it will have slightly different performance characteristics if your parser analyzes short fragments that often end up at the boundaries of byte fragments.

+6
source

All Articles