Storm KafkaSpout has stopped using messages from Kafka Topic

My problem is that Storm KafkaSpout stopped consuming messages from the Kafka theme after a while. When debug is turned on in a storm, I get a log file as follows:

2016-07-05 03: 58: 26.097 oasdtask [INFO] Emit: sleep_pack __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo @ 2c35b34f "] [#object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35" [__ack-count = {default = 0}] "] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x230867ec "[__sendqueue = {sojourn_time_ms = 0.0, write_pos = 5411461, read_pos = 5411461, overflow = 0, arrival_rate_secs = 0.0, capacity = 1024, population = 0}]"] #object [org.apache. storm.metric.api.IMetricsConsumer $ DataPoint 0x7cdec8eb "[__complete-latency = {default = 0.0}]"] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x658fc59 "[__skipped-max-spout = 0 ] "] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms = 4790.5, write_pos = 2468305, read_pos = 2468304, overflow = 0, arrival_rate_secs = 0.20874647740319383, capacity = 1024, population = 1}]" rg.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x262d7906 "[__skipped-inactive = 0]"] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x73648c7e "[kafkaPartition = {Partition {host = slave103: 9092, topic = package, partition = 12} / fetchAPICallCount = 0, section {host = slave103: 9092, topic = package, partition = 12} / fetchAPILatencyMax = null, section {host = slave103: 9092, topic = package, partition = 12} / lostMessageCount = 0, Partition {host = slave103: 9092, topic = package, partition = 12} / fetchAPILatencyMean = null, Partition {host = slave103: 9092, topic = package,partition = 12} / fetchAPIMessageCount = 0}] "] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x4e43df61" [kafkaOffset = {package / totalLatestCompletedOffset = 154305947, package / partition _12 / spoutLag = 82472754 / totalEarliestTimeOffset = 233919465, packet / partition_12 / earliestTimeOffset = 233919465, package / partition _12 / latestEmittedOffset = 154307691, package / partition _12 / latestTimeOffset = 236778701, package / totalLatestEmittedOffset = 154307 6914 latest, total package # 1514304124fted latest = 154304 6914 latest = 236778701, package / totalSpoutLag = 82472754}] "] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x49fe816b" [__transfer-count = {__ack_init = 0, default = 0, __metrics = 0}] "] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object [org.apache.storm .metric.api.IMetricsConsumer $ DataPoint 0x1315a68c "[__emit-count = {__ack_init = 0, default = 0, __metrics = 0}]"]]]

2016-07-05 03: 58: 55.042 oasdexecutor [INFO] FOR -2 TUPLE: source: __system: -1, stream: __tick, id: {}, [ 30]

2016-07-05 03: 59: 25.042 oasdexecutor [INFO] FOR -2 TUPLE: source: __system: -1, stream: __tick, id: {}, [ 30]

2016-07-05 03: 59: 25.946 oasdexecutor [INFO] FOR -2 TUPLE: source: __system: -1, stream: __metrics_tick, id: {}, [ 60]

: One KafkaSpout Counter Bolt. , FOR TUPLE ; , . , FOR -2 TUPLE ?

, :

: Red Hat Enterprise Linux Server 7.0 (Maipo)
: 0.10.0.0
Storm: 1.0.1

+4
2

KafkaSpout . .

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

, 20k-50k 1 . 2048 .

3 node, 4 64 .

200M .

+2
  • , .
  • , Kafka . Telnet.
  • Zookeeper? , Telnet.

: KafkaSpout Kafka

, :

Kafka . , .

, : , , , , .

: Storm-kafka ,

0

All Articles