Can RxJava reduce () be unsafe when parallelized?

I want to use the operation reduce()on the observable to map it to Guava ImmutableList, since I prefer it much more to the standard ArrayList.

Observable<String> strings = ...

Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
                .map(ImmutableList.Builder::build);

captured.forEach(i -> System.out.println(i));

Simple enough. But suppose I planned somewhere observable stringsin parallel with multiple threads, or something like that. Will this lead to disruption of the operation reduce()and, possibly, lead to a race condition? Especially since it ImmutableList.Builderwill be vulnerable to this?

+4
source share
2 answers

The problem is the general state between chain implementations. This is trap number 8 on my blog :

, , toList(), . , , reduce():

Observable<Vector<Integer>> list = Observable
    .range(1, 3)
    .reduce(new Vector<Integer>(), (vector, value) -> {
        vector.add(value);
        return vector;
    });

list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);

"" , , , , 1-3 , 9 !

reduce(), , . , , , "" .

, ( , CounterOp):

Observable<Vector<Integer>> list2 = Observable
    .range(1, 3)
    .reduce((Vector<Integer>)null, (vector, value) -> {
        if (vector == null) {
            vector = new Vector<>();
        }
        vector.add(value);
        return vector;
    });

list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);

, .

collect(), factory .

, , , , , - , , " ", , , , .

+5

onNext , strings Observable . serialize.

+1

All Articles