TPL Dataflow, confused by the basic design

I use TPL Dataflow quite a bit, but stumble on a problem that I cannot solve:

I have the following architecture:

BroadCastBlock<List<object1>> β†’ 2 different TransformBlock<List<Object1>, Tuple<int, List<Object1>>> β†’ both links to TransformManyBlock<Tuple<int, List<Object1>>, Object2>

I change the lambda expression in the TransformManyBlock at the end of the chain: (a) code that performs operations with the stream tuple, (b) no code at all.

In TransformBlocks, I measure the time starting from the arrival of the first element and stopping when TransformBlock.Completion indicates that the block is complete (broadband links for throwing blocks with propagation of Kompletion set to true).

What I can’t put up with is why transformBlocks in case (b) completes about 5-6 times faster than with (a). This is completely contrary to TDF's design goals. Elements from transformation blocks were passed to transfromManyBlock, so it does not matter at all what transformManyBlock does for elements that affect the completion of transformation blocks. I see no reason why everything that happens in transfromManyBlock can be related to previous TransformBlocks.

Who can reconcile this strange observation?

Here is some code to show the difference. When you run the code, be sure to change the following two lines:

  tfb1.transformBlock.LinkTo(transformManyBlock); tfb2.transformBlock.LinkTo(transformManyBlock); 

in

  tfb1.transformBlock.LinkTo(transformManyBlockEmpty); tfb2.transformBlock.LinkTo(transformManyBlockEmpty); 

