FileInputFormat, where filename is KEY and text content is VALUE

I would like to use the whole file as a separate entry for MAP processing with the file name as the key.
I read the following message: How to get the file / file name as the key / value input for the MAP when doing the Hadoop MapReduce job?
and while the top answer theory is solid, no code or β€œpractical” is actually provided.

Here is my custom FileInputFormat and the corresponding RecordReader , which compile but do not produce ANY recording data.
Thanks for any help.

 public class CommentsInput extends FileInputFormat<Text,Text> { protected boolean isSplitable(FileSystem fs, Path filename) { return false; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration()); } 

///////////////////////

 public class CommentFileRecordReader extends RecordReader<Text,Text> { private InputStream in; private long start; private long length; private long position; private Text key; private Text value; private boolean processed; private FileSplit fileSplit; private Configuration conf; public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException { this.fileSplit = fileSplit; this.conf=conf; } /** Boilerplate initialization code for file input streams. */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); fileSplit = (FileSplit) split; this.start = fileSplit.getStart(); this.length = fileSplit.getLength(); this.position = 0; this.processed = false; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream in = fs.open(path); CompressionCodecFactory codecs = new CompressionCodecFactory(conf); CompressionCodec codec = codecs.getCodec(path); if (codec != null) this.in = codec.createInputStream(in); else this.in = in; // If using Writables: // key = new Text(); // value = new Text(); } public boolean next(Text key, Text value) throws IOException { if(!processed) { key = new Text(fileSplit.getPath().toString()); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; byte[] contents = new byte[(int) fileSplit.getLength()]; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents.toString()); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public boolean nextKeyValue() throws IOException { // TODO parse the next key value, update position and return true. return false; } @Override public Text getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** Returns our progress within the split, as a float between 0 and 1. */ @Override public float getProgress() { if (length == 0) return 0.0f; return Math.min(1.0f, position / (float)length); } @Override public void close() throws IOException { if (in != null) in.close(); } } 
+4
source share
1 answer

You need to find a way to define your own key class and make sure your classes use it. You can look at how to define your own key class, and you can get the file name by calling the hte getName() method in your path, and then use it to create your key.

+1
source

All Articles