I would like to use Rx to compute statistics on 2 event streams.
Input streams
Intermediate result
The duration of the window where windows open on A and close on B along with the number of stream2 events raised inside these windows
// result ------1------0-----------2-------1| <-- count of stream2 events in [A-B] window
// 4 2 6 3 <-- paired with window [A-B] window duration
Final result
Group the subtotal by the number of stream2 events and statistics for the duration of the return window for each group, such as the average, minimum, and maximum window duration
// output -----------------------------------0 1 2| <-- count of stream2 events in [A-B] window
// 2 3.5 6 <-- average [A-B] window duration for that count of stream2 events.
Rx Requests
public enum EventKind
{
START,
STOP,
OTHER
};
public struct Event1
{
public EventKind Kind;
public DateTime OccurenceTime;
};
var merge = stream1.Merge(stream2.Select(x => new Event1
{
Kind = EventKind.OTHER,
OccurenceTime = x
}))
.RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));
var shared = merge.Publish().RefCount();
var windows = shared.Window(
shared.Where(x => x.Kind == EventKind.START),
opening => shared.Where(x => x.Kind == EventKind.STOP));
var pairs = windows.Select(window => new
{
Duration = window
.Where(x=>x.Kind != EventKind.OTHER)
.Buffer(2,1)
.Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
.Select(x => x[1].OccurenceTime - x[0].OccurenceTime),
EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count()
}
);
I would like to simplify the observed type
- from
IObservable<{IObservable<int>, IObservable<TimeSpan>}> - to
IObservable<{int, TimeSpan}>
this should be possible, since each window has exactly 1 duration and 1 count of OTHER events.
, EventCount , Min, Max, Avg .
var result = pairs
.GroupBy(pair => pair.EventCount)
.Select(g => new
{
EventCount = g.Key,
Min = g.Min(x => x.Duration),
Avg = g.Average(x => x.Duration),
Max = g.Max(x => x.Duration)
});
RemoveDisorder - , obersvable OccurenceTime. , ( ), Tx. .