Oozie 的多个action对一个action的依赖问题

现在的任务是,并行有25个action命名为merge_n执行shell做数据同步,执行结束后每一个merge_n执行对应的clean_n做数据清洗,在执行clean之前要依赖其中某一个(例如clean_1)先执行成功。
任务类似:
merge_1 ok to clean_1
Merge_2 ok to clean_2
。 . . .
以上是并行关系。我用的是fork/join
但是这个依赖怎么写呢,其他24个clean的shell依赖于第一个clean执行完成才可以执行
麻烦给出比较详细的代码说明

你的决策控制节点是怎么定义呢,给我们看看呢

下面是一个简单的Shell脚本示例,其中包括了25个并行执行的merge_n任务以及clean_n任务,其中clean_n任务依赖于第一个clean_1任务成功执行。


#!/bin/bash

# Function to execute a merge task
function merge_task() {
    merge_n=$1
    echo "Running merge_$merge_n"
    # Add your merge_n command here
    # Example: merge_command_$merge_n
    sleep 2  # Simulate merge task execution
    echo "merge_$merge_n completed"
}

# Function to execute a clean task
function clean_task() {
    clean_n=$1
    echo "Waiting for clean_1 to complete before running clean_$clean_n"
    wait_for_completion clean_1
    echo "Running clean_$clean_n"
    # Add your clean_n command here
    # Example: clean_command_$clean_n
    sleep 2  # Simulate clean task execution
    echo "clean_$clean_n completed"
}

# Function to wait for the completion of a task
function wait_for_completion() {
    task_name=$1
    # Add logic here to check if the task with name $task_name has completed
    # Example: Use a flag or check if a file exists indicating completion
    while [ ! -f "$task_name.completed" ]; do
        sleep 1
    done
}

# Execute merge tasks in parallel
for ((merge_n = 1; merge_n <= 25; merge_n++)); do
    merge_task $merge_n &
done

# Execute clean tasks in parallel with dependencies
for ((clean_n = 1; clean_n <= 25; clean_n++)); do
    if [ "$clean_n" -eq 1 ]; then
        clean_task $clean_n &
    else
        clean_task $clean_n &
        # Mark clean_1 as completed to allow others to proceed
        touch clean_1.completed
    fi
done

# Wait for all tasks to complete
wait

# Cleanup: Remove completion markers
rm *.completed

这个脚本中,首先定义了merge_task函数和clean_task函数来执行merge_n和clean_n任务。在clean_task函数中,它会等待clean_1任务完成后才会执行自身。在执行任务时,使用&来在后台执行任务,实现并行。

在脚本的最后,使用wait命令来等待所有任务完成。此外,在完成任务后,通过删除.completed文件来清除任务的完成标志。

参考结合GPT4.0、文心一言,如有帮助,恭请采纳。

首先,你可以使用fork操作来并行执行merge_n任务。然后,在每个merge_n任务之后,使用kill操作来等待clean_1任务完成。一旦clean_1任务成功完成,kill操作将释放其他clean_n任务继续执行。

以下是一个简单的示例,期望可以帮助到你:

<workflow-app xmlns="uri:oozie:workflow:0.1">  
    <start to="merge-node" />  
  
    <fork name="merge-node">  
        <path start="merge-1" />  
        <path start="merge-2" />  
        <!-- 添加其他merge_n任务 -->  
    </fork>  
  
    <action name="merge-1">  
        <shell xmlns="uri:oozie:shell-action:0.1">  
            <job-tracker>${jobTracker}</job-tracker>  
            <name-node>${nameNode}</name-node>  
            <exec>${merge_1_script}</exec>  
            <argument>${merge_1_arg}</argument>  
        </shell>  
        <ok to="clean-1" />  
        <error to="kill-node" />  
    </action>  
  
    <!-- 添加其他merge_n任务 -->  
  
    <fork name="clean-node">  
        <path start="clean-1" />  
        <path start="clean-2" />  
        <!-- 添加其他clean_n任务 -->  
    </fork>  
  
    <action name="clean-1">  
        <shell xmlns="uri:oozie:shell-action:0.1">  
            <job-tracker>${jobTracker}</job-tracker>  
            <name-node>${nameNode}</name-node>  
            <exec>${clean_1_script}</exec>  
            <argument>${clean_1_arg}</argument>  
        </shell>  
        <ok to="end" />  
        <error to="kill-node" />  
    </action>  
  
    <!-- 添加其他clean_n任务 -->  
  
    <join name="kill-node">  
        <ok to="end" />  
        <error to="end" />  
    </join>  
  
    <end name="end" />  
</workflow-app>


