Implement your own websphere MQ with CoD on top of the Camel JMS component

I had a lot of difficulties implementing the WebSphere MQ (WMQ) connector with Apache CAMEL, which could process MQ Delivery Confirmation (CoD) reports without exception, nor the side effects of unwanted response datagrams. In the end, I started working the way I wanted in the standard and usual way, if you are used to writing your own MQ clients. I document the method in a post on the same topic, but I find the solution hyped with subtleties and really appreciate any advice or examples to make the implementation cleaner and more elegant.

I realized that the problem is rooted in the way MQ designed the message-reply-reply (MEP) messaging template compared to the JMS specification, compared to the CAMEL implementation of the MEP request-response message in its JMS component. Three different philosophies!

  • WMQ has a MessageType header (see MQMD fields and constants ), which has a value of 1 for the request, 2 for the response and 8 for the datagram (one-way MEP). In addition, a value of 4 is used to mark report messages in the form of CoD (Conf of of Delivery), PAN (Positive AckNowledge) and NAN (Negative AckNowledge), which - during the message flow - also add an additional message response. CoD, PAN, and NAN codes can be requested for Requests, Responses, or Datagram messages using another field in the Report header, in which flags for all report options can be combined. Additional header fields "ReplyToQ" and "ReplyToQMgr" specify the queue and queue manager for which reports and replies are expected from the original sender, and the fixed 24 byte field CorrelId - optional - can help correlate reports and responses with the original datagram or Request message. To make it more complex, you can actually send replies and reports with the same original message identifier and without CorrelID, or provide the original message identifier in CorrelId or return the CorrelId value if it is already specified in the original request or datagram. IBM provides the JMS API via WMQ , allowing you to either tunnel regular JMS exchanges through WMQ as a transport (using the optional MQRFH2 message header ) or map your own MQ messages to JMS messages and vice versa.
  • JMS specifications, on the other hand, provide the optional header field “JMSReplyTo” and “JMSCorrelationID”, but leave exact MEP semantics for client applications; namely, specifying in the specifications: "The answer can be optional, and it depends on the client."
  • CAMEL uses “routes” in XML or Java DSL and an internal Exchange object model to support EIP patterns , including Request-Reply . CAMEL then accepts in its JMS Component that if the JMSReplyTo field is set, it is a request waiting for a response, resulting in an Out (or changed Partly, if Out is empty) Exchange, which should be returned to the queue defined in JMSReplyTo.

I was ready to support my own MQ messaging with delivery confirmation reports (CoDs) through the Websphere remote queue manager, so that in addition to transactions and persistence (i.e. no loss, no duplicates), you can also track when a message is being consumed and will raise alerts in case of delays.

Incoming problem:

By default, Websphere Queue Manager generates CoD reports when the consumption of messages from the queue ends. Therefore, without any specific settings, the remote MQ client sending the datagram with the CoD flag (and then forced ReplyToQ) will receive the first response in the form of an MQ report from the queue manager when the CAMEL endpoint consumes a message, followed by a second (unexpected) response message explicitly returned by CAMEL and containing everything that remained in the Exchange object at the end of the CAMEL route, since CAMEL accepts an EIP response request, given that the JMSReplyTo field is present (displayed from MQ ReplyToQ and ReplyToQMgr, both of which are necessary to support the opposite CoD stream).

Outgoing problems:

Without specific settings, CAMEL also accepts a default EIP / MEP request-response in an outbound connection. The endpoint of CAMEL JMS / MQ will wait for 1 response back. When the OUTbound message is a JMS on top of MQ (hence with the MQRFH2 header), this works fine. When forcing plain vanilla MQ, i.e. Removing the MQRFH2 header, as shown below, I was not able to get the endpoint listener to match the correlated incoming MQ report, although the monitored values ​​seem correct (forcing 24 char correlation identifiers to truncate longer CorrelId values ​​or zero MQ padding cannot geoprocess the correlation filter) . Has anyone been able to solve this problem?

Details: Although the IBM JMS API accepts the transfer of certain JMS property values ​​WMQ_MESSAGE_BODY = {1 | 0} / WMQ_TARGET_CLIENT = {1 | 0} to control the presence of the JMS MQRFH2 header in the generated messages, these parameters become inoperative via CAMEL. You must use the CamelJmsDestinationName header ( as described in the CAMEL JMS document ) to provide the IBM queue URL for the destination that specifies the "targetClient = 1" parameter to get rid of the MQRFH2 header. But without this header, the CAMEL correlation in the CoD report or MQ response does not work.

The solution to the aforementioned problem is really connected with the construction of a specific CAMEL route for processing CoD reports returned by the remote side (as well as correlated MQ responses). Therefore, outgoing CAMEL messages must run as "InOnly" ExchangePattern and therefore do not wait for a response. But this will cause CAMEL to suppress too many ReplyTo fields. Then, if MQ CoD is requested on the outgoing MQ datagram, a CAMEL exception is thrown, which is caused by the MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q') .

