Configure EventStore NServiceBus

How do you integrate into NServiceBus when using EventStore?

I am new to NSB and ES and trying to find the best setting for NSB when using ES and CQRS.

I connect to the NSB just like DispatchCommit in the example, https://github.com/joliver/EventStore/blob/master/doc/EventStore.Example/MainProgram.cs

  • Do you publish all Commit or Commit.Events?
  • Do you create a wrapper around your posts because the NSB requires an IMessage message in your posts? How do you post in the right queue? Since the wrapper is generic, unlike OrderSubmittedEvent, for example. If possible, I do not want an NSB dependency for my events, because then I also have this in my domain.

Some code or manual is really appreciated.

+5
source share
2 answers

Here is what I use in production:

public sealed class NServiceBusPublisher : IPublishMessages
{
    private const string AggregateIdKey = "AggregateId";
    private const string CommitVersionKey = "CommitVersion";
    private const string EventVersionKey = "EventVersion";
    private const string BusPrefixKey = "Bus.";
    private readonly IBus bus;

    public NServiceBusPublisher(IBus bus)
    {
        this.bus = bus;
    }

    public void Dispose()
    {
        GC.SuppressFinalize(this);
    }

    public void Publish(Commit commit)
    {
        for (var i = 0; i < commit.Events.Count; i++)
        {
            var eventMessage = commit.Events[i];
            var busMessage = eventMessage.Body as IMessage;
            AppendHeaders(busMessage, commit.Headers); // optional
            AppendHeaders(busMessage, eventMessage.Headers); // optional
            AppendVersion(commit, i); // optional
            this.bus.Publish(busMessage);
        }
    }
    private static void AppendHeaders(IMessage message, IEnumerable<KeyValuePair<string, object>> headers)
    {
        headers = headers.Where(x => x.Key.StartsWith(BusPrefixKey));
        foreach (var header in headers)
        {
            var key = header.Key.Substring(BusPrefixKey.Length);
            var value = (header.Value ?? string.Empty).ToString();
            message.SetHeader(key, value);
        }
    }
    private static void AppendVersion(Commit commit, int index)
    {
        var busMessage = commit.Events[index].Body as IMessage;
        busMessage.SetHeader(AggregateIdKey, commit.StreamId.ToString());
        busMessage.SetHeader(CommitVersionKey, commit.StreamRevision.ToString());
        busMessage.SetHeader(EventVersionKey, GetSpecificEventVersion(commit, index).ToString());
    }
    private static int GetSpecificEventVersion(Commit commit, int index)
    {
        // e.g. (StreamRevision: 120) - (5 events) + 1 + (index @ 4: the last index) = event version: 120
        return commit.StreamRevision - commit.Events.Count + 1 + index;
    }
}
+4
source

In the third version of NSB, a feature called Unobtrusive mode was introduced . With it, you can get rid of the NSB dependency in your domain.

This new feature in NServiceBus V3 allows you to go through conventions yourself to determine what types are message definitions instead of using the IMessage, ICommand, or IEvent interfaces.

+2
source

All Articles