How to make this asynchronous invokation work?

I tried to develop a method pipeline using an asynchronous method call. The logic for the pipeline is as follows

  • The collection has n data that must be submitted to the number m methods in the pipeline
  • List collection T
  • Pass the first element to the first method
  • Get the output, feed it to the second method asynchronously
  • At the same time, pass the second element of the collection to the first method
  • After completing the first method, pass the result to the second method (if the second method is still running, put the result in turn and start the third element in the first method).
  • When the second method finishes executing, take the first element from the queue and execute and so on (each method should be executed asynchronously, no one should wait for the next to complete).
  • In the m-th method, after executing the data, save the result in the list
  • After completing the nth element in the mth method, return the list of results (n number of results) to the very first level.

I came up with the code as follows, but it didn’t work as intended, the result never returns and, moreover, it does not execute as it should.

static class Program { static void Main(string[] args) { var list = new List<int> { 1, 2, 3, 4 }; var result = list.ForEachPipeline(Add, Square, Add, Square); foreach (var element in result) { Console.WriteLine(element); Console.WriteLine("---------------------"); } Console.ReadLine(); } private static int Add(int j) { return j + 1; } private static int Square(int j) { return j * j; } internal static void AddNotify<T>(this List<T> list, T item) { Console.WriteLine("Adding {0} to the list", item); list.Add(item); } } internal class Function<T> { private readonly Func<T, T> _func; private readonly List<T> _result = new List<T>(); private readonly Queue<T> DataQueue = new Queue<T>(); private bool _isBusy; static readonly object Sync = new object(); readonly ManualResetEvent _waitHandle = new ManualResetEvent(false); internal Function(Func<T, T> func) { _func = func; } internal Function<T> Next { get; set; } internal Function<T> Start { get; set; } internal int Count; internal IEnumerable<T> Execute(IEnumerable<T> source) { var isSingle = true; foreach (var element in source) { var result = _func(element); if (Next != null) { Next.ExecuteAsync(result, _waitHandle); isSingle = false; } else _result.AddNotify(result); } if (!isSingle) _waitHandle.WaitOne(); return _result; } internal void ExecuteAsync(T element, ManualResetEvent resetEvent) { lock(Sync) { if(_isBusy) { DataQueue.Enqueue(element); return; } _isBusy = true; _func.BeginInvoke(element, CallBack, resetEvent); } } internal void CallBack(IAsyncResult result) { bool set = false; var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate; var resultElement = worker.EndInvoke(result); var resetEvent = result.AsyncState as ManualResetEvent; lock(Sync) { _isBusy = false; if(Next != null) Next.ExecuteAsync(resultElement, resetEvent); else Start._result.AddNotify(resultElement); if(DataQueue.Count > 1) { var element = DataQueue.Dequeue(); ExecuteAsync(element, resetEvent); } if(Start._result.Count == Count) set = true; } if(set) resetEvent.Set(); } } public static class Pipe { public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes) { Function<T> start = null, previous = null; foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()})) { if (start == null) { start = previous = function; start.Start = function; continue; } function.Start = start; previous.Next = function; previous = function; } return start != null ? start.Execute(source) : null; } } 

Can you guys help me do this? If this project is not suitable for the actual method pipeline, feel free to suggest another.

Change I must strictly adhere to .Net 3.5.

+4
source share
3 answers

I did not immediately find the problem in your code, but you may have spoiled the situation a bit. This may be an easier way to do what you want.

 public static class Pipe { public static IEnumerable<T> Execute<T>( this IEnumerable<T> input, params Func<T, T>[] functions) { // each worker will put its result in this array var results = new T[input.Count()]; // launch workers and return a WaitHandle for each one var waitHandles = input.Select( (element, index) => { var waitHandle = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem( delegate { T result = element; foreach (var function in functions) { result = function(result); } results[index] = result; waitHandle.Set(); }); return waitHandle; }); // wait for each worker to finish foreach (var waitHandle in waitHandles) { waitHandle.WaitOne(); } return results; } } 

This does not create a lock for each stage of the pipeline, as in your own attempt. I skipped this because it did not seem useful. However, you can easily add it by wrapping the following functions:

 var wrappedFunctions = functions.Select(x => AddStageLock(x)); 

where is AddStageLock :

 private static Func<T,T> AddStageLock<T>(Func<T,T> function) { object stageLock = new object(); Func<T, T> wrappedFunction = x => { lock (stageLock) { return function(x); } }; return wrappedFunction; } 

edit: Execute implementation is likely to be slower than single-threaded execution, unless the work that needs to be done for each individual element does not smooth out the overhead of creating a wait descriptor and scheduling a task for a thread pool. To benefit from multithreading, you need limit overhead; PLINQ in .NET 4 does this data sharing .

+1
source

Any specific reason for approaching the pipeline? IMO, launching a separate thread for each input with all the functions encoded one after another will be easier to write and faster to execute. For instance,

 function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input) { T value = input; foreach(var f in pipe) { value = f(value); } return value; } var pipe = new List<Func<int, int>>() { Add, Square, Add, Square }; var list = new List<int> { 1, 2, 3, 4 }; foreach(var value in list) { ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value); } 

Now, returning to your code, I believe that for an exact implementation of the pipeline using the M-scene you should have exactly M threads, since each step can be performed in parallel - now some threads can be inactive because i / p did not reach them . I'm not sure if your code starts any threads and what the thread count will be at a specific time.

+1
source

Why don’t you interrupt the flow for each iteration and do not summarize your results in the lock resource. You only need to do it. You can use PLinq for this. I think you might be wrong in resource methods. You only need to lock the method if it is dealing with a critical block with a shared resource in it. Choosing a resource and moving to a new stream, you will get rid of the need to manage the second method.

IE: Method X Method calls1 then passes the value to Method2 The Foreach element in arr Asynchronous (MethodX (item));

0
source

All Articles