From 4d3bc301e5b86311687cc8e88bc11cb2a3a56dea Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 24 Feb 2026 22:01:00 +0100 Subject: [PATCH 1/6] chore: Use dapr-bot to create backport PRs, so they can trigger workflows (#931) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Albert Callarisa Signed-off-by: Matheus André <92062874+matheusandre1@users.noreply.github.com> --- .github/workflows/backport.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backport.yaml b/.github/workflows/backport.yaml index 5be6f43d2..71b9716cb 100644 --- a/.github/workflows/backport.yaml +++ b/.github/workflows/backport.yaml @@ -34,4 +34,4 @@ jobs: steps: - uses: tibdex/backport@9565281eda0731b1d20c4025c43339fb0a23812e with: - github_token: ${{ secrets.GITHUB_TOKEN }} + github_token: ${{ secrets.DAPR_BOT_TOKEN }} From 6522c27f5fc11d14e713d9ddff9ca72dbe5b422a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matheus=20Andr=C3=A9?= <92062874+matheusandre1@users.noreply.github.com> Date: Fri, 27 Feb 2026 15:42:43 -0300 Subject: [PATCH 2/6] Allow infinite retries for workflows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Matheus André <92062874+matheusandre1@users.noreply.github.com> --- examples/workflow/simple.py | 24 ++++++- .../dapr/ext/workflow/retry_policy.py | 5 +- .../dapr/ext/workflow/workflow_runtime.py | 5 +- .../tests/test_retry_policy.py | 63 +++++++++++++++++++ .../tests/test_workflow_runtime.py | 7 ++- 5 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 ext/dapr-ext-workflow/tests/test_retry_policy.py diff --git a/examples/workflow/simple.py b/examples/workflow/simple.py index dc0ea0b6a..1123cf762 100644 --- a/examples/workflow/simple.py +++ b/examples/workflow/simple.py @@ -31,6 +31,7 @@ child_orchestrator_count = 0 child_orchestrator_string = '' child_act_retry_count = 0 +infinite_retry_count = 0 instance_id = 'exampleInstanceID' child_instance_id = 'childInstanceID' workflow_name = 'hello_world_wf' @@ -48,6 +49,11 @@ retry_timeout=timedelta(seconds=100), ) +infinite_retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, +) + wfr = WorkflowRuntime() @@ -57,6 +63,7 @@ def hello_world_wf(ctx: DaprWorkflowContext, wf_input): yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_activity(hello_infinite_retryable_act, retry_policy=infinite_retry_policy) yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) # Change in event handling: Use when_any to handle both event and timeout @@ -91,6 +98,15 @@ def hello_retryable_act(ctx: WorkflowActivityContext): retry_count += 1 +@wfr.activity(name='hello_infinite_retryable_act') +def hello_infinite_retryable_act(ctx: WorkflowActivityContext): + global infinite_retry_count + infinite_retry_count += 1 + print(f'Infinite retry attempt: {infinite_retry_count}', flush=True) + if infinite_retry_count <= 10: + raise ValueError('Retryable Error') + + @wfr.workflow(name='child_retryable_wf') def child_retryable_wf(ctx: DaprWorkflowContext): global child_orchestrator_string, child_orchestrator_count @@ -128,11 +144,12 @@ def main(): wf_client.wait_for_workflow_start(instance_id) - # Sleep to let the workflow run initial activities - sleep(12) + # Sleep to let the workflow run initial activities and infinite retries + sleep(24) assert counter == 11 assert retry_count == 2 + assert infinite_retry_count == 11 assert child_orchestrator_string == '1aa2bb3cc' # Pause Test @@ -153,10 +170,13 @@ def main(): state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) if state.runtime_status.name == 'COMPLETED': print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + assert state.serialized_output.strip('"') == 'Completed' else: print(f'Workflow failed! Status: {state.runtime_status.name}') + raise AssertionError(f'Unexpected workflow status: {state.runtime_status.name}') except TimeoutError: print('*** Workflow timed out!') + raise wf_client.purge_workflow(instance_id=instance_id) try: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py index aa12f479d..69c9a481d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -40,6 +40,7 @@ def __init__( Args: first_retry_interval(timedelta): The retry interval to use for the first retry attempt. max_number_of_attempts(int): The maximum number of retry attempts. + Use ``-1`` for infinite retries. backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating the next retry interval. max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any @@ -50,8 +51,8 @@ def __init__( # validate inputs if first_retry_interval < timedelta(seconds=0): raise ValueError('first_retry_interval must be >= 0') - if max_number_of_attempts < 1: - raise ValueError('max_number_of_attempts must be >= 1') + if max_number_of_attempts == 0 or max_number_of_attempts < -1: + raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries') if backoff_coefficient is not None and backoff_coefficient < 1: raise ValueError('backoff_coefficient must be >= 1') if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 9f5edb2b4..72060a7b4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -246,7 +246,10 @@ def start(self): try: is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout) if not is_ready: - raise RuntimeError('WorkflowRuntime worker and its stream are not ready') + self._logger.warning( + 'WorkflowRuntime worker and its stream are not ready. ' + 'Continuing; workflows scheduled immediately may not be received.' + ) else: self._logger.debug( 'WorkflowRuntime worker is ready and its stream can receive work items' diff --git a/ext/dapr-ext-workflow/tests/test_retry_policy.py b/ext/dapr-ext-workflow/tests/test_retry_policy.py new file mode 100644 index 000000000..193ee3ae6 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_retry_policy.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest +from datetime import timedelta + +from dapr.ext.workflow.retry_policy import RetryPolicy + + +class RetryPolicyTests(unittest.TestCase): + @staticmethod + def _execute_with_retry(operation, retry_policy: RetryPolicy): + attempts = 0 + while True: + attempts += 1 + try: + operation(attempts) + return attempts + except ValueError: + if ( + retry_policy.max_number_of_attempts != -1 + and attempts >= retry_policy.max_number_of_attempts + ): + raise + + def test_allow_infinite_max_number_of_attempts(self): + retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1 + ) + + self.assertEqual(-1, retry_policy.max_number_of_attempts) + + def test_infinite_retries_succeeds_after_ten_failures(self): + retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1 + ) + + def flaky_operation(attempt: int): + if attempt <= 10: + raise ValueError('Retryable Error') + + attempts = self._execute_with_retry(flaky_operation, retry_policy=retry_policy) + + self.assertEqual(11, attempts) + + def test_reject_invalid_max_number_of_attempts(self): + with self.assertRaises(ValueError): + RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=0) + + with self.assertRaises(ValueError): + RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-2) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 16eb4946f..a6db2df89 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -246,7 +246,7 @@ def test_start_logs_exception_when_worker_start_fails(self): mock_exception.assert_called_once() self.assertIn('did not start', mock_exception.call_args[0][0]) - def test_start_raises_when_worker_not_ready(self): + def test_start_logs_warning_when_worker_not_ready(self): listActivities.clear() listOrchestrators.clear() mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() @@ -254,9 +254,10 @@ def test_start_raises_when_worker_not_ready(self): mock_worker = mock.MagicMock() mock_worker.is_worker_ready.return_value = False runtime._WorkflowRuntime__worker = mock_worker - with self.assertRaises(RuntimeError) as ctx: + with mock.patch.object(runtime._logger, 'warning') as mock_warning: runtime.start() - self.assertIn('not ready', str(ctx.exception)) + mock_worker.start.assert_called_once() + self.assertGreaterEqual(mock_warning.call_count, 1) def test_start_logs_warning_when_no_is_worker_ready(self): mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) From f94ba353d8d22190bc5996456a7920038f71ac63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matheus=20Andr=C3=A9?= <92062874+matheusandre1@users.noreply.github.com> Date: Wed, 4 Mar 2026 15:30:23 -0300 Subject: [PATCH 3/6] raise RuntimeError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Matheus André <92062874+matheusandre1@users.noreply.github.com> --- ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 72060a7b4..9f5edb2b4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -246,10 +246,7 @@ def start(self): try: is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout) if not is_ready: - self._logger.warning( - 'WorkflowRuntime worker and its stream are not ready. ' - 'Continuing; workflows scheduled immediately may not be received.' - ) + raise RuntimeError('WorkflowRuntime worker and its stream are not ready') else: self._logger.debug( 'WorkflowRuntime worker is ready and its stream can receive work items' From 64132f13fac7e90e3e1ffc9c5c618adf40f267a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matheus=20Andr=C3=A9?= <92062874+matheusandre1@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:19:05 -0300 Subject: [PATCH 4/6] fix: Align Workflow Multi App Naming Convention (#932) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Matheus André <92062874+matheusandre1@users.noreply.github.com> --- examples/workflow/README.md | 28 +++++++++---------- .../workflow/{cross-app1.py => multi-app1.py} | 0 .../workflow/{cross-app2.py => multi-app2.py} | 0 .../workflow/{cross-app3.py => multi-app3.py} | 0 .../ext/workflow/dapr_workflow_context.py | 6 ++-- .../dapr/ext/workflow/workflow_context.py | 4 +-- 6 files changed, 19 insertions(+), 19 deletions(-) rename examples/workflow/{cross-app1.py => multi-app1.py} (100%) rename examples/workflow/{cross-app2.py => multi-app2.py} (100%) rename examples/workflow/{cross-app3.py => multi-app3.py} (100%) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index e27f4c78f..b1f1cfae9 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -338,7 +338,7 @@ When you run the example, you will see output like this: ``` -### Cross-app Workflow +### Multi-app Workflows This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands: @@ -361,9 +361,9 @@ sleep: 20 --> ```sh -dapr run --app-id wfexample3 python3 cross-app3.py & -dapr run --app-id wfexample2 python3 cross-app2.py & -dapr run --app-id wfexample1 python3 cross-app1.py +dapr run --app-id wfexample3 python3 multi-app3.py & +dapr run --app-id wfexample2 python3 multi-app2.py & +dapr run --app-id wfexample1 python3 multi-app1.py ``` @@ -379,9 +379,9 @@ among others. This shows that the workflow calls are working as expected. #### Error handling on activity calls -This example demonstrates how the error handling works on activity calls across apps. +This example demonstrates how the error handling works on activity calls in multi-app workflows. -Error handling on activity calls across apps works as normal workflow activity calls. +Error handling on activity calls in multi-app workflows works as normal workflow activity calls. In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted. @@ -404,9 +404,9 @@ sleep: 20 ```sh export ERROR_ACTIVITY_MODE=true -dapr run --app-id wfexample3 python3 cross-app3.py & -dapr run --app-id wfexample2 python3 cross-app2.py & -dapr run --app-id wfexample1 python3 cross-app1.py +dapr run --app-id wfexample3 python3 multi-app3.py & +dapr run --app-id wfexample2 python3 multi-app2.py & +dapr run --app-id wfexample1 python3 multi-app1.py ``` @@ -424,9 +424,9 @@ among others. This shows that the activity calls are failing as expected, and th #### Error handling on workflow calls -This example demonstrates how the error handling works on workflow calls across apps. +This example demonstrates how the error handling works on workflow calls in multi-app workflows. -Error handling on workflow calls across apps works as normal workflow calls. +Error handling on workflow calls in multi-app workflows works as normal workflow calls. In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted. @@ -445,9 +445,9 @@ sleep: 20 ```sh export ERROR_WORKFLOW_MODE=true -dapr run --app-id wfexample3 python3 cross-app3.py & -dapr run --app-id wfexample2 python3 cross-app2.py & -dapr run --app-id wfexample1 python3 cross-app1.py +dapr run --app-id wfexample3 python3 multi-app3.py & +dapr run --app-id wfexample2 python3 multi-app2.py & +dapr run --app-id wfexample1 python3 multi-app1.py ``` diff --git a/examples/workflow/cross-app1.py b/examples/workflow/multi-app1.py similarity index 100% rename from examples/workflow/cross-app1.py rename to examples/workflow/multi-app1.py diff --git a/examples/workflow/cross-app2.py b/examples/workflow/multi-app2.py similarity index 100% rename from examples/workflow/cross-app2.py rename to examples/workflow/multi-app2.py diff --git a/examples/workflow/cross-app3.py b/examples/workflow/multi-app3.py similarity index 100% rename from examples/workflow/cross-app3.py rename to examples/workflow/multi-app3.py diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index cc0cfe8ba..d90c72dc2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -68,12 +68,12 @@ def call_activity( retry_policy: Optional[RetryPolicy] = None, app_id: Optional[str] = None, ) -> task.Task[TOutput]: - # Handle string activity names for cross-app scenarios + # Handle string activity names for multi-app workflow scenarios if isinstance(activity, str): activity_name = activity if app_id is not None: self._logger.debug( - f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}' + f'{self.instance_id}: Creating multi-app workflow activity {activity_name} for app {app_id}' ) else: self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}') @@ -106,7 +106,7 @@ def call_child_workflow( retry_policy: Optional[RetryPolicy] = None, app_id: Optional[str] = None, ) -> task.Task[TOutput]: - # Handle string workflow names for cross-app scenarios + # Handle string workflow names for multi-app workflow scenarios if isinstance(workflow, str): workflow_name = workflow self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index 8453e16ef..d41841472 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -118,7 +118,7 @@ def call_activity( Parameters ---------- activity: Activity[TInput, TOutput] | str - A reference to the activity function to call, or a string name for cross-app activities. + A reference to the activity function to call, or a string name for multi-app workflow activities. input: TInput | None The JSON-serializable input (or None) to pass to the activity. app_id: str | None @@ -145,7 +145,7 @@ def call_child_workflow( Parameters ---------- orchestrator: Orchestrator[TInput, TOutput] | str - A reference to the orchestrator function to call, or a string name for cross-app workflows. + A reference to the orchestrator function to call, or a string name for multi-app workflows. input: TInput The optional JSON-serializable input to pass to the orchestrator function. instance_id: str From 2d8acebbb92e87e9a64e4f9e199f56117f4b7d38 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Fri, 17 Apr 2026 11:07:00 +0200 Subject: [PATCH 5/6] Added infinite retry policy to durabletask, and extended testing Signed-off-by: Albert Callarisa --- examples/workflow/README.md | 6 +- .../dapr/ext/workflow/_durabletask/task.py | 12 +- .../test_orchestration_executor.py | 167 ++++++++++++++++++ .../tests/durabletask/test_task.py | 73 ++++++++ .../tests/test_workflow_runtime.py | 7 +- 5 files changed, 256 insertions(+), 9 deletions(-) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index cf3cce610..03294f3f1 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -39,6 +39,8 @@ expected_stdout_lines: - "New counter value is: 11!" - "Retry count value is: 0!" - "Retry count value is: 1! This print statement verifies retry" + - "Infinite retry attempt: 1" + - "Infinite retry attempt: 11" - "Appending 1 to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" @@ -53,7 +55,7 @@ expected_stdout_lines: - "New counter value is: 111!" - "New counter value is: 1111!" - "Workflow completed! Result: Completed" -timeout_seconds: 30 +timeout_seconds: 60 --> ```sh @@ -69,6 +71,8 @@ The output of this example should look like this: - "New counter value is: 11!" - "Retry count value is: 0!" - "Retry count value is: 1! This print statement verifies retry" + - "Infinite retry attempt: 1" + - "Infinite retry attempt: 11" - "Appending 1 to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" - "Appending a to child_orchestrator_string!" diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index 82bf062ac..f93885498 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -461,7 +461,11 @@ def increment_attempt_count(self) -> None: self._attempt_count += 1 def compute_next_delay(self) -> Optional[timedelta]: - if self._attempt_count >= self._retry_policy.max_number_of_attempts: + # max_number_of_attempts == -1 means retry indefinitely; skip the attempt cap check. + if ( + self._retry_policy.max_number_of_attempts != -1 + and self._attempt_count >= self._retry_policy.max_number_of_attempts + ): return None retry_expiration: datetime = datetime.max @@ -662,7 +666,7 @@ def __init__( first_retry_interval : timedelta The retry interval to use for the first retry attempt. max_number_of_attempts : int - The maximum number of retry attempts. + The maximum number of retry attempts. Use ``-1`` for infinite retries. backoff_coefficient : Optional[float] The backoff coefficient to use for calculating the next retry interval. max_retry_interval : Optional[timedelta] @@ -678,8 +682,8 @@ def __init__( # validate inputs if first_retry_interval < timedelta(seconds=0): raise ValueError('first_retry_interval must be >= 0') - if max_number_of_attempts < 1: - raise ValueError('max_number_of_attempts must be >= 1') + if max_number_of_attempts == 0 or max_number_of_attempts < -1: + raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries') if backoff_coefficient is not None and backoff_coefficient < 1: raise ValueError('backoff_coefficient must be >= 1') if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py index 953155027..fae9edc3c 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py @@ -2654,6 +2654,173 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _): assert retry_timer_2.childWorkflowRetry.instanceId != expected_second_child_id +def test_activity_infinite_retry_policy_succeeds_after_many_failures(): + """With max_number_of_attempts=-1, the executor keeps scheduling retries + past any finite attempt cap and the workflow completes once the activity + eventually succeeds.""" + + def dummy_activity(ctx, _): + pass # behavior is simulated via history events below + + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_activity( + dummy_activity, + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + old_events: list = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)), + ] + + # Simulate 5 consecutive failures - far beyond any default finite cap. + # Each failure must produce a retry timer, not a workflow completion. + next_schedule_id = 1 + for failure_num in range(5): + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_failed_event(next_schedule_id, ValueError('boom')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1, f'failure #{failure_num + 1} should create a retry timer' + assert result.actions[0].HasField('createTimer'), ( + f'failure #{failure_num + 1} must schedule a retry timer, not complete the workflow' + ) + + timer_id = result.actions[0].id + fire_at = result.actions[0].createTimer.fireAt.ToDatetime() + + # Fire the timer and observe the executor schedule the next attempt. + old_events = old_events + new_events + current_timestamp = fire_at + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(timer_id, current_timestamp), + helpers.new_timer_fired_event(timer_id, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('scheduleTask'), ( + f'retry #{failure_num + 1} must re-schedule the activity' + ) + next_schedule_id = result.actions[0].id + old_events = ( + old_events + + new_events + + [ + helpers.new_task_scheduled_event(next_schedule_id, task.get_name(dummy_activity)), + ] + ) + + # 6th attempt succeeds -> the workflow must complete successfully. + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_task_completed_event(next_schedule_id, encoded_output=json.dumps('ok')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete = get_and_validate_single_complete_workflow_action(result.actions) + assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete.result.value) == 'ok' + + +def test_sub_orchestration_infinite_retry_policy_succeeds_after_many_failures(): + """Parallel to the activity test: sub-orchestrations with max_number_of_attempts=-1 + keep retrying past any finite cap, and the parent completes when the child succeeds.""" + + def child_orchestrator(ctx: task.OrchestrationContext, _): + return 'child-ok' # behavior is simulated via history events below + + def parent_orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator( + 'child_orchestrator', + retry_policy=task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + ), + ) + return result + + registry = worker._Registry() + name = registry.add_orchestrator(parent_orchestrator) + registry.add_orchestrator(child_orchestrator) + + current_timestamp = datetime(2020, 1, 1, 12, 0, 0) + + first_child_id = f'{TEST_INSTANCE_ID}:0001' + old_events: list = [ + helpers.new_workflow_started_event(timestamp=current_timestamp), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), + helpers.new_child_workflow_created_event(1, 'child_orchestrator', first_child_id), + ] + next_child_id = 1 + + # Simulate 4 consecutive child-workflow failures - each must schedule a retry timer. + for failure_num in range(4): + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_failed_event(next_child_id, ValueError('child failed')), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('createTimer'), ( + f'child failure #{failure_num + 1} must schedule a retry timer' + ) + timer_id = result.actions[0].id + fire_at = result.actions[0].createTimer.fireAt.ToDatetime() + + old_events = old_events + new_events + current_timestamp = fire_at + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_timer_created_event(timer_id, current_timestamp), + helpers.new_timer_fired_event(timer_id, current_timestamp), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + assert len(result.actions) == 1 + assert result.actions[0].HasField('createChildWorkflow'), ( + f'retry #{failure_num + 1} must create a new child workflow' + ) + next_child_id = result.actions[0].id + retried_child_instance_id = f'{TEST_INSTANCE_ID}:{next_child_id:04d}' + old_events = ( + old_events + + new_events + + [ + helpers.new_child_workflow_created_event( + next_child_id, 'child_orchestrator', retried_child_instance_id + ), + ] + ) + + # 5th attempt succeeds -> the parent workflow must complete with the child's result. + new_events = [ + helpers.new_workflow_started_event(current_timestamp), + helpers.new_child_workflow_completed_event( + next_child_id, encoded_output=json.dumps('child-ok') + ), + ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + complete = get_and_validate_single_complete_workflow_action(result.actions) + assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED + assert json.loads(complete.result.value) == 'child-ok' + + def get_and_validate_single_complete_workflow_action( actions: list[pb.WorkflowAction], ) -> pb.CompleteWorkflowAction: diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_task.py b/ext/dapr-ext-workflow/tests/durabletask/test_task.py index a381ab3d1..6226cdc5c 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_task.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_task.py @@ -11,6 +11,8 @@ """Unit tests for durabletask.task primitives.""" +from datetime import datetime, timedelta + import dapr.ext.workflow._durabletask.internal.helpers as pbh import pytest from dapr.ext.workflow._durabletask import task @@ -195,3 +197,74 @@ def test_when_all_failure_propagates_to_parent(): # The parent WhenAnyTask should also have completed assert any_task.is_complete assert any_task.get_result() is all_task + + +def test_retry_policy_accepts_infinite_max_attempts(): + """RetryPolicy(max_number_of_attempts=-1) is allowed and means infinite retries.""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1) + assert policy.max_number_of_attempts == -1 + + +@pytest.mark.parametrize('invalid', [0, -2, -5]) +def test_retry_policy_rejects_invalid_max_attempts(invalid): + """RetryPolicy rejects zero and values below -1.""" + with pytest.raises(ValueError): + task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=invalid) + + +def test_retryable_task_infinite_keeps_computing_next_delay(): + """When max_number_of_attempts == -1, compute_next_delay never returns None + due to the attempt cap (subject only to retry_timeout).""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=-1) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # Simulate many failed attempts; delay should continue to be produced. + for _ in range(50): + retryable.increment_attempt_count() + assert retryable.compute_next_delay() is not None + + +def test_retryable_task_infinite_still_respects_retry_timeout(): + """When max_number_of_attempts == -1, retry_timeout must still cap the wall-clock + window so retries do not run forever.""" + policy = task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=-1, + retry_timeout=timedelta(seconds=3), + ) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # Each attempt adds 1s of delay. First few attempts stay inside the 3s window. + retryable.increment_attempt_count() # attempt 2 -> cumulative delay 2s + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # attempt 3 -> cumulative delay 3s + # At the boundary, the logical next start == retry_expiration, which is not "<" + assert retryable.compute_next_delay() is None + + +def test_retryable_task_stops_after_max_attempts(): + """With a finite cap, compute_next_delay returns None once the cap is reached.""" + policy = task.RetryPolicy(first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3) + retryable = task.RetryableTask( + retry_policy=policy, + start_time=datetime.utcnow(), + is_sub_orch=False, + task_name='activity', + ) + + # attempt 1 -> has more retries + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # -> 2 + assert retryable.compute_next_delay() is not None + retryable.increment_attempt_count() # -> 3 + assert retryable.compute_next_delay() is None diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 3b6c94461..0c6c2afd3 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -250,7 +250,7 @@ def test_start_logs_exception_when_worker_start_fails(self): mock_exception.assert_called_once() self.assertIn('did not start', mock_exception.call_args[0][0]) - def test_start_logs_warning_when_worker_not_ready(self): + def test_start_raises_when_worker_not_ready(self): listActivities.clear() listOrchestrators.clear() mock.patch( @@ -260,10 +260,9 @@ def test_start_logs_warning_when_worker_not_ready(self): mock_worker = mock.MagicMock() mock_worker.is_worker_ready.return_value = False runtime._WorkflowRuntime__worker = mock_worker - with mock.patch.object(runtime._logger, 'warning') as mock_warning: + with self.assertRaises(RuntimeError) as ctx: runtime.start() - mock_worker.start.assert_called_once() - self.assertGreaterEqual(mock_warning.call_count, 1) + self.assertIn('not ready', str(ctx.exception)) def test_start_logs_warning_when_no_is_worker_ready(self): mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) From f087c6c7fa5ef6a827ee6ea59bcaaa56da27b762 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Mon, 20 Apr 2026 10:54:13 +0200 Subject: [PATCH 6/6] Address PR comments Signed-off-by: Albert Callarisa --- examples/workflow/simple.py | 13 ++++- .../dapr/ext/workflow/_durabletask/task.py | 50 ++++++++++--------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/examples/workflow/simple.py b/examples/workflow/simple.py index 1123cf762..1d5e04439 100644 --- a/examples/workflow/simple.py +++ b/examples/workflow/simple.py @@ -144,8 +144,17 @@ def main(): wf_client.wait_for_workflow_start(instance_id) - # Sleep to let the workflow run initial activities and infinite retries - sleep(24) + # Poll for the workflow to finish the initial activities, the infinite retries, and + # the child workflows. Polling instead of a fixed sleep keeps the example fast. + for _ in range(120): # up to ~60 seconds at 0.5s intervals + if ( + counter == 11 + and retry_count == 2 + and infinite_retry_count == 11 + and child_orchestrator_string == '1aa2bb3cc' + ): + break + sleep(0.5) assert counter == 11 assert retry_count == 2 diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py index f93885498..6a427963e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py @@ -456,8 +456,29 @@ def __init__( self._task_execution_id = task_execution_id self._instance_id = instance_id self._app_id = app_id + # Running sum of delays for attempts [1.._attempt_count - 1]. Maintained by + # increment_attempt_count so compute_next_delay stays O(1) per call, which matters + # when max_number_of_attempts is -1 (infinite retries). + self._accumulated_delay_seconds = 0.0 + + def _delay_for_attempt(self, attempt: int) -> float: + """Compute the retry delay for a given attempt number (1-indexed), deterministically.""" + if self._retry_policy.backoff_coefficient is None: + backoff_coefficient = 1.0 + else: + backoff_coefficient = self._retry_policy.backoff_coefficient + delay = ( + math.pow(backoff_coefficient, attempt - 1) + * self._retry_policy.first_retry_interval.total_seconds() + ) + if self._retry_policy.max_retry_interval is not None: + delay = min(delay, self._retry_policy.max_retry_interval.total_seconds()) + return delay def increment_attempt_count(self) -> None: + # Fold the delay for the just-completed attempt into the running total, so + # compute_next_delay does not need to re-sum all past delays. + self._accumulated_delay_seconds += self._delay_for_attempt(self._attempt_count) self._attempt_count += 1 def compute_next_delay(self) -> Optional[timedelta]: @@ -475,32 +496,15 @@ def compute_next_delay(self) -> Optional[timedelta]: ): retry_expiration = self._start_time + self._retry_policy.retry_timeout - if self._retry_policy.backoff_coefficient is None: - backoff_coefficient = 1.0 - else: - backoff_coefficient = self._retry_policy.backoff_coefficient - - # Compute the next delay and the logical start time of the next attempt - # deterministically from accumulated delays, avoiding non-determinism during replay. - # range(1, attempt_count + 1) sums all delays up to and including the one about to - # be taken, so we're checking whether attempt N+1 would start within the timeout. - total_elapsed_seconds = 0.0 - next_delay_f = 0.0 - for i in range(1, self._attempt_count + 1): - next_delay_f = ( - math.pow(backoff_coefficient, i - 1) - * self._retry_policy.first_retry_interval.total_seconds() - ) - if self._retry_policy.max_retry_interval is not None: - next_delay_f = min( - next_delay_f, - self._retry_policy.max_retry_interval.total_seconds(), - ) - total_elapsed_seconds += next_delay_f + # Delay for the current attempt plus the cached sum of all prior attempts gives + # the logical start time of the next attempt — identical semantics to the old + # per-call loop, but O(1) instead of O(n). + next_delay_seconds = self._delay_for_attempt(self._attempt_count) + total_elapsed_seconds = self._accumulated_delay_seconds + next_delay_seconds logical_next_attempt_start = self._start_time + timedelta(seconds=total_elapsed_seconds) if logical_next_attempt_start < retry_expiration: - return timedelta(seconds=next_delay_f) + return timedelta(seconds=next_delay_seconds) return None