I will give some tips and an implementation below, so you may want to close the screen if you want to solve the problem yourself.
Disclaimer: this is only the first approach that came to mind, and my acquaintance with the scalaz-stream API is a bit rusty, so there may be better ways to implement this operation, it may be completely wrong in some terrible way, etc.
Tip 1
Instead of trying to βbuy outβ the losers, you can pass them in the next recursive call.
Hint 2
You can avoid accumulating more than one lost item by indicating which side lost the last.
Tip 3
It is often easier for me to sketch out an implementation using regular collections when I work with Scalaz streams. Here is a helper method that we will need for lists:
/** * @param p if true, the first of the pair wins */ def mergeListsWithHeld[A](p: (A, A) => Boolean)(held: Either[A, A])( ls: List[A], rs: List[A] ): List[A] = held match { // Right is the current winner. case Left(l) => rs match { // ...but it empty. case Nil => l :: ls // ...and it still winning. case r :: rt if p(r, l) => r :: mergeListsWithHeld(p)(held)(ls, rt) // ...upset! case r :: rt => l :: mergeListsWithHeld(p)(Right(r))(ls, rt) } // Left is the current winner. case Right(r) => ls match { case Nil => r :: rs case l :: lt if p(l, r) => l :: mergeListsWithHeld(p)(held)(lt, rs) case l :: lt => r :: mergeListsWithHeld(p)(Left(l))(lt, rs) } }
It is assumed that we already have a lost element, but now we can write a method that we really want to use:
def mergeListsWith[A](p: (A, A) => Boolean)(ls: List[A], rs: List[A]): List[A] = ls match { case Nil => rs case l :: lt => rs match { case Nil => ls case r :: rt if p(l, r) => l :: mergeListsWithHeld(p)(Right(r))(lt, rt) case r :: rt => r :: mergeListsWithHeld(p)(Left(l))(lt, rt) } }
And then:
scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) => | mergeListsWith[Int](_ < _)(ls.sorted, rs.sorted) == (ls ++ rs).sorted | }.check + OK, passed 100 tests.
Good Excellent. There are nicer ways to write this for lists, but this implementation matches the form of what we need to do for Process .
Implementation
And here is more or less equivalent to stream-stream:
import scalaz.{ -\/, \/, \/- } import scalaz.stream.Process.{ awaitL, awaitR, emit } import scalaz.stream.{ Process, Tee, tee } def mergeWithHeld[A](p: (A, A) => Boolean)(held: A \/ A): Tee[A, A, A] = held.fold(_ => awaitR[A], _ => awaitL[A]).awaitOption.flatMap { case None => emit(held.merge) ++ held.fold(_ => tee.passL, _ => tee.passR) case Some(next) if p(next, held.merge) => emit(next) ++ mergeWithHeld(p)(held) case Some(next) => emit(held.merge) ++ mergeWithHeld(p)( held.fold(_ => \/-(next), _ => -\/(next)) ) } def mergeWith[A](p: (A, A) => Boolean): Tee[A, A, A] = awaitL[A].awaitOption.flatMap { case None => tee.passR case Some(l) => awaitR[A].awaitOption.flatMap { case None => emit(l) ++ tee.passL case Some(r) if p(l, r) => emit(l) ++ mergeWithHeld(p)(\/-(r)) case Some(r) => emit(r) ++ mergeWithHeld(p)(-\/(l)) } }
And check it again:
scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) => | Process.emitAll(ls.sorted).tee(Process.emitAll(rs.sorted))( | mergeWith(_ < _) | ).toList == (ls ++ rs).sorted | }.check + OK, passed 100 tests.
I would not do this without additional tests, but it seems to work.