Rx: duration of the pairing window with the number of events raised inside the window

I would like to use Rx to compute statistics on 2 event streams.

Input streams

//    stream1    --A---B----A-B-----A-----B----A--B|
//    stream2    ----X---X-----------X--X---XX---X--X|

Intermediate result

The duration of the window where windows open on A and close on B along with the number of stream2 events raised inside these windows

//    result     ------1------0-----------2-------1|    <-- count of stream2 events in [A-B] window
//                     4      2           6       3     <-- paired with window [A-B] window duration

Final result

Group the subtotal by the number of stream2 events and statistics for the duration of the return window for each group, such as the average, minimum, and maximum window duration

//    output     -----------------------------------0    1     2|    <-- count of stream2 events in [A-B] window
//                                                  2   3.5    6     <-- average [A-B] window duration for that count of stream2 events.

Rx Requests

public enum EventKind
{
    START,
    STOP,
    OTHER
};

public struct Event1
{
    public EventKind  Kind;
    public DateTime   OccurenceTime;
};

var merge = stream1.Merge(stream2.Select(x => new Event1
                                        {
                                            Kind = EventKind.OTHER,
                                            OccurenceTime = x
                                        }))
           .RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));

var shared = merge.Publish().RefCount();

// Windows open on START and close on STOP
var windows = shared.Window(
            shared.Where(x => x.Kind == EventKind.START),
            opening => shared.Where(x => x.Kind == EventKind.STOP));

// For each window we're interested in the duration of the window along with
// the count of OTHER events that were raised inside the window
//
var pairs = windows.Select(window => new 
        {
            Duration = window
                .Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
                .Buffer(2,1)                         // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
                .Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
                .Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency

            EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
        }
    );

I would like to simplify the observed type

  • from IObservable<{IObservable<int>, IObservable<TimeSpan>}>
  • to IObservable<{int, TimeSpan}>

this should be possible, since each window has exactly 1 duration and 1 count of OTHER events.

, EventCount , ​​ Min, Max, Avg .

var result = pairs
        .GroupBy(pair => pair.EventCount)
        .Select(g => new 
            {
                EventCount = g.Key,
                Min = g.Min(x => x.Duration),
                Avg = g.Average(x => x.Duration),
                Max = g.Max(x => x.Duration)
            });

RemoveDisorder - , obersvable OccurenceTime. , ( ), Tx. .

+4
2

, Rx, , , - . , , .

@Brandon, -, , SelectMany. Select, IObservable<T> .

, , , .

Window, , , . , , .

:

var subject = new Subject<Event1>();
var shared = subject.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
var values = shared.Where(a => a.Kind == EventKind.OTHER);

values.Window(start, a => stop).Subscribe(inner => 
 { 
    Console.WriteLine("New Group Started");
    inner.Subscribe(next => 
                    { 
                        Console.WriteLine("Next = "+ next.Kind + " | " + next.OccurenceTime.ToLongTimeString());
                    }, () => Console.WriteLine("Group Completed"));
 });

subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now });
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now.AddSeconds(1) });
subject.OnNext(new Event1 { Kind = EventKind.OTHER, OccurenceTime = DateTime.Now.AddSeconds(2) });
subject.OnNext(new Event1 { Kind = EventKind.STOP, OccurenceTime = DateTime.Now.AddSeconds(3) });

:

New Group Started
New Group Started
Next = OTHER | 4:55:46 PM
Next = OTHER | 4:55:46 PM
Group Completed
Group Completed

, , . "" , :

  • . (: ).
  • , (, Switch).
  • , , ( , ).

, , . , . :

  • Window, :
IObservable<Event1> sx= GetEventStream();
var shared = sx.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
shared.Window(start, a => stop)
    .Select(sx => 
            sx.Publish(b =>
                        b.Take(1)
                        .Select(c => 
                        {
                            var final = b.LastOrDefaultAsync().Select(a => a.OccurenceTime);
                            var comp = b.Where(d => d.Kind == EventKind.OTHER).Count();
                            return final.Zip(comp, (d,e) => new { Count = e, Time = d - c.OccurenceTime });
                        })
                        .Switch()   // whatever operator here there no difference
                    )               // because is just 1
            )
    .Concat()
    .Subscribe(next => 
    { 
        Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
    });
  1. GroupByUntil, "", :

    IObservable<Event1> sx = GetEventStream();
    var shared = sx.Publish().RefCount();       
    var stop = shared.Where(a => a.Kind == EventKind.STOP).Publish().RefCount();
    var start = shared.Where(a => a.Kind == EventKind.START);       
    start.GroupByUntil(a => Unit.Default, a => stop)
            .Select(newGroup => 
            { 
                var creation = newGroup.Take(1);
                var rightStream = shared.Where(a => a.Kind == EventKind.OTHER)
                                        .TakeUntil(newGroup.LastOrDefaultAsync())
                                        .Count();
                var finalStream = stop.Take(1);
    
                return creation.Zip(rightStream, finalStream, (a,b,c) => new { Count = b, Time = c.OccurenceTime - a.OccurenceTime });
            })
            .Concat()
            .Subscribe(next => 
            { 
                Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
            });
    
  2. Group/Window Take(1) Repeat, - "" ( , ).

  3. , , , , , , .

, GroupBy.

, , . Rx, : http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App

+2

(, , ?), - ( SelectMany):

var pairs = windows.SelectMany(window =>
    {
        var duration = window
            .Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
            .Buffer(2,1)                         // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
            .Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
            .Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency

        var eventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window

        return duration.Zip(eventCount, (d, e) => new { EventCount = e, Duration = d });
    }
);
+1

All Articles