Here is the implementation I have for you:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; @SuppressWarnings("deprecation") public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); } public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> { private final LineRecordReader linerecord; public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); linerecord = new LineRecordReader(conf, filesplit); } @Override public void close() throws IOException { linerecord.close(); } @Override public LongWritable createKey() {
In your task, the mapred.max.split.size parameter is first set according to the size you want to merge the input files into. Do something like the following in your run ():
... if (argument != null) { conf.set("mapred.max.split.size", argument); } else { conf.set("mapred.max.split.size", "134217728");
Amar
source share