to observe the difference in runtime of previous transformBlocks.

 class Program { static void Main(string[] args) { Test test = new Test(); test.Start(); } } class Test { private const int numberTransformBlocks = 2; private int currentGridPointer; private Dictionary<int, List<Tuple<int, List<Object1>>>> grid; private BroadcastBlock<List<Object1>> broadCastBlock; private TransformBlockClass tfb1; private TransformBlockClass tfb2; private TransformManyBlock<Tuple<int, List<Object1>>, Object2> transformManyBlock; private TransformManyBlock<Tuple<int, List<Object1>>, Object2> transformManyBlockEmpty; private ActionBlock<Object2> actionBlock; public Test() { grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>(); broadCastBlock = new BroadcastBlock<List<Object1>>(list => list); tfb1 = new TransformBlockClass(); tfb2 = new TransformBlockClass(); transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2> (newTuple => { for (int counter = 1; counter <= 10000000; counter++) { double result = Math.Sqrt(counter + 1.0); } return new Object2[0]; }); transformManyBlockEmpty = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>( tuple => { return new Object2[0]; }); actionBlock = new ActionBlock<Object2>(list => { int tester = 1; //flush transformManyBlock }); //linking broadCastBlock.LinkTo(tfb1.transformBlock , new DataflowLinkOptions { PropagateCompletion = true } ); broadCastBlock.LinkTo(tfb2.transformBlock , new DataflowLinkOptions { PropagateCompletion = true } ); //link either to ->transformManyBlock or -> transformManyBlockEmpty tfb1.transformBlock.LinkTo(transformManyBlock); tfb2.transformBlock.LinkTo(transformManyBlock); transformManyBlock.LinkTo(actionBlock , new DataflowLinkOptions { PropagateCompletion = true } ); transformManyBlockEmpty.LinkTo(actionBlock , new DataflowLinkOptions { PropagateCompletion = true } ); //completion Task.WhenAll(tfb1.transformBlock.Completion , tfb2.transformBlock.Completion) .ContinueWith(_ => { transformManyBlockEmpty.Complete(); transformManyBlock.Complete(); }); transformManyBlock.Completion.ContinueWith(_ => { Console.WriteLine("TransformManyBlock (with code) completed"); }); transformManyBlockEmpty.Completion.ContinueWith(_ => { Console.WriteLine("TransformManyBlock (empty) completed"); }); } public void Start() { const int numberBlocks = 100; const int collectionSize = 300000; //send collection numberBlock-times for (int i = 0; i < numberBlocks; i++) { List<Object1> list = new List<Object1>(); for (int j = 0; j < collectionSize; j++) { list.Add(new Object1(j)); } broadCastBlock.Post(list); } //mark broadCastBlock complete broadCastBlock.Complete(); Console.WriteLine("Core routine finished"); Console.ReadLine(); } } class TransformBlockClass { private Stopwatch watch; private bool isStarted; private int currentIndex; public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock; public TransformBlockClass() { isStarted = false; watch = new Stopwatch(); transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>> (list => { if (!isStarted) { StartUp(); isStarted = true; } return new Tuple<int, List<Object1>>(currentIndex++, list); }); transformBlock.Completion.ContinueWith(_ => { ShutDown(); }); } private void StartUp() { watch.Start(); } private void ShutDown() { watch.Stop(); Console.WriteLine("TransformBlock : Time elapsed in ms: " + watch.ElapsedMilliseconds); } } class Object1 { public int val { get; private set; } public Object1(int val) { this.val = val; } } class Object2 { public int value { get; private set; } public List<Object1> collection { get; private set; } public Object2(int value, List<Object1> collection) { this.value = value; this.collection = collection; } } 

* EDIT: I sent another piece of code, this time using collections of value types, and I cannot reproduce the problem that I am observing in the above code. Could it be that walking around reference types and working on them at the same time (even in different blocks of the data stream) can block and cause a conflict? *

 class Program { static void Main(string[] args) { Test test = new Test(); test.Start(); } } class Test { private BroadcastBlock<List<int>> broadCastBlock; private TransformBlock<List<int>, List<int>> tfb11; private TransformBlock<List<int>, List<int>> tfb12; private TransformBlock<List<int>, List<int>> tfb21; private TransformBlock<List<int>, List<int>> tfb22; private TransformManyBlock<List<int>, List<int>> transformManyBlock1; private TransformManyBlock<List<int>, List<int>> transformManyBlock2; private ActionBlock<List<int>> actionBlock1; private ActionBlock<List<int>> actionBlock2; public Test() { broadCastBlock = new BroadcastBlock<List<int>>(item => item); tfb11 = new TransformBlock<List<int>, List<int>>(item => { return item; }); tfb12 = new TransformBlock<List<int>, List<int>>(item => { return item; }); tfb21 = new TransformBlock<List<int>, List<int>>(item => { return item; }); tfb22 = new TransformBlock<List<int>, List<int>>(item => { return item; }); transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item => { Thread.Sleep(100); //or you can replace the Thread.Sleep(100) with actual work, //no difference in results. This shows that the issue at hand is //unrelated to starvation of threads. return new List<int>[1] { item }; }); transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item => { return new List<int>[1] { item }; }); actionBlock1 = new ActionBlock<List<int>>(item => { //flush transformManyBlock }); actionBlock2 = new ActionBlock<List<int>>(item => { //flush transformManyBlock }); //linking broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions { PropagateCompletion = true }); tfb11.LinkTo(transformManyBlock1); tfb12.LinkTo(transformManyBlock1); tfb21.LinkTo(transformManyBlock2); tfb22.LinkTo(transformManyBlock2); transformManyBlock1.LinkTo(actionBlock1 , new DataflowLinkOptions { PropagateCompletion = true } ); transformManyBlock2.LinkTo(actionBlock2 , new DataflowLinkOptions { PropagateCompletion = true } ); //completion Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ => { Console.WriteLine("TransformBlocks 11 and 12 completed"); transformManyBlock1.Complete(); }); Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ => { Console.WriteLine("TransformBlocks 21 and 22 completed"); transformManyBlock2.Complete(); }); transformManyBlock1.Completion.ContinueWith(_ => { Console.WriteLine ("TransformManyBlock (from tfb11 and tfb12) finished"); }); transformManyBlock2.Completion.ContinueWith(_ => { Console.WriteLine ("TransformManyBlock (from tfb21 and tfb22) finished"); }); } public void Start() { const int numberBlocks = 100; const int collectionSize = 300000; //send collection numberBlock-times for (int i = 0; i < numberBlocks; i++) { List<int> list = new List<int>(); for (int j = 0; j < collectionSize; j++) { list.Add(j); } broadCastBlock.Post(list); } //mark broadCastBlock complete broadCastBlock.Complete(); Console.WriteLine("Core routine finished"); Console.ReadLine(); } } 
+4
source share
1 answer

OK, last try; -)

Description:

The observed time delta in scenario 1 can be fully explained by the "strong" difference in the behavior of the garbage collector.

When you run script 1 linking transformManyBlocks, the runtime behavior is such that garbage collection starts when new elements (lists) are created in the main thread, which does not correspond to scenario 1 with the transformManyBlockEmptys binding.

Note that instantiating a new reference type (Object1) causes a memory allocation on the GC heap, which in turn can trigger the start of the GC collection. Since multiple instances of Object1 (and lists) are created, the garbage collector has a bit more work to scan the heap for (potentially) unreachable objects.

Therefore, the observed difference can be minimized by any of the following:

  • Turning Object1 from class to structure (thereby ensuring that memory for instances is not allocated on the heap).
  • Saving links to generated lists (thereby reducing the collection time of the garbage collector to identify unreachable objects).
  • Creation of all elements before sending to the network.

(Note: I cannot explain why the garbage collector behaves differently in scenario 1 of "transformManyBlock" compared to scenario 1 of "transformManyBlockEmpty", but the data collected using ConcurrencyVisualizer clearly shows the difference.)

Results:

(Tests were performed on Core i7 980X, 6 cores, with HT support):

I modified script 2 as follows:

 // Start a stopwatch per tfb int tfb11Cnt = 0; Stopwatch sw11 = new Stopwatch(); tfb11 = new TransformBlock<List<int>, List<int>>(item => { if (Interlocked.CompareExchange(ref tfb11Cnt, 1, 0) == 0) sw11.Start(); return item; }); // [...] // completion Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ => { Console.WriteLine("TransformBlocks 11 and 12 completed. SW11: {0}, SW12: {1}", sw11.ElapsedMilliseconds, sw12.ElapsedMilliseconds); transformManyBlock1.Complete(); }); 

Results:

  • Scenario 1 (as published, i.e. related to transformManyBlock) :
    TransformBlock: Elapsed time in ms: 6826
    TransformBlock: time elapsed in ms: 6826
  • Scenario 1 (associated with transformManyBlockEmpty) :
    TransformBlock: time elapsed through ms: 3140
    TransformBlock: Elapsed time in ms: 3140
  • Scenario 1 (transformManyBlock, Thread.Sleep (200) in the body of the loop) :
    TransformBlock: Elapsed time in ms: 4949
    TransformBlock: elapsed time in ms: 4950
  • Scenario 2 (as published but modified for the report) :
    TransformBlocks 21 and 22 are complete. SW21: 619 ms, SW22: 669 ms
    TransformBlocks 11 and 12 are complete. SW11: 669 ms, SW12: 667 ms

Then I changed scripts 1 and 2 to prepare the input before sending it to the network:

 // Scenario 1 //send collection numberBlock-times var input = new List<List<Object1>>(numberBlocks); for (int i = 0; i < numberBlocks; i++) { var list = new List<Object1>(collectionSize); for (int j = 0; j < collectionSize; j++) { list.Add(new Object1(j)); } input.Add(list); } foreach (var inp in input) { broadCastBlock.Post(inp); Thread.Sleep(10); } // Scenario 2 //send collection numberBlock-times var input = new List<List<int>>(numberBlocks); for (int i = 0; i < numberBlocks; i++) { List<int> list = new List<int>(collectionSize); for (int j = 0; j < collectionSize; j++) { list.Add(j); } //broadCastBlock.Post(list); input.Add(list); } foreach (var inp in input) { broadCastBlock.Post(inp); Thread.Sleep(10); } 

Results:

  • Scenario 1 (transformManyBlock) :
    TransformBlock: time elapsed through ms: 1029
    TransformBlock: Time elapsed through ms: 1029
  • Scenario 1 (transformManyBlockEmpty) :
    TransformBlock: elapsed time in ms: 975
    TransformBlock: elapsed time in ms: 975
  • Scenario 1 (transformManyBlock, Thread.Sleep (200) in the body of the loop) :
    TransformBlock: Elapsed time in ms: 972
    TransformBlock: elapsed time in ms: 972

Finally, I changed the code to the original version, but keeping the link to the created list:

 var lists = new List<List<Object1>>(); for (int i = 0; i < numberBlocks; i++) { List<Object1> list = new List<Object1>(); for (int j = 0; j < collectionSize; j++) { list.Add(new Object1(j)); } lists.Add(list); broadCastBlock.Post(list); } 

Results:

  • Scenario 1 (transformManyBlock) :
    TransformBlock: Time elapsed through ms: 6052
    TransformBlock: time elapsed through ms: 6052
  • Scenario 1 (transformManyBlockEmpty) :
    TransformBlock: time elapsed in ms: 5524
    TransformBlock: time elapsed in ms: 5524
  • Scenario 1 (transformManyBlock, Thread.Sleep (200) in the body of the loop) :
    TransformBlock: Elapsed time in ms: 5098
    TransformBlock: Elapsed time in ms: 5098

Similarly, changing Object1 from class to structure will result in both blocks being executed at about the same time (and about 10 times faster).


Update: The answer below is not enough to explain the observed behavior.

In the scenario, one of the hard loops is executed inside the TransformMany lambda, which will be processor dependent and will starve other threads for processor resources. This is the reason why you can observe the delay in completing the task of continuing completion. In two scenarios, the Thread.Sleep script runs inside the TransformMany lambda, giving other threads the ability to complete the continuation task. The observed run-time difference is not related to the TPL data stream. To improve the observed deltas, it is enough to enter Thread.Sleep inside the loop body in scenario 1:

 for (int counter = 1; counter <= 10000000; counter++) { double result = Math.Sqrt(counter + 1.0); // Back off for a little while Thread.Sleep(200); } 

(Below is my initial answer. I did not read the OP question carefully enough and only understood what he was asking about by reading his comments. I still leave it here as a link.)

Are you sure you are measuring correctly? Note that when you do something like this: transformBlock.Completion.ContinueWith(_ => ShutDown()); , your time measurement will depend on the behavior of the TaskScheduler (for example, how long it will take until the execution of the continue task begins). Although I could not observe the difference that you saw on my machine, I got predictor results (in terms of delta between tfb1 and tfb2 times) when using dedicated threads to measure time:

  // Within your Test.Start() method... Thread timewatch = new Thread(() => { var sw = Stopwatch.StartNew(); tfb1.transformBlock.Completion.Wait(); Console.WriteLine("tfb1.transformBlock completed within {0} ms", sw.ElapsedMilliseconds); }); Thread timewatchempty = new Thread(() => { var sw = Stopwatch.StartNew(); tfb2.transformBlock.Completion.Wait(); Console.WriteLine("tfb2.transformBlock completed within {0} ms", sw.ElapsedMilliseconds); }); timewatch.Start(); timewatchempty.Start(); //send collection numberBlock-times for (int i = 0; i < numberBlocks; i++) { // ... rest of the code 
+3
source

All Articles