Why do reactive extensions stop ringing after an exception is thrown when using Where or OfType statements?

Test1 passes. Why do Test2 and Test3 fail? I am using .NET 4.0 and Rx 2.0.

[TestClass] public class RxQuestion { private Subject<string> sequence; [TestInitialize] public void Intialize() { sequence = new Subject<string>(); } [TestMethod] public void Test1() { Test(sequence); } [TestMethod] public void Test2() { Test(sequence.Where(s => true)); } [TestMethod] public void Test3() { Test(sequence.OfType<string>()); } private void Test(IObservable<string> observable) { var observed = string.Empty; observable.Subscribe(s => { observed = s; if (s == "a") throw new Exception(); }); try { sequence.OnNext("a"); } catch { } sequence.OnNext("b"); Assert.AreEqual("b", observed); } } 
+4
source share
2 answers

The real question for me is why does test1 pass? For me, it looks like the Subject<T> does not play according to the same rules as all other IObservable<T> implementations.

With more control (really, reflection), you can split the Subject<T> into a DotPeek / Reflector and see that when OnNext(T) called, it is delegated directly to its _observer instance. Before any subscription, this is just NullObject / NopObserver. After the subscription is completed (in general), the observer is an implementation of Observer<T> . This implementation is actually an implementation of the IObserver<T> composite interface template, which simply calls OnNext(T) for each of its instances.

Also, given that we are using the Subscribe extension method, which simply takes the OnNext handler, we now know that our real implication, IObserver<T> is AnonymousObserver<T> . Opening this, we see that any call to OnNext(T) is largely insecure.

Now compare this with the IObservable<T> implementations of the Where or Cast statements. Both of these extension methods will return you an IObservable<T> implementation that extends the Producer<T> class. When they subscribe to one of these observable sequences, the observer subscriber wraps with the SafeObserver<T> implementation. This is a key difference .

If we look at this implementation, we will see that for our code path from an anonymous observer we will get the MakeSafe method called. This now completes any OnNext call with try / finally.

 public void OnNext(T value) { if(this.isStopped!=0) return; bool flag = true; try { this._onNext(value); flag=true; //Flag only set if OnNext doesn't throw!! } finally { if(!flag) this._disposable.Dispose(); } } 

Please note that if we have a safe observer, as indicated above, if any OnNext handler OnNext , then the flag will not be set to true, and the _disposable instance will be deleted. In this case, the _disposable instance represents the subscription.

So, there is your explanation of why the original Subject passes the test and where the seemingly harmless operators cause a change in behavior.

Regarding why Subject<T> does not work by default, I believe this is due to the performance improvement that was made in version 2.0. I feel that the subjects are tuned for raw performance and believe that if you are brave enough to use them, then you know what you are doing (i.e. Do not throw OnNext into your handlers!). I base this assumption on the fact that they also removed concurrency security by default themes, and you should enable it using the Synchronize() extension method, maybe they also thought that all these additional try / finally calls should be paid only if you choose. One way to select these security features is to do what you did above Where(_=>true) or more often AsObservable() .

+12
source

By definition, IObservable<T> should not OnError more notifications after either OnError or OnCompleted . Therefore, when you call sequence.OnNext("b"); , you are actually breaking the implicit contract that IObservable<T> must adhere to.

The reason Where and OfType behaves this way is because they (respectively) ignore any notifications after OnError that you generate.

+2
source

All Articles