I am using Kafka Spring Integration to publish and consume messages using kafka. I see that Payload is correctly transferred from producer to consumer, but the header information is redefined somewhere.
@ServiceActivator(inputChannel = "fromKafka") public void processMessage(Message<?> message) throws InterruptedException, ExecutionException { try { System.out.println("Headers :" + message.getHeaders().toString()); } } catch (Exception e) { e.printStackTrace(); } }
I get the following headers:
Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}
I create a message at the end of the producer as follows:
Message<FeelDBMessage> message = MessageBuilder .withPayload(samplePayloadObj) .setHeader(KafkaHeaders.MESSAGE_KEY, "key") .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();
and below - manufacturer's header information:
headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}
Any idea why the headers are overridden by a different value?
source share