该回答通过自己思路及引用到GPTᴼᴾᴱᴺᴬᴵ搜索,得到内容具体如下:
在Oozie中,可以使用工作流的依赖关系来处理这种并行执行的情况。在这种情况下,你需要使用fork/join工作流类型,并设置适当的依赖关系。
以下是一个示例的工作流定义文件(workflow.xml),展示了如何设置这种依赖关系:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="multi_action_workflow">
    <start to="merge_n"/>

    <action name="merge_n">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <!-- 在这里编写执行数据同步的代码 -->
        </shell>
        <ok to="clean_n"/>
        <error to="fail"/>
    </action>

    <action name="clean_n">
        <ok to="end"/>
        <fork to="check_clean_1"/>
        <onsuccess to="clean_1"/>
        <onfailure to="fail"/>
    </action>

    <action name="check_clean_1">
        <ok to="clean_1"/>
        <error to="fail"/>
    </action>

    <action name="clean_1">
        <!-- 在这里编写执行数据清洗的代码 -->
    </action>

    <kill name="fail">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>

    <end name="end"/>
</workflow-app>

在这个示例中,我们定义了4个action:merge_nclean_ncheck_clean_1clean_1merge_n是主要的action,它执行数据同步操作。如果merge_n成功完成,则执行clean_n
clean_n是一个fork/join类型的action,它首先尝试执行check_clean_1。如果check_clean_1成功完成,则执行clean_1;否则,它将失败并返回错误消息。
通过这种方式,你可以确保每个clean_n操作都依赖于一个成功的clean_1操作之后才能执行。
请注意,上述示例中的代码部分需要根据你的具体需求进行修改,以实现实际的数据同步和清洗逻辑。


如果以上回答对您有所帮助,点击一下采纳该答案~谢谢



#!/bin/bash
# Function to execute a merge task
function merge_task() {
    merge_n=$1
    echo "Running merge_$merge_n"
    # Add your merge_n command here
    # Example: merge_command_$merge_n
    sleep 2  # Simulate merge task execution
    echo "merge_$merge_n completed"
}
# Function to execute a clean task
function clean_task() {
    clean_n=$1
    echo "Waiting for clean_1 to complete before running clean_$clean_n"
    wait_for_completion clean_1
    echo "Running clean_$clean_n"
    # Add your clean_n command here
    # Example: clean_command_$clean_n
    sleep 2  # Simulate clean task execution
    echo "clean_$clean_n completed"
}
# Function to wait for the completion of a task
function wait_for_completion() {
    task_name=$1
    # Add logic here to check if the task with name $task_name has completed
    # Example: Use a flag or check if a file exists indicating completion
    while [ ! -f "$task_name.completed" ]; do
        sleep 1
    done
}
# Execute merge tasks in parallel
for ((merge_n = 1; merge_n <= 25; merge_n++)); do
    merge_task $merge_n &
done
# Execute clean tasks in parallel with dependencies
for ((clean_n = 1; clean_n <= 25; clean_n++)); do
    if [ "$clean_n" -eq 1 ]; then
        clean_task $clean_n &
    else
        clean_task $clean_n &
        # Mark clean_1 as completed to allow others to proceed
        touch clean_1.completed
    fi
done
# Wait for all tasks to complete
wait
# Cleanup: Remove completion markers
rm *.completed

梳理好顺序再写。

援引讯飞星火:
在Oozie中,可以使用fork/join工作流来实现多个action之间的依赖关系。下面是一个示例的代码说明,演示了如何实现你描述的任务:

首先,你需要创建一个工作流定义文件(例如workflow.xml),其中包含所有的action和它们的执行顺序。在这个例子中,我们有两个action:merge_n和clean_n。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="example-workflow">
    <start to="merge_1"/>
    <action name="merge_1">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <!-- 这里是执行数据同步的shell命令 -->
        </shell>
        <ok to="clean_1"/>
        <error to="fail"/>
    </action>
    <action name="clean_1">
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <action name="clean_2">
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <!-- 其他23个clean_n action... -->
    <kill name="fail">
        <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

在这个工作流定义中,我们定义了一个起始action(start)来启动整个工作流。然后,我们定义了两个action:merge_1和clean_1。当merge_1执行成功时,它会将控制传递给clean_1,表示clean_1可以开始执行。如果merge_1执行失败,则控制会转移到fail节点,表示整个工作流失败。

对于每个clean_n action,你可以按照相同的方式定义它们,并将它们连接到相应的merge_n action。确保每个clean_n action在连接到下一个merge_n action之前先执行成功。

最后,我们定义了一个结束action(end),它将在所有action执行成功后触发。

通过这种方式,你可以使用Oozie的fork/join工作流来实现多个action之间的依赖关系。每个action都会等待其前一个action成功完成后再继续执行。

