Using Rx to synchronize asynchronous events

I want to add Reactive Extensions for .NET (Rx) for good use and would like some input in some basic tasks. To illustrate what I'm trying to do, I have a far-fetched example where I have an external component with asynchronous events:

class Component { public void BeginStart() { ... } public event EventHandler Started; } 

The component starts when BeginStart() called. This method returns immediately, and later, when the component has finished starting, the Started event occurs.

I want to create a synchronous start method by wrapping the component and wait for the Started event to occur. This is what I came up with:

 class ComponentWrapper { readonly Component component = new Component(); void StartComponent() { var componentStarted = Observable.FromEvent<EventArgs>(this.component, "Started"); using (var startedEvent = new ManualResetEvent(false)) using (componentStarted.Take(1).Subscribe(e => { startedEvent.Set(); })) { this.componenet.BeginStart(); startedEvent.WaitOne(); } } } 

I would like to get rid of ManualResetEvent , and I expect Rx to have a solution. But how?

+6
c # system.reactive
source share
2 answers

PL's answer , if it suits your specification perfectly, but I thought you could get better results without fighting RX with .First (), but hugging it with creating an observable for your component:

  public static IObservable<Unit> AsObservable(this Component component) { return Observable.Defer(() => { component.BeginStart(); return Observable .FromEvent<EventArgs>(component, "Started") .Select(_ => new Unit()); }); } 

Then you can use it as a lock:

 new Component().AsObservable().First(); 

Non-blocking:

 new Component().AsObservable().Subscribe(_ => Console.WriteLine("Done")); 

Hot:

 var pub = new Component().AsObservable().Publish(); pub.Subscribe(_ => Console.WriteLine("Sub1")); pub.Subscribe(_ => Console.WriteLine("Sub2")); pub.Connect(); // started just once per two subscriptions 

typesetting:

 new Component().AsObservable().Delay(TimeSpan.FromSeconds(1)); 

etc...

EDIT: for the case of several events that you need to wait and collect information, you can use the following option:

 public static IObservable<EventArgs> AsObservable(this Component component) { return Observable.Defer(() => { component.BeginStart(); return Observable.FromEvent<EventArgs>(component, "Started1").Take(1) .Merge( Observable.FromEvent<EventArgs>(component, "Started2").Take(1)) .Select(evt => evt.EventArgs); }); } 

However, if you want to block until completion, you can use .AsObservable.Last() .

+8
source share

Something like this should do this:

 var replay = Observable .FromEvent<EventArgs>(this.component, "Started") .Replay(); replay.Connect(); component.BeginStart(); replay.First(); 
+3
source share

All Articles