How can I make an IObservable from a queue so that the sequence does not end when the queue is empty?

I am working on what uses Reactive Extensions for .NET (Rx), and I want to have a sequence that takes its input from a queue (or similar).

I tried to do this:

    static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>();
    //...
    var observableTransactionSource = TransactionQueue.ToObservable();
    //...
    observableTransactionSource.Subscribe(transactionObserver);

It runs to the point, but the sequence ends when the queue is empty. I do not want an empty queue to end the sequence. Empty does not mean that it ends, it simply means "no longer at the moment."

Is there a way to stop the execution of the sequence when the queue is empty, or should I think about the whole problem differently?

+4
source share
1 answer

ToObservable() , , IEnumerable<T> .

, , Subject<T> . Rx , . OnNext<T> .

, , , , ReplaySubject<T>.

, . , , , , , Observable.FromEvent.

+3

All Articles