Skip to content

Commit

Permalink
Merge pull request #200 from StackStorm/fix-task-retry
Browse files Browse the repository at this point in the history
Fix remediated task with retry
  • Loading branch information
m4dcoder authored Apr 24, 2020
2 parents afdb704 + 36a3475 commit 1e2d962
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

In Development
--------------

Fixed
~~~~~

* Fix task retry where transition on error is also executed along with retry. (bug fix)

1.1.0
-----

Expand Down
2 changes: 1 addition & 1 deletion orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ def update_task_state(self, task_id, route, event):
# the condition if a retry for the task is required.
if (self.get_workflow_status() in statuses.ACTIVE_STATUSES and
self._evaluate_task_retry(task_state_entry, current_ctx)):
self.update_task_state(task_id, route, events.TaskRetryEvent())
return self.update_task_state(task_id, route, events.TaskRetryEvent())

# Evaluate task transitions if task is completed and status change is not processed.
if new_task_status in statuses.COMPLETED_STATUSES and new_task_status != old_task_status:
Expand Down
91 changes: 91 additions & 0 deletions orquesta/tests/unit/conducting/test_task_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,97 @@ def test_retries_exhausted(self):
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
self.assertListEqual(expected_task_sequence, actual_task_sequence)

def test_retries_exhausted_and_task_remediated(self):
wf_def = """
version: 1.0
tasks:
task1:
action: core.echo message="$RANDOM"
retry:
count: 3
next:
- when: <% succeeded() %>
do: task2
- when: <% failed() %>
do: task3
task2:
action: core.noop
task3:
action: core.echo message="BOOM!"
next:
- do: fail
"""

expected_tk1_action_spec = {'action': 'core.echo', 'input': {'message': '$RANDOM'}}
expected_tk3_action_spec = {'action': 'core.echo', 'input': {'message': 'BOOM!'}}

spec = native_specs.WorkflowSpec(wf_def)
self.assertDictEqual(spec.inspect(), {})

conductor = conducting.WorkflowConductor(spec)
conductor.request_workflow_status(statuses.RUNNING)

# Failed execution for task1.
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #1 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['count'], 3)
self.assertEqual(tk1_state['retry']['tally'], 1)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #2 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['tally'], 2)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Failed retry #3 for task1.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.RETRYING)
self.assertEqual(tk1_state['retry']['tally'], 3)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task1')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk1_action_spec)
self.forward_task_statuses(conductor, 'task1', [statuses.RUNNING, statuses.FAILED])

# Assert task1 failed and the workflow execution progresses to task3.
tk1_state = conductor.get_task_state_entry('task1', 0)
self.assertEqual(tk1_state['status'], statuses.FAILED)
self.assertEqual(tk1_state['retry']['tally'], 3)
next_tasks = conductor.get_next_tasks()
self.assertEqual(len(next_tasks), 1)
self.assertEqual(next_tasks[0]['id'], 'task3')
self.assertDictEqual(next_tasks[0]['actions'][0], expected_tk3_action_spec)

# Successful execution for task3.
self.forward_task_statuses(conductor, 'task3', [statuses.RUNNING, statuses.SUCCEEDED])
tk3_state = conductor.get_task_state_entry('task3', 0)
self.assertEqual(tk3_state['status'], statuses.SUCCEEDED)

# Assert workflow failed (manual under task3).
self.assertEqual(conductor.get_workflow_status(), statuses.FAILED)

# Assert there is only a single task1 and a single task3 in the task sequences.
expected_task_sequence = ['task1', 'task3', 'fail']
actual_task_sequence = [item['id'] for item in conductor.workflow_state.sequence]
self.assertListEqual(expected_task_sequence, actual_task_sequence)

def test_retry_delay_with_task_delay_defined(self):
wf_def = """
version: 1.0
Expand Down

0 comments on commit 1e2d962

Please sign in to comment.