Scala: read some Enumerator [T] data and return the remaining Enumerator [T]

I use the playframework asynchronous I / O library, which uses Iteratees and Enumerators. Now I have Iterator [T] as a data receiver (for simplicity, they say that it is Iterator [Byte], which saves its contents to a file). This Iterator [Byte] is passed to the function that processes the record.

But before writing, I want to add some statistical information to the beginning of the file (for simplicity, let's say it's one byte), so I move the iterator as follows before passing it to the write function:

def write(value: Byte, output: Iteratee[Byte]): Iteratee[Byte] = Iteratee.flatten(output.feed(Input.El(value))) 

When I just read the saved file from disk, I get Enumerator [Byte] for it. First I want to read and delete additional data, and then I want to pass the rest of Enumerator [Byte] to a function that handles reading. Therefore, I also need to convert the counter:

 def read(input: Enumerator[Byte]): (Byte, Enumerator[Byte]) = { val firstEnumeratorEntry = ... val remainingEnumerator = ... (firstEnumeratorEntry, remainingEnumerator) } 

But I have no idea how to do this. How can I read a few bytes from Enumerator and get the remaining Enumerator?

Replacing Iteratee [Byte] with an OutputStream and Enumerator [Byte] with an InputStream, this would be very simple:

 def write(value: Byte, output: OutputStream) = { output.write(value) output } def read(input: InputStream) = (input.read,input) 

But I need asynchronous input / output of the playback platform.

+4
source share
3 answers

Here is one way to achieve this by adding to Iteratee corresponding (view) state accumulator (tuple here)

I am reading the routes file, the first byte will be read as Char , and the other will be added to String as UTF-8 bytes.

  def index = Action { /*let do everything asyncly*/ Async { /*for comprehension for read-friendly*/ for ( i <- read; /*read the file */ (r:(Option[Char], String)) <- i.run /*"create" the related Promise and run it*/ ) yield Ok("first : " + r._1.get + "\n" + "rest" + r._2) /* map the Promised result in a correct Request Result*/ } } def read = { //get the routes file in an Enumerator val file: Enumerator[Array[Byte]] = Enumerator.fromFile(Play.getFile("/conf/routes")) //apply the enumerator with an Iteratee that folds the data as wished file(Iteratee.fold((None, ""):(Option[Char], String)) { (acc, b) => acc._1 match { /*on the first chunk*/ case None => (Some(b(0).toChar), acc._2 + new String(b.tail, Charset.forName("utf-8"))) /*on other chunks*/ case x => (x, acc._2 + new String(b, Charset.forName("utf-8"))) } }) } 

EDIT

I found another way using Enumeratee , but it needs to create 2 Enumerator (one short). However, it is a little more elegant. We use the "view" of Enumeratee, but Traversal , which works on a more subtle level than Enumeratee (level chunck). We use take 1, which takes only 1 byte, and then closes the stream. On the other hand, we use drop , which simply discards the first byte (because we use Enumerator [Array [Byte]])

In addition, now read2 has a signature much closer than you wanted, because it returns 2 counters (not so far from Promise, Enumerator)

 def index = Action { Async { val (first, rest) = read2 val enee = Enumeratee.map[Array[Byte]] {bs => new String(bs, Charset.forName("utf-8"))} def useEnee(enumor:Enumerator[Array[Byte]]) = Iteratee.flatten(enumor &> enee |>> Iteratee.consume[String]()).run.asInstanceOf[Promise[String]] for { f <- useEnee(first); r <- useEnee(rest) } yield Ok("first : " + f + "\n" + "rest" + r) } } def read2 = { def create = Enumerator.fromFile(Play.getFile("/conf/routes")) val file: Enumerator[Array[Byte]] = create val file2: Enumerator[Array[Byte]] = create (file &> Traversable.take[Array[Byte]](1), file2 &> Traversable.drop[Array[Byte]](1)) } 
+1
source

I wonder if you can solve your goal from a different angle.

This function, which will use the remaining enumerator, let it be called remaining , presumably it is applicable to the iteration to process the remaining part: remaining |>> iteratee gives another iteration. Let me call it the result of iteratee iteratee2 ... Can you check if you can get a link to iteratee2 ? In this case, you can get and process the first byte using the first iteratee head , then combine head and iteratee2 via flatMap:

 val head = Enumeratee.take[Byte](1) &>> Iteratee.foreach[Byte](println) val processing = for { h <- head; i <- iteratee2 } yield (h, i) Iteratee.flatten(processing).run 

If you cannot get the iteratee2 trick - it will be so if your enumerator is combined with an enumeration that you did not implement - then this approach will not work.

+3
source

Actually, we like Iteratee because they compose. Therefore, instead of creating multiple Enumerator from your original, you are more likely to compose two Iteratees sequentially (read-first and read-rest) and feed them using your only Enumerator.

For this you need a sequential composition method, now I call it andThen . Here is an approximate implementation. Note that returning unused input is a little tough, maybe it can tweak the behavior with typeclass based on the input type. Also, it does not process the transfer of the remaining things from the first iterator to the second (Exercise :).

 object Iteratees { def andThen[E, A, B](a: Iteratee[E, A], b: Iteratee[E, B]): Iteratee[E, (A,B)] = new Iteratee[E, (A,B)] { def fold[C]( done: ((A, B), Input[E]) => Promise[C], cont: ((Input[E]) => Iteratee[E, (A, B)]) => Promise[C], error: (String, Input[E]) => Promise[C]): Promise[C] = { a.fold( (ra, aleft) => b.fold( (rb, bleft) => done((ra, rb), aleft /* could be magicop(aleft, bleft)*/), (bcont) => cont(e => bcont(e) map (rb => (ra, rb))), (s, err) => error(s, err) ), (acont) => cont(e => andThen[E, A, B](acont(e), b)), (s, err) => error(s, err) ) } } } 

Now you can simply use the following:

 object Application extends Controller { def index = Action { Async { val strings: Enumerator[String] = Enumerator("1","2","3","4") val takeOne = Cont[String, String](e => e match { case Input.El(e) => Done(e, Input.Empty) case x => Error("not enough", x) }) val takeRest = Iteratee.consume[String]() val firstAndRest = Iteratees.andThen(takeOne, takeRest) val futureRes = strings(firstAndRest) flatMap (_.run) futureRes.map(x => Ok(x.toString)) // prints (1,234) } } } 
+1
source