Kafka KStreams - Processing Time

I am trying to use <KStream>.process() with TimeWindows.of("name", 30000) to load some KTable values ​​and send them. It seems that 30 seconds exceeds the consumer wait interval, after which Kafka believes that the specified consumer is not working and frees up the partition.

I tried to increase the polling rate and commit interval to avoid this:

 config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000"); config.put(StreamsConfig.POLL_MS_CONFIG, "5000"); 

Unfortunately, these errors still occur:

(many of them)

 ERROR oakspinternals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0 

The following are:

 INFO oakcciAbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1 WARN oakspinternals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 

Clearly, I need to send heartbeats to the server more often. How?

My topology:

 KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> lines = kStreamBuilder.stream(TOPIC); KTable<Windowed<String>, String> kt = lines.aggregateByKey( new DBAggregateInit(), new DBAggregate(), TimeWindows.of("write_aggregate2", 30000)); DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier(); kt.toStream().process(dbProcessorSupplier); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

KTable groups key values ​​every 30 seconds. In Processor.init (), I call context.schedule(30000) .

DBProcessorSupplier provides an instance of DBProcessor . This is an implementation of AbstractProcessor where all overrides are represented. All they do is LOG, so I know when each one gets it.

This is a fairly simple topology, but it’s clear that I’m missing somewhere.


Edit:

I get that I can configure this on the server side, but I hope there is a client solution. I like that section sections become available pretty quickly when a client exits / dies.


Edit:

In an attempt to simplify the problem, I removed the aggregation step from the graph. Now it's just a consumer> processor (). (If I send the consumer directly to .print() , it quickly works v, so I know this is normal). (Similarly, if I output aggregation (KTable) via .print() , this also looks fine).

I found that .process() , which should call .punctuate() every 30 seconds, actually blocks a variable length of time and prints out somewhat randomly (if at all).

Further:

I set the debug level for 'debug' and reeran. I see a lot of posts:

 DEBUG oakspinternals.StreamTask - Start processing one record [ConsumerRecord <info> 

but the breakpoint in the .punctuate() function does not hit. Therefore, he does a lot of work, but does not allow me to use it.

+6
source share
1 answer

A few clarifications:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG - lower limit of the commit interval, i.e. after fixation, the next fixation occurs no earlier than this time. Basically, Kafka Stream is trying to commit ASAP after this time, but there is no guarantee how much time it actually takes to commit the next commit.
  • StreamsConfig.POLL_MS_CONFIG used to internally call KafkaConsumer#poll() to indicate the maximum blocking time for the poll() call.

Thus, both meanings do not help beat the heart more often.

Kafka Streams follows the depth strategy when processing recordings. This means that after poll() , all topology statements are executed for each record. Suppose you have three consecutive cards, than all three cards will be called for the first record before the next / second record is processed.

Thus, the next poll() call will be executed after the first poll() record is fully processed. If you want to beat your heart more often, you need to make sure that one poll() call retrieves fewer records, so that processing all the records takes less time, and the next poll() will start earlier.

You can use the configuration options for KafkaConsumer , which you can specify through StreamsConfig to do this (see https://kafka.apache.org/documentation.html#consumerconfigs )

streamConfig.put (ConsumerConfig.XXX, VALUE);

  • max.poll.records : if you decrease this value, fewer records will be polled
  • session.timeout.ms : if you increase this value, there is more time for processing the data (adding this for completeness, as this is actually the client’s setting, not the server / broker side configuration), even if you know about this solution and don’t like it :) )

EDIT

As with Kafka 0.10.1 , it is possible (and recommended) to prefix the consumer configuration and procuder in the stream configuration. This avoids parameter conflicts, since some parameter names are used for the consumer and the manufacturer and cannot be distinguished otherwise (and will be applied simultaneously to the consumer and the manufacturer). To prefix the parameter, you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix() , respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

One more thing to add: the script described in this question is a known issue, and there is already KIP-62 , which introduces a background stream for KafkaConsumer , which sends heartbeats, thereby separating heartbeats from poll() calls. Kafka Streams will use this new feature in future releases.

+7
source

All Articles