I am trying to use <KStream>.process() with TimeWindows.of("name", 30000) to load some KTable values and send them. It seems that 30 seconds exceeds the consumer wait interval, after which Kafka believes that the specified consumer is not working and frees up the partition.
I tried to increase the polling rate and commit interval to avoid this:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000"); config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Unfortunately, these errors still occur:
(many of them)
ERROR oakspinternals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
The following are:
INFO oakcciAbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1 WARN oakspinternals.StreamThread - Failed to commit StreamTask
Clearly, I need to send heartbeats to the server more often. How?
My topology:
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> lines = kStreamBuilder.stream(TOPIC); KTable<Windowed<String>, String> kt = lines.aggregateByKey( new DBAggregateInit(), new DBAggregate(), TimeWindows.of("write_aggregate2", 30000)); DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier(); kt.toStream().process(dbProcessorSupplier); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start();
KTable groups key values every 30 seconds. In Processor.init (), I call context.schedule(30000) .
DBProcessorSupplier provides an instance of DBProcessor . This is an implementation of AbstractProcessor where all overrides are represented. All they do is LOG, so I know when each one gets it.
This is a fairly simple topology, but it’s clear that I’m missing somewhere.
Edit:
I get that I can configure this on the server side, but I hope there is a client solution. I like that section sections become available pretty quickly when a client exits / dies.
Edit:
In an attempt to simplify the problem, I removed the aggregation step from the graph. Now it's just a consumer> processor (). (If I send the consumer directly to .print() , it quickly works v, so I know this is normal). (Similarly, if I output aggregation (KTable) via .print() , this also looks fine).
I found that .process() , which should call .punctuate() every 30 seconds, actually blocks a variable length of time and prints out somewhat randomly (if at all).
Further:
I set the debug level for 'debug' and reeran. I see a lot of posts:
DEBUG oakspinternals.StreamTask - Start processing one record [ConsumerRecord <info>
but the breakpoint in the .punctuate() function does not hit. Therefore, he does a lot of work, but does not allow me to use it.