Solution implementation
My approach to solving the problem is to create an extension method that accepts a stream of IObservable<MyInfo> inputs (which may be a Subject ) and IScheduler , and which returns a stream of elements that are stuck. It looks like this:
public static class ObservableExtensions { public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return source.Publish(pub => pub.Where(x => x.Status == "STARTED") .SelectMany( x => Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler) .TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED")))); } }
There are a lot of Rx! Let him little by little ...
The general idea is that we want to search for instances of MyInfo (hereinafter MyInfo to as "elements") in the "START" state, which remain "unanswered" by an unoccupied element with the corresponding identifier for 30 minutes.
Ignore the Publish bit, I will return to this. Imagine the pub variable is source .
Step 1 - Filter for "START" items
pub.Where(x => x.Status == "STARTED")
This bit is simple, we just filtered the source to get only “START” items.
Step 2 - Turn each item into a stream with a delay of its own
This is a little trickier. Since the appearance of the element, we know that after 30 minutes we want to answer the question: "Is there another item that appeared to unleash it?" To help us do this, we will create a new thread that only provides the most information in 30 minutes. Our plan will be to shorten this flow if a qualifying item appears to unlock it. Assuming x is information, we do the following:
Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler)
Observable.Return converts the element to an observable stream, which immediately OnNext for the element, and then OnComplete s. Despite seeming a little useless, this is actually a wonderfully useful building block (and for some advanced reading it is sometimes called a functional unit, a key part of the IObservable monad ), as it allows us to create a new Observable stream from any element. As soon as we receive this stream, we Delay so that the item appears after 30 minutes.
Pay attention to how we specify our scheduler when calling Delay - this will facilitate testing with simulated time.
Step 3 - Block the delayed thread if the item becomes "peeled off"
Now, if the qualification element, which “peels off” to us, reaches 30 minutes, we will no longer be interested in this item.
The qualification criteria for an “detachment” element is one that matches Id and does not have a “BEGINNING” status - I suggested that the other “BEGINNING” copy of the element is not good enough to peel off the stuck element! (If this is wrong, then any status will be in effect to qualify for denial). If an element without binding ( pub ) enters the original unopened stream, we use TakeUntil to terminate the deferred stream before the delayed element has the opportunity:
.TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED"))));
Step 4 - Sort All These Item Streams
Now we have a bit of a mess, because we projected each element into its own stream - we have a stream of flows, and somehow we need to return to one stream. To do this, we use SelectMany (advanced reading: equivalent to monad bind). SelectMany performs two tasks here - this allows us to map an element to a stream and smooth the resulting stream of streams back into one stream at a time. The matching function that we will use is the one we just created - therefore, putting all this together, we have:
pub.Where(x => x.Status == "STARTED") .SelectMany( x => Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler) .TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED"))));
Step 5 - Ensuring We Are Not Confused with the Source Stream Using Publish
We look good, but in this situation there is one subtle problem. You will notice that we subscribe to the source stream ( pub ) for more than one - in the original Where filter in TakeUntil .
The problem is that subscribing to the same stream several times can have unexpected consequences. Some streams are “cold” - each subscriber starts his own chain of events. This can be especially difficult in queries where time is a critical factor. There may be other problems, but I don’t want to go too far here. Basically, we need to be very careful that we only subscribe once to the source stream. The Publish() method can do this for us - it will subscribe to the source once, and then the multicast source for many subscribers.
Thus, the pub that appears in lambda is a "safe" copy of the source, we can safely subscribe several times.
How to turn to testing
You will need to control the time for this - it is best to use specially designed Rx testing tools in the nuget rx-testing package. In doing so, you can use TestScheduler to manage time and schedule events.
Here's a trivial test that detects a simple stuck item.
public class StuckDetectorTests : ReactiveTest { [Test] public void FindSingleStuckItem() { var testScheduler = new TestScheduler(); var xs = testScheduler.CreateColdObservable( OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1"))); var results = testScheduler.CreateObserver<MyInfo>(); xs.StuckInfos(testScheduler).Subscribe(results); testScheduler.Start(); results.Messages.AssertEqual( OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1"))); } }
Be sure to derive your test class from ReactiveTest in order to use the OnXXX helper methods.
I also created some useful factory methods on MyInfo and implemented stateful overloads to make testing easier.
The full code is quite long - I posted a list that contains more tests here: https://gist.github.com/james-world/62dca2fe2f91531a0401
There's also a good Rx testing blog post here .