We have an application in which we have a materialized array of elements that we will process through the jet pipeline. It's a bit like this
EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
new TaskFactory(
new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
IConnectableObservable<byte[]> source =
numbers.Select(Transform).ToObservable(eventLoop).Publish();
IObservable<int> final = source.
Buffer(10).
Select(
batch =>
batch.
ToObservable(concurrency).
Buffer(10).
Select(
concurrentBatch =>
concurrentBatch.
Select(Work).
ToArray().
ToObservable(eventLoop)).
Merge()).
Merge();
final.Subscribe();
source.Connect();
Await(final).Wait();
If you're really interested in playing with this, the stand-in methods look like
private async static Task Await(IObservable<int> final)
{
await final.LastOrDefaultAsync();
}
private static byte[] Transform(int number)
{
if (number == itemCount)
{
Console.WriteLine("numbers exhausted.");
}
byte[] buffer = new byte[1000000];
Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
return buffer;
}
private static int Work(byte[] buffer)
{
Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(50);
return 1;
}
A little explanation. Range(1, itemCount)simulates the original inputs received from the data source. Transformimitates the enrichment process, each input of which must pass, and leads to an increase in memory. Workis a “long” process that works with a converted input.
, , , Work. (10 ), (threadCount).
, 5 50 Transform ; , , 1 , , 50 .
, , . , Reactive numbers Transform ( numbers exhausted.), (@1GB 1000 itemCount).
: , (.. , )?
: ; -, , paulpdaniels Enigmativity Work(Transform) ( , , , ), : .. .