ActiveMQ Internet receiver / producer

I can’t find a way to listen for new producer and consumer connections (or connection terminations) in ActiveMQ (Java version). I want to be able to tell consumers (or they can find out for themselves) that the manufacturer’s connection has stopped. Another way (a producer finding out that a particular consumer is disconnected) is also required.

I would be grateful for the help.

+4
source share
2 answers

I think you want to listen to new producers and consumers at a specific destination (a specific lineup or topic). Is it correct?

You can instantiate ConsumerEventSource and ProducerEventSource and register your own listeners by calling their setConsumerListener and setProducerListener, respectively.

So:

Connection conn = yourconnection; // the connection your listener will use Destination dest = yourdestination; // the destination you're paying attention to ConsumerEventSource source = new ConsumerEventSource(conn, dest); source.setConsumerListener(new ConsumerListener() { public void onConsumerEvent(ConsumerEvent event) { if (event.isStarted()) { System.out.println("a new consumer has started - " + event.getConsumerId()); } else { System.out.println("a consumer has dropped - " + event.getConsumerId()); } } }); 

If you look at the code for ConsumerEventSource or ProducerEventSource, you will see that these are simple objects that use the AdvisorySupport methods to listen on a special advisory topic, the purpose of which is to transmit news about manufacturers and consumers. You can learn more by reading the source code for these classes.

Your use of the “connection” is potentially a problem; in an ActiveMQ zone (which is a subset of the JMS land), a “Connection” is a lower level entity that is not associated with a specific destination. A particular client creates a “session” from the connection — still not specific to the destination — and then creates the target QueueSender, QueueReceiver, TopicPublisher, or TopicSubscriber. When they are created or when the sessions ripening them die, these are the events that you want to hear about, and find out if you use the code above.

+4
source

All the necessary information is published in ActiveMQ consulting questions, such as "ActiveMQ.Advisory.Connection" or simply "ActiveMQ.Advisory ..>" for all of them. Even events that occur in the Stomp connection are published in ActiveMQ Advisory sections. The following example gives an example of this (verified using the Flex Client connected via Stomp):

 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(transacted, ackMode); connection.start(); Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>"); MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory); consumerAdvisory.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof ActiveMQMessage) { ActiveMQMessage activeMessage = (ActiveMQMessage) message; Object command = activeMessage.getDataStructure(); if (command instanceof ConsumerInfo) { System.out.println("A consumer subscribed to a topic or queue: " + command); } else if (command instanceof RemoveInfo) { RemoveInfo removeInfo = (RemoveInfo) command; if (removeInfo.isConsumerRemove()) { System.out.println("A consumer unsubscribed from a topic or queue"); } else { System.out.println("RemoveInfo, a connection was closed: " + command); } } else if (command instanceof ConnectionInfo) { System.out.println("ConnectionInfo, a new connection was made: " + command); } else { System.out.println("Unknown command: " + command); } } } }); 
+2
source

All Articles