Reactive Extensions catch exceptions from OnNext () caused by thread pool thread?

I am using Rx 2 in .Net 4.5. When the following code is executed, it simply shuts down without executing the OnCompleted delegate or shows any errors. If I use Scheduler.CurrentThreadin ToObservable, it will at least throw an error and exit the program, and the OnCompleted function will not execute at that moment. But when it is executed in a thread other than the main one, this behavior seems unreasonable and unacceptable. Am I missing something?

static void Main()
{
     Enumerable.Range(0, 1)
                           .ToObservable(Scheduler.Default)
                           .Subscribe(o => { throw new Exception("blah"); }, () => Console.WriteLine("completed"));

     Thread.Sleep(2000);
 }

Edited: Yes, when launched as a console application, it will always cause an error, regardless of which stream is being monitored.

However, when I run this code as a test in NUnit as follows, it automatically exits after 2 seconds (thread timeout) without any error or message (waiting for "completed"). So is this really NUnit causing the problem?

[TestFixture]
class Program
{
    [Test]
    public void Test()
    {
        Enumerable.Range(0, 1)
                .ToObservable(Scheduler.Default)
                .Subscribe(
                    o => { throw new Exception("blah"); }, 
                    () => Console.WriteLine("completed"));
        Thread.Sleep(2000);
    }
}
+4
source share
2 answers

Rx does not catch exceptions thrown by observers. This is a very important design principle that has been fully discussed, although for some reason it was only included in the §6.4 footnote in the Rx Design Guidelines .

. , , OnNext, OnError OnCompleted . . OnError .

, , OnError , , , ( ). , , , , OnError ​​ OnNext , , .

, , , OnNext, . .

, , OnNext , . .

, , Thread.Sleep Console.ReadKey().

+7

, Subscribe, Undefined . -, , Select SelectMany ( try-catch).

+3

All Articles