I am trying to connect a stream with n * subFlows. Therefore, I am building a source from the exit from the broadcast. But it throws UnsupportedOperationException: cannot replace the shape of the EmptyModule . I tried to make this Google exception, but I could not find anything like it.
Here is my code
val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder => val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2)) val bc = builder.add(Broadcast[Long](1)) val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma))) val merge = builder.add(Merge[Seq[StreamMeasurement]](1)) broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc val source = Source.fromGraph(GraphDSL.create() { implicit bl => SourceShape(bc.out(0)) }) broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1) .map(_._2) .zip[Long](source) .takeWhile(deciderFunction) .map(_._1) .fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm) .mergeSubstreams ~> merge FlowShape(broadcast.in, merge.out) })
and here the exception I get:
Exception in thread "main" java.lang.ExceptionInInitializerError at xxx$.main(Processor.scala:80) at xxx.Processor.main(Processor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.UnsupportedOperationException: cannot replace the shape of the EmptyModule at akka.stream.impl.StreamLayout$EmptyModule$.replaceShape(StreamLayout.scala:322) at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18) at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:801) at xxx.logic$$anonfun$22.apply(logic.scala:156) at xxx.logic$$anonfun$22.apply(logic.scala:146) at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:17) at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:801) at xxx.logic$.<init>(logic.scala:146) at xxx.logic$.<clinit>(logic.scala) ... 7 more
The key issue can be found here: akka-stream Zipping Flows with SubFlows
source share