Repeat policy in ITargetBlock <TInput>

I need to introduce a retry policy in a workflow. Let's say there are 3 blocks that are connected in this way:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }; var buffer = new BufferBlock<int>(); var processing = new TransformBlock<int, int>(..., executionOptions); var send = new ActionBlock<int>(...); buffer.LinkTo(processing); processing.LinkTo(send); 

So, there is a buffer that accumulates data, and then sends it to the conversion block, which processes no more than three elements at a time, and then the result is sent to the action block.

Potentially, during processing, transient errors of the conversion block are possible, and I want to repeat the block if the error is somewhat temporary.

I know that blocks do not usually repeat (delegates that went into blocks can be repeated). And one option is to transfer the delegate delegated to support retrying.

I also know that there is a very good TransientFaultHandling.Core library that provides repeat mechanisms for transient errors. This is a great library, but not in my case. If I transfer the delegate that is passed to the conversion unit to the RetryPolicy.ExecuteAsync method, the message inside the conversion unit will be blocked, and until the retry completes or completes, the conversion unit cannot receive a new message, Imagine if all three messages are entered in the repeated an attempt (say, the next retry attempt will be in 2 minutes) and a failure, the conversion unit will get stuck until at least one message leaves the conversion unit.

The only solution I see is the TranformBlock extension (in fact, ITargetBlock will also be enough), and try again manually (for example, here ):

 do { try { return await transform(input); } catch { if( numRetries <= 0 ) throw; else Task.Delay(timeout).ContinueWith(t => processing.Post(message)); } } while( numRetries-- > 0 ); 

ig send the message again inside the conversion block with a delay, but in this case, the retry context (the number of attempts on the left, etc.) should also be transferred to this block. Sounds too complicated ...

Does anyone see a simpler approach for implementing a retry policy for a flowchart?

+7
c # task-parallel-library tpl-dataflow
source share
2 answers

I think that you should do this to a large extent, you need to keep track of the remaining number of attempts for the message, and you need to somehow plan the retry.

But you could do it better by encapsulating it with a separate method. Something like:

 // it a private class, so public fields are okay private class RetryingMessage<T> { public T Data; public int RetriesRemaining; public readonly List<Exception> Exceptions = new List<Exception>(); } public static IPropagatorBlock<TInput, TOutput> CreateRetryingBlock<TInput, TOutput>( Func<TInput, Task<TOutput>> transform, int numberOfRetries, TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler) { var source = new TransformBlock<TInput, RetryingMessage<TInput>>( input => new RetryingMessage<TInput> { Data = input, RetriesRemaining = numberOfRetries }); // TransformManyBlock, so that we can propagate zero results on failure TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>( async message => { try { return new[] { await transform(message.Data) }; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { failureHandler(message.Exceptions); } else { message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }); source.LinkTo( target, new DataflowLinkOptions { PropagateCompletion = true }); return DataflowBlock.Encapsulate(source, target); } 

I added code to track exceptions because I believe that errors should not be ignored, they should be, at a minimum, logged.

In addition, this code does not work very well with completion: if there are expectations waiting for their delay, and you Complete() is a block, it will end immediately and retries will be lost. If this is a problem for you, you will have to track the outstanding results and complete the target when the source completes and expectations are not expected.

+10
source share

In addition to svick's excellent answer, there are a few more options:

  • You can use TransientFaultHandling.Core - just set MaxDegreeOfParallelism to Unbounded so that other messages can go through.
  • You can change the type of output of the block to turn on the failure indication and retry counter, and create a data flow cycle by passing a filter to LinkTo , which checks if a retry is necessary. This approach is more complex; you will have to add a delay to your block if it retries and add a TransformBlock to remove the failure / retry information for the rest of the grid.
+3
source share

All Articles