Conduit - multiple output files in the pipeline

I am writing a program in which the input file is split into several files (Shamir Secret Sharing Scheme).

Here is the pipeline that I represent:

  • source: use Conduit.Binary.sourceFile to read from input
  • channel: takes a byte string, creates [ByteString]
  • sink: Receives [ByteString] from the channel and writes each ByteString (in [ByteString]) to the corresponding file. (say, if our [ByteString] input is called bsl, then bsl !! 0 will be written to file 0, bsl !! 1 to file 1, etc.)

I found a question about several input files here , but in their case the whole pipeline starts once for each input file, while for my program I write to several output files in the pipeline.

I also look at Conduit source code here to find out if I can implement multiSinkFile myself, but I'm a little confused as a sinkFile consumer, and more so if I try to dig deeper ... (I'm still a beginner)

So the question is, how should I implement a function like multiSinkFile, which allows you to write multiple files as part of the shell?

Any advice is welcome!

Explanation

Suppose we want to use Shamir Secret in a file containing the binary value "ABCDEF" (in 3 parts).

(So, we have our srcFile input file and our output files outFile0 , outFile1 and outFile2 )

First we read "ABC" from the file and do the processing, which will give us a list of, say, ["133", "426", "765"] . therefore, "133" will be written to outFile0 , "426" to outFile1 and "765" to outFile2 . Then we read "DEF" from srcFile , process it and write the corresponding outputs to each output file.

EDIT:

Thank you for your responses. I somehow understood what is happening with ZipSinks, etc., and I wrote a simple test program that takes the source file and simply writes it to 3 output files. Hope this helps others in the future.

 {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE OverloadedStrings #-} import ClassyPrelude.Conduit import Safe (atMay) import Text.Printf import Filesystem.Path.CurrentOS (decodeString, encodeString) import Control.Monad.Trans.Resource (runResourceT, ResourceT(..)) -- get the output file name given the base (file) path and the split number getFileName :: FilePath -> Int -> FilePath getFileName basePath splitNumber = decodeString $ encodeString basePath ++ "." ++ printf "%03d" splitNumber -- Get the sink file, given a filepath generator (that takes an Int) and the split number idxSinkFile :: MonadResource m => (Int -> FilePath) -> Int -> Consumer [ByteString] m () idxSinkFile mkFP splitNumber = concatMapC (flip atMay splitNumber) =$= sinkFile (mkFP splitNumber) sinkMultiFiles :: MonadResource m => (Int -> FilePath) -> [Int] -> Sink [ByteString] m () sinkMultiFiles mkFP splitNumbers = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) splitNumbers simpleConduit :: Int -> Conduit ByteString (ResourceT IO) [ByteString] simpleConduit num = mapC (replicate num) main :: IO () main = do let mkFP = getFileName "test.txt" splitNumbers = [0..2] runResourceT $ sourceFile "test.txt" $$ simpleConduit (length splitNumbers) =$ sinkMultiFiles mkFP splitNumbers 
+7
haskell conduit
source share
2 answers

There are several ways to do this, depending on whether you want to dynamically increase the number of files that you write to, or just keep a fixed number. Here is one example with a fixed list of files:

 {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ViewPatterns #-} import ClassyPrelude.Conduit import Safe (atMay) idxSinkFile :: MonadResource m => (Int -> FilePath) -> Int -> Consumer [ByteString] m () idxSinkFile mkFP idx = concatMapC (flip atMay idx) =$= sinkFile fp where fp = mkFP idx sinkMultiFiles :: MonadResource m => (Int -> FilePath) -> [Int] -> Sink [ByteString] m () sinkMultiFiles mkFP indices = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) indices someFunc :: ByteString -> [ByteString] someFunc (decodeUtf8 -> x) = map encodeUtf8 [x, toUpper x, toLower x] mkFP :: Int -> FilePath mkFP 0 = "file0.txt" mkFP 1 = "file1.txt" mkFP 2 = "file2.txt" src :: Monad m => Producer m ByteString src = yieldMany $ map encodeUtf8 $ words "Hello There World!" main :: IO () main = do let indices = [0..2] runResourceT $ src $$ mapC someFunc =$ sinkMultiFiles mkFP indices forM_ indices $ \idx -> do let fp = mkFP idx bs <- readFile fp print (fp, bs :: ByteString) 

You can try it online with FP School of Haskell .

+6
source share

One possibility is for your algorithm to output something like (Int, ByteString) , where Int is the index of the assigned output file (of course, you could use any other type as the key). Thus, the channel can decide which file it wants to add its output to.

 import Data.Conduit import qualified Data.Conduit.List as C import qualified Data.Foldable as F -- | Filter only pairs tagged with the appropriate key. filterInputC :: (Monad m, Eq k) => k -> Conduit (k, a) ma filterInputC idx = C.filter ((idx ==) . fst) =$= C.map snd -- | Prepend a given sink with a filter. filterInput :: (Monad m, Eq k) => k -> Sink amr -> Sink (k, a) mr filterInput idx = (filterInputC idx =$) -- | Given a list of sinks, create a single sink that directs received values -- depending on the index. multiSink_ :: (Monad m) => [Sink am ()] -> Sink (Int, a) m () multiSink_ = getZipSink . F.sequenceA_ . fmap ZipSink . zipWith filterInput [0..] 

Update:. The following example shows how multiSink_ can be used (testing tries to just print everything in stdout using the appropriate prefix instead of writing files).

 -- | A testing sink that just prints its input, marking it with -- a given prefix. testSink :: String -> Sink String IO () testSink prefix = C.mapM_ (putStrLn . (prefix ++)) -- | An example that produces indexed output. testSource :: (Monad m) => Source m (Int, String) testSource = do yield (0, "abc") yield (0, "def") yield (1, "opq") yield (0, "0") yield (1, "1") yield (2, "rest") main :: IO () main = testSource $$ multiSink_ (map testSink ["1: ", "2: ", "3: "]) 
+8
source share

All Articles