Combination of observables conditionally

I have two observables, one IObservable<AlertData> and the other IObservable<SoundRequestData> . AlertData contains an Id property that knows which SoundRequestData belongs to. SoundRequestData knows only about itself and has the Id property, which can be compared with what is in AlertData .

I want to combine these two data types into a new type AlertDataViewModel . However, I cannot be sure that the order of the data coming into both observables is the same. I don’t care about the order in the exit right now.

I want to combine AlertData with SoundRequestData .

The way I am doing it now, which works but works slowly, is to wait until one of the observables has done all the data in the ObservableCollection . Subsequently, I run another observable and map the Id.

Is there a better way to do this? I suppose this can be expressed as the following marble diagram:

Imgur

So a.id=1 corresponds to 3.id=1 , b.id=2 corresponds to 4.id=2 , etc.

+5
source share
2 answers

First we introduce a small extension method for IObserver<T> .

 public static IObserver<T> Safe<T>(this IObserver<T> observer) { var done = false; return Observer.Create<TResult>( value => { if (!done) { observer.OnNext(value); } }, error => { if (!done) { done = true; observer.OnError(error); } }, () => { if (!done) { done = true; observer.OnCompleted(); } }); } 

This simply ensures that the observer is called in the OnNext*(OnError|OnCompleted) and that its violations are simply ignored.

Now we can implement the operator you described by buffering the values ​​from both sequences by key and only emit them when we have a match between the two sequences.

 public static IObservable<TResult> Join<T1, T2, TKey, TResult>( IObservable<T1> source1, IObservable<T2> source2, Func<T1, TKey> key1, Func<T2, TKey> key2, Func<T1, T2, TResult> selector) { return Observable.Create<TResult>(observer => { var dict1 = new Dictionary<TKey, T1>(); var dict2 = new Dictionary<TKey, T2>(); var gate = new object(); var safeObserver = observer.Safe(); Action<TKey> emit = k => { T1 value1; T2 value2; if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2)) { var result = selector(value1, value2); safeObserver.OnValue(result); dict1.Remove(k); dict2.Remove(k); } }; return new CompositeDisposable( source1.Synchronize(gate).Subscribe( value1 => { var k = key1(value1); dict1[k] = value1; emit(k); }, safeObserver.OnError, safeObserver.OnCompleted), source2.Synchronize(gate).Subscribe( value2 => { var k = key2(value2); dict2[k] = value2; emit(k); }, safeObserver.OnError, safeObserver.OnCompleted)); }); } 

Example:

 IObservable<AlertData> alertDatas = ...; IObservable<SoundRequestData> = soundRequestDatas = ...; IObservable<AlertDataViewModel> alertDataViewModels = Join( alertDatas, soundRequestDatas, alertData => alertData.Id, soundRequestData => soundRequestData.Id, (alertData, soundRequestData) => new AlertDataViewModel { AlertData = alertData, SoundRequestData = soundRequestData }); 
+2
source

It is not the most beautiful, but it will work.

It will return this class, which is just a collection of the original two:

 class Aggregate { public AlertData AlertData {get;set;} public SoundRequestData SoundRequestData { get; set; } public int Id { get { return AlertData == null ? SoundRequestData.Id : AlertData.Id; } } } 

This is the connection logic:

 var joined = Observable.Merge( // Convert the two sources into half-filled aggregates and merge them source1.Select(a => new Aggregate() { AlertData = a }), source2.Select(s => new Aggregate() { SoundRequestData = s })) .GroupBy(a => a.Id) // We only need two for each Id .Select(group => group.Take(2)) // This looks ugly, but is just joining the two messages into one .Select(group => group.Aggregate(new Aggregate(), (agg, newData) => new Aggregate() { AlertData = agg.AlertData ?? newData.AlertData, SoundRequestData = agg.SoundRequestData ?? newData.SoundRequestData })) // Back to one stream .Merge(); 
+1
source

All Articles