How to limit buffering while processing threads with reactive extensions

I have an alternating thread that I am divided into separate consecutive threads .

Producer

int streamCount = 3; new MyIEnumerable<ElementType>() .ToObservable(Scheduler.ThreadPool) .Select((x,i) => new { Key = (i % streamCount), Value = x }) .Subscribe(x => outputs[x.Key].OnNext(x.Value)); 

Where outputs[] are the entities that process the threads, as defined below. .ObserveOn() used to process threads simultaneously (multithreading).

Consumers

 var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray(); outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x)); outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x)); outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x)); 

The problem with this code is that it will read all the enumerated as quickly as possible, even if the output streams cannot catch up. In my case, an enumerated file stream, so this can cause a lot of memory to be used. Therefore, I would like the reading to be blocked if the buffer reaches the threshold value.

+4
source share
2 answers

I solved this using a semaphore for producer and consumers, as shown below. However, I'm not sure if this is considered a good solution (in terms of Rx contracts, programming style, etc.).

 var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS); // add to producer (before subscribe) .Do(_ => semaphore.Wait()); // add to consumer (before subscribe) .Do(_ => semaphore.Release())) 

It might be a good idea to pass a CancelationToken to a Wait() call and make sure it is canceled when the thread stops normal?

+3
source

I think your decision is very reasonable. The biggest problem (with some background to the previous question) is that the "insides" of your solution are currently displayed everywhere. Just make sure that with the right coding, you clear the following:

  • Wrap everything in a class that provides one method: IDisposable Subscribe(<index>, Action) or alternatively IObservable<element> ToObservable(<index>)) . Either the returned subscription or the returned observable will have "work" already done for them, namely, the added Do actions, etc. The fact that the dictionary or list under all of this should be completely irrelevant for the user, otherwise any change to your code here will require changes everywhere.

  • A CancelationToken is a great idea, be sure to cancel it with OnCompleted or OnError , which you can do using overloads to Do

+2
source

All Articles