How to write an integration test for @RabbitListener annotation?

My question is really the next question for

RabbitMQ Integration Test and Threading

It says to wrap "your listeners" and pass it to CountDownLatch, and in the end, all threads will merge. This answer works if we manually create and insert a message listener, but for @RabbitListener annotations ... I'm not sure how to get into CountDownLatch. The frame automatically creates a message listener behind the scenes.

Are there any other approaches?

+3
spring-amqp spring-rabbit
source share
2 answers

With @Gary Russell, I was able to get an answer and used the following solution.

Conclusion: I have to admit that I am indifferent to this decision (it seems like a hack), but this is the only thing I could get, and as soon as you go to the initial time setting and actually understand the “workflow”, this not so painful. It basically boils down to defining (2) @ Beans and adding them to the Test Integration configuration.

An approximate solution is published below with explanations. Feel free to suggest improvements to this solution.

1. Define ProxyListenerBPP that spring initialization will listen for a specific clazz (that is, our test class containing @RabbitListener) and add our special request CountDownLatchListenerInterceptor, defined in the next step.

import org.aopalliance.aop.Advice; import org.springframework.aop.framework.ProxyFactoryBean; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; /** * Implements BeanPostProcessor bean... during spring initialization we will * listen for a specified clazz * (ie our @RabbitListener annotated class) and * inject our custom CountDownLatchListenerInterceptor advice * @author sjacobs * */ public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered{ private BeanFactory beanFactory; private Class<?> clazz; public static final String ADVICE_BEAN_NAME = "wasCalled"; public ProxyListenerBPP(Class<?> clazz) { this.clazz = clazz; } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (clazz.isAssignableFrom(bean.getClass())) { ProxyFactoryBean pfb = new ProxyFactoryBean(); pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed) pfb.setTarget(bean); pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class)); return pfb.getObject(); } else { return bean; } } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor } 

2. Create a MethodInterceptor recommendation that will reference the CountDownLatch. CountDownLatch must be referenced both in the integration test stream and inside the async workflow in @RabbitListener. This way, we will be able to return to the integration test thread later , as soon as the asynchronous @RabbitListener stream has completed execution. No survey needed.

import java.util.concurrent.CountDownLatch;

 import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; /** * AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes * CountDownLatch.countDown() after the method has completed execution. The motivation behind this * is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge * the Integration Test thread after an Async 'worker' thread completed its task. * @author sjacobs * */ public class CountDownLatchListenerInterceptor implements MethodInterceptor { private CountDownLatch countDownLatch = new CountDownLatch(1); private final String methodNameToInvokeCDL ; public CountDownLatchListenerInterceptor(String methodName) { this.methodNameToInvokeCDL = methodName; } @Override public Object invoke(MethodInvocation invocation) throws Throwable { String methodName = invocation.getMethod().getName(); if (this.methodNameToInvokeCDL.equals(methodName) ) { //invoke async work Object result = invocation.proceed(); //returns us back to the 'awaiting' thread inside the integration test this.countDownLatch.countDown(); //"reset" CountDownLatch for next @Test (if testing for more async worker) this.countDownLatch = new CountDownLatch(1); return result; } else return invocation.proceed(); } public CountDownLatch getCountDownLatch() { return countDownLatch; } } 

3. Next add to your integration test Configure the following @ Bean (s)

 public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig { // pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor @Bean public static ProxyListenerBPP listenerProxier() { // note static return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class); } // pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class // IE the method name annotated with @RabbitListener or @RabbitHandler // in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class @Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME) public static Advice wasCalled() { String methodName = "listen"; return new CountDownLatchListenerInterceptor( methodName ); } // this is the @RabbitListener bean we are testing @Bean public SomeClassThatHasRabbitListenerAnnotations rabbitListener() { return new SomeClassThatHasRabbitListenerAnnotations(); } } 

4. Finally, in the integration call @Test ... after sending the message via rabbitTemplate to start the asynchronous stream ... the method CountDownLatch # wait (...) is now called from the interceptor and do not forget to pass the TimeUnit arguments so that it can time-out out in the case of a long process or something went wrong. As soon as the asynchronous flow of the integration test is notified (awakened), and now we can finally begin to actually check / verify / verify the results of the async operation.

 @ContextConfiguration(classes={ SomeClassThatHasRabbitListenerAnnotationsITConfig.class } ) public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest{ @Inject private CountDownLatchListenerInterceptor interceptor; @Inject private RabbitTemplate rabbitTemplate; @Test public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception { MyObject payload = new MyObject(); rabbitTemplate.convertAndSend("some.defined.work.queue", payload); CountDownLatch cdl = interceptor.getCountDownLatch(); // wait for async thread to finish cdl.await(10, TimeUnit.SECONDS); // IMPORTANT: set timeout args. //Begin the actual testing of the results of the async work // check the database? // download a msg from another queue? // verify email was sent... // etc... } 
+2
source share

This is a bit more complicated with @RabbitListener , but the easiest way is to tell the listener.

With the factory custom destination container, just add factory advice to the test case.

The tip would be MethodInterceptor ; the call will have 2 arguments; channel and (unreacted) Message . Counseling should be introduced before the container (s) are created.

Alternatively, access the container using the registry and add advice later (but you need to call initialize() to force the new advice to apply).

An alternative would be to simply BeanPostProcessor proxy your listener class before it is injected into the container. So you will see the argumen (s) method after any conversion; you can also check for any result returned by the listener (for request / response scripts).

If you are not familiar with these methods, I can try to find some time to deploy a quick example for you.

EDIT

I issued a migration request to add an example to EnableRabbitIntegrationTests . This adds a bean listener with two annotated listening methods, BeanPostProcessor , which proxies the bean listener before it is injected into the listener container. An Advice added to the proxy server, which counts delays when receiving expected messages.

0
source share

All Articles