Skip to content

Commit 2b86561

Browse files
authored
[Fix-17597][Master] Fix condition task node not run in workflow when pre-task node failed (#17606)
1 parent 6b75ea2 commit 2b86561

File tree

5 files changed

+329
-2
lines changed

5 files changed

+329
-2
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ public boolean isEndOfTaskChain(final ITaskExecutionRunnable taskExecutionRunnab
290290
return successors.get(taskExecutionRunnable.getName()).isEmpty()
291291
|| isTaskExecutionRunnableKilled(taskExecutionRunnable)
292292
|| isTaskExecutionRunnablePaused(taskExecutionRunnable)
293-
|| isTaskExecutionRunnableFailed(taskExecutionRunnable);
293+
|| (isTaskExecutionRunnableFailed(taskExecutionRunnable)
294+
&& !isAllSuccessorsAreConditionTask(taskExecutionRunnable));
294295
}
295296

296297
@Override
@@ -335,7 +336,8 @@ public boolean isAllSuccessorsAreConditionTask(final ITaskExecutionRunnable task
335336
}
336337
return successors.stream().allMatch(
337338
successor -> isTaskExecutionRunnableSkipped(successor)
338-
|| TaskTypeUtils.isConditionTask(taskExecutionRunnable.getTaskInstance().getTaskType()));
339+
|| (TaskTypeUtils.isConditionTask(successor.getTaskDefinition().getTaskType())
340+
&& !isTaskExecutionRunnableForbidden(successor)));
339341
}
340342