CAMEL documents the URI parameter "disableReplyTo = true" to disable listening for responses on exchange templates, but keep the ReplyTo fields visible, both for incoming and outgoing exchanges. But this option does not work (as you can see, I may be wrong) on ​​outgoing JMS exchanges, and instead, you need to use a much less intuitive version of preserveMessageQos.

Elegant solutions to these problems are welcome.

+7
apache-camel jms ibm-mq
source share
1 answer

The best I can get is described below, illustrated as the context of the Spring XML application, which itself contains the CAMEL context and routes. It works with its own MQ JCA resource adapter for IBM v7.5, CAMEL 2.15, Spring core 4.2. I can deploy it on both Glassfish and Weblogic servers.

Of course, DSL is used in a real implementation taking into account numerous variables. This example, based on the CAMEL XML DSL, is self-contained and easy to test.

Let's start with Spring and Camel declarations:

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context" xmlns:camel="http://camel.apache.org/schema/spring" xmlns:jee="http://www.springframework.org/schema/jee" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd"> 

The CAMEL context follows with two routes: MQ - JMS and JMS - MQ, here is a chain to create a bridge to facilitate testing.

 <camel:camelContext id="mqBridgeCtxt"> <camel:route id="mq2jms" autoStartup="true"> 

Strange: when using your own MQ resource adapter, the only way to get (for example) 3 listeners is to use 3 connections (with 3 camels: from the operators in the sequence) with a maximum of 1 session each, otherwise error MQ: MQJCA1018: Only one session per connection is allowed . However, if you use MQ client banks instead, the concurentConsumers option in CAMEL JMS works fine.

  <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp; acknowledgementModeName=SESSION_TRANSACTED"/> <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp; acknowledgementModeName=SESSION_TRANSACTED"/> <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp; acknowledgementModeName=SESSION_TRANSACTED"/> 

The disable parameter disableReplyTo above ensures that CAMEL does not respond before we can check the MQ message type to 1 = Request (-reply) or 8 = datagram (one way!). This test and answer construct is not illustrated here.

Then we apply EIP to InOnly the next time we host it on a simple JMS so that it matches the Inbound MQ mode.

  <camel:setExchangePattern pattern="InOnly"/> <!-- camel:process ref="reference to your MQ message processing bean fits here" / --> <camel:to uri="ref:innerQueue" /> </camel:route> 

Next is the jms-to-MQ route:

  <camel:route id="jms2mq" autoStartup="true"> <camel:from uri="ref:innerQueue" /> <!-- remove inner message headers and properties to test without inbound side effects! --> <camel:removeHeaders pattern="*"/> <camel:removeProperties pattern="*" /> <!-- camel:process ref="reference to your MQ message preparation bean fits here" / --> 

A request flag now appears for the MQ CoD report, which should be sent by the remote recipient. We also apply the MQ message to the datagram type (value 8).

  <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader> <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader> <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader> 

The ReplyTo queue can be specified either through the ReplyTo uri parameter, or as a header, as shown below.

Next, we use the CamelJmsDestinationName header to force the suppression of the JQ MQ MQRFH2 JQ message header (using the targetClient MQ URL parameter value). In other words, we want to send a normal binary message with vanilla MQ (i.e., only the MQMD message descriptor followed by the payload).

  <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader> <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader> 

Additional MQMD fields can be controlled using the reserved JMS properties , as shown below. See limitations in the IBM documentation.

  <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR </camel:constant></camel:setHeader> <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader> 

The target queue in the URI is overwritten by CamelJmsDestinationName above, so the queue name in the URI becomes a placeholder.

The preserveMessageQos URI parameter is one that has been observed to send a message with the ReplyTo settings set (to get an MQ CoD report), but it does not allow CAMEL to instantiate a message listener using InOnly MEP.

  <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp; exchangePattern=InOnly&amp;preserveMessageQos=true&amp; includeSentJMSMessageID=true" /> </camel:route> </camel:camelContext> 

In your context, you need to adjust the following. It provides queue factories for both its own JMS provider and Websphere MQ (through its own IBM WMQ JCA Resource Adapter). We use JNDI requests for administrative objects here.

 <camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue"> </camel:endpoint> <jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/> <jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/> <bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="jmsraQCFBean" /> </bean> <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="mqQCFBean" /> </bean> </beans> 

Alternatively, if you use MQ client banners instead of a resource adapter, you will declare the factory beans connection as (instead of JNDI requests, as mentioned above):

 <bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory"> <property name="hostName" value="${mqHost}"/> <property name="port" value="${mqPort}"/> <property name="queueManager" value="${mqQueueManager}"/> <property name="channel" value="${mqChannel}"/> <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries --> <property name="appName" value="${connectionName}"/> </bean> <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="mqCFBean"/> <property name="transacted" value="true"/> <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/> </bean> 

Comments and improvements are welcome.

+5
source share

All Articles