JMS Message Listener Weblog Concurrency

I run a JMS test case and find that processing is sequential. When I fired 200 requests, the servlet that sends messages using the JMS and the receiver (messageListner) sends requests in sequence. How to receive parallel requests? Do we have any installation options? I read JMS tutorials and APIs that deliver messages in a single session in turn, even I create a new session for each send request, and 10 sessions at the receiving stage are still processed sequentially.

public class ProducerServlet extends javax.servlet.http.HttpServlet implements javax.servlet.Servlet { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; TestJMSListener jms = new TestJMSListener(); ConnectionFactory connectionFactory = null; Queue dest1 = null; Topic dest =null; Connection connection = null; MessageProducer producer = null; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(dest1); TextMessage message = session.createTextMessage(); message.setText("This is message from JMSSECOND DEMO " + request.getParameter("Num")); System.out.println("Sending message: " + message.getText()); producer.send(message); producer.send(session.createMessage()); } catch (Exception e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void init(ServletConfig arg0) throws ServletException { Context jndiContext = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); e.printStackTrace(); } } } 

Listner implementation where, after receiving the message, I'm going to sleep (does something for a second).

 public class TestJMSListener implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; public TestJMSListener() { System.out.println("********* Consumer check **********"); Context jndiContext = null; ConnectionFactory connectionFactory = null; Connection connection[] = null; Session session[] = null; Queue dest1 = null; Topic dest = null; MessageConsumer consumer[] = null; // TextMessage message = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); System.exit(1); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); System.exit(1); } connection = new Connection[10]; session = new Session[10]; consumer = new MessageConsumer[10]; for (int i = 0; i < 10; i++) { try { connection[i] = connectionFactory.createConnection(); session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); consumer[i] = session[i].createConsumer(dest); consumer[i].setMessageListener(this); connection[i].start(); } catch (JMSException e) { System.out.println("Exception occurred: " + e.toString()); } } } @Override public void onMessage(Message m) { if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; try { System.out.println("Reading message from Listener: " + new Date() + message.getText()); Thread.sleep(1000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 

I am using Weblogic 11g with the default settings for ConnectionFactory and Queue. When I used Subject, it actually delivered only one message per second (i.e., after the completion of the first message), and for the queue it sent 2 to 3 messages per second. How to get my listener to support parallel processing.

Final decision

Added more listener objects that defined multiple sessions / consumers in the lists and solved this problem. Find the modified code below.

 public class ProducerServlet extends javax.servlet.http.HttpServlet implements javax.servlet.Servlet { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; TestJMSListener listeners[] = new TestJMSListener[20]; ConnectionFactory connectionFactory = null; Queue dest1 = null; Topic dest =null; Connection connection = null; MessageProducer producer = null; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(dest1); TextMessage message = session.createTextMessage(); message.setText("This is message from JMSSECOND DEMO " + request.getParameter("Num")); System.out.println("Sending message: " + message.getText()); producer.send(message); producer.send(session.createMessage()); } catch (Exception e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void init(ServletConfig arg0) throws ServletException { Context jndiContext = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); for(int i=0;i<listeners.length;i++ ){ listeners[i]=new TestJMSListener(Integer.toString(i+1)); } } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); e.printStackTrace(); } } } public class TestJMSListener implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; public String listnerNum = ""; public TestJMSListener(String listerNo) { super(); System.out.println("********* Consumer check **********"); listnerNum = listerNo; Context jndiContext = null; ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Queue dest1 = null; Topic dest = null; MessageConsumer consumer = null; // TextMessage message = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); System.exit(1); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); System.exit(1); } try{ connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest1); consumer.setMessageListener(this); connection.start(); } catch (JMSException e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void onMessage(Message m) { if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; try { System.out.println("Reading message from Listener: "+listnerNum+ " : " + new Date() + message.getText()); Thread.sleep(1000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 
+7
java concurrency jms
source share
3 answers

In your code, you only have one Listener instance (created when you created the Servlet instance), so you will receive messages only sequentially, regardless of how many sender sessions you have. This is just the turn.
If you want to receive at the same time, then you may need several listeners, and only one message will be delivered to any of the listeners.
If you want to process messages at the same time, as soon as it is delivered sequentially, create a thread pool and process the process in a separate thread and return to listening mode.
Note ** in this mode, you cannot normally control the Ack mode, since you are not executing the Message process.

+3
source share

I looked at your solution and realized that you have several parallel sessions, users, etc., but one queue to handle all this. The queue, well, the queue, everything goes in line through the pipe in sequence. If you have only one of them, then you have one thread of execution at this point, and everything goes sequentially, because the queue does not allow simultaneous actions.

If you implement multiple queues in different threads, your machine can handle multiple calls at once. Multiple queues may involve using different queue names, etc., but for this problem you can use a load balancing solution like Apache Camel to actually make the queue a choice for you. At least this closed post makes me realize that such a combination of queues and threads is possible.

Then balancing selects a separate queue for each request, and each queue does its own sequential work to process the request. Then the number of concurrent sessions is a configuration issue.

+3
source share

I'm not an expert at WebLogic JMS, but your code looks fine (with the exception of the few connections you create, which is optional, just a few sessions are enough). I even think that several compounds can have a negative effect, since they consume threads), it should consume at the same time. Since you say that with the queue you receive 2-3 messages per second, you actually receive them at the same time (since each listener sleeps a second).

Since you say you get 2-3 per second, I think you really get 4-6, because every second message is not printed (because it is not TextMessage), since the producer sends TextMessage ('producer .send (message) 'and empty' manufacturer.send (session.createMessage ()) '). Other than that, I would check the server configuration for ConnectionFactpry. I remember that you need to configure WorkManager streams for MDB, but, unfortunately, are not sure about the "manual" JMS clients.

So, I will work as follows:

  • remove double submission ('producer .send (session.createMessage ())')
  • Use one connection (but keep a new session) for the manufacturer (instead of re-creating each request)
  • Use one connection on the client (but several sessions)
  • Close the producer session correctly (may consume threads)
  • Make sure there are enough posts.
  • Check Administrative Restrictions

Hope this helps.

Hi,

Messi

0
source share

All Articles