Kafka Spring Integration: Headers Not Suitable for Kafka Consumers

I am using Kafka Spring Integration to publish and consume messages using kafka. I see that Payload is correctly transferred from producer to consumer, but the header information is redefined somewhere.

@ServiceActivator(inputChannel = "fromKafka") public void processMessage(Message<?> message) throws InterruptedException, ExecutionException { try { System.out.println("Headers :" + message.getHeaders().toString()); } } catch (Exception e) { e.printStackTrace(); } } 

I get the following headers:

 Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81} 

I create a message at the end of the producer as follows:

 Message<FeelDBMessage> message = MessageBuilder .withPayload(samplePayloadObj) .setHeader(KafkaHeaders.MESSAGE_KEY, "key") .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build(); // publish the message publisher.publishMessage(message); 

and below - manufacturer's header information:

  headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic} 

Any idea why the headers are overridden by a different value?

+4
source share
1 answer

Just because, by default, the Framework uses immutable GenericMessage .

Any manipulation of an existing message (e.g. MessageBuilder.withPayload ) will create a new instance of GenericMessage .

Kafka, on the other hand, does not support headers abstraction, such as JMS or AMQP. This is why KafkaProducerMessageHandler simply does this when it posts a message to Kafka:

 this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload()); 

As you can see, it does not send headers at all. Thus, the other side (consumer) simply deals with message from a topic like payload and some system parameters like headers , such as topic , partition , messageKey .

In a nutshell: we do not pass the headings over Kafka, because they do not support them.

+5
source

All Articles