Spring RabbitMQ - using manual channel acknowledgment on a service with @RabbitListener configuration

How to confirm messages manually without automatic confirmation. Is there a way to use this together with @RabbitListener and @EnableRabbit configuration styles. In most documentation, we use the SimpleMessageListenerContainer along with the ChannelAwareMessageListener . However, using this, we lose the flexibility provided by annotations. I configured my service as shown below:

 @Service public class EventReceiver { @Autowired private MessageSender messageSender; @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order) throws Exception { // code for processing order } 

My RabbitConfiguration setting is below

 @EnableRabbit public class RabbitApplication implements RabbitListenerConfigurer { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; @Bean public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setMaxConcurrentConsumers(5); factory.setMessageConverter((MessageConverter) jackson2Converter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("localhost"); return connectionFactory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setContainerFactory(myRabbitListenerContainerFactory()); } @Autowired private EventReceiver receiver; } } 

Any help would be appreciated as adapting manual channel acknowledgment along with the above configuration style. If we implement ChannelAwareMessageListener, then the onMessage signature will change. Can we implement ChannelAwareMessageListener in a service?

+10
source share
3 answers

Add Channel to @RabbitListener ... method

 @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { ... } 

and use the tag in basicAck , basicReject .

EDIT

 @SpringBootApplication @EnableRabbit public class So38728668Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args); context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo"); context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS); context.close(); } @Bean public Queue so38728668() { return new Queue("so38728668"); } @Bean public Listener listener() { return new Listener(); } public static class Listener { private final CountDownLatch latch = new CountDownLatch(1); @RabbitListener(queues = "so38728668") public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println(payload); channel.basicAck(tag, false); latch.countDown(); } } } 

application.properties:

 spring.rabbitmq.listener.acknowledge-mode=manual 
+16
source

Just in case, you need to use #onMessage () from the ChannelAwareMessageListener class. Then you can do it this way.

 @Component public class MyMessageListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) { log.info("Message received."); // do something with the message channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } 

}

And for the rabbit configuration

 @Configuration public class RabbitConfig { public static final String topicExchangeName = "exchange1"; public static final String queueName = "queue1"; public static final String routingKey = "queue1.route.#"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("xxxx"); connectionFactory.setPassword("xxxxxxxxxx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("vHost1"); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean Queue queue() { return new Queue(queueName, true); } @Bean TopicExchange exchange() { return new TopicExchange(topicExchangeName); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey); } @Bean public SimpleMessageListenerContainer listenerContainer(POCRabbitMessageListener pocRabbitMessageListener) { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory()); listenerContainer.setQueueNames(queueName); listenerContainer.setMessageListener(pocRabbitMessageListener); listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); listenerContainer.setConcurrency("4"); listenerContainer.setPrefetchCount(20); return listenerContainer; } 

}

+1
source

Thanks for helping Harry. I finally solved the problem. I am documenting this for others. This must be documented as part of the standard documentation on the Spring AMQP man page. The class of service is given below.

  @Service public class Consumer { @RabbitListener(queues = "${eventqueue}") public void receiveMessage(Order order, Channel channel) throws Exception { // the above methodname can be anything but should have channel as second signature channel.basicConsume(eventQueue, false, channel.getDefaultConsumer()); // Get the delivery tag long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag(); try { // code for processing order catch(Exception) { // handle exception channel.basicReject(deliveryTag, true); } // If all logic is successful channel.basicAck(deliveryTag, false); } 

the configuration has also been changed as shown below

 public class RabbitApplication implements RabbitListenerConfigurer { private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class); public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(jackson2Converter()); return factory; } @Autowired private Consumer consumer; @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } ... } 

Note: There is no need to set up a Rabbitconnectionfactory or containerfactor, etc., since the annotation implication takes care of all this.

-3
source

All Articles