Creating Themes for Apache Kafka 0.9 Using Java

I am programming a client to work with kafka 0.9. I want to know how to create a theme. This answer: How to create a theme in Kafka via Java is similar to what I ask. In addition, this solution only works for Kafka 0.8.2, which is very different from the Kafka 0.9 API.

+4
source share
2 answers

Browse the scala api and various links on the Internet.

This is the solution I found:

Maven Dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

The code:

import java.util.Properties;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

    public static void main(String[] args) {
        String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
        int sessionTimeoutMs = 10 * 1000;
        int connectionTimeoutMs = 8 * 1000;

        ZkClient zkClient = new ZkClient(
            zookeeperConnect,
            sessionTimeoutMs,
            connectionTimeoutMs,
            ZKStringSerializer$.MODULE$);

       // Security for Kafka was added in Kafka 0.9.0.0
       boolean isSecureKafkaCluster = false;
       // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
       ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

       String topic = "my-topic";
       int partitions = 2;
       int replication = 3;

       // Add topic configuration here
       Properties topicConfig = new Properties();

       AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
       zkClient.close();
    }
}

If you're wondering why the code below is not like Java:

ZKStringSerializer$.MODULE$

This is because ZkStringSerializer is a scala Object. You can read more about this here:

Kafka ZKStringSerializer Java?

. ZkClient ZKStringSerializer.
, createTopic() ( : ).
Zookeeper .

bin/kafka-topics.sh --list --zookeeper localhost:2181

. , .

bin/kafka-topics.sh --describe --zookeeper localhost:2181

, ZKStringSerializer $.MODULE $.

: Kafka IDE API -the-ide-using-api

Chee Loong,

+8

, Chee Loong Kafka 0.9.0.1, . ZKStringSerializer . ZkUtils API ( ZkClient ):

ZkUtils.apply(
    "zookeeper1:port1,zookeeper2:port2",
    sessionTimeoutMs,
    connectionTimeoutMs,
    false)
+8

All Articles