I was looking for the correct messaging template to use for the following problem:
I have one queue in which there are messages for each user in the domain, each message is a userChanged event. The business requirement is that all messages for a specific user must be processed in FIFO order, and if an error occurs while processing a specific user message, processing should not be performed until this error is marked as successfully processed. All messages must be processed, and this solution must be deployed in a clustered ESB environment.
I would like to demux these events into the FIFO queue for each userid so that I can process messages from different users in parallel and process each message for the user sequentially.
So far I have come up with two possible solutions related to Mule and Rabbit, but both will include either custom components or existing components that I don’t know about.
Have all the messages in the first RabbitMQ queue read by the Mule thread with an incoming AMQP endpoint that grabs the user ID from the header. He then uses the AMQP outbound endpoint, which dynamically creates a persistent queue, if it does not already exist, which is something like this: userChangedEvent-#[flowVars.userid]and posts a message for that queue. The custom component will need to do the following: a. Create a shared object map that will identify all dynamic amqp listener threads if they do not exist b. Check the map to see if there is an instance of the listener stream for the user. If this does not happen, add an instance of the dynamic amqp listener thread that will listen to the queueuserChangedEvent-#[flowVars.userid]add this to the card using the user id as the key. with. Run the thread. e. The dynamic thread must be configured to process in a single-threaded configuration, manually monitor messages after successful completion of the business logic, and stop the thread in the event of an error.
amqp, , "" - , if . :
. ?
. - , .
. , .
, "bucketing" userid (userChangedEvent-0to1000, userChangedEvent-1000to2000, etc), amqp, , , .
, , EIP ! .
: , , (, ), , (1 ) : http://www.coralblocks.com/index.php/2014/06/demultiplexing-with-coralqueue-for-parallel-processing/