Trident Storm Topology with Kafka: Unexpected Tuple Error Received

We have a Trident topology that consumes and produces kafa using storm kafa OpaqueTridentKafkaSpout and TridentKafkaState. Everything works fine when it runs on our cluster of production storms, but when starting in local mode, we often get the following error:

java.lang.RuntimeException: java.lang.RuntimeException: Received unexpected tuple source: $mastercoord-bg1:2, stream: $commit, id: {-4957901903366351898=6364388931843393707}, [1:0]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.daemon.executor$fn__4606$fn__4619$fn__4670.invoke(executor.clj:806) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: java.lang.RuntimeException: Received unexpected tuple source: $mastercoord-bg1:2, stream: $commit, id: {-4957901903366351898=6364388931843393707}, [1:0]
    at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:144) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.daemon.executor$fn__4606$tuple_action_fn__4608.invoke(executor.clj:668) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__4529.invoke(executor.clj:424) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.disruptor$clojure_handler$reify__1229.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    ... 6 common frames omitted

We also see this error as part of the same error:

java.lang.RuntimeException: org.apache.storm.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/<topology-name>/<bolt-name>-last-error
    at backtype.storm.util$wrap_in_runtime.invoke(util.clj:48) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:92) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.cluster$mk_distributed_cluster_state$reify__2234.set_data(cluster.clj:104) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.cluster$mk_storm_cluster_state$reify__2774.report_error(cluster.clj:450) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.daemon.executor$throttled_report_error_fn$fn__4385.invoke(executor.clj:191) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.daemon.executor$mk_executor_data$fn__4439$fn__4440.invoke(executor.clj:253) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at backtype.storm.util$async_loop$fn__543.invoke(util.clj:485) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:na]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: org.apache.storm.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/<topology-name>/<bolt-name>-last-error
    at org.apache.storm.zookeeper.KeeperException.create(KeeperException.java:119) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.zookeeper.KeeperException.create(KeeperException.java:51) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.zookeeper.ZooKeeper.create(ZooKeeper.java:783) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:676) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:660) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:656) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:441) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:431) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:239) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at org.apache.storm.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:193) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_31]
    at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0_31]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.6.0.jar:na]
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.6.0.jar:na]
    at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:91) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    ... 7 common frames omitted

Currently, the latest storm is being used, 0.10.0-SNAPSHOT, built from github , but the same problem was related to the stable 0.9.3 release. Any help would be greatly appreciated, thanks.

+4
source share
2 answers

LocalCluster, , Spouts .

TestTopology: Spout1 → Bolt1 → Bolt2...

DummyTopology: Spout1 → Bolt1 → Bolt2...

KeeperException, Storm , Spout , ..

:

TestTopology: TestTopology-Spout1 → Bolt1 → Bolt2...

DummyTopology: DummyTopology-Spout1 → Bolt1 → Bolt2...

0

KeeperException$NodeExistsException, 2 ID. ID .

-1

All Articles