Customize elasticsearch apache-flume shell

This is my first time here, so sorry if I don't post the message in order and sorry for my poor English.

I am trying to configure Apache Flume and Elasticsearch streams. Everything is fine, it seems to be working fine, but there are two warnings when I start the agent; the following:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy: org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows. java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143) at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77) at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48) at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357) at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy: org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies 

My agent configuration:

 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink ES a1.sinks = k1 a1.sinks.k1.type = elasticsearch a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 a1.sinks.k1.indexName = items a1.sinks.k1.indexType = item a1.sinks.k1.clusterName = elasticsearch a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 

It starts netcat, and everything is fine, but I am afraid for the points of warnings, I do not understand this.

+7
elasticsearch flume
source share
3 answers

I found the reason, it seems that Apache Flume 1.6.0 and Elasticsearch 2.0 cannot communicate correctly.

I found a good third-person sink that I modified.

Here is the link

And this is my final configuration,

 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink ES a1.sinks = k1 a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink a1.sinks.k1.hostNames = 127.0.0.1:9300 a1.sinks.k1.indexName = items a1.sinks.k1.indexType = item a1.sinks.k1.clusterName = elasticsearch a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 

This works for me.

Thanks for answers.

PS yes, I had to move the libraries.

+2
source share

When visiting magazines, there is a problem with some missing dependency.

If you look at the ElasticSearchSink documentation, you will see the following:

The elasticsearch and lucene-core boxes required for your environment must be placed in the lib directory of the Apache Flume installation . Elasticsearch requires that the core version of the client JAR match the core version of the server and that both of them run the same minor version of the JVM. Serialization Exceptions will appear if this is not true. To select the required version, first determine the version of elasticsearch and the version of the JVM on which the target cluster is running. Then select the elasticsearch client library that matches the major version. Client 0.19.x can talk with cluster 0.19.x; 0.20.x can talk with 0.20.x and 0.90.x can talk with 0.90.x. After determining the elasticsearch version, read the pom.xml file to determine the correct version of the lucene-core JAR version to use. The Flume agent that runs ElasticSearchSink must also match the JVM to which the target cluster is running before the minor version.

Most likely, you have not placed the necessary Java banks, or the version is not suitable.

+1
source share

Added below 2 JARs only in flume / lib dir, and it worked, no need to add all other Lucene JARs:

elasticsearch-1.7.1.jar

Lucene-core-4.10.4.jar

run tray command:

 bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console 

be sure to add below flume-env.sh

 export JAVA_HOME=/usr/java/default export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib" 

Garbage aggregator configuration for loading data into ES: flume-aggregator.conf

 agent2.sources = source1 agent2.sinks = sink1 agent2.channels = channel1 ################################################ # Describe Source ################################################ # Source Avro agent2.sources.source1.type = avro agent2.sources.source1.bind = 0.0.0.0 agent2.sources.source1.port = 9997 ################################################ # Describe Interceptors ################################################ # an example of nginx access log regex match # agent2.sources.source1.interceptors = interceptor1 # agent2.sources.source1.interceptors.interceptor1.type = regex_extractor # # agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)( \"(.*?)\" \"(.*?)\")?" # # # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+) # # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 # # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 # agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip # agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime # agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method # agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request # agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response # agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status # agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes # agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime # ################################################ # Describe Sink ################################################ # Sink ElasticSearch # Elasticsearch lib ---> flume/lib # elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data. agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300 agent2.sinks.sink1.indexName = pdi agent2.sinks.sink1.indexType = pdi_metrics agent2.sinks.sink1.clusterName = My-ES-CLUSTER agent2.sinks.sink1.batchSize = 1000 agent2.sinks.sink1.ttl = 2 #this serializer is crucial in order to use kibana agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer ################################################ # Describe Channel ################################################ # Channel Memory agent2.channels.channel1.type = memory agent2.channels.channel1.capacity = 10000000 agent2.channels.channel1.transactionCapacity = 1000 ################################################ # Bind the source and sink to the channel ################################################ agent2.sources.source1.channels = channel1 agent2.sinks.sink1.channel = channel1 
+1
source share

All Articles