Coordinator Oozi. How to transfer data from the past to mapreduce?

I am trying to create an ooize coordinator. The problem is that I already have preliminary data pending processing using oozie.

Imagine this situation.

  • Current date: 03/01/2013 (March 1, 2013)

  • I have these input directories:

    / staging / landing / source / xvlr / 2013/02/01/00 (frist February 2013, first hour of the day) / staging / landing / source / xvlr / 2013/02/01/01

    / staging / landing / source / xvlr / 2013/02/01/02

    / staging / landing / source / xvlr / 2013/02/01/03

    / staging / landing / source / xvlr / 2013/02/01/04

    ....

    / staging / landing / source / xvlr / 2013/02/28/00

    ...

    / staging / landing / source / xvlr / 2013/02/28/23

I want my oozie coordinator to consume ALL previously created landing data and make this conclusion:

/masterdata/source/xvlr/2013/02/01/00 /masterdata/source/xvlr/2013/02/01/01 /masterdata/source/xvlr/2013/02/01/02 /masterdata/source/xvlr/2013/02/01/03 /masterdata/source/xvlr/2013/02/01/04 .... /masterdata/source/xvlr/2013/02/28/00 ... /masterdata/source/xvlr/2013/02/28/23 

Then I want my coordinator to work every hour and produce new output for masterdata.

How can I do this using the coordinator specification? Here is my coordinator. He does not do anything. He reaches the time I need, and then waits. It does not start work.

Please advice.

 <coordinator-app name="Xvlr-parser-coordinator" frequency="60" start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> <controls> <timeout>5</timeout> <concurrency>4</concurrency> </controls> <datasets> <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> <done-flag></done-flag> </dataset> <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> <done-flag></done-flag> </dataset> </datasets> <input-events> <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> <configuration> <property> <name>inputDir</name> <value>${coord:dataIn('xvlrInputEvent')}</value> </property> <property> <name>outputDir</name> <value>${coord:dataOut('xvlrOutputEvent')}</value> </property> </configuration> </workflow> </action> </coordinator-app> 
+4
source share
1 answer

Here is the correct solution (it works for several days :))):

 <coordinator-app name="Xvlr-parser-coordinator" frequency="60" start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> <controls> <timeout>3</timeout> <concurrency>1</concurrency> </controls> <datasets> <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> <done-flag></done-flag> </dataset> <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> <done-flag></done-flag> </dataset> </datasets> <input-events> <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> <instance>${coord:current(0)}</instance> </data-in> </input-events> <output-events> <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> <configuration> <property> <name>inputDir</name> <value>${coord:dataIn('xvlrInputEvent')}</value> </property> <property> <name>outputDir</name> <value>${coord:dataOut('xvlrOutputEvent')}</value> </property> </configuration> </workflow> </action> </coordinator-app> 

What is he doing?

  • At first it started from 2013-03-07T16: 35Z , so all the previously collected data was transferred through the main workflow (mr-job call using the parsing function)
    • While working with “temporary data sets” (data set time is shorter than the current time), the workflow started one after another: it consumed / pastdate / hour _00, then it immediately started consuming / pastdate / hour _01, etc
    • When the coordinator reached the present time, he began to start the workflow every day (as it was developed: 05:35, 06:35 ... 23:35).
    • See the timeout announcement: I did not have data sets: for example, there was no data for the 10th hour of the first march. The workflow waited 3 minutes and then died.

The problem is resolved.

+4
source

All Articles