Instead of re-writing Merge to complete when the thread ends, I would suggest converting onCompleted events to onNext events and use var ss = s1.Merge(s2).TakeUntil(s1ors2complete) , where s1ors2complete returns a value when s1 or s2 ends. You can also just create a chain .TakeUntil(s1completes).TakeUntil(s2completes) instead of creating s1ors2complete. This approach provides better composition than the MergeWithCompleteOnEither extension, as it can be used to change any complete when both complete statement to complete when any complete.
Regarding how to convert onNext events to onCompleted events, there are several ways to do this. The CompositeDisposable method is similar to a good approach, and a bit of searching finds an interesting topic about converting between onNext, onError and onCompleted notifications . I would probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True)) and your merge will then become:
var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted()); var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));
You can also look at using the Zip statement, which automatically terminates when one of the input streams terminates.
Greg bray
source share