First we introduce a small extension method for IObserver<T> .
public static IObserver<T> Safe<T>(this IObserver<T> observer) { var done = false; return Observer.Create<TResult>( value => { if (!done) { observer.OnNext(value); } }, error => { if (!done) { done = true; observer.OnError(error); } }, () => { if (!done) { done = true; observer.OnCompleted(); } }); }
This simply ensures that the observer is called in the OnNext*(OnError|OnCompleted) and that its violations are simply ignored.
Now we can implement the operator you described by buffering the values ββfrom both sequences by key and only emit them when we have a match between the two sequences.
public static IObservable<TResult> Join<T1, T2, TKey, TResult>( IObservable<T1> source1, IObservable<T2> source2, Func<T1, TKey> key1, Func<T2, TKey> key2, Func<T1, T2, TResult> selector) { return Observable.Create<TResult>(observer => { var dict1 = new Dictionary<TKey, T1>(); var dict2 = new Dictionary<TKey, T2>(); var gate = new object(); var safeObserver = observer.Safe(); Action<TKey> emit = k => { T1 value1; T2 value2; if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2)) { var result = selector(value1, value2); safeObserver.OnValue(result); dict1.Remove(k); dict2.Remove(k); } }; return new CompositeDisposable( source1.Synchronize(gate).Subscribe( value1 => { var k = key1(value1); dict1[k] = value1; emit(k); }, safeObserver.OnError, safeObserver.OnCompleted), source2.Synchronize(gate).Subscribe( value2 => { var k = key2(value2); dict2[k] = value2; emit(k); }, safeObserver.OnError, safeObserver.OnCompleted)); }); }
Example:
IObservable<AlertData> alertDatas = ...; IObservable<SoundRequestData> = soundRequestDatas = ...; IObservable<AlertDataViewModel> alertDataViewModels = Join( alertDatas, soundRequestDatas, alertData => alertData.Id, soundRequestData => soundRequestData.Id, (alertData, soundRequestData) => new AlertDataViewModel { AlertData = alertData, SoundRequestData = soundRequestData });