Spring -Integration-Kafka outbound-channel-adapter Send a message

Using Spring -Integration-Kafka, with outbound adapter I am trying to send messages to a topic called " test "

Through the command line terminal, I started zookeeper, kafka and created a theme called "test"

Spring XML Configuration

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    kafka-template="template"
                                    sync="true"
                                    topic="test">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

JUnit Verification Code

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath:kafka-outbound-context.xml"
        })
public class ProducerTest{

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

    @Test
    public void test_send_message() {

        channel.send(MessageBuilder.withPayload("Test Message")
                .setHeader(KafkaHeaders.TOPIC, "test").build());

    }

}

The test case succeeds, and when debugging, I find that channel.send () returns true

I check the topic through the command line using the command below, but I do not see any messages in the test tag.

bin / kafka-console-consumer.sh --bootstrap-server localhost: 9092 --topic test - from the start

Can someone why I don't see posts in my test ?

0
1

? ,

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

java:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

key.serializer value.serializer.

+1

All Articles