I am working on what uses Reactive Extensions for .NET (Rx), and I want to have a sequence that takes its input from a queue (or similar).
I tried to do this:
static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>(); //... var observableTransactionSource = TransactionQueue.ToObservable(); //... observableTransactionSource.Subscribe(transactionObserver);
It runs to the point, but the sequence ends when the queue is empty. I do not want an empty queue to end the sequence. Empty does not mean that it ends, it simply means "no longer at the moment."
Is there a way to stop the execution of the sequence when the queue is empty, or should I think about the whole problem differently?
ToObservable() , , IEnumerable<T> .
ToObservable()
IEnumerable<T>
, , Subject<T> . Rx , . OnNext<T> .
Subject<T>
OnNext<T>
, , , , ReplaySubject<T>.
ReplaySubject<T>
, . , , , , , Observable.FromEvent.
Observable.FromEvent