Creating a message bus with TPL data stream

I was looking for a lightweight, asynchronous message bus, and came across a TPL data stream.

My current implementation is below (full example at https://gist.github.com/4416655 ).

public class Bus { private readonly BroadcastBlock<object> broadcast = new BroadcastBlock<object>(message => message); private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions = new ConcurrentDictionary<Guid, IDisposable>(); public Task SendAsync<TMessage>(TMessage message) { return SendAsync<TMessage>(message, CancellationToken.None); } public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken) { return broadcast.SendAsync(message, cancellationToken); } public Guid Subscribe<TMessage>(Action<TMessage> handlerAction) { var handler = new ActionBlock<object>(message => handlerAction((TMessage)message)); var subscription = broadcast.LinkTo(handler, new DataflowLinkOptions { PropagateCompletion = true }, message => message is TMessage); return AddSubscription(subscription); } public void Unsubscribe(Guid subscriptionId) { IDisposable subscription; if (subscriptions.TryRemove(subscriptionId, out subscription)) { subscription.Dispose(); } } private Guid AddSubscription(IDisposable subscription) { var subscriptionId = Guid.NewGuid(); subscriptions.TryAdd(subscriptionId, subscription); return subscriptionId; } } 

I have some general questions about using TPL data flow in a messaging script.

  • Is BroadcastBlock<T> recommended source for sending messages to multiple handlers simultaneously? This was the conclusion I came to based on this post .
  • In my implementation, I use a single BroadcastBlock<T> instance for all message types. Can this cause problems when processing a large number of messages? Should I create a separate instance for the message type?
  • BroadcastBlock<T> always saves the last sent item. This means that any new subscriptions (links) will be automatically transmitted to this message. Is it possible to change this behavior (new subscriptions should only receive new messages).
  • In my test application, I entered a delay in the first handler:

      // Subscribe to Message type var subscription1 = bus.Subscribe<Message>(async m => { await Task.Delay(2000); Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); }); 

    When sending a message, I expected that each message will be displayed on the console one by one in increments of 2 seconds. Instead, after 2, all messages were displayed immediately. I assume this is due to parallelism executed by the main scheduler, but I'm curious how I can change these settings (setting MaxDegreeOfParallelism = 1 does not matter).

  • Finally, while SendAsync allows me to wait for a message to be sent, it does not allow me to wait for a target to complete ( ActionBlock<T> ). I thought it would be a PropagateCompletion , but that doesn't seem to be the case. Ideally, I would like to know when all the message handlers were executed.

Update

The reason I did not expect behavior with Task.Delay is because it delayed the execution of each handler rather than the processing of all handlers. Thread.Sleep was what I needed.

+4
source share
1 answer

Answering your questions (see below), I realized that modeling your design using TPL Dataflow blocks is probably not a good idea. TDF is good for processing messages in mostly independent blocks without an integrated way to track a single message. But this is what you think is necessary: ​​to process the message by the handlers in sequence with tracking the completion of each message.

