For this you need to use SequencedSink
; it uses the sink and its tracked state to create a channel from the reapplication of the sink manufacturer.
The created receiver is optimized for incremental analysis of a single value, which will be the result at the end of the channel sequence.
Since you want this to be part of the pipeline, however, and each fragment of the incoming ByteString
may or may not match your parser one or more times, you need to take care to get finer-grained control over the parsing process, passing the incomplete parsing state between each application shells.
Assuming, for example, that your parser is analyzing [--]
or [----]
, etc., and T
is Int
, indicating the number of dashes to analyze, you need to monitor the condition of the analyzer, as shown in the figure.
Input chunk Sink result - Data.Conduit.SequencedSinkResponse [--][---] Emit Nothing [2, 3] [---][--- Emit (Just #func) [3] --------- Emit (Just #func) [] ] Emit Nothing [12] Stop
In this case, I use Maybe (ByteString -> Data.Attoparsec.ByteString.Result)
as the passed state; another type of data may be more appropriate depending on the situation.
This explicit flow treatment is necessary to maintain the nature of the pipeline; having a parser is a bottleneck that always expects enough data to satisfy the requirements of the parser and will be the main absorber of performance.
The implementation of the required shell should be quite trivial with the accessible ResourceT
monad interface.
EDIT: Just using your sink in a loop will really be the easiest solution, but it will have slightly different performance characteristics if your parser analyzes short fragments that often end up at the boundaries of byte fragments.
dflemstr
source share