I run a JMS test case and find that processing is sequential. When I fired 200 requests, the servlet that sends messages using the JMS and the receiver (messageListner) sends requests in sequence. How to receive parallel requests? Do we have any installation options? I read JMS tutorials and APIs that deliver messages in a single session in turn, even I create a new session for each send request, and 10 sessions at the receiving stage are still processed sequentially.
public class ProducerServlet extends javax.servlet.http.HttpServlet implements javax.servlet.Servlet { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; TestJMSListener jms = new TestJMSListener(); ConnectionFactory connectionFactory = null; Queue dest1 = null; Topic dest =null; Connection connection = null; MessageProducer producer = null; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(dest1); TextMessage message = session.createTextMessage(); message.setText("This is message from JMSSECOND DEMO " + request.getParameter("Num")); System.out.println("Sending message: " + message.getText()); producer.send(message); producer.send(session.createMessage()); } catch (Exception e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void init(ServletConfig arg0) throws ServletException { Context jndiContext = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); e.printStackTrace(); } } }
Listner implementation where, after receiving the message, I'm going to sleep (does something for a second).
public class TestJMSListener implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; public TestJMSListener() { System.out.println("********* Consumer check **********"); Context jndiContext = null; ConnectionFactory connectionFactory = null; Connection connection[] = null; Session session[] = null; Queue dest1 = null; Topic dest = null; MessageConsumer consumer[] = null; // TextMessage message = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); System.exit(1); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); System.exit(1); } connection = new Connection[10]; session = new Session[10]; consumer = new MessageConsumer[10]; for (int i = 0; i < 10; i++) { try { connection[i] = connectionFactory.createConnection(); session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); consumer[i] = session[i].createConsumer(dest); consumer[i].setMessageListener(this); connection[i].start(); } catch (JMSException e) { System.out.println("Exception occurred: " + e.toString()); } } } @Override public void onMessage(Message m) { if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; try { System.out.println("Reading message from Listener: " + new Date() + message.getText()); Thread.sleep(1000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
I am using Weblogic 11g with the default settings for ConnectionFactory and Queue. When I used Subject, it actually delivered only one message per second (i.e., after the completion of the first message), and for the queue it sent 2 to 3 messages per second. How to get my listener to support parallel processing.
Final decision
Added more listener objects that defined multiple sessions / consumers in the lists and solved this problem. Find the modified code below.
public class ProducerServlet extends javax.servlet.http.HttpServlet implements javax.servlet.Servlet { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; TestJMSListener listeners[] = new TestJMSListener[20]; ConnectionFactory connectionFactory = null; Queue dest1 = null; Topic dest =null; Connection connection = null; MessageProducer producer = null; protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(dest1); TextMessage message = session.createTextMessage(); message.setText("This is message from JMSSECOND DEMO " + request.getParameter("Num")); System.out.println("Sending message: " + message.getText()); producer.send(message); producer.send(session.createMessage()); } catch (Exception e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void init(ServletConfig arg0) throws ServletException { Context jndiContext = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); for(int i=0;i<listeners.length;i++ ){ listeners[i]=new TestJMSListener(Integer.toString(i+1)); } } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); e.printStackTrace(); } } } public class TestJMSListener implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JMS context factory. public final static String JMS_FACTORY = "jms/TestConnectionFactory"; // Defines the queue. public final static String QUEUE = "jms/TestJMSQueue"; public final static String TOPIC = "jms/TestTopic"; public String listnerNum = ""; public TestJMSListener(String listerNo) { super(); System.out.println("********* Consumer check **********"); listnerNum = listerNo; Context jndiContext = null; ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Queue dest1 = null; Topic dest = null; MessageConsumer consumer = null; // TextMessage message = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, "http://localhost:7001"); jndiContext = new InitialContext(env); } catch (NamingException e) { System.out.println("Could not create JNDI API context: " + e.toString()); System.exit(1); } try { connectionFactory = (ConnectionFactory) jndiContext .lookup(JMS_FACTORY); dest1 = (Queue) jndiContext.lookup(QUEUE); } catch (Exception e) { System.out.println("JNDI API lookup failed: " + e.toString()); System.exit(1); } try{ connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest1); consumer.setMessageListener(this); connection.start(); } catch (JMSException e) { System.out.println("Exception occurred: " + e.toString()); } } @Override public void onMessage(Message m) { if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; try { System.out.println("Reading message from Listener: "+listnerNum+ " : " + new Date() + message.getText()); Thread.sleep(1000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }