Kafka Quickstart: What dependencies do I need?

I work through kafka quick launch:

http://kafka.apache.org/07/quickstart.html

and an example of a major consumer group:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

I encoded Consumer and ConsumerThreadPool as above:

import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerIterator; public class Consumer implements Runnable { private KafkaStream m_stream; private Integer m_threadNumber; public Consumer(KafkaStream a_stream, Integer a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); } System.out.println("Shutting down Thread: " + m_threadNumber); } } 

A few other aspects: I use spring to manage my zookeeper:

 import javax.inject.Named; import java.util.Properties; import kafka.consumer.ConsumerConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.truecar.inventory.worker.core") public class AppConfig { @Bean @Named("consumerConfig") private static ConsumerConfig createConsumerConfig() { String zookeeperAddress = "127.0.0.1:2181"; String groupId = "inventory"; Properties props = new Properties(); props.put("zookeeper.connect", zookeeperAddress); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } } 

And I'm going with Maven and the OneJar Maven plugin. However, I compile and then run the resulting one jar. I get the following error:

 Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) at java.lang.Class.getDeclaredMethods(Class.java:1845) at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) ... 6 more Caused by: java.lang.ClassNotFoundException: scala.ScalaObject at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 27 more 

Now I know little about Kafka and nothing about Scala. How to fix it? What should I do next? Is this a known issue? Do I need other dependencies? Here is the kafka version in my pom.xml:

 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.0-beta1</version> </dependency> 

Update. I contacted the Kafka dev mailing list and they let me know some specific version requirements for scala dependencies. However, there is also an undocumented log4j dependency that leads to a different runtime rather than compilation time, exceptions.

 Exception in thread "main" java.lang.reflect.InvocationTargetException Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V at org.apache.log4j.Category.log(Category.java:333) at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

Another update:

I found the correct log4j dependency:

  <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> 

But now I meet with an even more cryptic exception:

 Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

At that moment I got a feeling of WTF. So I added another dependency:

  <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> 

But this revealed another runtime exception:

 Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

I hope I can get this example for a child and maybe this is the price for using beta products? Maybe I should switch to Apache Active MQ. But that doesn’t sound so much fun. Did I miss something?

+7
java noclassdeffounderror scala maven log4j
source share
3 answers

The problem is that kafka beta was built in such a way that the pom generated by jar is invalid, and maven could not recognize it and correctly analyze the transitive dependencies. We managed to mitigate this problem by involving all the dependencies on this pom (scala, zk, etc.) in our definition of pom. We are waiting for the next beta versions of kafka in which the problem will be fixed.

A complete list of dependencies is given below. Note that you need to change the dependency of the scala version to match the postinfit of your kafka artifact.

 <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> <exclusions> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>net.sf.jopt-simple</groupId> <artifactId>jopt-simple</artifactId> <version>3.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-annotation</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.easymock</groupId> <artifactId>easymock</artifactId> <version>3.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest</artifactId> <version>1.2</version> <scope>test</scope> </dependency> 

Concerning

Maybe I should switch to Apache Active MQ. But that doesn’t sound so much fun. Did I miss something?

Well, don't you forget that this is a beta strong> release ? Some bad things are happening, really, but we are currently launching kafka 0.7 without any effort .

+9
source share

I found this functional dependency configuration:

 <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>3.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.0-beta1</version> </dependency> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> <version>1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> </dependencies> 
+3
source share

It works:

 $ git clone https://github.com/buildlackey/cep $ cd cep/kafka-0.8.x $ mvn package $ mvn exec:java -Dexec.mainClass=TestKafkaProducer 

(via where can I find the maven repository for kafka? )

0
source share

All Articles