My MapReduce performs data processing by date and requires writing output to a specific folder structure. The current expectation is to generate the following structure:
2013 01 02 .. 2012 01 02 ..
and etc.
At any time, I receive only up to 12 months of data. So, I use the MultipleOutputs class to create 12 outputs using the following function in the driver:
public void createOutputs(){ Calendar c = Calendar.getInstance(); String monthStr, pathStr; // Create multiple outputs for last 12 months // TODO make 12 configurable for(int i = 0; i < 12; ++i ){ //Get month and add 1 as month is 0 based index int month = c.get(Calendar.MONTH)+1; //Add leading 0 monthStr = month > 10 ? "" + month : "0" + month ; // Generate path string in the format 2013/03/etl pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl"; // Add the named output MultipleOutputs.addNamedOutput(config, pathStr ); // Move to previous month c.add(Calendar.MONTH, -1); } }
In the gearbox, I added a cleanup function to move the generated output to the appropriate directories.
protected void cleanup(Context context) throws IOException, InterruptedException {
Problem: The gearbox cleaning function is executed before the output is moved from the _temporary directory to the output directory. And because of this, the above function does not see any output at runtime, since all the data is still in the _temporary directory.
What is the best way to achieve the desired functionality? Appreciate any ideas.
Thinking of the following:
- Is there a way to use a custom output committer?
- Is it better to cling to another job or is it too complicated?
- Is there a simpler alternative that I just donβt know about ..
Here is an example file structure log from the cleanup function:
MyMapReduce: filepath:hdfs://localhost:8020/dev/test MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000
Kiran source share