diff --git a/examples/workflow/README.md b/examples/workflow/README.md index cf3cce610..03294f3f1 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -39,6 +39,8 @@ expected_stdout_lines: - "New counter value is: 11!" - "Retry count value is: 0!" - "Retry count value is: 1! This print statement verifies retry" + - "Infinite retry attempt: 1" + - "Infinite retry attempt: 11" - "Appending 1 to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" @@ -53,7 +55,7 @@ expected_stdout_lines: - "New counter value is: 111!" - "New counter value is: 1111!" - "Workflow completed! Result: Completed" -timeout_seconds: 30 +timeout_seconds: 60 --> ```sh @@ -69,6 +71,8 @@ The output of this example should look like this: - "New counter value is: 11!" - "Retry count value is: 0!" - "Retry count value is: 1! This print statement verifies retry" + - "Infinite retry attempt: 1" + - "Infinite retry attempt: 11" - "Appending 1 to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" diff --git a/examples/workflow/simple.py b/examples/workflow/simple.py index dc0ea0b6a..1d5e04439 100644 --- a/examples/workflow/simple.py +++ b/examples/workflow/simple.py @@ -31,6 +31,7 @@ child_orchestrator_count = 0 child_orchestrator_string = '' child_act_retry_count = 0 +infinite_retry_count = 0 instance_id = 'exampleInstanceID' child_instance_id = 'childInstanceID' workflow_name = 'hello_world_wf' @@ -48,6 +49,11 @@ retry_timeout=timedelta(seconds=100), ) +infinite_retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, +) + wfr = WorkflowRuntime() @@ -57,6 +63,7 @@ def hello_world_wf(ctx: DaprWorkflowContext, wf_input): yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_activity(hello_infinite_retryable_act, retry_policy=infinite_retry_policy) yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) # Change in event handling: Use when_any to handle both event and timeout @@ -91,6 +98,15 @@ def hello_retryable_act(ctx: WorkflowActivityContext): retry_count += 1 +@wfr.activity(name='hello_infinite_retryable_act') +def hello_infinite_retryable_act(ctx: WorkflowActivityContext): + global infinite_retry_count + infinite_retry_count += 1 + print(f'Infinite retry attempt: {infinite_retry_count}', flush=True) + if infinite_retry_count <= 10: + raise ValueError('Retryable Error') + + @wfr.workflow(name='child_retryable_wf') def child_retryable_wf(ctx: DaprWorkflowContext): global child_orchestrator_string, child_orchestrator_count @@ -128,11 +144,21 @@ def main(): wf_client.wait_for_workflow_start(instance_id) - # Sleep to let the workflow run initial activities - sleep(12) + # Poll for the workflow to finish the initial activities, the infinite retries, and + # the child workflows. Polling instead of a fixed sleep keeps the example fast. + for _ in range(120): # up to ~60 seconds at 0.5s intervals + if ( + counter == 11 + and retry_count == 2 + and infinite_retry_count == 11 + and child_orchestrator_string == '1aa2bb3cc' + ): + break + sleep(0.5) assert counter == 11 assert retry_count == 2 + assert infinite_retry_count == 11 assert child_orchestrator_string == '1aa2bb3cc' # Pause Test @@ -153,10 +179,13 @@ def main(): state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) if state.runtime_status.name == 'COMPLETED': print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + assert state.serialized_output.strip('"') == 'Completed' else: print(f'Workflow failed! Status: {state.runtime_status.name}') + raise AssertionError(f'Unexpected workflow status: {state.runtime_status.name}') except TimeoutError: print('*** Workflow timed out!') + raise wf_client.purge_workflow(instance_id=instance_id) try: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index 82bf062ac..6a427963e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -456,12 +456,37 @@ def __init__( self._task_execution_id = task_execution_id self._instance_id = instance_id self._app_id = app_id + # Running sum of delays for attempts [1.._attempt_count - 1]. Maintained by + # increment_attempt_count so compute_next_delay stays O(1) per call, which matters + # when max_number_of_attempts is -1 (infinite retries). + self._accumulated_delay_seconds = 0.0 + + def _delay_for_attempt(self, attempt: int) -> float: + """Compute the retry delay for a given attempt number (1-indexed), deterministically.""" + if self._retry_policy.backoff_coefficient is None: + backoff_coefficient = 1.0 + else: + backoff_coefficient = self._retry_policy.backoff_coefficient + delay = ( + math.pow(backoff_coefficient, attempt - 1) + * self._retry_policy.first_retry_interval.total_seconds() + ) + if self._retry_policy.max_retry_interval is not None: + delay = min(delay, self._retry_policy.max_retry_interval.total_seconds()) + return delay def increment_attempt_count(self) -> None: + # Fold the delay for the just-completed attempt into the running total, so + # compute_next_delay does not need to re-sum all past delays. + self._accumulated_delay_seconds += self._delay_for_attempt(self._attempt_count) self._attempt_count += 1 def compute_next_delay(self) -> Optional[timedelta]: - if self._attempt_count >= self._retry_policy.max_number_of_attempts: + # max_number_of_attempts == -1 means retry indefinitely; skip the attempt cap check. + if ( + self._retry_policy.max_number_of_attempts != -1 + and self._attempt_count >= self._retry_policy.max_number_of_attempts + ): return None retry_expiration: datetime = datetime.max @@ -471,32 +496,15 @@ def compute_next_delay(self) -> Optional[timedelta]: ): retry_expiration = self._start_time + self._retry_policy.retry_timeout - if self._retry_policy.backoff_coefficient is None: - backoff_coefficient = 1.0 - else: - backoff_coefficient = self._retry_policy.backoff_coefficient - - # Compute the next delay and the logical start time of the next attempt - # deterministically from accumulated delays, avoiding non-determinism during replay. - # range(1, attempt_count + 1) sums all delays up to and including the one about to - # be taken, so we're checking whether attempt N+1 would start within the timeout. - total_elapsed_seconds = 0.0 - next_delay_f = 0.0 - for i in range(1, self._attempt_count + 1): - next_delay_f = ( - math.pow(backoff_coefficient, i - 1) - * self._retry_policy.first_retry_interval.total_seconds() - ) - if self._retry_policy.max_retry_interval is not None: - next_delay_f = min( - next_delay_f, - self._retry_policy.max_retry_interval.total_seconds(), - ) - total_elapsed_seconds += next_delay_f + # Delay for the current attempt plus the cached sum of all prior attempts gives + # the logical start time of the next attempt — identical semantics to the old + # per-call loop, but O(1) instead of O(n). + next_delay_seconds = self._delay_for_attempt(self._attempt_count) + total_elapsed_seconds = self._accumulated_delay_seconds + next_delay_seconds logical_next_attempt_start = self._start_time + timedelta(seconds=total_elapsed_seconds) if logical_next_attempt_start < retry_expiration: - return timedelta(seconds=next_delay_f) + return timedelta(seconds=next_delay_seconds) return None @@ -662,7 +670,7 @@ def __init__( first_retry_interval : timedelta The retry interval to use for the first retry attempt. max_number_of_attempts : int - The maximum number of retry attempts. + The maximum number of retry attempts. Use ``-1`` for infinite retries. backoff_coefficient : Optional[float] The backoff coefficient to use for calculating the next retry interval. max_retry_interval : Optional[timedelta] @@ -678,8 +686,8 @@ def __init__( # validate inputs if first_retry_interval < timedelta(seconds=0): raise ValueError('first_retry_interval must be >= 0') - if max_number_of_attempts < 1: - raise ValueError('max_number_of_attempts must be >= 1') + if max_number_of_attempts == 0 or max_number_of_attempts < -1: + raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries') if backoff_coefficient is not None and backoff_coefficient < 1: raise ValueError('backoff_coefficient must be >= 1') if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py index 255e7bc6c..8329ac166 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -40,6 +40,7 @@ def __init__( Args: first_retry_interval(timedelta): The retry interval to use for the first retry attempt. max_number_of_attempts(int): The maximum number of retry attempts. + Use ``-1`` for infinite retries. backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating the next retry interval. max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any @@ -50,8 +51,8 @@ def __init__( # validate inputs if first_retry_interval < timedelta(seconds=0): raise ValueError('first_retry_interval must be >= 0') - if max_number_of_attempts < 1: - raise ValueError('max_number_of_attempts must be >= 1') + if max_number_of_attempts == 0 or max_number_of_attempts < -1: + raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries') if backoff_coefficient is not None and backoff_coefficient < 1: raise ValueError('backoff_coefficient must be >= 1') if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 953155027..fae9edc3c 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2654,6 +2654,173 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): assert retry_timer_2.childWorkflowRetry.instanceId != expected_second_child_id +def test_activity_infinite_retry_policy_succeeds_after_many_failures(): + """With max_number_of_attempts=-1, the executor keeps scheduling retries + past any finite attempt cap and the workflow completes once the activity + eventually succeeds.""" + + def dummy_activity(ctx, _): + pass # behavior is simulated via history events below + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + old_events: list = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + + # Simulate 5 consecutive failures - far beyond any default finite cap. + # Each failure must produce a retry timer, not a workflow completion. + next_schedule_id = 1 + for failure_num in range(5): + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_failed_event(next_schedule_id, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1, f'failure #{failure_num + 1} should create a retry timer' + assert result.actions[0].HasField('createTimer'), ( + f'failure #{failure_num + 1} must schedule a retry timer, not complete the workflow' + ) + + timer_id = result.actions[0].id + fire_at = result.actions[0].createTimer.fireAt.ToDatetime() + + # Fire the timer and observe the executor schedule the next attempt. + old_events = old_events + new_events + current_timestamp = fire_at + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(timer_id, current_timestamp), + helpers.new_timer_fired_event(timer_id, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('scheduleTask'), ( + f'retry #{failure_num + 1} must re-schedule the activity' + ) + next_schedule_id = result.actions[0].id + old_events = ( + old_events + + new_events + + [ + helpers.new_task_scheduled_event(next_schedule_id, task.get_name(dummy_activity)), + ] + ) + + # 6th attempt succeeds -> the workflow must complete successfully. + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_completed_event(next_schedule_id, encoded_output=json.dumps('ok')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete = get_and_validate_single_complete_workflow_action(result.actions) + assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete.result.value) == 'ok' + + +def test_sub_orchestration_infinite_retry_policy_succeeds_after_many_failures(): + """Parallel to the activity test: sub-orchestrations with max_number_of_attempts=-1 + keep retrying past any finite cap, and the parent completes when the child succeeds.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + return 'child-ok' # behavior is simulated via history events below + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events: list = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', first_child_id), + ] + next_child_id = 1 + + # Simulate 4 consecutive child-workflow failures - each must schedule a retry timer. + for failure_num in range(4): + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_failed_event(next_child_id, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('createTimer'), ( + f'child failure #{failure_num + 1} must schedule a retry timer' + ) + timer_id = result.actions[0].id + fire_at = result.actions[0].createTimer.fireAt.ToDatetime() + + old_events = old_events + new_events + current_timestamp = fire_at + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(timer_id, current_timestamp), + helpers.new_timer_fired_event(timer_id, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('createChildWorkflow'), ( + f'retry #{failure_num + 1} must create a new child workflow' + ) + next_child_id = result.actions[0].id + retried_child_instance_id = f'{TEST_INSTANCE_ID}:{next_child_id:04d}' + old_events = ( + old_events + + new_events + + [ + helpers.new_child_workflow_created_event( + next_child_id, 'child_orchestrator', retried_child_instance_id + ), + ] + ) + + # 5th attempt succeeds -> the parent workflow must complete with the child's result. + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_completed_event( + next_child_id, encoded_output=json.dumps('child-ok') + ), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete = get_and_validate_single_complete_workflow_action(result.actions) + assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete.result.value) == 'child-ok' + + def get_and_validate_single_complete_workflow_action( actions: list[pb.WorkflowAction], ) -> pb.CompleteWorkflowAction: diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_task.py b/ext/dapr-ext-workflow/tests/durabletask/test_task.py index a381ab3d1..6226cdc5c 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_task.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_task.py @@ -11,6 +11,8 @@ """Unit tests for durabletask.task primitives.""" +from datetime import datetime, timedelta + import dapr.ext.workflow._durabletask.internal.helpers as pbh import pytest from dapr.ext.workflow._durabletask import task @@ -195,3 +197,74 @@ def test_when_all_failure_propagates_to_parent(): # The parent WhenAnyTask should also have completed assert any_task.is_complete assert any_task.get_result() is all_task + + +def test_retry_policy_accepts_infinite_max_attempts(): + """RetryPolicy(max_number_of_attempts=-1) is allowed and means infinite retries.""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1) + assert policy.max_number_of_attempts == -1 + + +@pytest.mark.parametrize('invalid', [0, -2, -5]) +def test_retry_policy_rejects_invalid_max_attempts(invalid): + """RetryPolicy rejects zero and values below -1.""" + with pytest.raises(ValueError): + task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=invalid) + + +def test_retryable_task_infinite_keeps_computing_next_delay(): + """When max_number_of_attempts == -1, compute_next_delay never returns None + due to the attempt cap (subject only to retry_timeout).""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # Simulate many failed attempts; delay should continue to be produced. + for _ in range(50): + retryable.increment_attempt_count() + assert retryable.compute_next_delay() is not None + + +def test_retryable_task_infinite_still_respects_retry_timeout(): + """When max_number_of_attempts == -1, retry_timeout must still cap the wall-clock + window so retries do not run forever.""" + policy = task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + retry_timeout=timedelta(seconds=3), + ) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # Each attempt adds 1s of delay. First few attempts stay inside the 3s window. + retryable.increment_attempt_count() # attempt 2 -> cumulative delay 2s + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # attempt 3 -> cumulative delay 3s + # At the boundary, the logical next start == retry_expiration, which is not "<" + assert retryable.compute_next_delay() is None + + +def test_retryable_task_stops_after_max_attempts(): + """With a finite cap, compute_next_delay returns None once the cap is reached.""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # attempt 1 -> has more retries + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # -> 2 + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # -> 3 + assert retryable.compute_next_delay() is None diff --git a/ext/dapr-ext-workflow/tests/test_retry_policy.py b/ext/dapr-ext-workflow/tests/test_retry_policy.py new file mode 100644 index 000000000..193ee3ae6 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_retry_policy.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest +from datetime import timedelta + +from dapr.ext.workflow.retry_policy import RetryPolicy + + +class RetryPolicyTests(unittest.TestCase): + @staticmethod + def _execute_with_retry(operation, retry_policy: RetryPolicy): + attempts = 0 + while True: + attempts += 1 + try: + operation(attempts) + return attempts + except ValueError: + if ( + retry_policy.max_number_of_attempts != -1 + and attempts >= retry_policy.max_number_of_attempts + ): + raise + + def test_allow_infinite_max_number_of_attempts(self): + retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1 + ) + + self.assertEqual(-1, retry_policy.max_number_of_attempts) + + def test_infinite_retries_succeeds_after_ten_failures(self): + retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1 + ) + + def flaky_operation(attempt: int): + if attempt <= 10: + raise ValueError('Retryable Error') + + attempts = self._execute_with_retry(flaky_operation, retry_policy=retry_policy) + + self.assertEqual(11, attempts) + + def test_reject_invalid_max_number_of_attempts(self): + with self.assertRaises(ValueError): + RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=0) + + with self.assertRaises(ValueError): + RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-2)