I use a simple Subject<object> to implement an aggregator template events in a Web application as follows:
public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } _subject = Subject.Synchronize (new Subject <object> ()); public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } > (); public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } sampleEvent) public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } }
When the operator or the subscriber throws an exception, I want to register it, and then ignore it and continue to publish events - essentially, I want to stream "kept going" in the case of some unexpected behavior, because the event aggregation aggregator is a singleton lifetime in application context.
In this issue we answer to create a delayed observation and use Retry() , but I do not want to be seen here cold.
The solution I've come across is to use Catch and try-catch in the method of the subscriber, which I wrapped in extension:
public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); } => {log (e); return source;}); public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); } e); public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); }
I know that exception handling "catch-all", as a rule, neodobrilas, but in this case I'm not sure what other options I have, taking into account that I want to subscribe remained even when generating an exception. I do not know what types of exceptions can occur, because I still do not know what work will be performed at a flow processing.
Is this an acceptable way to address the possible exceptions and you can anticipate any problems that may affect me using this approach?