341343
private void assertTaskExecutionRunnableState(final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public void onFailedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
178178
// And the DAG will continue to execute.
179179
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
180180
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
181+
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
181182
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
182183
return;
183184
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,4 +1103,84 @@ public void testStartWorkflow_withTaskSuccessorsIsForbidden() {
11031103
});
11041104
masterContainer.assertAllResourceReleased();
11051105
}
1106+
1107+
@Test
1108+
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run success")
1109+
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runSuccess() {
1110+
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_success.yaml";
1111+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1112+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1113+
1114+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1115+
.workflowDefinition(parentWorkflow)
1116+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1117+
.build();
1118+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1119+
1120+
await()
1121+
.atMost(Duration.ofMinutes(1))
1122+
.untilAsserted(() -> {
1123+
Assertions
1124+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1125+
.matches(
1126+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1127+
1128+
Assertions
1129+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1130+
.hasSize(3)
1131+
.anySatisfy(taskInstance -> {
1132+
assertThat(taskInstance.getName()).isEqualTo("A");
1133+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1134+
})
1135+
.anySatisfy(taskInstance -> {
1136+
assertThat(taskInstance.getName()).isEqualTo("B");
1137+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1138+
})
1139+
.anySatisfy(taskInstance -> {
1140+
assertThat(taskInstance.getName()).isEqualTo("C");
1141+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1142+
});
1143+
});
1144+
masterContainer.assertAllResourceReleased();
1145+
}
1146+
1147+
@Test
1148+
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run failed")
1149+
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
1150+
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_failed.yaml";
1151+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1152+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
1153+
1154+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1155+
.workflowDefinition(parentWorkflow)
1156+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1157+
.build();
1158+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1159+
1160+
await()
1161+
.atMost(Duration.ofMinutes(1))
1162+
.untilAsserted(() -> {
1163+
Assertions
1164+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
1165+
.matches(
1166+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
1167+
1168+
Assertions
1169+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
1170+
.hasSize(3)
1171+
.anySatisfy(taskInstance -> {
1172+
assertThat(taskInstance.getName()).isEqualTo("A");
1173+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
1174+
})
1175+
.anySatisfy(taskInstance -> {
1176+
assertThat(taskInstance.getName()).isEqualTo("B");
1177+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1178+
})
1179+
.anySatisfy(taskInstance -> {
1180+
assertThat(taskInstance.getName()).isEqualTo("D");
1181+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
1182+
});
1183+
});
1184+
masterContainer.assertAllResourceReleased();
1185+
}
11061186
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# A(failed) -> B(success) -> D(success)
19+
project:
20+
name: MasterIntegrationTest
21+
code: 1
22+
description: This is a fake project
23+
userId: 1
24+
userName: admin
25+
createTime: 2024-08-12 00:00:00
26+
updateTime: 2021-08-12 00:00:00
27+
28+
workflows:
29+
- name: workflow_with_one_condition_task_with_one_fake_predecessor_failed
30+
code: 1
31+
version: 1
32+
projectCode: 1
33+
description: This is a fake workflow with one condition task which has one predecessor failed
34+
releaseState: ONLINE
35+
createTime: 2024-08-12 00:00:00
36+
updateTime: 2021-08-12 00:00:00
37+
userId: 1
38+
executionType: PARALLEL
39+
40+
tasks:
41+
- name: A
42+
code: 1
43+
version: 1
44+
projectCode: 1
45+
userId: 1
46+
taskType: LogicFakeTask
47+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"error"}'
48+
workerGroup: default
49+
createTime: 2024-08-12 00:00:00
50+
updateTime: 2021-08-12 00:00:00
51+
taskExecuteType: BATCH
52+
- name: B
53+
code: 2
54+
version: 1
55+
projectCode: 1
56+
userId: 1
57+
taskType: CONDITIONS
58+
taskParams: '{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
59+
workerGroup: default
60+
createTime: 2024-08-12 00:00:00
61+
updateTime: 2021-08-12 00:00:00
62+
taskExecuteType: BATCH
63+
- name: C
64+
code: 3
65+
version: 1
66+
projectCode: 1
67+
userId: 1
68+
taskType: LogicFakeTask
69+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
70+
workerGroup: default
71+
createTime: 2024-08-12 00:00:00
72+
updateTime: 2021-08-12 00:00:00
73+
taskExecuteType: BATCH
74+
- name: D
75+
code: 4
76+
version: 1
77+
projectCode: 1
78+
userId: 1
79+
taskType: LogicFakeTask
80+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
81+
workerGroup: default
82+
createTime: 2024-08-12 00:00:00
83+
updateTime: 2021-08-12 00:00:00
84+
taskExecuteType: BATCH
85+
86+
taskRelations:
87+
- projectCode: 1
88+
workflowDefinitionCode: 1
89+
workflowDefinitionVersion: 1
90+
preTaskCode: 0
91+
preTaskVersion: 0
92+
postTaskCode: 1
93+
postTaskVersion: 1
94+
createTime: 2024-08-12 00:00:00
95+
updateTime: 2024-08-12 00:00:00
96+
- projectCode: 1
97+
workflowDefinitionCode: 1
98+
workflowDefinitionVersion: 1
99+
preTaskCode: 1
100+
preTaskVersion: 1
101+
postTaskCode: 2
102+
postTaskVersion: 1
103+
createTime: 2024-08-12 00:00:00
104+
updateTime: 2024-08-12 00:00:00
105+
- projectCode: 1
106+
workflowDefinitionCode: 1
107+
workflowDefinitionVersion: 1
108+
preTaskCode: 2
109+
preTaskVersion: 1
110+
postTaskCode: 3
111+
postTaskVersion: 1
112+
createTime: 2024-08-12 00:00:00
113+
updateTime: 2024-08-12 00:00:00
114+
- projectCode: 1
115+
workflowDefinitionCode: 1
116+
workflowDefinitionVersion: 1
117+
preTaskCode: 2
118+
preTaskVersion: 1
119+
postTaskCode: 4
120+
postTaskVersion: 1
121+
createTime: 2024-08-12 00:00:00
122+
updateTime: 2024-08-12 00:00:00
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# A(success) -> B(success) -> C(success)
19+
project:
20+
name: MasterIntegrationTest
21+
code: 1
22+
description: This is a fake project
23+
userId: 1
24+
userName: admin
25+
createTime: 2024-08-12 00:00:00
26+
updateTime: 2021-08-12 00:00:00
27+
28+
workflows:
29+
- name: workflow_with_one_condition_task_with_one_fake_predecessor_success
30+
code: 1
31+
version: 1
32+
projectCode: 1
33+
description: This is a fake workflow with one condition task which has one predecessor success
34+
releaseState: ONLINE
35+
createTime: 2024-08-12 00:00:00
36+
updateTime: 2021-08-12 00:00:00
37+
userId: 1
38+
executionType: PARALLEL
39+
40+
tasks:
41+
- name: A
42+
code: 1
43+
version: 1
44+
projectCode: 1
45+
userId: 1
46+
taskType: LogicFakeTask
47+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
48+
workerGroup: default
49+
createTime: 2024-08-12 00:00:00
50+
updateTime: 2021-08-12 00:00:00
51+
taskExecuteType: BATCH
52+
- name: B
53+
code: 2
54+
version: 1
55+
projectCode: 1
56+
userId: 1
57+
taskType: CONDITIONS
58+
taskParams: '{"localParams":[],"resourceList":[],"dependence":{"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"depTaskCode":1,"status":"SUCCESS"}]}]},"conditionResult":{"successNode":[3],"failedNode":[4]}},'
59+
workerGroup: default
60+
createTime: 2024-08-12 00:00:00
61+
updateTime: 2021-08-12 00:00:00
62+
taskExecuteType: BATCH
63+
- name: C
64+
code: 3
65+
version: 1
66+
projectCode: 1
67+
userId: 1
68+
taskType: LogicFakeTask
69+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
70+
workerGroup: default
71+
createTime: 2024-08-12 00:00:00
72+
updateTime: 2021-08-12 00:00:00
73+
taskExecuteType: BATCH
74+
- name: D
75+
code: 4
76+
version: 1
77+
projectCode: 1
78+
userId: 1
79+
taskType: LogicFakeTask
80+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo failed"}'
81+
workerGroup: default
82+
createTime: 2024-08-12 00:00:00
83+
updateTime: 2021-08-12 00:00:00
84+
taskExecuteType: BATCH
85+
86+
taskRelations:
87+
- projectCode: 1
88+
workflowDefinitionCode: 1
89+
workflowDefinitionVersion: 1
90+
preTaskCode: 0
91+
preTaskVersion: 0
92+
postTaskCode: 1
93+
postTaskVersion: 1
94+
createTime: 2024-08-12 00:00:00
95+
updateTime: 2024-08-12 00:00:00
96+
- projectCode: 1
97+
workflowDefinitionCode: 1
98+
workflowDefinitionVersion: 1
99+
preTaskCode: 1
100+
preTaskVersion: 1
101+
postTaskCode: 2
102+
postTaskVersion: 1
103+
createTime: 2024-08-12 00:00:00
104+
updateTime: 2024-08-12 00:00:00
105+
- projectCode: 1
106+
workflowDefinitionCode: 1
107+
workflowDefinitionVersion: 1
108+
preTaskCode: 2
109+
preTaskVersion: 1
110+
postTaskCode: 3
111+
postTaskVersion: 1
112+
createTime: 2024-08-12 00:00:00
113+
updateTime: 2024-08-12 00:00:00
114+
- projectCode: 1
115+
workflowDefinitionCode: 1
116+
workflowDefinitionVersion: 1
117+
preTaskCode: 2
118+
preTaskVersion: 1
119+
postTaskCode: 4
120+
postTaskVersion: 1
121+
createTime: 2024-08-12 00:00:00
122+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)