I tried to develop a method pipeline using an asynchronous method call. The logic for the pipeline is as follows
- The collection has n data that must be submitted to the number m methods in the pipeline
- List collection T
- Pass the first element to the first method
- Get the output, feed it to the second method asynchronously
- At the same time, pass the second element of the collection to the first method
- After completing the first method, pass the result to the second method (if the second method is still running, put the result in turn and start the third element in the first method).
- When the second method finishes executing, take the first element from the queue and execute and so on (each method should be executed asynchronously, no one should wait for the next to complete).
- In the m-th method, after executing the data, save the result in the list
- After completing the nth element in the mth method, return the list of results (n number of results) to the very first level.
I came up with the code as follows, but it didnβt work as intended, the result never returns and, moreover, it does not execute as it should.
static class Program { static void Main(string[] args) { var list = new List<int> { 1, 2, 3, 4 }; var result = list.ForEachPipeline(Add, Square, Add, Square); foreach (var element in result) { Console.WriteLine(element); Console.WriteLine("---------------------"); } Console.ReadLine(); } private static int Add(int j) { return j + 1; } private static int Square(int j) { return j * j; } internal static void AddNotify<T>(this List<T> list, T item) { Console.WriteLine("Adding {0} to the list", item); list.Add(item); } } internal class Function<T> { private readonly Func<T, T> _func; private readonly List<T> _result = new List<T>(); private readonly Queue<T> DataQueue = new Queue<T>(); private bool _isBusy; static readonly object Sync = new object(); readonly ManualResetEvent _waitHandle = new ManualResetEvent(false); internal Function(Func<T, T> func) { _func = func; } internal Function<T> Next { get; set; } internal Function<T> Start { get; set; } internal int Count; internal IEnumerable<T> Execute(IEnumerable<T> source) { var isSingle = true; foreach (var element in source) { var result = _func(element); if (Next != null) { Next.ExecuteAsync(result, _waitHandle); isSingle = false; } else _result.AddNotify(result); } if (!isSingle) _waitHandle.WaitOne(); return _result; } internal void ExecuteAsync(T element, ManualResetEvent resetEvent) { lock(Sync) { if(_isBusy) { DataQueue.Enqueue(element); return; } _isBusy = true; _func.BeginInvoke(element, CallBack, resetEvent); } } internal void CallBack(IAsyncResult result) { bool set = false; var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate; var resultElement = worker.EndInvoke(result); var resetEvent = result.AsyncState as ManualResetEvent; lock(Sync) { _isBusy = false; if(Next != null) Next.ExecuteAsync(resultElement, resetEvent); else Start._result.AddNotify(resultElement); if(DataQueue.Count > 1) { var element = DataQueue.Dequeue(); ExecuteAsync(element, resetEvent); } if(Start._result.Count == Count) set = true; } if(set) resetEvent.Set(); } } public static class Pipe { public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes) { Function<T> start = null, previous = null; foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()})) { if (start == null) { start = previous = function; start.Start = function; continue; } function.Start = start; previous.Next = function; previous = function; } return start != null ? start.Execute(source) : null; } }
Can you guys help me do this? If this project is not suitable for the actual method pipeline, feel free to suggest another.
Change I must strictly adhere to .Net 3.5.
source share