Consumer does not receive messages from ActiveMQ

We are facing a random issue with ActiveMQ and its consumers. We observe that only a few consumers do not receive messages, although they are connected to the ActiveMQ queue. But it works fine after rebooting the consumer.

We have a queue called testQueue from ActiveMQ. The consumer is trying to disconnect messages from this queue. For this purpose we use Spring DefaultMessageListenerContainer. The message is delivered to the consumer node from ActiveMQ Broker. From tcpdump, it was also clear that the message reaches the consumer node, but the actual consumer code cannot see the message. In other words, the message seems to be stuck in either ActiveMQ consumer code or Spring's DefaultMessageListenerContainer.

See picture below. for more clarity on this. The message reaches the consumer node, but it does not reach the "Actual consumer class", which means that the message is stuck in either the consumer AMQ code or the Spring DMLC.

enter image description here

The following are data received from the ActiveMQ administrator.

Queue-name / pending-message-counter / consumer-count / messages-in-line / messages-dropped testQueue / 9/1/9/0

See below for more details.

Connection-ID / SessionId / Selector / Enqueues / Dequeues / Dispatched / Dispatched-Queue / Prefetch ID: bearsvir52-45176-1375519181268-3: 5/1 // 9/0/9/9/250

From the second table it is obvious that the messages are delivered to the consumer, but the consumer does not confirm the message. Consequently, messages are stuck in the Sent Queue on the broker's side.

A few points for your notice:

1) Time difference b / w Broker node and consumer node.

2) Watched tcpdump from the consumer. We can see that the MessageDispatch (Openwire) package is being passed to the node consumer, but could not find MessageAck (Openwire) for it.

3) Sometimes it works with node, and sometimes it creates a problem on the same node.

+7
spring-jms activemq
source share
2 answers

It took a long time to figure out the solution. There seems to be some problem with the org.apache.activemq.ActiveMQConnection.java class in case AMQ fails. In such cases, the connection object does not start on the consumer side.

Below is the fix that I added to the ActiveMQConnection.java file and compiled the sources to create activeemq-core-xxxjar

private final Object startMutex = new Object(); 

added check of createSession method

 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { synchronized (startMutex) { if(!isStarted()) { start(); } } 
+2
source share

One of the reasons for this may be incorrect use of the CachingConnectionFactory (with cached consumers) with a listener container that dynamically configures consumers (maximum consumers> consumers). You may end up with a cached consumer just sitting in the pool and not actively using it. You never need to cache users with a listener container.

For such tasks, I usually recommend working with TRACE logging, and you can see all user activity.

+1
source share

All Articles