Does Spring 5 support Java 9 Flow APIs in its reactive function?

The Spring 5.0.0.RC4 reference documentation says:

Publisher or Flow.Publisher - Any type supporting Reactive Streams Publisher is supported.

https://docs.spring.io/spring/docs/5.0.0.RC4/spring-framework-reference/reactive-web.html#webflux

But when I created a simple project based on Spring 5.0.0.RC4, I got a failure when returning Flow.Publisher to the controller. It seems that Flow.Publisher could not be serialized by jackson.

 org.springframework.core.codec.CodecException: Type definition error: [simple type, class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.concurrent.SubmissionPublisher["executor"]->java.util.concurrent.ForkJoinPool["factory"]) at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:132) at org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$0(AbstractJackson2Encoder.java:96) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107) at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:91) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onSubscribe(ChannelSendOperator.java:143) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:76) at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1068) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:72) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1068) at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:290) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1625) at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:279) at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:161) at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:798) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1625) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1439) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1313) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) at reactor.core.publisher.Mono.subscribe(Mono.java:2757) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:91) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:55) at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) at reactor.core.publisher.Mono.subscribe(Mono.java:2757) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380) at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:354) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at java.base/java.lang.Thread.run(Thread.java:844) Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.concurrent.SubmissionPublisher["executor"]->java.util.concurrent.ForkJoinPool["factory"]) at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77) at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191) at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:312) at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71) at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719) at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719) at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1396) at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1120) at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:950) at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:129) ... 65 common frames omitted 

Update 1 : full source code for Java 9 .

 @GetMapping public Flow.Publisher<Post> all() { SubmissionPublisher publisher = new SubmissionPublisher(); publisher.submit(new Post(1L, "post one", "content of post one")); publisher.submit(new Post(2L, "post two", "content of post two")); return publisher; } 

Update 2 . In the reference document Spring 5.0.0.RELEASE I can not find the instruction there.

Update 3 : updated Spring 5.0.2 (managed by Spring Boot 2.0.0.M7), the codes work without any exceptions, but block the request when accessing it. Check out my updated code examples .

+8
spring java-9 spring-webflux
source share
3 answers

In fact, this is not supported now.

Subscribe to question SPR-16052 to find out when it is available.

+5
source share

If you carefully read the first line of stacktrace, you will see that the encoder cannot serialize an object like java.util.concurrent.ForkJoinPool , which is referenced from SubmissionPublisher . Indeed, threads (and therefore thread pools) cannot be serialized. You can avoid the direct link to ForkJoinPool with a proxy artist:

  Executor proxyExecutor = (Runnable command)-> ForkJoinPool.commonPool().execute(command); SubmissionPublisher publisher = new SubmissionPublisher(proxyExecutor); 
0
source share

How about this

 @RestController public class Controller { @GetMapping("/one") Publisher<String> one(){ return JdkFlowAdapter.publisherToFlowPublisher(Flux.just("one", "two")); } } 
0
source share

All Articles