How to combine two observable data so that the result is completed when any of the Observables is completed?

I have this code:

var s1 = new Subject<Unit>(); var s2 = new Subject<Unit>(); var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!")); ss.Subscribe(_ => Console.WriteLine("Next")); s1.OnNext(new Unit()); s2.OnNext(new Unit()); s1.OnCompleted(); // I wish ss finished here. s2.OnCompleted(); // Yet it does so here. =( 

I solved the problem with OnError (new OperationCanceledException ()), but I would like to get a better solution (should there be a combinator right?).

+6
c # combinators system.reactive
source share
4 answers

Or this, which is also pretty neat:

 public static class Ext { public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right) { return Observable.CreateWithDisposable<T>(obs => { var compositeDisposable = new CompositeDisposable(); var subject = new Subject<T>(); compositeDisposable.Add(subject.Subscribe(obs)); compositeDisposable.Add(source.Subscribe(subject)); compositeDisposable.Add(right.Subscribe(subject)); return compositeDisposable; }); } } 

In this case, an object is used that guarantees that only one OnCompleted will be clicked on the observer in CreateWithDisposable ();

+6
source share

Instead of re-writing Merge to complete when the thread ends, I would suggest converting onCompleted events to onNext events and use var ss = s1.Merge(s2).TakeUntil(s1ors2complete) , where s1ors2complete returns a value when s1 or s2 ends. You can also just create a chain .TakeUntil(s1completes).TakeUntil(s2completes) instead of creating s1ors2complete. This approach provides better composition than the MergeWithCompleteOnEither extension, as it can be used to change any complete when both complete statement to complete when any complete.

Regarding how to convert onNext events to onCompleted events, there are several ways to do this. The CompositeDisposable method is similar to a good approach, and a bit of searching finds an interesting topic about converting between onNext, onError and onCompleted notifications . I would probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True)) and your merge will then become:

 var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted()); var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!")); 

You can also look at using the Zip statement, which automatically terminates when one of the input streams terminates.

+6
source share

Assuming you don't need the output of any of the threads, you can use Amb in combination with some magic from Materialize :

 var s1 = new Subject<Unit>(); var s2 = new Subject<Unit>(); var ss = Observable.Amb( s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted) ) .Finally(() => Console.WriteLine("Finished!")); ss.Subscribe(_ => Console.WriteLine("Next")); s1.OnNext(new Unit()); s2.OnNext(new Unit()); s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from 

If you need values, you can use Do for two objects.

+2
source share

Try the following:

 public static class Ext { public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right) { var completed = Observable.Throw<T>(new StreamCompletedException()); return source.Concat(completed) .Merge(right.Concat(completed)) .Catch((StreamCompletedException ex) => Observable.Empty<T>()); } private sealed class StreamCompletedException : Exception { } } 

What this does is an IObservable concatenation that will throw an exception if the source or right source is completed. Then we can use the Catch extension method to return an empty Observable to automatically end the stream when it ends.

0
source share

All Articles