I am experimenting with Spring Reactor 3 and Spring Integration components to create a reactive stream (Flux) from a JMS queue.
I am trying to create a reactive thread (Spring Reactor 3 Flux) from a JMS queue (ActiveMQ using Spring Integration) for clients to receive JMS messages asynchronously. I believe that everything is connected correctly, but the client does not receive any JMS messages until the server stops. Then all messages are received "once" to the client once.
Any help would be appreciated.
Here is the configuration file that I use to configure JMS, integration components, and reactive publisher:
@Configuration @EnableJms @EnableIntegration public class JmsConfiguration { @Value("${spring.activemq.broker-url:tcp://localhost:61616}") private String defaultBrokerUrl; @Value("${queues.patient:patient}") private String patientQueue; @Autowired MessageListenerAdapter messageListenerAdapter; @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, jmsConnectionFactory()); return factory; } @Bean public Queue patientQueue() { return new ActiveMQQueue(patientQueue); } @Bean public ActiveMQConnectionFactory jmsConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(defaultBrokerUrl); connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero")); return connectionFactory; } // Set the jackson message converter @Bean public JmsTemplate jmsTemplate() { JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(jmsConnectionFactory()); template.setDefaultDestinationName(patientQueue); template.setMessageConverter(jacksonJmsMessageConverter()); return template; } @Bean public MessageListenerAdapter messageListenerAdapter() { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(); messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter()); return messageListenerAdapter; } @Bean public AbstractMessageListenerContainer messageListenerContainer() { DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter()); defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory()); defaultMessageListenerContainer.setDestinationName(patientQueue); defaultMessageListenerContainer.setMessageListener(messageListenerAdapter()); defaultMessageListenerContainer.setCacheLevel(100); defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() { @Override public void handleError(Throwable t) { t.printStackTrace(); } }); return defaultMessageListenerContainer; } @Bean // Serialize message content to json using TextMessage public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } @Bean public MessageChannel jmsOutboundInboundReplyChannel() { return MessageChannels.queue().get(); } @Bean public Publisher<Message<String>> pollableReactiveFlow() { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get()) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @Bean public MessageChannel jmsChannel() { return new DirectChannel(); }
The controller creating the Flux:
@RestController @RequestMapping("patients") public class PatientChangePushController { private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now(); private int durationInSeconds = 30; private Patient patient; AtomicReference<SignalType> checkFinally = new AtomicReference<>(); @Autowired PatientService patientService; @Autowired @Qualifier("pollableReactiveFlow") private Publisher<Message<String>> pollableReactiveFlow; @Autowired private JmsTemplate jmsTemplate; @Autowired private Queue patientQueue; @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) { Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow); return messageFlux; } @GetMapping(value = "/generate") public void generateJmsMessage() { for (long i = 0L; i < 100; i++) { Patient patient = new Patient(); patient.setId(i); send(patient); System.out.println("Message was sent to the Queue"); } } void send(Patient patient) { this.jmsTemplate.convertAndSend(this.patientQueue, patient); } }
If someone tells me why messages are not sent to the client until the server is killed, I would appreciate it.
java spring spring-integration reactive-programming project-reactor
T. Nash
source share