I have an alternating thread that I am divided into separate consecutive threads .
Producer
int streamCount = 3; new MyIEnumerable<ElementType>() .ToObservable(Scheduler.ThreadPool) .Select((x,i) => new { Key = (i % streamCount), Value = x }) .Subscribe(x => outputs[x.Key].OnNext(x.Value));
Where outputs[] are the entities that process the threads, as defined below. .ObserveOn() used to process threads simultaneously (multithreading).
Consumers
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray(); outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x)); outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x)); outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
The problem with this code is that it will read all the enumerated as quickly as possible, even if the output streams cannot catch up. In my case, an enumerated file stream, so this can cause a lot of memory to be used. Therefore, I would like the reading to be blocked if the buffer reaches the threshold value.
source share