How to create continuous flow processing using TPL in C # 4

I'm not sure if the following is possible, but I would like to invoke several actions in Paralell throttle, but to maintain the continuity of the processing flow without returning to the use of timers or sleep / sleep cycles.

So far I have worked on the fact that it loads a large batch of inputs from some source ... and then processes them in paralell in a controlled manner and loops, as shown below.

static void Main(string[] args)
{
    while(true) //Simulate a Timer Elapsing...
    {
        IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
        //Simulate querying database queue tables for next batch of entries

        RunAllActions(inputs, 3); //Max 3 at a time.
    }
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

    Parallel.ForEach<int>(inputs, options, DoWork);
    //Blocks here until all inputs are processed.
    Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
    Console.WriteLine("Starting Task {0}", input);
    System.Threading.Thread.Sleep(3000);
    Console.WriteLine("Finishing Task {0}", input);
}

what I would like to know is there a constructor in TPL that I could use to always work ... so that I can replace the "Timer Elapsing" and "Database Polling" with the MessageQueue Receieved event.

, ... , , , , , TPL.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}
+5
2

BlockingCollection<T> , /.

, BlockingCollection<T> "". ( ) ( ), ( blockingCollection.GetConsumingEnumerable() foreach).

, . , BlockingCollection<T>.CompleteAdding, foreach, .

- Parallel.ForEach GetConsumingEnumerable() BlockingCollection<T> - , . . , Parallel.ForEach ( , "" , , "" ).

+5

, BlockingCollection - "" . , .

, , , , , TPL Dataflow. , ActionBlock<T> , , Post ActionBlock<T>, TPL . Engine :

ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem);

void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)      
{      
    int input = (int)e.Message.Body;

    // Post the data to the action block
    this.myActionBlock.Post(input);
}

private void ProcessWorkItem(int workItemData)
{
    // ActionBlock will hand each work item to this method for processing
}

, parallelism , ActionBlock<T>, ExecutionDataflowBlockOptions ActionBlock<T>. , , parallelism . :

ActionBlock<int> myActionBlock = new ActionBlock<int>(
                                     this.ProcessWorkItem, 
                                     new ExecutionDataflowBlockOptions
                                     {
                                         MaxDegreeOfParallelism = 4,
                                         BoundedCapacity = 100
                                     });
+2

All Articles