Non-reproduction of hot observable

Original question

I have a scenario where I have several IObservable sequences that I want to combine with Merge and then listen. However, if one of them causes an error, I do not want it to break everything into other threads, as well as re-subscribe to a sequence (this is a constant sequence).

I do this by adding Retry() to streams before merging, i.e.:

 IEnumerable<IObservable<int>> observables = GetObservables(); observables .Select(o => o.Retry()) .Merge() .Subscribe(/* Do subscription stuff */); 

However, the problem arises when I want to verify this. What I would like to check out is that if one of the IObservable in observables raises an OnError , the others should still send their values โ€‹โ€‹and they should be processed

I thought I was just using two Subject<int> representing two IObservable in observables ; one sends OnError(new Exception()) , and the other, after that, sends OnNext(1) . However, it seems that Subject<int> will play all previous values โ€‹โ€‹for the new subscription (which is effective Retry() ), turning the test into an endless loop.

I tried to solve this problem by creating an IObservable guide that causes an error on the first subscription, and then an empty sequence, but it looks like a hacker:

 var i = 0; var nErrors = 2; var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => { i++; if (i < nErrors) { return Observable.Throw<int>(new Exception()).Subscribe(o); } else { return Observable.Empty<int>().Subscribe(o); } }); 

Am I using Subject or thinking of Retry() incorrectly? Any other thoughts on this? How would you solve this situation?

Update

Ok, here is a marble diagram of what I want, and I think that Retry() does.

 o = message, X = error. ------o---o---X \ Retry() -> \---o---o---X \ Retry() -> \... 

My problem, perhaps, is that I do not have a good stock class for using front-end testing, since Subject wants to reproduce all my previous errors.

Update 2

Here's a test case that shows what I mean by Subject , reproducing its values. Am I using the term correctly if I say it is done in a cold way? I know that Subject is a way to create a hot observable, but still this behavior seems to me to be "cold."

 var onNext = false; var subject = new Subject<int>(); subject.Retry().Subscribe(x => onNext = true); subject.OnError(new Exception()); subject.OnNext(1); Assert.That(onNext, Is.True); 
+5
source share
1 answer

According to your updated requirements (you want to repeat the observed errors, and not just ignore them), we can find a solution that works.

Firstly, it is important to understand the difference between the observed cold (recreated with each subscription) and the hot observed (exists independently of the subscription). You cannot Retry() see hot visibility, because it does not know how to recreate the main events. That is, if hot observed errors, they are gone forever.

Subject creates a hot observable, in the sense that you can call OnNext without subscribers, and it will act as expected. To convert a hot observable to a cold observable, you can use Observable.Defer , which will contain the โ€œcreate on subscriptionโ€ logic for the observable.

All that is said, here the source code is changed for this:

 var success = new Subject<int>(); var error = new Subject<int>(); var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) }; observables .Select(o => o.Retry()) .Merge() .Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")); 

And the test (similar to the previous one):

 success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); error.OnNext(-1); success.OnCompleted(); error.OnCompleted(); 

And the expected result:

 1 2 -1 done 

Of course, you will need to significantly change this concept, depending on what you focus on. Using objects for testing is not the same as using them for real.

I also want to note that this comment:

However, it seems Subject will replay all previous values โ€‹โ€‹for the new subscription (which is Retry () effectively), turning the test into an endless loop.

Not true - Subject does not behave like this. There is another aspect of your code that causes an endless loop based on the fact that Retry recreates the subscription, and the subscription creates an error at some point.


Original answer (to complete)

The problem is that Retry() does not do what you want. From here:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

Repeats the observed source sequence for retryCount once or until it completes successfully.

This means that Retry will constantly try to connect to the underlying observable until it succeeds and issues an error.

My understanding is that you really want the exceptions in the observable to be ignored , not repeated. This will do what you want, instead:

 observables .Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>()))) .Merge() .Subscribe(/* subscription code */); 

This uses Catch to capture the observable with an exception and replaces it with the empty observable at that point.

Here is the complete test using items:

 var success = new Subject<int>(); var error = new Subject<int>(); var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() }; observables .Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>()))) .Merge() .Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"))); success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); success.OnCompleted(); 

And this produces, as expected:

 1 2 done 
+4
source

All Articles