sponsored links

oozie知识整理

oozie workflow 由控制流节点(control flow nodes)和行为节点(action nodes)组成
控制流节点包括start ,end ,kill node,the decision , fork and join nodes
当一个workflow执行到end node时,表名这个wf已经成功执行完成,如果有多个actions,其中一个到达了end node,则会kill掉其他actions,此时WF也被认为是成功执行完成.
kill node 也同理, 当有多个actions时,其中一个到达kill node,其他的actions将被kill掉,WF标记为KILLED执行失败。

Decision Control Node 相当于一个switch-case语法,语法如下:
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

其中PREDICATE为EL表达式语言,例如:${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB},返回true或者false
Example: 
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="mydecision">
        <switch>
            <case to="reconsolidatejob">
              ${fs:fileSize(secondjobOutputDir) gt 10 * GB}
            </case>
            <case to="rexpandjob">
              ${fs:filSize(secondjobOutputDir) lt 100 * MB}
            </case>
            <case to="recomputejob">
              ${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
            </case>
            <default to="end"/>
        </switch>
    </decision>
    ...
</workflow-app>

fork 和join node是必须同时使用的,用来控制并行执行 ,fork定义多个paths并行执行, 到多个paths同时执行到join node时,才会执行join node指定的to name。
和java中多线程变成的join概念类似.
Example: 
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="forking">
        <path start="firstparalleljob"/>
        <path start="secondparalleljob"/>
    </fork>
    <action name="firstparallejob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job1.xml</job-xml>
        </map-reduce>
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <action name="secondparalleljob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job2.xml</job-xml>
        </map-reduce>
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <join name="joining" to="nextaction"/>
    ...
</workflow-app>

oozie 在提交前会检查fork是否有效或者是否是引导一个正确的行为执行,如果校验不通过,则不会提交任务,
但是如果你确定你定义的forkjoin行为是对的,可以通过设置job.properties文件中的 oozie.wf.validate.ForkJoin 为 false(只影响自己)或者oozie-site.xml文件中的oozie.validate.ForkJoin 为 false(全局的,影响所有wf) 来关闭forkjoin检查
forkjoin检查会判断这两个文件的配置,仅当都为true时才会开启检查,其中一个为false,都会关闭检查

actions都是异步的远程task,oozie可以通过callbacks 和 polling两种方式来获得远程过程的执行进度,oozie为每个task都会提供一个唯一的callback url供task来报告状态。如果由于某些原因导致callback失败,oozie还会通过polling的方式去主动获取task的状态。

actions有两种结果,ok 和error,当成功执行时会执行ok定义的to name,失败时执行error定义的to name,当为error时,必须提供 error-code and error-message 信息给 Oozie,可以通过decision node对error-code进行switch-case从而实现不同错误代码不同的处理逻辑。

Map-Reduce Action: 
语法
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <map-reduce>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
                <delete path="[PATH]"/>
                ...
                <mkdir path="[PATH]"/>
                ...
            </prepare>
            <streaming>
                <mapper>[MAPPER-PROCESS]</mapper>
                <reducer>[REDUCER-PROCESS]</reducer>
                <record-reader>[RECORD-READER-CLASS]</record-reader>
                <record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
                ...
                <env>[NAME=VALUE]</env>
                ...
            </streaming>
<!-- Either streaming or pipes can be specified for an action, not both -->
            <pipes>
                <map>[MAPPER]</map>
                <reduce>[REDUCER]</reducer>
                <inputformat>[INPUTFORMAT]</inputformat>
                <partitioner>[PARTITIONER]</partitioner>
                <writer>[OUTPUTFORMAT]</writer>
                <program>[EXECUTABLE]</program>
            </pipes>
            <job-xml>[JOB-XML-FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
        </map-reduce>        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>
mapreduce的配置属性的加载顺序是:streaming , job-xml and configuration  ,后面的覆盖前面的值.
Streaming and inline property values 能够被参数化或者叫模板化 parameterized (templatized) 通过使用 EL 表达式.
 
可以为job提供files或者Archives,放在job分布式缓存中供mapreduce使用,上传的文件可以是相对路径或者绝对路径,当为相对路径时,根目录为oozie提交的WF的根目录(application directory)。
如果要添加到分布式缓存的files为系统的原生库文件(native library),例如(an '.so' or a '.so.#' file),那么需要为该文件创建一个符号链接。

符号链接
每个存储在HDFS中的文件被放到分布式缓存中后都可以通过一个符号链接使用。
URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile来访问 file1这个文件。 myfile是一个符号链接文件。

Fs (HDFS) action:
语法
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
    ...
    <action name="[NODE-NAME]">
        <fs>
            <delete path='[PATH]'/>
            ...
            <mkdir path='[PATH]'/>
            ...
            <move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
            ...
            <chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
            ...
            <touchz path='[PATH]' />
            ...
            <chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
        </fs>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

支持的hadoop fs命令有
 : move , delete , mkdir , chmod , touchz and chgrp。
FS命令是同步执行的,即上一个命令执行完之后才会往下执行下一个命令。 
FS命令不会自动回滚的,即当前命令失败之后,上一条成功的对HDFS的操作命令不会撤销。因此一般使用时都会定义一个单独action来检查下面的FS ACTION将做的操作的source或者target文件是否有效等等。然后再进行FS ACTION定义的操作,避免中途失败造成的风险。
delete指定path如果是一个目录的话,会递归的删除子目录然后再删除该目录,类似hadoop fs -rm -r
mkdir会创建所有missing的目录,类似于hadoop fs -mkdir -p
move 命令的source必须存在,The file system URI(e.g. hdfs://{nameNode}) 在target path中可以被跳过,默认和source 的system URI一样,但是也可以指定target自己的 system URI. target path的父目录必须存在,如果target path已经存在的话,source path将作为target dirctory的子目录移动过去。
touchz 命令当path不存在时创建一个空文件(zero length file),如果存在的话,执行的是touch操作,即修改文件的时间戳
<recursive/> 表示递归操作即chmod -r ,chgrp -r
例子: 
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
    ...
    <action name="hdfscommands">
         <fs>
            <delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
            <mkdir path='archives/${wf:id()}'/>
            <move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
            <chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
            <chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
        </fs>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

Sub-workflow Action:
语法
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <sub-workflow>
            <app-path>[WF-APPLICATION-PATH]</app-path>
            <propagate-configuration/>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
        </sub-workflow>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

<propagate-configuration/>表示父workflow的job配置将被传递到子workflow中去.
子wf可以继承父wf引用的的jar包
,通过设置oozie-site.xml的oozie.subworkflow.classpath.inheritance=true(全局)或者job.properties的oozie.wf.subworkflow.classpath.inheritance =true(当前wf),job.properties中设置的优先级更高。
subworkflows的嵌套层级是有限制的,通过设置oozie-site.xml 中oozie.action.subworkflow.max.depth值来指定最大层级,默认为50.

Java Action:
执行的是java类的main方法,会在hadoop集群上以单mapper task的形式执行一个map-reduce job .
main方法中不能调用System.exit(int n),否则的话java action将始终执行error transition

参数化配置文件样例:
<workflow-app name='hello-wf' xmlns="uri:oozie:workflow:0.4">
    <parameters>
        <property>
            <name>inputDir</name>
        </property>
        <property>
            <name>outputDir</name>
            <value>out-dir</value>
        </property>
    </parameters>
    ...
    <action name='firstjob'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>com.foo.FirstMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>com.foo.FirstReducer</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='secondjob'/>
        <error to='killcleanup'/>
    </action>
    ...
</workflow-app>

如果inputDir 没有定义的话,oozie将打印一个错误消息在提交job的时候,但是如果outputDir 没有定义的话,oozie将使用parameters 部分设置的outputDir的默认值

Expression Language Functions:

Basic EL Constants

•KB: 1024, one kilobyte.
•MB: 1024 * KB, one megabyte.
•GB: 1024 * MB, one gigabyte.
•TB: 1024 * GB, one terabyte.
•PB: 1024 * TG, one petabyte.

All the above constants are of type long .

Basic EL Functions:

String firstNotNull(String value1, String value2) 返回首个不为null的value,如果两个都为null,则返回""而不是null

String concat(String s1, String s2) 连接字符串

String replaceAll(String src, String regex, String replacement) 正则表达式替换

String appendAll(String src, String append, String delimeter) 
 E.g. appendAll("/a/b/,/c/b/,/c/d/", "ADD", ",") will return /a/b/ADD,/c/b/ADD,/c/d/ADD . 
  
String trim(String s)  去除前后空格,null将返回""

String urlEncode(String s)  返回UTF-8 encoded string

String timestamp() 返回UTC current date and time in W3C format ,例1997-07-16T19:20:30.45Z

String toJsonStr(Map) 

It returns an XML encoded JSON representation of a Map. This function is useful to encode as a single property the complete action-data of an action, wf:actionData(String actionName) , in order to pass it in full to another action.

String toPropertiesStr(Map) (since Oozie 3.3)

It returns an XML encoded Properties representation of a Map. This function is useful to encode as a single property the complete action-data of an action, wf:actionData(String actionName) , in order to pass it in full to another action.

String toConfigurationStr(Map) (since Oozie 3.3)

It returns an XML encoded Configuration representation of a Map. This function is useful to encode as a single property the complete action-data of an action, wf:actionData(String actionName) , in order to pass it in full to another action.
 

Workflow EL Functions:

String wf:id() 

It returns the workflow job ID for the current workflow job.

String wf:name() 

It returns the workflow application name for the current workflow job.

String wf:appPath() 

It returns the workflow application path for the current workflow job.

String wf:conf(String name) 

It returns the value of the workflow job configuration property for the current workflow job, or an empty string if undefined.

String wf:user() 

It returns the user name that started the current workflow job.

String wf:group() 

It returns the group/ACL for the current workflow job.

String wf:callback(String stateVar) 

It returns the callback URL for the current workflow action node, stateVar can be a valid exit state (=OK= or ERROR ) for the action or a token to be replaced with the exit state by the remote system executing the task.

String wf:transition(String node) 

It returns the transition taken by the specified workflow action node, or an empty string if the action has not being executed or it has not completed yet.

String wf:lastErrorNode() 

It returns the name of the last workflow action node that exit with an ERROR exit state, or an empty string if no a ction has exited with ERROR state in the current workflow job.

String wf:errorCode(String node) 

It returns the error code for the specified action node, or an empty string if the action node has not exited with ERROR state.

Each type of action node must define its complete error code list.

String wf:errorMessage(String message) 

It returns the error message for the specified action node, or an empty string if no action node has not exited with ERROR state.

The error message can be useful for debugging and notification purposes.

int wf:run() 

It returns the run number for the current workflow job, normally 0 unless the workflow job is re-run, in which case indicates the current run.

Map wf:actionData(String node) 

This function is only applicable to action nodes that produce output data on completion.

The output data is in a Java Properties format and via this EL function it is available as a Map .

int wf:actionExternalId(String node) 

It returns the external Id for an action node, or an empty string if the action has not being executed or it has not completed yet.

int wf:actionTrackerUri(String node) 

It returns the tracker URIfor an action node, or an empty string if the action has not being executed or it has not completed yet.

int wf:actionExternalStatus(String node) 

It returns the external status for an action node, or an empty string if the action has not being executed or it has not completed yet.

Hadoop EL Constants

•RECORDS: Hadoop record counters group name.
•MAP_IN: Hadoop mapper input records counter name.
•MAP_OUT: Hadoop mapper output records counter name.
•REDUCE_IN: Hadoop reducer input records counter name.
•REDUCE_OUT: Hadoop reducer input record counter name.
•GROUPS: 1024 * Hadoop mapper/reducer record groups counter name.

Hadoop EL Functions:

Map < String, Map < String, Long > > hadoop:counters(String node)

  Example of MR action stats: 

{
    "ACTION_TYPE": "MAP_REDUCE",
    "org.apache.hadoop.mapred.JobInProgress$Counter": {
        "TOTAL_LAUNCHED_REDUCES": 1,
        "TOTAL_LAUNCHED_MAPS": 1,
        "DATA_LOCAL_MAPS": 1
    },
    "FileSystemCounters": {
        "FILE_BYTES_READ": 1746,
        "HDFS_BYTES_READ": 1409,
        "FILE_BYTES_WRITTEN": 3524,
        "HDFS_BYTES_WRITTEN": 1547
    },
    "org.apache.hadoop.mapred.Task$Counter": {
        "REDUCE_INPUT_GROUPS": 33,
        "COMBINE_OUTPUT_RECORDS": 0,
        "MAP_INPUT_RECORDS": 33,
        "REDUCE_SHUFFLE_BYTES": 0,
        "REDUCE_OUTPUT_RECORDS": 33,
        "SPILLED_RECORDS": 66,
        "MAP_OUTPUT_BYTES": 1674,
        "MAP_INPUT_BYTES": 1409,
        "MAP_OUTPUT_RECORDS": 33,
        "COMBINE_INPUT_RECORDS": 0,
        "REDUCE_INPUT_RECORDS": 33
    }
}

Below is the workflow that describes how to access specific information using hadoop:counters() EL function from the MR stats. *Workflow xml:* 

<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.SampleMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.SampleReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
                </property>
<property>
<name>oozie.action.external.stats.write</name>
<value>true</value>
</property>
            </configuration>
        </map-reduce>
        <ok to="java1"/>
        <error to="fail"/>
    </action>
    <action name="java1">
        <java>
   <job-tracker>${jobTracker}</job-tracker>
   <name-node>${nameNode}</name-node>
   <configuration>
      <property>
   <name>mapred.job.queue.name</name>
   <value>${queueName}</value>
</property>
   </configuration>
   <main-class>MyTest</main-class>
   <arg>  ${hadoop:counters("mr-node")["FileSystemCounters"]["FILE_BYTES_READ"]}</arg>
        <capture-output/>
        </java>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

HDFS EL Functions:
For all the functions in this section the path must include the FS URI. For example hdfs://foo:8020/user/tucu .

boolean fs:exists(String path) 

It returns true or false depending if the specified path URI exists or not.

boolean fs:isDir(String path) 

It returns true if the specified path URI exists and it is a directory, otherwise it returns false .

boolean fs:dirSize(String path) 

It returns the size in bytes of all the files in the specified path. If the path is not a directory, or if it does not exist it returns -1. It does not work recursively, only computes the size of the files under the specified path.

boolean fs:fileSize(String path) 

It returns the size in bytes of specified file. If the path is not a file, or if it does not exist it returns -1.

boolean fs:blockSize(String path) 

It returns the block size in bytes of specified file. If the path is not a file, or if it does not exist it returns -1.

share lib 覆盖(为什么要覆盖,可以扩展默认的action,按需求实现自己的功能)
有如下几种方式覆盖系统默认的lib:
•action.sharelib.for.#ACTIONTYPE# in the action configuration
•action.sharelib.for.#ACTIONTYPE# in the job configuration
•action.sharelib.for.#ACTIONTYPE# in the oozie server configuration
•action's ActionExecutor getDefaultShareLibName() method
可以指定多个share library directory,用逗号分隔,例如:
action.sharelib.for.pig=pig,hcatalog
来同时包含pig和hcatalog的jars

附录B:workflow example
Fork and Join Example
下面这个工作流定义样例分三个步骤执行了4个mapreduce jobs
,第一个job执行完之后,并行执行两个JOB,然后再执行一个JOB。
前一个step的jobs的输出会当做下一个step的jobs的输入.,

需要的参数: 
•jobtracker : JobTracker HOST:PORT
•namenode : NameNode HOST:PORT
•input : input directory
•output : output directory

<workflow-app name='example-forkjoinwf' xmlns="uri:oozie:workflow:0.1">
    <start to='firstjob' />
    <action name="firstjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${input}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="fork" />
        <error to="kill" />
    </action>
    <fork name='fork'>
        <path start='secondjob' />
        <path start='thirdjob' />
    </fork>
    <action name="secondjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp2</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join" />
        <error to="kill" />
    </action>
    <action name="thirdjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp3</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join" />
        <error to="kill" />
    </action>
    <join name='join' to='finalejob'/>
    <action name="finaljob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp2,/usr/foo/${wf:id()}/temp3
                    </value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${output}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end" />
        <ok to="kill" />
    </action>
    <kill name="kill">
        <message>Map/Reduce failed, error message[${wf:errorMessage()}]</message>
    </kill>
    <end name='end'/>
</workflow-app>
Tags: