Using RX queries, how do I get which records have the same status for a window for 3 seconds every second?

I have a few days looking at the RX, and I read a lot; I read IntroToRx; I also looked at 101 RX samples and many other places, but I can't figure it out. It sounds so simple, but I can’t get what I need: I need to know which “identifier” was “stuck” in the “START” state for at least 30 minutes.

I have a MyInfo class that looks like this:

public class MyInfo { public string ID { get; set; } public string Status { get; set; } } 

And I coded the topic to help me test as follows:

  var subject = new Subject<MyInfo>(); subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" }); subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" }); subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" }); subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" }); subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" }); subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" }); subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" }); subject.OnNext(new MyInfo() { ID = "4", Status = "PHASE2" }); subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" }); subject.OnNext(new MyInfo() { ID = "2", Status = "STOPPED" }); subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" }); subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" }); subject.OnCompleted(); 

My request and subscription look like this: (I use seconds in the example)

 var q8 = from e in subject group e by new { ID = e.ID, Status = e.Status } into g from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(3) , timeShift: TimeSpan.FromSeconds(1)) select new { ID = g.Key.ID, Status = g.Key.Status, count = w.Count }; var subsc = q8.Subscribe(a => Console.WriteLine("{0} {1} {2}", a.ID, a.Status, a.count)); 

Now I can get an output that tells me the ID and what indicates what the identifier saw over a period of time.

  ID Status Count
 1 STARTED 3
 2 PHASE1 2
 3 STOPPED 3
 4 STARTED 2
 4 PHASE2 1
 2 STOPPED 1

At first, I wanted to do the following: discard those that had more than 1 state in the interval (so identifiers 2 and 4 would be eliminated), and the remaining ones - discard those that have a status that is not 't "START" (this will eliminate ID 3). And ID 1 is the record I'm looking for.

Is this the best approach to the problem? And how do I achieve this request?

Also, how can I send a topic to my messages at different intervals, so I can check the window.

Thanks!

+7
c # system.reactive
source share
1 answer

Solution implementation

My approach to solving the problem is to create an extension method that accepts a stream of IObservable<MyInfo> inputs (which may be a Subject ) and IScheduler , and which returns a stream of elements that are stuck. It looks like this:

 public static class ObservableExtensions { public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return source.Publish(pub => pub.Where(x => x.Status == "STARTED") .SelectMany( x => Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler) .TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED")))); } } 

There are a lot of Rx! Let him little by little ...

The general idea is that we want to search for instances of MyInfo (hereinafter MyInfo to as "elements") in the "START" state, which remain "unanswered" by an unoccupied element with the corresponding identifier for 30 minutes.

Ignore the Publish bit, I will return to this. Imagine the pub variable is source .

Step 1 - Filter for "START" items

 pub.Where(x => x.Status == "STARTED") 

This bit is simple, we just filtered the source to get only “START” items.

Step 2 - Turn each item into a stream with a delay of its own

This is a little trickier. Since the appearance of the element, we know that after 30 minutes we want to answer the question: "Is there another item that appeared to unleash it?" To help us do this, we will create a new thread that only provides the most information in 30 minutes. Our plan will be to shorten this flow if a qualifying item appears to unlock it. Assuming x is information, we do the following:

 Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler) 

Observable.Return converts the element to an observable stream, which immediately OnNext for the element, and then OnComplete s. Despite seeming a little useless, this is actually a wonderfully useful building block (and for some advanced reading it is sometimes called a functional unit, a key part of the IObservable monad ), as it allows us to create a new Observable stream from any element. As soon as we receive this stream, we Delay so that the item appears after 30 minutes.

Pay attention to how we specify our scheduler when calling Delay - this will facilitate testing with simulated time.

Step 3 - Block the delayed thread if the item becomes "peeled off"

Now, if the qualification element, which “peels off” to us, reaches 30 minutes, we will no longer be interested in this item.

The qualification criteria for an “detachment” element is one that matches Id and does not have a “BEGINNING” status - I suggested that the other “BEGINNING” copy of the element is not good enough to peel off the stuck element! (If this is wrong, then any status will be in effect to qualify for denial). If an element without binding ( pub ) enters the original unopened stream, we use TakeUntil to terminate the deferred stream before the delayed element has the opportunity:

 .TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED")))); 

Step 4 - Sort All These Item Streams

Now we have a bit of a mess, because we projected each element into its own stream - we have a stream of flows, and somehow we need to return to one stream. To do this, we use SelectMany (advanced reading: equivalent to monad bind). SelectMany performs two tasks here - this allows us to map an element to a stream and smooth the resulting stream of streams back into one stream at a time. The matching function that we will use is the one we just created - therefore, putting all this together, we have:

 pub.Where(x => x.Status == "STARTED") .SelectMany( x => Observable.Return(x) .Delay(TimeSpan.FromMinutes(30), scheduler) .TakeUntil(pub.Where(y => y.Id == x.Id && y.Status != "STARTED")))); 

Step 5 - Ensuring We Are Not Confused with the Source Stream Using Publish

We look good, but in this situation there is one subtle problem. You will notice that we subscribe to the source stream ( pub ) for more than one - in the original Where filter in TakeUntil .

The problem is that subscribing to the same stream several times can have unexpected consequences. Some streams are “cold” - each subscriber starts his own chain of events. This can be especially difficult in queries where time is a critical factor. There may be other problems, but I don’t want to go too far here. Basically, we need to be very careful that we only subscribe once to the source stream. The Publish() method can do this for us - it will subscribe to the source once, and then the multicast source for many subscribers.

Thus, the pub that appears in lambda is a "safe" copy of the source, we can safely subscribe several times.

How to turn to testing

You will need to control the time for this - it is best to use specially designed Rx testing tools in the nuget rx-testing package. In doing so, you can use TestScheduler to manage time and schedule events.

Here's a trivial test that detects a simple stuck item.

 public class StuckDetectorTests : ReactiveTest { [Test] public void FindSingleStuckItem() { var testScheduler = new TestScheduler(); var xs = testScheduler.CreateColdObservable( OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1"))); var results = testScheduler.CreateObserver<MyInfo>(); xs.StuckInfos(testScheduler).Subscribe(results); testScheduler.Start(); results.Messages.AssertEqual( OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1"))); } } 

Be sure to derive your test class from ReactiveTest in order to use the OnXXX helper methods.

I also created some useful factory methods on MyInfo and implemented stateful overloads to make testing easier.

The full code is quite long - I posted a list that contains more tests here: https://gist.github.com/james-world/62dca2fe2f91531a0401

There's also a good Rx testing blog post here .

+7
source share

All Articles