What are some queuing mechanisms for implementing round robin queues?

I have several task producers who add work to the queue. I also have several consumers who support this lineup. Because these queues are FIFOs, they are unloaded in the same order in which they were added.

In my script, tasks are added to the queue from HTTP requests. Each task is associated with an account, and there is no speed limit. Therefore, it is possible that tasks from the same account induce a message queue.

To solve this problem, I was looking for an implementation of a queue that allows you to honestly process tasks from multiple accounts in a circular mode.

I currently resorted to using Redis with some Lua scripts that were chosen to emulate a looping queue, but I was wondering if there are any existing queue topologies that do this?

+10
java amqp
source share
5 answers

I usually do it like this:

  • Instead of setting tasks directly to the work queue, create a separate task queue for each account. Each request puts a task in the queue of its account, and when the account queue goes from empty to non-empty, put the account queue in the global work queue

  • Workers accept queues of queues from the work queue when they are ready for more work. When a worker takes up the account queue, he retrieves the first task, and the worker immediately puts the account queue at the end of the work queue if it is not empty. Then the worker performs the task.

Using this system, each account queue is in the work queue no more than once, and all accounts with related work are equally represented in the work queue.

This is pretty simple to implement, but you need to be careful when it is discovered when you need to put the account queue in the work queue, since there may be two threads making this decision at the same time, and you donโ€™t know, t want the account queue to run twice .

I do it simply:

  • Each account queue has an atomic logical value that keeps track of whether it is in the work queue. The worker sets this value to false immediately after deleting the queue of the queue. If someone finds that the account queue is not empty, they can try CAS for this boolean to true if the queue is successfully completed in the work queue.

  • There is little chance that the account queue might end up in the work queue when it is empty. Make sure that it is harmless - if the employee cannot complete the task from the account queue, he should just forget about it and take the new account queue from the work queue.

+11
source share

With RabbitMQ Direct Exchange and Spring AMQP, you can implement a queue topology that contains a queue for each account connected to a single exchange. Sending messages in exchange with the account name as a routing key and with a single user tied to several queues , the consumer will receive round robin messages (see "Direct exchange and load balancing") .

The problem with this setting is that you can have quite a few queues (one for each account) and, at least in my implementation (attached as a simple Spring Boot application below), you will have to restart the "consumer every time when a new account arrives, as this means that you have a new queue for connecting a consumer. I donโ€™t know if this scales / works well. Check this post for the maximum number of queues in RabbitMQ , and if this can affect you.

@RunWith(SpringRunner.class) @SpringBootTest(classes = RoundRobin.RoundRobinQueueConfiguration.class) public class RoundRobin { private static final String EXCHANGE = "round-robin-exchange"; private final List<String> tasks = Arrays.asList( // account(a):task(t) where t holds the expected order of consumption "a1:t1", "a2:t2", "a3:t3", // make sure, a queue for every account (a) exists "a1:t4", "a1:t7", "a1:t9", "a1:t10", // add "many" tasks (t) for account 1 "a2:t5", "a2:t8", "a3:t6"); // add further tasks for other accounts, such that a1 has to "wait" private final List<String> declaredQueues = new ArrayList<>(); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private DirectExchange directExchange; @Autowired private SimpleMessageListenerContainer listenerContainer; @Test public void enqueuedTasksAreProcessedRoundRobin() { tasks.forEach(task -> { String[] accountAndTask = task.split(":"); declareQueue(accountAndTask[0]); rabbitTemplate.convertAndSend(accountAndTask[0], accountAndTask[1] + " from account " + accountAndTask[0]); }); } private void declareQueue(String routingKey) { if (!declaredQueues.contains(routingKey)) { Queue queue = new Queue(routingKey); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey)); listenerContainer.stop(); listenerContainer.addQueues(queue); listenerContainer.start(); declaredQueues.add(routingKey); } } @Configuration public static class RoundRobinQueueConfiguration { @Bean public ConnectionFactory connectionFactory() { return new CachingConnectionFactory("localhost"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setExchange(EXCHANGE); return template; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public DirectExchange directExchange(RabbitAdmin rabbitAdmin) { DirectExchange directExchange = new DirectExchange(EXCHANGE); rabbitAdmin.declareExchange(directExchange); return directExchange; } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) { Queue queue = new Queue("dummy-queue"); // we need a queue to get the container started... rabbitAdmin.declareQueue(queue); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setMessageListener(new RoundRobinMessageListener()); container.setQueues(new Queue("dummy-queue")); container.start(); return container; } } public static class RoundRobinMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("Consumed message " + (new String(message.getBody()))); } } } 

In this example, the number of tasks is arbitrary - but I wanted to โ€œadd the expectedโ€ order to find out if the result matches our expectations.

Test output:

 Consumed message t1 from account a1 Consumed message t2 from account a2 Consumed message t3 from account a3 Consumed message t4 from account a1 Consumed message t5 from account a2 Consumed message t6 from account a3 Consumed message t7 from account a1 Consumed message t8 from account a2 Consumed message t9 from account a1 Consumed message t10 from account a1 

I think this is what you wanted ...

+4
source share

Any packaged solution comes with a lot of extraneous overhead. I believe this is the template you want to focus on . For example, RabbitMQ has a routing queue solution. And ActiveMQ supports this template .

I would write it myself, but it is not so difficult to do.

+2
source share

I know that WebSphere in version 5.1 (very old) provided such a Queue that one Queue can provide a service for sub-queues, i.e. that in your case, you will create a sub-queue for each client and basically you can request in a round-by-line how each sub-heading is for the next task. But I do not know the details and do not recommend WebSphere at all (speak from experience). But I assume that programmatically you can maintain a list of queues or a queue of queues, where each queue at the lower level represents a task queue from a specific client. And then you can use your own logic to perform tasks in ordering tariffs from the proper queue. Of course, you will need to manage the queues, that is, clear the empty queues and get a new task if this client already has a dedicated queue or not, and add your task to the new or existing queue accordingly.

+1
source share

I would suggest using a multimedia card Guava:

 import java.util.LinkedHashSet; public class QueueTest { public static void main(String[] args) { TreeMultimap<String, String> multimap = TreeMultimap.create(); multimap.put("c1", "TaskC11"); multimap.put("c1", "TaskC12"); multimap.put("c1", "TaskC13"); multimap.put("c2", "TaskC21"); multimap.put("c3", "TaskC31"); while (multimap.size() > 0) { for (String customer : new LinkedHashSet<>(multimap.keySet())) { String taskToProcess = multimap.get(customer).pollFirst(); System.out.println(taskToProcess); } } } } 

Result:

 TaskC11 TaskC21 TaskC31 TaskC12 TaskC13 

You can also add your own comparators to manage priorities for each client.

0
source share

All Articles