Using Spring AMQP, when I use header exchange, all messages are sent to the queue regardless of the contents of the header. To be more specific, I declare in xml that I need messages with "betty rubble" in the headers, but the messages still don't match. What am I doing wrong? I would like to be able to do this programmatically, so I also tried using the BindingBuilder class, but had no joy. There seems to be no examples of header exchanges with Spring amqp or examples with BindingBuilder.
I have successfully routed / filtered these headers with java and a simple rabbit api, but Spring offers some elegant packaging that I would like to use.
I am using Spring Core 3.2.2.RELEASE, Spring AMQP 1.1.4.RELEASE, RabbitMq 3.0.4 and not using Spring Integration. Here is my code in the form of a test (it may one day confirm something :-)
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext-test-rabbit.xml" }) public class SpringRabbitTest { @Autowired private AmqpTemplate amqpTemplate; @Autowired private HeadersExchange headersExchange; @Autowired private MessageConverter converter; @Autowired @Qualifier("mymessage.consumer") private Queue consumerQ; @Test public void headersTest() throws InterruptedException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("fred", "flintstone"); messageProperties.setHeader("wilma", "flintstone"); messageProperties.setHeader("barney", "rubble"); MyMessage myMessage = new MyMessage("just an example"); Message message = converter.toMessage(myMessage, messageProperties); amqpTemplate.convertAndSend(headersExchange.getName(),"", message); Thread.sleep(5000); } }
app Ctx
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd" > <context:component-scan base-package="uk.co.abc" /> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" virtual-host="/" port="5672" /> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"/> <rabbit:template connection-factory="connectionFactory" id="amqpTemplate" message-converter="jsonMessageConverter"/> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue id="mymessage.consumer" name="mymessage.consumer"/> <rabbit:headers-exchange name="headers.mymessage.all" id="headers.mymessage.all" > <rabbit:bindings > <rabbit:binding queue="mymessage.consumer" key="betty" value="rubble" /> </rabbit:bindings> </rabbit:headers-exchange> <rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" error-handler="loggingErrorHandler"> <rabbit:listener queues="mymessage.consumer" ref="myMessageHandler" method="handleMyMessage" /> </rabbit:listener-container> </beans>
post bean
package uk.co.abc; public class MyMessage { private String message; public MyMessage() { } public MyMessage(String message) { this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "MyMessage [message=" + message + "]"; } }
Handler
package uk.co.abc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @Component @Qualifier("myMessageHandler") public class MyMessageHandler { public void handleMyMessage(MyMessage myMessage) { System.out.println("Got it! " + myMessage); } }
and for completeness, pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <properties> <spring.amqp.version>1.1.4.RELEASE</spring.amqp.version> <rabbitmq.version>3.0.4</rabbitmq.version> <spring.version>3.2.2.RELEASE</spring.version> <log4j.version>1.2.16</log4j.version> <slf4j.version>1.5.2</slf4j.version> </properties> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>${spring.amqp.version}</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring.amqp.version}</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-erlang</artifactId> <version>${spring.amqp.version}</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.12</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-lgpl</artifactId> <version>1.9.12</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <optional>true</optional> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies> </project>
The debug log implies that the exchange does not have a set of headers.
DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp:// guest@127.0.0.1 :5672/,1) DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp:// guest@127.0.0.1 :5672/,1) DEBUG [org.springframework.amqp.rabbit.core.RabbitAdmin] - declaring Exchange 'headers.mymessage.all' DEBUG [org.springframework.amqp.rabbit.core.RabbitAdmin] - declaring Queue 'mymessage.consumer' DEBUG [org.springframework.amqp.rabbit.core.RabbitAdmin] - Binding destination [mymessage.consumer (QUEUE)] to exchange [headers.mymessage.all] with routing key [] DEBUG [org.springframework.amqp.rabbit.core.RabbitAdmin] - Declarations finished DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Started on queue 'mymessage.consumer': Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp:// guest@127.0.0.1 :5672/,1), acknowledgeMode=AUTO local queue size=0 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tag=[null], channel=Cached Rabbit Channel: AMQChannel(amqp:// guest@127.0.0.1 :5672/,1), acknowledgeMode=AUTO local queue size=0 DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp:// guest@127.0.0.1 :5672/,2) DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp:// guest@127.0.0.1 :5672/,2) DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [headers.mymessage.all], routingKey = [] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tag=[amq.ctag-JS6zwiLjp6cGSNbieRTlvw], channel=Cached Rabbit Channel: AMQChannel(amqp:// guest@127.0.0.1 :5672/,1), acknowledgeMode=AUTO local queue size=0 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'{"message":"just an example"}'; ID:null; Content:application/json; Headers:{wilma=flintstone, fred=flintstone, __TypeId__=uk.co.abc.MyMessage, barney=rubble}; Exchange:headers.mymessage.all; RoutingKey:; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1) Got it! MyMessage [message=just an example]