exception of long-term swallowing aggregator events observed

I use a simple Subject<object> to implement an aggregator template events in a Web application as follows:

 public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } _subject = Subject.Synchronize (new Subject <object> ()); public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } > (); public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } sampleEvent) public class EventAggregator { private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>()); public IObservable<T> GetEvent<T>() { return _subject.AsObservable().OfType<T>(); } public void Publish<TEvent>(TEvent sampleEvent) { _subject.OnNext(sampleEvent); } } 

When the operator or the subscriber throws an exception, I want to register it, and then ignore it and continue to publish events - essentially, I want to stream "kept going" in the case of some unexpected behavior, because the event aggregation aggregator is a singleton lifetime in application context.

In this issue we answer to create a delayed observation and use Retry() , but I do not want to be seen here cold.

The solution I've come across is to use Catch and try-catch in the method of the subscriber, which I wrapped in extension:

 public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); } => {log (e); return source;}); public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); } e); public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log) { source = source.Catch((Exception e) => { log(e); return source; }); return source.Subscribe(x => { try { subscription(x); } catch (Exception e) { log(e); } }); } 

I know that exception handling "catch-all", as a rule, neodobrilas, but in this case I'm not sure what other options I have, taking into account that I want to subscribe remained even when generating an exception. I do not know what types of exceptions can occur, because I still do not know what work will be performed at a flow processing.

Is this an acceptable way to address the possible exceptions and you can anticipate any problems that may affect me using this approach?

+6
source share
1 answer

This is a problematic solution. Once a subscriber throws an exception, you should really assume that they're dead in the water at the time (there is a little more than what you can reasonably do) and do not call them on. With your approach subscriber "spoiled", will continue to send events and potentially generate an exception indefinitely.

In addition, you cannot rely on subscribers using your extension method. I'd rewrite your method GetEvent<T> as follows (obviously replacing my Console.WriteLine on some magazine). This approach will manage the subscription due to a poor subscriber and keep everyone else in the works.

 public IObservable<T> GetEvent<T>() { return Observable.Create<T>(o => { var source = _subject.OfType<T>(); var m = new SingleAssignmentDisposable(); m.Disposable = source.Subscribe( x => { try { o.OnNext(x); } catch(Exception e) { Console.WriteLine(e); m.Dispose(); } }, e => { try { o.OnError(e); } catch(Exception ex) { Console.WriteLine(ex); } finally { m.Dispose(); } }, () => { try { o.OnCompleted(); } catch(Exception e) { Console.WriteLine(e); } finally { m.Dispose(); } } ); return m; }); } > () public IObservable<T> GetEvent<T>() { return Observable.Create<T>(o => { var source = _subject.OfType<T>(); var m = new SingleAssignmentDisposable(); m.Disposable = source.Subscribe( x => { try { o.OnNext(x); } catch(Exception e) { Console.WriteLine(e); m.Dispose(); } }, e => { try { o.OnError(e); } catch(Exception ex) { Console.WriteLine(ex); } finally { m.Dispose(); } }, () => { try { o.OnCompleted(); } catch(Exception e) { Console.WriteLine(e); } finally { m.Dispose(); } } ); return m; }); } > public IObservable<T> GetEvent<T>() { return Observable.Create<T>(o => { var source = _subject.OfType<T>(); var m = new SingleAssignmentDisposable(); m.Disposable = source.Subscribe( x => { try { o.OnNext(x); } catch(Exception e) { Console.WriteLine(e); m.Dispose(); } }, e => { try { o.OnError(e); } catch(Exception ex) { Console.WriteLine(ex); } finally { m.Dispose(); } }, () => { try { o.OnCompleted(); } catch(Exception e) { Console.WriteLine(e); } finally { m.Dispose(); } } ); return m; }); } ); public IObservable<T> GetEvent<T>() { return Observable.Create<T>(o => { var source = _subject.OfType<T>(); var m = new SingleAssignmentDisposable(); m.Disposable = source.Subscribe( x => { try { o.OnNext(x); } catch(Exception e) { Console.WriteLine(e); m.Dispose(); } }, e => { try { o.OnError(e); } catch(Exception ex) { Console.WriteLine(ex); } finally { m.Dispose(); } }, () => { try { o.OnCompleted(); } catch(Exception e) { Console.WriteLine(e); } finally { m.Dispose(); } } ); return m; }); } 
+1
source

All Articles