How to use Ack or Nack in Spring AMQP

I am new to Spring AMQP. I have an application that is a manufacturer sending messages to another application that is a consumer.

As soon as the consumer receives the message, we will check the data.

If the data is correct, we need an ACK, and the message must be removed from the queue. If the data is incorrect, we need NACK (Negative Acknowledge) data so that it is reordered in RabbitMQ .

I stumbled upon

**factory.setDefaultRequeueRejected(false);** (it will not request a message at all)

**factory.setDefaultRequeueRejected(true);** (It will request a message when an exception occurs)

But in my case, I will confirm the verification based message. Then he must delete the message. If NACK then requests a message.

I read on the RabbitMQ website

The AMQP specification defines the basic.reject method, which allows customers to reject individual, delivered messages, instructing the broker to either drop them or request them

How to achieve the above scenario? Please provide some examples.

I tried a small program

  logger.info("Job Queue Handler::::::::::" + new Date()); try { }catch(Exception e){ logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::"); } factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{ return cause instanceof XMLException; })); 

The message does not reload for another factory.setDefaultRequeueRejected exception (true)

09: 46: 38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException : no processes deployed with the key 'WF89012'

09: 46: 39,102 INFO [Com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1), obtained from the error queue: {ERROR = Could not commit JPA transactions; javax.persistence.RollbackException nested exception : transaction marked as rollbackOnly }

+9
source share
1 answer

See the documentation .

By default (with defaultRequeueRejected=true ), the container will acknowledge the message (causing it to be deleted) if the listener completes normally, or reject (and demand) it if the listener throws an exception.

If the listener (or error handler) throws an AmqpRejectAndDontRequeueException , the default behavior is overridden and the message is discarded (or sent to DLX / DLQ, if configured) - the container calls basicReject(false) instead of basicReject(true) .

So, if your check is not an AmqpRejectAndDontRequeueException , AmqpRejectAndDontRequeueException . Or configure the listener using a special error handler to convert the exception into an AmqpRejectAndDontRequeueException .

This is described in this answer .

If you really want to take charge of the hack, set the confirmation mode to MANUAL and use the ChannelAwareMessageListener or this method if you use @RabbitListener .

But most people just let the container take care of things (as soon as they understand what is going on). Typically, the use of manual confirmations is intended for special use cases, such as pending confirmation or early confirmation.

EDIT

The answer I pointed out to you had a mistake (now fixed); You should look at the cause of ListenerExecutionFailedException . I just checked this and it works as expected ...

 @SpringBootApplication public class So39530787Application { private static final String QUEUE = "So39530787"; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args); RabbitTemplate template = context.getBean(RabbitTemplate.class); template.convertAndSend(QUEUE, "foo"); template.convertAndSend(QUEUE, "bar"); template.convertAndSend(QUEUE, "baz"); So39530787Application bean = context.getBean(So39530787Application.class); bean.latch.await(10, TimeUnit.SECONDS); System.out.println("Expect 1 foo:" + bean.fooCount); System.out.println("Expect 3 bar:" + bean.barCount); System.out.println("Expect 1 baz:" + bean.bazCount); context.close(); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setErrorHandler(new ConditionalRejectingErrorHandler( t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException)); return factory; } @Bean public Queue queue() { return new Queue(QUEUE, false, false, true); } private int fooCount; private int barCount; private int bazCount; private final CountDownLatch latch = new CountDownLatch(5); @RabbitListener(queues = QUEUE) public void handle(String in) throws Exception { System.out.println(in); latch.countDown(); if ("foo".equals(in) && ++this.fooCount < 3) { throw new FooException(); } else if ("bar".equals(in) && ++this.barCount < 3) { throw new BarException(); } else if ("baz".equals(in)) { this.bazCount++; } } @SuppressWarnings("serial") public static class FooException extends Exception { } @SuppressWarnings("serial") public static class BarException extends Exception { } } 

Result:

 Expect 1 foo:1 Expect 3 bar:3 Expect 1 baz:1 
+10
source

All Articles