According to your updated requirements (you want to repeat the observed errors, and not just ignore them), we can find a solution that works.
Firstly, it is important to understand the difference between the observed cold (recreated with each subscription) and the hot observed (exists independently of the subscription). You cannot Retry() see hot visibility, because it does not know how to recreate the main events. That is, if hot observed errors, they are gone forever.
Subject creates a hot observable, in the sense that you can call OnNext without subscribers, and it will act as expected. To convert a hot observable to a cold observable, you can use Observable.Defer , which will contain the โcreate on subscriptionโ logic for the observable.
All that is said, here the source code is changed for this:
var success = new Subject<int>(); var error = new Subject<int>(); var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) }; observables .Select(o => o.Retry()) .Merge() .Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
And the test (similar to the previous one):
success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); error.OnNext(-1); success.OnCompleted(); error.OnCompleted();
And the expected result:
1 2 -1 done
Of course, you will need to significantly change this concept, depending on what you focus on. Using objects for testing is not the same as using them for real.
I also want to note that this comment:
However, it seems Subject will replay all previous values โโfor the new subscription (which is Retry () effectively), turning the test into an endless loop.
Not true - Subject does not behave like this. There is another aspect of your code that causes an endless loop based on the fact that Retry recreates the subscription, and the subscription creates an error at some point.
Original answer (to complete)
The problem is that Retry() does not do what you want. From here:
http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
Repeats the observed source sequence for retryCount once or until it completes successfully.
This means that Retry will constantly try to connect to the underlying observable until it succeeds and issues an error.
My understanding is that you really want the exceptions in the observable to be ignored , not repeated. This will do what you want, instead:
observables .Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>()))) .Merge() .Subscribe();
This uses Catch to capture the observable with an exception and replaces it with the empty observable at that point.
Here is the complete test using items:
var success = new Subject<int>(); var error = new Subject<int>(); var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() }; observables .Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>()))) .Merge() .Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"))); success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); success.OnCompleted();
And this produces, as expected:
1 2 done