Combining one observable with the last of another observable

I am trying to combine two observables whose values ​​share some key.

I want to create a new value whenever the first observable creates a new value in combination with the last value of the second observable, which choice depends on the last value on the first observable.

Pseudo-code example:

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x) var obs2 = Observable.Interval(TimeSpan.FromMilliSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x) from x in obs1 let latestFromObs2WhereKeyMatches = … select Tuple.create(x, latestFromObs2WhereKeyMatches) 

Any suggestions?

Obviously, this can be realized by subordinating to the second observable and creating a dictionary with the last values ​​indexed by the key. But I am looking for a different approach.

Usage scenario: 1 minute of price bars calculated from the stock quotes stream. In this case, the key is a ticker, and the dictionary contains the latest supply and demand prices for specific tickers, which are then used in the calculation.

(By the way, thanks to Dave and James, it was a very fruitful discussion)

(Sorry for the formatting, it's hard to get straight to the iPad.)

+2
c # system.reactive
source share
2 answers

... why are you looking for a different approach? You seem to be on the right track. This is short, simple code ... roughly speaking, it will be something like:

 var cache = new ConcurrentDictionary<long, long>(); obs2.Subscribe(x => cache[x.Item1] = x.Item2); var results = obs1.Select(x => new { obs1 = x.Item2, cache.ContainsKey(x.Item1) ? cache[x.Item1] : 0 }); 

At the end of the day, C # is an OO language, and the heavy lifting of thread-safe mutable collections has already been done for you.

A bizarre Rx approach may come up (it looks like associations might be involved) ... but how much is this possible? And how will this be done?

$ 0.02

+2
source share

I would like to know the purpose of such a request. Could you describe a use case a bit?

However, it looks like the following query might solve your problem. Initial forecasts are not needed if you already have some way of determining the origin of each value, but I have included them for generalization to fit your extremely abstract way of polling .; -)

Note. I assume someKeyThatVaries not shared data, as you showed it, so I also included the term anotherKeyThatVaries ; otherwise the whole request really doesn't make any sense to me.

 var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(x => Tuple.Create(someKeyThatVaries, x)); var obs2 = Observable.Interval(TimeSpan.FromSeconds(.25)) .Select(x => Tuple.Create(anotherKeyThatVaries, x)); var results = obs1.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 1 }) .Merge( obs2.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 2 })) .GroupBy(t => t.Key, t => new { t.Value, t.Kind }) .SelectMany(g => g.Scan( new { X = -1L, Y = -1L, Yield = false }, (acc, cur) => cur.Kind == 1 ? new { X = cur.Value, Y = acc.Y, Yield = true } : new { X = acc.X, Y = cur.Value, Yield = false }) .Where(s => s.Yield) .Select(s => Tuple.Create(sX, sY))); 
+2
source share

All Articles