Is there any way to check if kafka works with kafka-net

I use the kafka-net client to send messages to kafka. I'm just wondering if there is a way to check the kafka server and receive messages. I closed the kafka, but the producer was created successfully, and SendMessageAsync freezes for quite some time. I tried to skip the timeout, but nothing changes. I am using kafka-net 0.9. It works fine when the kafka server is up and running.

+4
source share
2 answers

The broker ID is registered in zookeeper ( /brokers/ids/[brokerId]) as an ephemeral node, which allows other brokers and consumers to detect failures. (Currently, the definition of health is rather naive. If registered with zk /brokers/ids/[brokerId], the broker is healthy, otherwise he is dead).

zookeeper ephemeral node exists if the broker session is active.

You can check if the broker works through ZkUtils.getSortedBrokerList (zkClient), which returns the entire active broker identifier under /brokers/ids

import org.I0Itec.zkclient.ZkClient;

ZkClient zkClient = new ZkClient(properties.getProperty("zkQuorum"), zkSessionTimeout, zkConnectionTimeout,ZKStringSerializer$.MODULE$);
ZkUtils.getSortedBrokerList(zkClient);

Kafka Data Structures Link
in Zookeeper

+1
source

Try it.

In your constructor put

options = new KafkaOptions(uri);
var endpoint = new DefaultKafkaConnectionFactory().Resolve(options.KafkaServerUri.First(), options.Log);
client = new KafkaTcpSocket(new DefaultTraceLog(), endpoint);

and then before sending each message

// test if the broker is alive
var request = new MetadataRequest { Topics = new List<string>() { Topic } };
var task1 = client.WriteAsync(request.Encode()).ConfigureAwait(false);
Task<KafkaDataPayload> task2 = Task.Factory.StartNew(() =>  task1.GetAwaiter().GetResult());
if (task2.Wait(30000) == false)
{
    throw new TimeoutException("Timeout while sending message to kafka broker!");
}

, , .

0