Wrap file watcher in reactive extensions

I looked at the wrapper of the file observer in the observable helper during event handling, but I have some problems figuring out how to get the behavior I want from it. The file watcher watches the directory where the files are placed. When a file is first placed in this directory, the generated event is fired in the file watcher. However, if the file is large or a slow network connection, the series of updates changes when the file is updated. I don’t want to process the file until it finishes writing, so I really need this timeline

|Created |Changed |Changed |Changed ________________________________________________ ^Write starts ^Write finishes ^Processing Starts 

I looked at several methods of filtering events in Rx, but I could not get what I need, that "runs the function after the file the file has not been changed for X seconds". The throttle is not suitable, as it will lose events in the middle. The buffer is not suitable, because events can occur at the boundary of the buffer.

I thought about using timeouts, but I wasn’t crazy that they threw an exception, and I wanted the processing to start with the files being written, and there were never more events.

There is a similar issue in Reactive Extensions vs FileSystemWatcher that has never really been resolved.

Is there any way that I can do this easily? I am sure this is not a common use case.

+7
source share
4 answers

Take a look at my BufferWithInactivity extension method on this answer .

I think you could use it to look for inactivity in changed events.

+1
source

EDIT: after watching, don’t think you want this ...

Mayhap I'm simplifying a bit, but could Throttle be here?

This is by no means β€œsimple,” but I think it does what you want closer than my previous idea:

(bonus: with a test case !;))

 void Main() { var pathToWatch = @"c:\temp\"; var fsw = new FileSystemWatcher(pathToWatch); // set up observables for create and changed var changedObs = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( dlgt => fsw.Changed += dlgt, dlgt => fsw.Changed -= dlgt); var createdObs = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>( dlgt => fsw.Created += dlgt, dlgt => fsw.Created -= dlgt); // the longest we'll wait between last file write and calling it "changed" var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1); // A "pulse" ticking off every 10ms (adjust this as desired) var timer = Observable .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10)) .Select(i => DateTime.Now); var watcher = from creation in createdObs from change in changedObs // we only care about changes matching a create .Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name) // take latest of (pulse, changes) and select (event, time since last file write) .CombineLatest(timer, (evt, now) => new { Change = evt, DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)}) // skip all until we trigger than "time before considered changed" threshold .SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites) // Then lock on that until we change a diff file .Distinct(evt => evt.Change.EventArgs.FullPath) select change.Change; var disp = new CompositeDisposable(); // to show creates disp.Add( createdObs.Subscribe( evt => Console.WriteLine("New file:{0}", evt.EventArgs.FullPath))); // to show "final changes" disp.Add( watcher.Subscribe( evt => Console.WriteLine("{0}:{1}:{2}", evt.EventArgs.Name, evt.EventArgs.ChangeType, evt.EventArgs.FullPath))); fsw.EnableRaisingEvents = true; var rnd = new Random(); Enumerable.Range(0,10) .AsParallel() .ForAll(i => { var filename = Path.Combine(pathToWatch, "foo" + i + ".txt"); if(File.Exists(filename)) File.Delete(filename); foreach(var j in Enumerable.Range(0, 20)) { var writer = File.AppendText(filename); writer.WriteLine(j); writer.Close(); Thread.Sleep(rnd.Next(500)); } }); Console.WriteLine("Press enter to quit..."); Console.ReadLine(); disp.Dispose(); } 
+2
source

ObservableFileSystemWatcher - the observed wrapper around the FileSystemWatcher type - works fine. Add a NuGet package named ReactiveFileSystemWatcher and create a console application for testing as follows

 class Program { static void Main(string[] args) { using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:\FolderToWatch\"; c.IncludeSubdirectories = true; })) { watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine); watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine); watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine); watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine); watcher.Errors.Subscribe(Console.WriteLine); watcher.Start(); Console.ReadLine(); } } } 
+2
source

Check out the NuGet Reactive FileSystemWatcher package.

The source code is on the GitHub page.

0
source

All Articles