How to use PathFilter in Apache Spark?

I have a simple file filter that basically selects files from a specific date. In Hadoop, I set the class PathFilterto a parameter InputFormatusing setInputPathFilter. How can I accomplish this in Spark?

public class FilesFilter extends Configured implements PathFilter {

    @Override
    public boolean accept(Path path) {

        try {
            if (fs.isDirectory(path))
                return true;
        } catch (IOException e1) {
            e1.printStackTrace();
            return false;
        }

        String file_date = "01.30.2015";
        SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
        Date date = null;

        try {
            date = sdf.parse(file_date);
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        long dt = date.getTime()/(1000 * 3600 * 24);

        try {
            FileStatus file = fs.getFileStatus(path);
            long time = file.getModificationTime() / (1000 * 3600 * 24);
            return time == dt;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

    }
}
+4
source share
1 answer

Use this:

sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[TmpFileFilter], classOf[PathFilter])

Here is my code TmpFileFilter.scalathat omits files .tmp:

import org.apache.hadoop.fs.{Path, PathFilter}

class TmpFileFilter  extends PathFilter {
  override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp")
}

You can define your own PathFilter.

+6
source

All Articles