With RabbitMQ Direct Exchange and Spring AMQP, you can implement a queue topology that contains a queue for each account connected to a single exchange. Sending messages in exchange with the account name as a routing key and with a single user tied to several queues , the consumer will receive round robin messages (see "Direct exchange and load balancing") .
The problem with this setting is that you can have quite a few queues (one for each account) and, at least in my implementation (attached as a simple Spring Boot application below), you will have to restart the "consumer every time when a new account arrives, as this means that you have a new queue for connecting a consumer. I donโt know if this scales / works well. Check this post for the maximum number of queues in RabbitMQ , and if this can affect you.
@RunWith(SpringRunner.class) @SpringBootTest(classes = RoundRobin.RoundRobinQueueConfiguration.class) public class RoundRobin { private static final String EXCHANGE = "round-robin-exchange"; private final List<String> tasks = Arrays.asList( // account(a):task(t) where t holds the expected order of consumption "a1:t1", "a2:t2", "a3:t3", // make sure, a queue for every account (a) exists "a1:t4", "a1:t7", "a1:t9", "a1:t10", // add "many" tasks (t) for account 1 "a2:t5", "a2:t8", "a3:t6"); // add further tasks for other accounts, such that a1 has to "wait" private final List<String> declaredQueues = new ArrayList<>(); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private DirectExchange directExchange; @Autowired private SimpleMessageListenerContainer listenerContainer; @Test public void enqueuedTasksAreProcessedRoundRobin() { tasks.forEach(task -> { String[] accountAndTask = task.split(":"); declareQueue(accountAndTask[0]); rabbitTemplate.convertAndSend(accountAndTask[0], accountAndTask[1] + " from account " + accountAndTask[0]); }); } private void declareQueue(String routingKey) { if (!declaredQueues.contains(routingKey)) { Queue queue = new Queue(routingKey); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey)); listenerContainer.stop(); listenerContainer.addQueues(queue); listenerContainer.start(); declaredQueues.add(routingKey); } } @Configuration public static class RoundRobinQueueConfiguration { @Bean public ConnectionFactory connectionFactory() { return new CachingConnectionFactory("localhost"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setExchange(EXCHANGE); return template; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public DirectExchange directExchange(RabbitAdmin rabbitAdmin) { DirectExchange directExchange = new DirectExchange(EXCHANGE); rabbitAdmin.declareExchange(directExchange); return directExchange; } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) { Queue queue = new Queue("dummy-queue"); // we need a queue to get the container started... rabbitAdmin.declareQueue(queue); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setMessageListener(new RoundRobinMessageListener()); container.setQueues(new Queue("dummy-queue")); container.start(); return container; } } public static class RoundRobinMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("Consumed message " + (new String(message.getBody()))); } } }
In this example, the number of tasks is arbitrary - but I wanted to โadd the expectedโ order to find out if the result matches our expectations.
Test output:
Consumed message t1 from account a1 Consumed message t2 from account a2 Consumed message t3 from account a3 Consumed message t4 from account a1 Consumed message t5 from account a2 Consumed message t6 from account a3 Consumed message t7 from account a1 Consumed message t8 from account a2 Consumed message t9 from account a1 Consumed message t10 from account a1
I think this is what you wanted ...
Michael lihs
source share