I wrote a Java application that sends messages to RabbitMQ. Flume then selects messages from the RabbitMQ queue. I'm curious that no one is pulling messages out of the queue except the tray.
My application uses the Spring AMQP Java plugin.
Problem:
With the code below, the message enters the RabbitMQ queue and remains "Unknowledges" forever. As I understand it, RabbitMQ is waiting for an ACK from a MessageListener, but a MessageListener will never be an ACK. Does anyone know how to fix this?
The code:
public class MyAmqpConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(activityLogsQueue()); container.setMessageListener(MyMessageListener()); container.setConcurrentConsumers(3); return container; } @Bean(name="myTemplate") public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(MyMessageConverter()); return template; } } public class MyMessageListener implements MessageListener { public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) { this.converter = converter; this.messageHandler = messageHandler; } @Override public void onMessage(Message message) { this.messageHandler.doThings(); } } public class MyMessageHandler { @Autowired @Qualifier("myTemplate") RabbitTemplate template; @Override public void handleMessage(MyObject thing) { template.convertAndSend(exchange, routingKey, thing); } } public class MyMessageConverter extends JsonMessageConverter { @Override protected Message createMessage(Object object, MessageProperties messageProperties) {
sunny source share