It looks like I was setting the wrong properties on the client side, also my server.properties file had properties that were not intended for the client I used. Therefore, I decided to change the java client to version 0.9.0 using maven.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency>
The server.properties file is shown below.
broker.id=0 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=9000 delete.topic.enable=true advertised.host.name=<aws public Ip> advertised.port=9092
My manufacturer code looks like
import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class HelloKafkaProducer { public static void main(String args[]) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); boolean sync = false; String topic="loader1"; String key = "mykey"; for(int i=0;i<1000;i++) { String value = "myvaluehasbeensent"+i+i; ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value); if (sync) { producer.send(producerRecord).get(); } else { producer.send(producerRecord); } } producer.close(); } }
source share