Why doesn't RefCount work after disconnecting all the original subscribers?

Consider the following:

[Fact] public void foo() { var result = new Subject<bool>(); var startCount = 0; var completionCount = 0; var obs = Observable .Defer(() => { ++startCount; return result.FirstAsync(); }) .Do(_ => ++completionCount) .Publish() .RefCount(); // pretend there are lots of subscribers at once var s1 = obs.Subscribe(); var s2 = obs.Subscribe(); var s3 = obs.Subscribe(); // even so, we only expect to be started once Assert.Equal(1, startCount); Assert.Equal(0, completionCount); // and we won't complete until the result ticks through result.OnNext(true); Assert.Equal(1, startCount); Assert.Equal(1, completionCount); s1.Dispose(); s2.Dispose(); s3.Dispose(); // now try exactly the same thing again s1 = obs.Subscribe(); s2 = obs.Subscribe(); s3 = obs.Subscribe(); // startCount is 4 here instead of the expected 2! Assert.Equal(2, startCount); Assert.Equal(1, completionCount); result.OnNext(true); Assert.Equal(2, startCount); Assert.Equal(2, completionCount); s1.Dispose(); s2.Dispose(); s3.Dispose(); } 

My understanding of Publish + RefCount is that the connection to the source is maintained as long as at least one subscriber exists. As soon as the last subscriber disconnects, any future subscriber re-initiates a connection to the source.

As you can see in my test, everything works fine for the first time. But for the second time, delayed observables inside the pipeline are executed once for each new subscriber.

I see through the debugger that for the first group of subscribers, obs._count (which counts subscribers) increases for each Subscribe call. But for the second group of subscribers, it remains zero.

Why is this happening and what can I do to fix my pipeline?

+6
source share
2 answers

The answer from user @ user631090 is close, but incorrect, so I thought I would answer it myself.

This is because Publish will immediately terminate new subscribers if the thread that it published itself has terminated. You can see that in the diagram here :

enter image description here

But it would be nice if the diagram included the subscriber after the completion of the base stream.

To add to the confusion, Defer is still being called for new subscribers. But its return value is simply ignored by Publish due to the completion of the initial stream.

I cannot yet come up with a way to realize my alleged use case. I thought, perhaps using Multicast rather than Publish , creating a new item as needed. But I still could not achieve this. And that seems rather painful for what, in my opinion, is a common use case.

+3
source

This is because the main observable result is already completed. Therefore, each new subscriber simply receives an OnCompleted callback.

If ObservableDefer created a new sequence every time or did not execute, you would see the desired behavior.

eg.

 return result.FirstAsync().Concat(Observable.Never<bool>()); 

You will need to remove Assert.Equal(1, completionCount);

+1
source

All Articles