I am writing a special decorator plugin for the distributed log aggregation system Cloudera, Flume. My Java code is below:
package multiplex; import java.io.IOException; import java.util.ArrayList; import java.util.List; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventImpl; import com.cloudera.flume.core.EventSink; import com.cloudera.flume.core.EventSinkDecorator; import com.cloudera.util.Pair; import com.google.common.base.Preconditions; public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> { private final String serverName; private final String logType; public JsonMultiplexDecorator(S s, String serverName, String logType) { super(s); this.serverName = serverName; this.logType = logType; } @Override public void append(Event e) throws IOException { String body = new String(e.getBody()).replaceAll("\"", "\\\""); String json = "{ \"server\": \"" + this.serverName + "\"," + "\"log_type\": \"" + this.logType + "\", " + "\"body\": \"" + body + "\" }"; EventImpl e2 = new EventImpl(json.getBytes(), e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(), e.getAttrs()); super.append(e2); } public static SinkDecoBuilder builder() { return new SinkDecoBuilder() { @Override public EventSinkDecorator<EventSink> build(Context context, String... argv) { Preconditions.checkArgument(argv.length == 2, "usage: multiplexDecorator(serverName, logType)"); return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]); } }; } public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() { List<Pair<String, SinkDecoBuilder>> builders = new ArrayList<Pair<String, SinkDecoBuilder>>(); builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder())); return builders; } }
This compiles into a JAR file using ant, I can load it into Flume at runtime and successfully configure the nodes to use it. However, when the event actually occurs through the node that loads this plugin, I get errors in my log as follows:
2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1285) at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65) at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164) at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93) at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144) at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109) at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58) at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56) at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58) at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69) at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
( [logicalNode xxxxx] is a placeholder for the EC2 internal DNS name). I have little Java experience, so I'm not sure if I am doing something wrong here, or if this is a Flume error. It is worth mentioning that I wrote this using the HelloWorld plugin examples from the Flume source, as well as drawings from some of the built-in Flume decorators.