HDFS File Watcher

Can I have a file watcher on HDFS ?

Scenario: Files land on HDFS continuously. I want to start Spark Job after the number of files has reached the threshold (this may be the number of files or the size of the files).

Is it possible to implement a file viewer on HDFS to achieve this. If so, can anyone suggest a way to do this? What are the different options? Can Zoker or Oozi do this?

Any help would be appreciated. Thanks.

+8
hadoop hdfs apache-spark file-watcher
source share
4 answers

Hadoop 2.6 introduced the DFSInotifyEventInputStream , which you can use to do this. You can get its instance from HdfsAdmin , and then just call .take() or .poll() to get all the events. Event types include deletion, addition, and creation, which should cover what you are looking for.

Here is a basic example. Make sure that you run it as the hdfs user, as the administrator interface requires root HDFS.

 public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException { HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() ); DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream(); while( true ) { EventBatch events = eventStream.take(); for( Event event : events.getEvents() ) { System.out.println( "event type = " + event.getEventType() ); switch( event.getEventType() ) { case CREATE: CreateEvent createEvent = (CreateEvent) event; System.out.println( " path = " + createEvent.getPath() ); break; default: break; } } } } 

Here is a blog post that covers it in more detail:

http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1

+12
source share

Yes, you can do this with Inotification. You just need to get the details of the HDFS transaction through inotifyier in order to better understand this link.

0
source share

Oozie Coordinator can do this. The actions of the Oozi coordinator can be initiated based on data availability. Write the coordinator initiated by the data. Coordinator actions are triggered based on the flag set. done-flag is nothing but an empty file. Therefore, when your threshold is reached, write an empty file to the directory.

0
source share

Old thread ... In case someone wants to do it in Scala

 import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hdfs.client.HdfsAdmin import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent} object HDFSTest extends App { val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() ) val eventStream = admin.getInotifyEventStream() while( true ) { val events = eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS) events.getEvents.toList.foreach { event โ‡’ println(s"event type = ${event.getEventType}") event match { case create: CreateEvent โ‡’ println("CREATE: " + create.getPath) case rename: RenameEvent โ‡’ println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath) case append: AppendEvent โ‡’ println("APPEND: " + append.getPath) case other โ‡’ println("other: " + other) } } } } 

In case someone wants to use an impersonated user ... set env var: HADOOP_USER_NAME=user-name

0
source share

All Articles