State transition processing for multiple events

I have a MassTransitStateMachine that organizes a process that involves creating multiple events.

As soon as all events are completed, I want the state to go to the "cleanup" phase.

Here is the relevant status declaration and filter function:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

So, while the status is ImportData, I expect to get some DataImported events. Each event marks its location so that this IsAllDataImported method can determine whether to proceed to the next state.

However, if the last two DataImported events arrive at the same time, the handler to enter the CleaningUp phase fires twice, and I end up trying to clean twice.

, , . - , ?

+4
2

, , , . , . CompositeEvent .

AllDataImported MarkImportCompletedForLocation. , - .

, :

            During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark the URI in the locations list as done. 
                .Then(MarkImportCompletedForLocation),

            When(AllDataImported)
                // Once all are done, we can transition to cleaning up...
                .TransitionTo(CleaningUp)
                .Then(CleanUpSources)
        );

IsAllDataImported .

Locations:

public Dictionary<Uri, bool> Locations { get; set; }

MarkImportCompletedForLocation :

    private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
    {
        lock (ctx.Instance.Locations)
        {
            ctx.Instance.Locations[ctx.Data.ImportSource] = true;
            if (ctx.Instance.Locations.Values.All(x => x))
            {
                var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
            }
        }
    }

( , , , , MarkImportCompletedForLocation , , .)

+4

, . :

CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);

During(ImportingData,
    When(DataImported)
        .Then(context => { do something with data }),
    When(MoreDataImported)
        .Then(context => { do smoething with more data}),
    When(AllDataImported)
        .Then(context => { okay, have all data now}));

:

class DataImportSagaState :
    SagaStateMachineInstance
{
    public int ImportStatus { get; set; }
}

, , . , , , ImportStatus .

, , .Then().

+1

All Articles