Discard null stream events before sending to the channel

I read in the Flume book that if the event returns null in the interceptor method, the event will be deleted. Therefore, I created a special interceptor that, based on the condition, returns the event as null, for example:

public Event intercept(Event event) {
    // TODO Auto-generated method stub
    Event finalEvent = event;
    check = new String(event.getBody(),Charsets.UTF_8);

    if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){

        try {
            fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finalEvent = null;
    }
    System.out.println("Event is : " + finalEvent);
    return finalEvent;
}

The interceptor emits a null event, but the file channel still passes it to the HDFS receiver as empty. Why doesn’t the event drop out? I use the Spooling directory as a source.

+4
source share
2 answers

intercept(Event event) , ( ), null , , intercept(List<Event> events) - , . intercept(List<Event> events):

public List<Event> intercept(List<Event> events) 
    {
          List<Event> interceptedEvents = new ArrayList<Event>(events.size());
          for (Event event : events) 
          {
              // Intercept any event
              Event interceptedEvent = intercept(event);
              if(interceptedEvent!=null)
                  interceptedEvents.add(interceptedEvent);
          }

          return interceptedEvents;
    }
0

, . , processEventBatch (), processEventBatch():

events = interceptorChain.intercept(events);//use your custom interceptor
...
eventQueue.add(event); // add user event to queue,even the event == null

processEvent(), : processEvent():

event = interceptorChain.intercept(event);
if (event == null) {
  //null event then return !! intercept works!!
  return;
}

modiry processEventBatch() :

if (event == null){
    //dont add to eventQueue
}
+2

All Articles