Browse the scala api and various links on the Internet.
This is the solution I found:
Maven Dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
The code:
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
String topic = "my-topic";
int partitions = 2;
int replication = 3;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}
}
If you're wondering why the code below is not like Java:
ZKStringSerializer$.MODULE$
This is because ZkStringSerializer is a scala Object. You can read more about this here:
Kafka ZKStringSerializer Java?
. ZkClient ZKStringSerializer.
, createTopic()
( : ).
Zookeeper .
bin/kafka-topics.sh --list --zookeeper localhost:2181
.
, .
bin/kafka-topics.sh
, ZKStringSerializer $.MODULE $.
:
Kafka IDE API -the-ide-using-api
Chee Loong,