Spring 5 Jet crash when expanding Publisher stream / implementation and calling s.onNext () more than once

I was just starting to play with the new Spring 5 reactive support and wanted to simulate some asynchronous data generation by noting two erroneous behaviors:

1) Call s.onNext (String) more than once:

@GetMapping("/strings") public Publisher<String> getStrings(){ return new Publisher<String>() { @Override public void subscribe(Subscriber<? super String> s) { int i = 0; while(++i <= 5){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } s.onNext("message"); } s.onComplete(); } }; } 

In this case, stacktrace:

 2016-08-03 13:35:04.986 DEBUG 5136 --- [nio-8080-exec-1] swrrmaRequestMappingHandlerMapping : Looking up handler method for path /strings 2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] swrrmaRequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<java.lang.String> com.codependent.spring5.playground.reactive.web.AccountsController.getStrings()] 2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] osbfsDefaultListableBeanFactory : Returning cached instance of singleton bean 'accountsController' 2016-08-03 13:35:07.120 DEBUG 5136 --- [nio-8080-exec-1] oswshExceptionHandlingWebHandler : Could not complete request java.lang.IllegalStateException: RECEIVED at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) [classes/:na] at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) [spring-web-5.0.0.M1.jar:5.0.0.M1] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) [spring-web-5.0.0.M1.jar:5.0.0.M1] at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] 2016-08-03 13:35:07.121 DEBUG 5136 --- [nio-8080-exec-1] oshsrServletServerHttpResponse : Can't set the status 500 because the HTTP response has already been committed 2016-08-03 13:35:08.127 ERROR 5136 --- [nio-8080-exec-1] accC[.[.0.0.0.[.[httpHandlerServlet] : Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: RECEIVED at reactor.core.Exceptions.bubble(Exceptions.java:97) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onError(MonoThenApply.java:209) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:105) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.OperatorAdapter.doOnSubscriberError(OperatorAdapter.java:113) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:91) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) ~[classes/:na] at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) ~[tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) ~[tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] Caused by: java.lang.IllegalStateException: RECEIVED at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] ... 57 common frames omitted 

2) Call s.onNext (Alert.class -any DTO-) more than once:

 @GetMapping("/alerts") public Publisher<Alert> getAlerts(){ return new Publisher<Alert>() { @Override public void subscribe(Subscriber<? super Alert> s) { int i = 0; while(++i <= 5){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } s.onNext(new Alert((long)1, "ms")); } s.onComplete(); } }; } 

Now it does not show errors in the logs, but the caller receives a 500 response code and the contents of '[' .

Log:

 2016-08-03 13:37:11.834 DEBUG 5136 --- [nio-8080-exec-3] osweb.reactive.DispatcherHandler : Processing GET request for [http://localhost:8080/alerts] 2016-08-03 13:37:11.835 DEBUG 5136 --- [nio-8080-exec-3] swrrmaRequestMappingHandlerMapping : Looking up handler method for path /alerts 2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] swrrmaRequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsController.getAlerts()] 2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] osbfsDefaultListableBeanFactory : Returning cached instance of singleton bean 'accountsController' 

Why can't we call onNext () multiple times? How can we do this?

NOTE: I, if I just call onNext when it works fine:

 @Override public void subscribe(Subscriber<? super String> s) { s.onNext("my message" + Math.random()); s.onComplete(); } 

or

 @Override public void subscribe(Subscriber<? super Alert> s) { s.onNext(new Alert((long)1, "ms")); s.onComplete(); } 
+5
source share
2 answers

My Publisher implementation did not meet the jet stream specification, here is how I fixed it:

 @GetMapping(value="/strings", produces="text/event-stream") public Publisher<String> getStrings(){ return new Publisher<String>() { private int loops = 5; @Override public void subscribe(Subscriber<? super String> s) { s.onSubscribe(new Subscription() { @Override public void request(long n) { for (int i = 0; i < n; i++) { if(loops-- > 0){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } s.onNext("message"+Math.random()); }else{ s.onComplete(); } } } @Override public void cancel() { loops = 0; } }); } }; } 

If you want to know more about this, look at the problem that I discovered in Spring JIRA , and there is a useful comment I received.

+4
source

I haven't tested this yet, since I'm a little busy - I'll do it later if that doesn't work. :)

From the above comments, it seems that the problem is with creating Flux.

I assume Spring Reactive controllers can handle Flux, which emits multiple, without using WebSockets or SSE. Again, I will have a game a bit later.

Flux has many static building methods that will help you here.

How to do it as follows:

 return Flux.intervalMillis(1000) .map(l -> new new SensorRead(sensorId, Math.random())); 

But it will give you an endless stream, which may not be what you want.

Another option looks something like this:

 return Flux.range(1, 5) //Spit out 5 values starting from 1 .delayMillis(1000) //Delay the onNext calls to separate 1 second apart .map(l -> new new SensorRead(sensorId, Math.random())); 

Update

OK, so this question has changed significantly.

In response to "Why can't we call onNext () several times? How can we do this?"

Of course, I didn’t write an API, so the arguments that I can’t answer cannot answer, but IMO has ambiguity and complexity as to how to handle multiple outliers in a myriad in different ways that can be expressed,

HTTP 1.1 does not allow multiple responses to a request, so the only acceptable option is some collect in the list or at a low level, writing onNext to the output stream for each emission - both of which have difficulties around the content type (EG XML vs. JSON)

This is even more complicated when we present HTTP2, WebSockets and SSE, which can execute several forms of response on request - each time when it needs to be processed differently.

If you want to make multiple outliers, you need to look at WebSockets or SSE.

There are SSE classes in the Spring -Reactive project, so it looks like it is implemented.

E.G.

 @RequestMapping("/sse/event") Flux<SseEvent> sse() { return Flux.interval(Duration.ofMillis(100)).map(l -> { SseEvent event = new SseEvent(); event.setId(Long.toString(l)); event.setData("foo"); event.setComment("bar"); return event; }).take(2); } 

Take a look at the examples below:

https://github.com/spring-projects/spring-reactive/blob/master/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Hope this helps

+1
source

All Articles