How to calculate moving average in RxJava

In the financial domain, we usually need to calculate the value of the aggregate window in the time series data stream, using as an example a moving average, say, we have the following data stream (T is the timestamp, and V is the actual vlaue):

[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],...... 

to calculate a moving average of 3 from the stream we get:

 avg([T0,V0],[T1,V1],[T2,V2]), avg([T1,V1],[T2,V2],[T3,V3]), avg([T2,V2],[T3,V3],[T4,V4]), avg([T3,V3],[T4,V4],[T5,V5]), avg([T4,V4],[T5,V5],[T6,V6]),... 

To calculate the moving average, it seems that we could do this:

  • build a Observable from the source stream
  • build a Observable from the source stream by combining values ​​into groups
  • using the aggregate operator to calculate the final results from the Observable in step 2.

Steps 1 and 3 are trivial to implement, however, for step 2 it seems that the current RxJava does not have a built-in operator for creating groups of moving windows, the window / groupBy operator does not seem appropriate in this case, and I did not find an easy way to compose a solution from the existing operators. can anyone suggest how to do this in RxJava "elegantly"?

+7
reactive-programming rx-java
source share
2 answers

RxJava Version: 0.15.1

 import java.util.List; import rx.Observable; import rx.util.functions.Action1; class Bar { public static void main(String args[]) { Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6 Observable<Integer> oi = Observable.from(arr); // 1.- bundle 3, skip 1 oi.buffer(3, 1) /** * 2.- take only the first X bundles * When bundle 3, X = N - 2 => 4 * When bundle 4, X = N - 3 => 3 * When bundle a, X = N - (a-1) */ .take(4) // 3.- calculate average .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> lst) { int sum = 0; for(int i = 0; i < lst.size(); i++) { sum += lst.get(i); } System.out.println("MA(3) " + lst + " => " + sum / lst.size()); } }); } } 

Output Example:

MA (3) [1, 2, 3] => 2

MA (3) [2, 3, 4] => 3

MA (3) [3, 4, 5] => 4

MA (3) [4, 5, 6] => 5

+6
source share

I would do it like this:

 public static Observable<Double> movingAverage(Observable<Double> o, int N) { return o.window(N, 1).flatMap( new Func1<Observable<Double>, Observable<Double>>() { public Observable<Double> call(Observable<Double> window) { return Observable.averageDoubles(window); } } ); } 
  • I use window (which emits Observables, which consumes only a constant amount of memory), and not buffer (which emits lists that consume memory for each of its elements).
  • This is an example of how you can use combinator statements instead of writing your own loops, which you should always consider when using Observables.

Update:. If you want to filter out windows at the end of the stream that have fewer n elements, you can do this as follows:

 def movingAverage(o: Observable[Double], n: Int): Observable[Double] = { class State(val sum: Double, val n: Int) o.window(n, 1).flatMap(win => win.foldLeft(new State(0.0, 0))((s, e) => new State(s.sum + e, sn + 1)) .filter(s => sn == n) .map(s => s.sum/sn)) } 

(I chose Scala because it is shorter for writing, but in Java you can do the same, just note that Scala foldLeft is called reduce in Java).

+2
source share

All Articles