Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix remediated task with retry #200

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

In Development
Copy link
Member

@arm4b arm4b Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll take it to st2 v3.2.0

@m4dcoder I guess we'll release v1.1.1 orquesta once its merged to include in st2 dependencies?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll do a release of v1.1.1.

--------------

Fixed
~~~~~

* Fix task retry where transition on error is also executed along with retry. (bug fix)

1.1.0
-----

Expand Down
2 changes: 1 addition & 1 deletion orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ def update_task_state(self, task_id, route, event):
# the condition if a retry for the task is required.
if (self.get_workflow_status() in statuses.ACTIVE_STATUSES and
self._evaluate_task_retry(task_state_entry, current_ctx)):
self.update_task_state(task_id, route, events.TaskRetryEvent())
return self.update_task_state(task_id, route, events.TaskRetryEvent())

# Evaluate task transitions if task is completed and status change is not processed.
if new_task_status in statuses.COMPLETED_STATUSES and new_task_status != old_task_status:
Expand Down
91 changes: 91 additions & 0 deletions orquesta/tests/unit/conducting/test_task_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,97 @@ def test_retries_exhausted(self):
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
self.assertListEqual(expected_task_sequence, actual_task_sequence)

def test_retries_exhausted_and_task_remediated(self):
wf_def = """
version: 1.0

tasks:
task1:
action: core.echo message="$RANDOM"
retry:
count: 3
next:
- when: <% succeeded() %>
do: task2
- when: <% failed() %>
do: task3
task2:
action: core.noop
task3:
action: core.echo message="BOOM!"
next:
- do: fail
"""

expected_tk1_action_spec = {'action': 'core.echo', 'input': {'message': '$RANDOM'}}
expected_tk3_action_spec = {'action': 'core.echo', 'input': {'message': 'BOOM!'}}

spec = native_specs.WorkflowSpec(wf_def)
self.assertDictEqual(spec.inspect(), {})

conductor = conducting.WorkflowConductor(spec)
conductor.request_workflow_status(statuses.RUNNING)

# Failed execution for task1.
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #1 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['count'], 3)
self.assertEqual(tk1_state['retry']['tally'], 1)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #2 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['tally'], 2)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #3 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['tally'], 3)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Assert task1 failed and the workflow execution progresses to task3.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.FAILED)
self.assertEqual(tk1_state['retry']['tally'], 3)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task3')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk3_action_spec)

# Successful execution for task3.
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
tk3_state = conductor.get_task_state_entry('task3', 0)
self.assertEqual(tk3_state['status'], statuses.SUCCEEDED)

# Assert workflow failed (manual under task3).
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)

# Assert there is only a single task1 and a single task3 in the task sequences.
expected_task_sequence = ['task1', 'task3', 'fail']
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
self.assertListEqual(expected_task_sequence, actual_task_sequence)

def test_retry_delay_with_task_delay_defined(self):
wf_def = """
version: 1.0
Expand Down