I would like to use the whole file as a separate entry for MAP
processing with the file name as the key.
I read the following message: How to get the file / file name as the key / value input for the MAP when doing the Hadoop MapReduce job?
and while the top answer theory is solid, no code or βpracticalβ is actually provided.
Here is my custom FileInputFormat
and the corresponding RecordReader
, which compile but do not produce ANY recording data.
Thanks for any help.
public class CommentsInput extends FileInputFormat<Text,Text> { protected boolean isSplitable(FileSystem fs, Path filename) { return false; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration()); }
///////////////////////
public class CommentFileRecordReader extends RecordReader<Text,Text> { private InputStream in; private long start; private long length; private long position; private Text key; private Text value; private boolean processed; private FileSplit fileSplit; private Configuration conf; public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException { this.fileSplit = fileSplit; this.conf=conf; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); fileSplit = (FileSplit) split; this.start = fileSplit.getStart(); this.length = fileSplit.getLength(); this.position = 0; this.processed = false; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream in = fs.open(path); CompressionCodecFactory codecs = new CompressionCodecFactory(conf); CompressionCodec codec = codecs.getCodec(path); if (codec != null) this.in = codec.createInputStream(in); else this.in = in;
David source share