Hadoop method to send output to multiple directories

My MapReduce performs data processing by date and requires writing output to a specific folder structure. The current expectation is to generate the following structure:

 2013 01 02 .. 2012 01 02 .. 

and etc.

At any time, I receive only up to 12 months of data. So, I use the MultipleOutputs class to create 12 outputs using the following function in the driver:

 public void createOutputs(){ Calendar c = Calendar.getInstance(); String monthStr, pathStr; // Create multiple outputs for last 12 months // TODO make 12 configurable for(int i = 0; i < 12; ++i ){ //Get month and add 1 as month is 0 based index int month = c.get(Calendar.MONTH)+1; //Add leading 0 monthStr = month > 10 ? "" + month : "0" + month ; // Generate path string in the format 2013/03/etl pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl"; // Add the named output MultipleOutputs.addNamedOutput(config, pathStr ); // Move to previous month c.add(Calendar.MONTH, -1); } } 

In the gearbox, I added a cleanup function to move the generated output to the appropriate directories.

 protected void cleanup(Context context) throws IOException, InterruptedException { // Custom function to recursively process data moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath")); } 

Problem: The gearbox cleaning function is executed before the output is moved from the _temporary directory to the output directory. And because of this, the above function does not see any output at runtime, since all the data is still in the _temporary directory.

What is the best way to achieve the desired functionality? Appreciate any ideas.

Thinking of the following:

  • Is there a way to use a custom output committer?
  • Is it better to cling to another job or is it too complicated?
  • Is there a simpler alternative that I just don’t know about ..

Here is an example file structure log from the cleanup function:

 MyMapReduce: filepath:hdfs://localhost:8020/dev/test MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000 MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000 
+3
source share
2 answers

You do not need a second job. I am currently using MultipleOutputs to create tons of output directories in one of my programs. Despite the fact that there are more than 30 directories, I can only use a few MultipleOutputs objects. This is because you can set the output directory when recording, so it can only be determined if necessary. You really need more than one namedOutput if you want to output in different formats (for example, with the key: Text.class, value: Text.class and one with the key: Text.class and value: IntWritable.class)

Setup:

 MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class); 

gear unit installation:

 mout = new MultipleOutputs<Text, Text>(context); 

mout call in gear:

 String key; //set to whatever output key will be String value; //set to whatever output value will be String outputFileName; //set to absolute path to file where this should write mout.write("Output",new Text(key),new Text(value),outputFileName); 

you may have a piece of code defining a directory for encoding. For example, you want to specify a directory by month and year:

 int year;//extract year from data int month;//extract month from data String baseFileName; //parent directory to all outputs from this job String outputFileName = baseFileName + "/" + year + "/" + month; mout.write("Output",new Text(key),new Text(value),outputFileName); 

Hope this helps.

EDIT: output file structure for the above example:

 Base 2013 01 02 03 ... 2012 01 ... ... 
+13
source

Most likely, you skipped to close mos during the cleaning process.

If you have a setting in mapper or reducer as shown below:

 public void setup(Context context) {mos = new MultipleOutputs(context);} 

You must close mos at the beginning of your cleaning, as shown below.

 public void cleanup(Context context ) throws IOException, InterruptedException {mos.close();} 
0
source

All Articles