Combining multiple event sources into one IObservable with Rx

This is a question about how to use Reactive Extensions (Rx) in a specific event-related scenario.

  • The goal is to take a few classes that trigger an event
  • And collect them into one IObservable , which any clients (not knowing event classes) can subscribe to.
  • Note that interest events use EventArgs subclasses

Some custom EventArgs

 public class HappenedEventArgs : EventArgs { internal bool IsBadNotGood; } 

Many separate classes where events occurred

 public class EventSourceA : IEventSource { public event HappenedEventHandler Happened; private void OnHappened(HappenedEventArgs e) { if (Happened != null) Happened(this, e); } // And then this class calls OnHappened(e) whenever it decides to ... } public class EventSourceB : IEventSource { public event HappenedEventHandler Happened; private void OnHappened(HappenedEventArgs e) { if (Happened != null) Happened(this, e); } // And then this class also calls OnHappened(e) at times ... } public interface IEventSource { event HappenedEventHandler Happened; } public delegate void HappenedEventHandler(object sender, HappenedEventArgs e); 

How to collect all those events and set up a united front of events

 public class Pooler{ private IObservable<X> _pool; public IObservable<X> Subscribe(){ return _pool; } public void Register(IEventSource item) { // How to take item.Happened and inject/bind it into _pool here? } internal void Unregister(IEventSource item) { // Disconnect item.Happened from _pool } public Pooler(){ // Instantiate _pool to whatever is best? // _pool = ... } } 

A subscriber who does not know anything about event sources directly

  static void Try() { var pooler = new Pooler(); pooler.Subscribe().Subscribe(e => { // Do something with events here, as they arrive } ); // .... // Wherever whenever: AddEventSources(pooler); } static void AddEventSources(Pooler pooler){ var eventSourceA = new EventSourceA(); pooler.Register(eventSourceA); var eventSourceB = new EventSourceB(); pooler.Register(eventSourceB); } 
+8
c # event-handling system.reactive
source share
2 answers

What the Rx library is trying to provide are ways to handle such situations without creating a bunch of classes / methods that manually distribute observables.

Say you had a class with an event:

 public class EventedClass { public event Action<EventArgs> Event; } 

And enumerated of these IEnumerable<EventedClass> objects instances, you can use LINQ to project observables from these classes, combine them with Observable.Merge , which will give you a combined sequential output of these events.

 Observable.Merge( objects.Select( o => Observable.FromEvent<EventArgs>( handler => o.Event += handler, handler => o.Event -= handler ) )).Subscribe(args => { //do stuff }); 
+6
source share

It looks like you are doing something similar to this question . Basically, you want to use the topic as your _pool variable and subscribe and unsubscribe to different sources of events in the register and unregister. To unregister the source, you will need to save the one-time items that you received when you called Register. In addition, I would like to make Pooler implement IObservable directly and just forward Subscribe to the _pool variable.

 using System.Reactive.Subjects; using System.Reactive.Linq; public class Pooler : IObservable<HappenedEventArgs>, IDisposable { void Dispose() { if (_pool != null) _pool.Dispose(); if (_sourceSubs != null) { foreach (var d in _sourceSubs.Values) { d.Dispose(); } _sourceSubs.Clear(); } } private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>(); private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>(); public IDisposable Subscribe(IObserver<HappenedEventArgs> observer) { return _pool.Subscribe(observer); } public void Register(IEventSource item) { if (_sourceSubs.ContainsKey(item)) { return; //already registered } else { _sourceSubs.Add(item, Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h, h => item.Happened -= h) .Select(ep => ep.EventArgs) .Subscribe(_pool)); } } internal void Unregister(IEventSource item) { IDisposable disp = null; if (_sourceSubs.TryGetValue(item, out disp)) { _sourceSubs.Remove(item); disp.Dispose(); } } } 

Please note that you will need to implement IDisposable so that you can clear all event subscriptions when you are done using Pooler .

+3
source share

All Articles