I am losing messages using spring-reactor, what is wrong with my setup?

I thought I would look into the recently opened Pivotal platform for a simple program that I am writing that requires multi-threaded processing in order to complete the work in a timely manner.

I wrote the following sample project to get to know the framework and play with it to understand how it is used:

Main.java:

package reactortest;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Main { 
    public static void main(String[] args) throws InterruptedException {
        try(AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MainConfiguration.class)) {
            MyProducer producer = context.getBean(MyProducer.class);
            producer.run();
        }
    }
}

MyProducer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.core.Reactor;
import reactor.event.Event;

public class MyProducer {
    private final Reactor reactor;
    private final Integer messagesToPrint;
    private final CountDownLatch countDownLatch;

    public MyProducer(final Reactor reactor, final Integer messagesToPrint, CountDownLatch countDownLatch) {
        this.reactor = reactor;
        this.messagesToPrint = messagesToPrint;
        this.countDownLatch = countDownLatch;
    }

    public void run() throws InterruptedException {
        for(int i = 0; i < messagesToPrint; ++i) {
            reactor.notify(Event.wrap("String event: " + i));
        }

        countDownLatch.await();
        System.out.println("Finished. Remaining count is: " + countDownLatch.getCount());
    }
}

MyConsumer.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import reactor.event.Event;
import reactor.function.Consumer;

public class MyConsumer implements Consumer<Event<String>> {
    private final CountDownLatch countDownLatch;

    public MyConsumer(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void accept(Event<String> message) {
        System.out.println(message);
        countDownLatch.countDown();
    }
}

and finally MainConfiguration.java:

package reactortest;

import java.util.concurrent.CountDownLatch;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class MainConfiguration {
    private final Integer MESSAGESTOPRINT = 10;

    @Autowired private Environment environment;

    @Bean
    public CountDownLatch countDownLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(MESSAGESTOPRINT);
        return countDownLatch;
    }

    @Bean
    public Reactor reactor() {
        Reactor reactor = Reactors.reactor().env(environment).dispatcher(Environment.THREAD_POOL).randomEventRouting().get();
        reactor.on(consumer());
        return reactor;
    }

    @Bean
    public MyProducer producer() {
        MyProducer producer = new MyProducer(reactor(), MESSAGESTOPRINT, countDownLatch());
        return producer;
    }

    @Bean
    public MyConsumer consumer() {
        MyConsumer consumer = new MyConsumer(countDownLatch());
        return consumer;
    }
}

My problem is that the program never ends. The consumer also prints various information for each launch. Of three consecutive runs, it prints:

1st run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 7}
Event{id=null, headers=null, replyTo=null, data=String event: 8}

2nd run:
Event{id=null, headers=null, replyTo=null, data=String event: 0}
Event{id=null, headers=null, replyTo=null, data=String event: 1}
Event{id=null, headers=null, replyTo=null, data=String event: 5}
Event{id=null, headers=null, replyTo=null, data=String event: 6}
Event{id=null, headers=null, replyTo=null, data=String event: 9}

3rd run:
Event{id=null, headers=null, replyTo=null, data=String event: 2}
Event{id=null, headers=null, replyTo=null, data=String event: 4}
Event{id=null, headers=null, replyTo=null, data=String event: 6}

, , , - , , javaconfig , , .

+4
1

, , ( ). , , , - .

- randomEventRouting() , . / , , , , , ,

.on(), :

reactor.on(Selectors.$(selector()), consumer());

:

@Bean
public String selector() {
    String selector = "My very special event";
    return selector;
}

reactor.notify():

reactor.notify(selector, Event.wrap("String event: " + i));

.

, , ( ) , .:)

+5

All Articles