We have a Trident topology that consumes and produces kafa using storm kafa OpaqueTridentKafkaSpout and TridentKafkaState. Everything works fine when it runs on our cluster of production storms, but when starting in local mode, we often get the following error:
java.lang.RuntimeException: java.lang.RuntimeException: Received unexpected tuple source: $mastercoord-bg1:2, stream: $commit, id: {-4957901903366351898=6364388931843393707}, [1:0]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.daemon.executor$fn__4606$fn__4619$fn__4670.invoke(executor.clj:806) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: java.lang.RuntimeException: Received unexpected tuple source: $mastercoord-bg1:2, stream: $commit, id: {-4957901903366351898=6364388931843393707}, [1:0]
at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:144) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.daemon.executor$fn__4606$tuple_action_fn__4608.invoke(executor.clj:668) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.daemon.executor$mk_task_receiver$fn__4529.invoke(executor.clj:424) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.disruptor$clojure_handler$reify__1229.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
... 6 common frames omitted
We also see this error as part of the same error:
java.lang.RuntimeException: org.apache.storm.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/<topology-name>/<bolt-name>-last-error
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:48) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:92) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.cluster$mk_distributed_cluster_state$reify__2234.set_data(cluster.clj:104) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.cluster$mk_storm_cluster_state$reify__2774.report_error(cluster.clj:450) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.daemon.executor$throttled_report_error_fn$fn__4385.invoke(executor.clj:191) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.daemon.executor$mk_executor_data$fn__4439$fn__4440.invoke(executor.clj:253) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at backtype.storm.util$async_loop$fn__543.invoke(util.clj:485) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: org.apache.storm.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/<topology-name>/<bolt-name>-last-error
at org.apache.storm.zookeeper.KeeperException.create(KeeperException.java:119) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.zookeeper.KeeperException.create(KeeperException.java:51) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.zookeeper.ZooKeeper.create(ZooKeeper.java:783) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:676) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:660) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:656) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:441) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:431) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:239) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at org.apache.storm.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:193) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_31]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0_31]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.6.0.jar:na]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.6.0.jar:na]
at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:91) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
... 7 common frames omitted
Currently, the latest storm is being used, 0.10.0-SNAPSHOT, built from github , but the same problem was related to the stable 0.9.3 release. Any help would be greatly appreciated, thanks.
source
share