Kafka-python: manufacturer cannot connect

kafka-python (1.0.0) throws an error when connecting to a broker. At the same time, / usr / bin / kafka-console-producer and / usr / bin / kafka-console-consumer are working fine.

The Python application also works well, but after restarting the zookeeper it can no longer connect.

I use the bare bones example from the docs:

from kafka import KafkaProducer from kafka.common import KafkaError producer = KafkaProducer(bootstrap_servers=['hostname:9092']) # Asynchronous by default future = producer.send('test-topic', b'raw_bytes') 

I get this error:

 Traceback (most recent call last): File "pp.py", line 4, in <module> producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__ self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored 

When navigating through (/usr/lib/python2.6/site-packages/kafka/client_async.py), I noticed that line 270 evaluates to false:

 270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0: 271 if self._can_send_request(node_id): 272 return True 273 return False 

In my case, self._metadata_refresh_in_progress is False, but ttl () = 0;

At the same time, kafka-console- * joyfully pushes messages around:

 /usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic hello again hello2 

Any tips?

+12
apache-kafka kafka-python
source share
8 answers

I had the same problem and none of the above solutions worked. Then I read the exception messages and it seems like api_version is mandatory, so

 producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0)) 

note: tuple (1,0,0) corresponds to version kafka 1.0.0

it works fine (at least it completes without exception, now you need to convince him to accept messages;))

+20
source share

I had a similar problem. In my case, the broker host name was not resolvable on the client side. Try explicitly specifying advertised.host.name in the configuration file.

+8
source share

A host can have multiple DNS aliases. Any of these will work for the ssh or ping test. However, the kafka connection must use an alias that matches advertised.host.name in the broker's server.properties file.

I used a different alias in the bootstrap_servers parameter. Hence the error. After I changed the call to using advertised.hostname , the problem was resolved

+3
source share

I had the same problem.

I solved the problem with user3503929 hint.

The kafka server was installed on windows.

server.properties

 ... host.name = 0.0.0.0 ... 

.

 producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092', value_serializer=str.encode) producer.send('test', value='aaa') producer.close() print("DONE.") 

There were no problems with processing in the kafka client. However, when I post to a topic using kafka-python in ubuntu , a NoBrokersAvailable exception is NoBrokersAvailable .

Add the following parameters to server.properties.

 ... advertised.host.name = 192.168.1.3 ... 

It works successfully in the same code. Because of this, I spent three hours.

thanks

+2
source share

I had a similar problem and helped me remove the port from bootstrap_servers.

 consumer = KafkaConsumer('my_topic', #group_id='x', bootstrap_servers='kafka.com') 
+1
source share

Install kafka-python using pip install kafka-python

Steps to create a kafka data pipeline: -
1. Launch Zookeeper with the shell command or install zookeeperd with

 sudo apt-get install zookeeperd 

This will cause zookeeper to run as a daemon and by default listens on port 2181

  1. Launch kafka server
  2. Run the script using users.py and consumer.py on separate consoles to view the data in real time.

Here are the commands to run: -

 cd kafka-directory ./bin/zookeeper-server-start.sh ./config/zookeeper.properties ./bin/kafka-server-start.sh ./config/server.properties 

Now that you have the zookeeper and kafka server running, run the manufacturer.py script and consumer.py

Producer.py:

 from kafka import KafkaProducer import time producer = KafkaProducer(bootstrap_servers=['localhost:9092']) topic = 'simple-text' <implement a file reading functionality or any log reader and put it in lines> for line in lines: lst = line.split(" ") try: final_list = [lst[x] for x in range(14)] producer.send(topic, final_list[0]).get(timeout=10) except IndexError as e: print e continue 

Consumer.py: -

 from kafka import KafkaConsumer topic = 'simple-text' consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # eg, for unicode: `message.value.decode('utf-8')` # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, # message.offset, message.key, # message.value)) print message 

Now run the file manufacturer.py and consumer.py in separate terminals to see the current data ..!

Note The above product.py script is run once to run it forever, use the while loop and time time loop.

+1
source share

In the server.properties file, make sure that the recipient IP address is set to the IP address of your window, accessible to the remote machine. The default is localhost

Update this line on server.properties server:

 listeners=PLAINTEXT://<Your-IP-address>:9092 

Also, make sure you do not have a firewall that can block other IP addresses to access you. If you have sudo prevail. Try disabling the firewall.

 sudo systemctl stop firewalld 
0
source share

I know this problem and solution. If you tested ping and ssh, you can try to connect to the server using telnet.

I think the firewall is blocking your messages. Try disabling the firewall. If your messages are reached, edit the firewall permissions.

 sudo ufw disable 

But this decision is very green.

-2
source share

All Articles