Because of this, I think you should not create a whole network of data streams, but instead use one ActionBlock as an asynchronous message processor:

 public class Bus { class Subscription { public Guid Id { get; private set; } public Func<object, Task> HandlerAction { get; private set; } public Subscription(Guid id, Func<object, Task> handlerAction) { Id = id; HandlerAction = handlerAction; } } private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>(); private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>(); private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor; public Bus() { // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe var subscriptions = new List<Subscription>(); m_messageProcessor = new ActionBlock<Tuple<object, Action>>( async tuple => { var message = tuple.Item1; var completedAction = tuple.Item2; // could be made more efficient, probably doesn't matter Guid idToUnsubscribe; while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe)) { subscriptions.RemoveAll(s => s.Id == idToUnsubscribe); } Subscription handlerToSubscribe; while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe)) { subscriptions.Add(handlerToSubscribe); } foreach (var subscription in subscriptions) { await subscription.HandlerAction(message); } completedAction(); }); } public Task SendAsync<TMessage>(TMessage message) { var tcs = new TaskCompletionSource<bool>(); Action completedAction = () => tcs.SetResult(true); m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction)); return tcs.Task; } public Guid Subscribe<TMessage>(Action<TMessage> handlerAction) { return Subscribe<TMessage>( message => { handlerAction(message); // we need a completed non-generic Task; this is a simple, efficient way to get it // another option would be to use async lambda with no await, // but that less efficient and produces a warning return Task.FromResult(false); }); } public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction) { Func<object, Task> actionWithCheck = async message => { if (message is TMessage) await handlerAction((TMessage)message); }; var id = Guid.NewGuid(); m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck)); return id; } public void Unsubscribe(Guid subscriptionId) { m_idsToUnsubscribe.Enqueue(subscriptionId); } } 

(I decided to use queues to subscribe and unsubscribe, so the list of handlers did not change during message processing.)

Answers to your questions

Is BroadcastBlock<T> recommended source for sending messages to multiple handlers simultaneously?

Yes, at first glance, it looks like BroadcastBlock<T> is what you want. Of course, no such block exists in the TPL data stream.

In my implementation, I use one instance of BroadcastBlock for all types of messages. Can this cause problems when processing a large number of messages? Should I create a separate instance for the message type?

With one block for all types of messages, you do more work (sending to all handlers) in a single thread. With one block for each type of message, you do less work (send only the right handlers) that can run on multiple threads. Because of this, I consider it reasonable to assume that the latter will be faster.

But don't forget the rules for optimizing application performance: first, write code that is simple and readable. Only if it turns out that it is actually slow, try optimizing it. And when comparing two alternatives, always use profiling to figure out which one is really faster, not just guess which one should be faster.

BroadcastBlock<T> always saves the last sent item. This means that any new subscriptions (links) will be automatically transmitted to this message. Is it possible to change this behavior (new subscriptions should only receive new messages)?

No, there is no way to configure BroadcastBlock<T> for this. If you don’t need all the features of BroadcastBlock<T> (sending to blocks with limited bandwidth that may be temporarily filled, support for non-greedy blocks as targets), you can write a custom version of BroadcastBlock<T> to do this.

When sending a message, I expected that each message will be displayed on the console one by one in increments of 2 seconds. Instead, after 2, all messages were displayed immediately. I suppose this is due to parallelism executed by the base scheduler, but I'm curious how I can change these settings (setting MaxDegreeOfParallelism = 1 doesn't matter).

One of the points of TDF is that each block is independent, so multiple blocks can run on multiple threads. If this is not what you want, then perhaps using a separate ActionBlock<T> for each handler may not be the best solution. In fact, TDF may not be the best solution.

In addition, Subscribe() accepts Action<TMessage> , which means your lambda will be compiled as an async void method. They should only be used in specific (and relatively rare) cases where you have no other options. If you want to support async handlers, you must accept the async Task methods, i.e. Func<TMessage, Task> .

The reason I did not expect behavior with Task.Delay is because it delayed the execution of each handler rather than the processing of all handlers. Thread.Sleep was what I needed.

Using Thread.Sleep() contradicts the whole idea of ​​async, you should not use it if possible. Also, I don’t think that it really worked the way you wanted: it introduced a delay in each stream, but TPL Dataflow will use more than one stream, therefore it will not behave the way you planned.

Finally, while SendAsync allows me to wait for a message to be sent, it does not allow me to wait for a target to complete ( ActionBlock<T> ). I thought it would be a PropagateCompletion , but that doesn't seem to be the case. Ideally, I would like to know when all the message handlers were executed.

PropagateCompletion , together with Complete() and Completion intended to transmit an entire block, and not to process a single message. One reason for this is with more complex networks of data streams, it may not be clear exactly when the message is being processed. For example, if a message has already been sent to all current BroadcastBlock<T> targets, but will also be sent to all newly added targets, should it be considered complete?

If you want to do this, you will have to do it manually somehow, possibly using TaskCompletionSource .

+6
source

All Articles