结合GPT给出回答如下请题主参考
可以使用Oozie的控制流功能来解决这个依赖问题。具体来说,可以将25个merge_n作为并行执行的子工作流(sub-workflow),其中每个子工作流包含两个action:一个是执行shell命令的merge_n,另一个是执行shell命令的clean_n。在子工作流的action之间添加控制节点,使得merge_n的执行结束后才会执行clean_n。同时,为了能够确定要依赖哪个clean_n,可以使用Oozie的决策节点(decision node)来根据先前的执行情况来确定依赖关系。以下是一个可能的Oozie工作流示例代码:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="data-sync">
  <start to="merge-flow"/>
  <action name="merge-flow">
    <parallel>
      <workflow>
        <app-path>${appPath}/sub-workflow-1</app-path>
      </workflow>
      <workflow>
        <app-path>${appPath}/sub-workflow-2</app-path>
      </workflow>
      <!-- more workflows... -->
      <workflow>
        <app-path>${appPath}/sub-workflow-25</app-path>
      </workflow>
    </parallel>
    <ok to="clean-decision"/>
    <error to="fail"/>
  </action>
  <decision name="clean-decision">
    <switch>
      <case to="clean-1">${wf:actionData('sub-workflow-1')['status'] == 'OK'}</case>
      <case to="clean-2">${wf:actionData('sub-workflow-2')['status'] == 'OK'}</case>
      <!-- more cases... -->
      <case to="clean-25">${wf:actionData('sub-workflow-25')['status'] == 'OK'}</case>
      <default to="fail"/>
    </switch>
  </decision>
  <action name="clean-1">
    <shell xmlns="uri:oozie:shell-action:0.2">
      <!-- run clean-1 script -->
    </shell>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <action name="clean-2">
    <shell xmlns="uri:oozie:shell-action:0.2">
      <!-- run clean-2 script -->
    </shell>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <!-- more clean actions... -->
  <action name="clean-25">
    <shell xmlns="uri:oozie:shell-action:0.2">
      <!-- run clean-25 script -->
    </shell>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <end name="end"/>
  <kill name="fail">
    <message>Workflow failed</message>
  </kill>
</workflow-app>

在上述示例中,一共有两个控制节点:parallel节点和decision节点。parallel节点用来并行执行25个子工作流,每个子工作流中包含merge_n和clean_n两个action。当所有子工作流中的merge_n都执行完毕后,控制流会进入decision节点。decision节点使用switch语句来根据先前的执行情况来确定依赖关系,具体来说,当某个子工作流中的merge_n执行成功(即status为'OK')时,就会进入相应的clean_n执行。如果所有子工作流中的merge_n都执行完毕后,仍然有clean_n未被执行,就会进入default分支,从而使得整个工作流失败。

大数据云计算——Oozie组件
可以参考下


Oozie Ssh Action问题排查_ywheel1989的博客-CSDN博客 最近在我们的其中一个现网环境中部署MR程序,MR程序的调度自然是用Oozie了。在Oozie的Workflow中,我们使用ssh action登录到一台节点上,并且在该节点上部署了脚本做数据库的建表操作。问题出在ssh action中,并且抛出了一个Cannot run program "scp": error=error=2, No such file or directory的错误 https://blog.csdn.net/ywheel1989/article/details/51931057/

引用 皆我百晓生 小程序回复内容作答:
要实现其他24个clean的shell依赖于第一个clean执行完成才能执行的依赖关系,可以使用Oozie的decision节点来判断条件并控制工作流的执行流程。以下提供了一种实现方式:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="my_workflow">
  
  <!-- 定义fork元素,将25个merge_n动作放在fork中 -->
  <fork name="fork_node">
    <path start="merge_1" />
    <path start="merge_2" />
    <path start="merge_3" />
    <!-- 添加其他merge_n动作路径 -->
  </fork>
  
  <!-- 定义merge_n动作 -->
  <action name="merge_1">
    <!-- 设置merge_1所需的配置(执行shell等) -->
    <shell xmlns="uri:oozie:shell-action:0.3">
      <!-- 设置shell脚本命令 -->
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <!-- 设置其他属性和参数 -->
    </shell>
    <ok to="check_clean_1" />
    <error to="error_handling_node" />
  </action>

  <!-- 定义decision节点来判断是否执行clean_1 -->
  <decision name="check_clean_1">
    <switch>
      <case to="clean_1">${wf:lastErrorNode() eq 'merge_1'}</case>
      <default to="end_node" />
    </switch>
  </decision>

  <!-- 定义clean_1动作 -->
  <action name="clean_1">
    <!-- 设置clean_1所需的配置(执行shell等) -->
    <shell xmlns="uri:oozie:shell-action:0.3">
      <!-- 设置shell脚本命令 -->
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <!-- 设置其他属性和参数 -->
    </shell>
    <ok to="join_node" />
    <error to="error_handling_node" />
  </action>

  <!-- 定义join元素,将24个clean_n动作放在join中 -->
  <join name="join_node" to="end_node" />
  
  <!-- 其他clean_n动作 -->
  
  <!-- 定义结束节点 -->
  <kill name="error_handling_node">
    <message>Error occurred, check logs</message>
  </kill>
  
  <end name="end_node" />
</workflow-app>

在上述代码中:

  1. 使用fork节点将25个merge_n动作放在fork中,并行执行。
  2. merge_1动作完成后,使用决策节点(decision node)check_clean_1来判断是否要继续执行clean_1。
  3. 如果上一步执行的是merge_1(通过wf:lastErrorNode()函数判断),则执行clean_1。否则,直接进入结束节点。
  4. clean_1完成后,使用join节点将其他24个clean_n动作放在join中,只有当所有的clean_n动作完成后,流程才会进入结束节点。

请替换示例代码中的占位符${jobTracker}${nameNode}为实际的JobTracker和NameNode的URL或配置。

使用以上代码,就可以实现其他24个clean的shell依赖于第一个clean执行成功后才能执行的依赖关系。