How to gracefully implement a pipeline pattern using Scala

I want to build a pipeline template with Scala. I want that after I wrote the pipeline objects, they can be interconnected as follows:

Pipeline1 :: Pipeline2 :: Pipeline3 ... 

I experimented with several ideas. Some work and some not. But none of them seem to have completely got rid of the template. The following is the closest I have.

First define the abstract class Pipeline and Source:

 // I is the input type and O is the output type of the pipeline abstract class Pipeline[I, +O](p: Pipeline[_, _ <: I]) { val source = p val name: String def produce(): O def stats():String } abstract class Source[+T] extends Pipeline[AnyRef, T](null) 

Then I created two pipelines and tried to link them together

 // this creates a random integer class RandomInteger extends Source[Int] { override val name = "randInt" def produce() = { scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) } def stats()="this pipeline is stateless" } // multiply it by ten class TimesTen(p: Pipeline[_, Int]) extends Pipeline[Int, Int](p) { private var count = 0 // this is a simple state of the pipeline override val name = "Times" def produce = { val i = source.produce() count += 1 // updating the state i * 10 } def stats() = "this pipeline has been called for " + count + " times" } object TimesTen { // this code achieves the desired connection using :: // but this has to be repeated in each pipeline subclass. // how to remove or abstract away this boilerplate code? def ::(that: Pipeline[_, Int]) = new TimesTen(that) } 

This is the main class in which two pipelines are connected.

 object Pipeline { def main(args: Array[String]) { val p = new RandomInteger() :: TimesTen println(p.source) for (i <- 0 to 10) println(p.produce()) println(p.stats()) } } 

So this code works. But I will have to repeat the code in the Companion TimesTen object in every pipeline class that I write. This, of course, is undesirable. Is there a better way to do this? Reflection may work, but I heard bad things about it, for example, something related to reflection is a bad design. I'm also not sure about Scala's support for reflection.

Thank you for your time.

Update . I designed this toy to make it easy to understand. As a general solution, and as my application requires, each pipeline object has a state that is ideally encapsulated within the object itself, and is not exposed to any other pipeline. I modified the code above to reflect this. I would like it to be an object-oriented solution. I'm still experimenting and letting you know if I can find him.

Update 2 . After some thoughts, I think that the idea of ​​a pipeline is just a generalized function that contains some internal states, as well as the ability to link Function0 to Function1 . In Scala, the Function0 class does not have a compose() or andThen() method.

+8
scala pipeline
source share
4 answers

Here is a solution with objects using andThen . The idea is to force the creation of Function1 objects using the input Unit . Connecting two pipelines creates a new pipeline with two functions together. This solution allows Pipeline to have internal states.

A further simplification would be to use apply() instead of produce() . This remains as an exercise for the reader.

 abstract class Pipeline[-I, +O] { val name: String def produce : I => O def stats(): String def ->[X](seg:Pipeline[_ >: O, X]):Pipeline[I, X] = { val func = this.produce val outerName = this.name new Pipeline[I, X] { val name = outerName + "." + seg.name def produce = func andThen seg.produce def stats = seg.stats } } } abstract class Source[+T] extends Pipeline[Unit, T] { } class RandomInteger extends Source[Int] { override val name = "randInt" def produce: Unit => Int = (x:Unit) => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) def stats() = "stateless" } class TimesTen() extends Pipeline[Int, Int] { private var count = 0 override val name = "times" def produce : Int => Int = (x:Int) => { count += 1 x * 10 } def stats() = "called for " + count + " times" } object Main { def main(args: Array[String]) { val p = new RandomInteger() -> new TimesTen() for (i <- 0 to 10) println(p.produce()) println(p.name) // print "randInt.times" println(p.stats()) // print "called for 11 times" } } 
+5
source share

If I'm missing something, your pipeline objects are just functions, and your :: operator just β€œcomposes”

 val randomInteger: ()=>Int = () => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) val timesTen :Int => Int = x => x*10 val pipeline: () =>Int = timesTen compose randomInteger 

Your "product ()" method is just "apply ()", but the abbreviation "()" is usually used. A small amount of library litter will allow you to use the operator for composition. This is one of those cases when an object-oriented template really interferes with simple functional concepts. Fortunately, Scala avoids the pattern for many use cases like this.

+8
source share
 object Pipelining { implicit def toPipe[T](x : T) = new { def :: [U](f : T => U) = f(x) }} import Pipelining._ List(2,3,4) :: (_.map(_*3)) :: (_.map(_.toString)) :: println 

all credits for operator StephaneLD "|> as in F #"

http://www.scala-lang.org/node/8747

+3
source share

Do you mean data flow or functional reactive programming? Try this question . A reactive library is actively developing - I do not know about the rest.

0
source share

All Articles