Kafka consumer does not work out of eclipse

Kafka 0.8.2.2.3 and zookeper work inside the VM. I was able to successfully launch both the manufacturer and the consumer in VM using kafka-console-producer.sh and kafka-console-consumer.sh respectively. Even I was able to use Kafka messages from the host machine using kafka-console-consumer.sh. But when I tried to start the user using java from eclipse, then zookeeper logs the following error

2015-06-26 03:06:26,323 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /192.168.1.12:59549 (no session established for client)
2015-06-26 03:07:26,225 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /192.168.1.12:59617
2015-06-26 03:07:26,226 - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket
    at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
    at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
    at java.lang.Thread.run(Thread.java:745)

Below is my Kafka consumption code

package com.truckevent.producer;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


public class KafkaConsumer {

    public static void main(String[] args) throws Exception {

        String group = "hello" ;


        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.1.12:2181");
        props.put("group.id", group);
        props.put("zookeeper.session.timeout.ms", "20000");
        props.put("zookeeper.sync.time.ms", "2030");
        props.put("auto.commit.interval.ms", "10000");
        props.put("auto.offset.reset", "smallest");

        ConsumerConfig cf = new ConsumerConfig(props) ;

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;

        String topic = "event" ;

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);


        KafkaStream<byte[],byte[]> stream = streams.get(0) ;

        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        int i = 1 ;
        while (it.hasNext()) {

            System.out.println(i + ": " + new String(it.next().message()));
            ++i;
        }
        consumer.shutdown(); 
    }
}

I am not sure why I cannot use messages from java code. Kafka operates at port 6667 and the zoo for 2181.

+4
source share
1 answer
  • , , zookeeper (netstat -lntp) 0.0.0.0 localhost ( vm), (clientPortBindAddress, , , , , )
  • vm. . , vagrant + virtual box, - config.vm.network: private_network, ip: 192.168.1.12
  • , , - , , ( )
0

All Articles