Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ expected_stdout_lines:
- "New counter value is: 11!"
- "Retry count value is: 0!"
- "Retry count value is: 1! This print statement verifies retry"
- "Infinite retry attempt: 1"
- "Infinite retry attempt: 11"
- "Appending 1 to child_orchestrator_string!"
- "Appending a to child_orchestrator_string!"
- "Appending a to child_orchestrator_string!"
Expand All @@ -53,7 +55,7 @@ expected_stdout_lines:
- "New counter value is: 111!"
- "New counter value is: 1111!"
- "Workflow completed! Result: Completed"
timeout_seconds: 30
timeout_seconds: 60
-->

```sh
Expand All @@ -69,6 +71,8 @@ The output of this example should look like this:
- "New counter value is: 11!"
- "Retry count value is: 0!"
- "Retry count value is: 1! This print statement verifies retry"
- "Infinite retry attempt: 1"
- "Infinite retry attempt: 11"
- "Appending 1 to child_orchestrator_string!"
- "Appending a to child_orchestrator_string!"
- "Appending a to child_orchestrator_string!"
Expand Down
33 changes: 31 additions & 2 deletions examples/workflow/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
infinite_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
Expand All @@ -48,6 +49,11 @@
retry_timeout=timedelta(seconds=100),
)

infinite_retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=-1,
)

wfr = WorkflowRuntime()


Expand All @@ -57,6 +63,7 @@ def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_activity(hello_infinite_retryable_act, retry_policy=infinite_retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
Expand Down Expand Up @@ -91,6 +98,15 @@ def hello_retryable_act(ctx: WorkflowActivityContext):
retry_count += 1


@wfr.activity(name='hello_infinite_retryable_act')
def hello_infinite_retryable_act(ctx: WorkflowActivityContext):
global infinite_retry_count
infinite_retry_count += 1
print(f'Infinite retry attempt: {infinite_retry_count}', flush=True)
if infinite_retry_count <= 10:
raise ValueError('Retryable Error')


@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
Expand Down Expand Up @@ -128,11 +144,21 @@ def main():

wf_client.wait_for_workflow_start(instance_id)

# Sleep to let the workflow run initial activities
sleep(12)
# Poll for the workflow to finish the initial activities, the infinite retries, and
# the child workflows. Polling instead of a fixed sleep keeps the example fast.
for _ in range(120): # up to ~60 seconds at 0.5s intervals
if (
counter == 11
and retry_count == 2
and infinite_retry_count == 11
and child_orchestrator_string == '1aa2bb3cc'
):
break
sleep(0.5)

assert counter == 11
assert retry_count == 2
assert infinite_retry_count == 11
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
Expand All @@ -153,10 +179,13 @@ def main():
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
assert state.serialized_output.strip('"') == 'Completed'
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
raise AssertionError(f'Unexpected workflow status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')
raise

wf_client.purge_workflow(instance_id=instance_id)
try:
Expand Down
62 changes: 35 additions & 27 deletions ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,37 @@ def __init__(
self._task_execution_id = task_execution_id
self._instance_id = instance_id
self._app_id = app_id
# Running sum of delays for attempts [1.._attempt_count - 1]. Maintained by
# increment_attempt_count so compute_next_delay stays O(1) per call, which matters
# when max_number_of_attempts is -1 (infinite retries).
self._accumulated_delay_seconds = 0.0

def _delay_for_attempt(self, attempt: int) -> float:
"""Compute the retry delay for a given attempt number (1-indexed), deterministically."""
if self._retry_policy.backoff_coefficient is None:
backoff_coefficient = 1.0
else:
backoff_coefficient = self._retry_policy.backoff_coefficient
delay = (
math.pow(backoff_coefficient, attempt - 1)
* self._retry_policy.first_retry_interval.total_seconds()
)
if self._retry_policy.max_retry_interval is not None:
delay = min(delay, self._retry_policy.max_retry_interval.total_seconds())
return delay

def increment_attempt_count(self) -> None:
# Fold the delay for the just-completed attempt into the running total, so
# compute_next_delay does not need to re-sum all past delays.
self._accumulated_delay_seconds += self._delay_for_attempt(self._attempt_count)
self._attempt_count += 1

def compute_next_delay(self) -> Optional[timedelta]:
if self._attempt_count >= self._retry_policy.max_number_of_attempts:
# max_number_of_attempts == -1 means retry indefinitely; skip the attempt cap check.
if (
self._retry_policy.max_number_of_attempts != -1
and self._attempt_count >= self._retry_policy.max_number_of_attempts
):
Comment on lines +485 to +489
return None

retry_expiration: datetime = datetime.max
Expand All @@ -471,32 +496,15 @@ def compute_next_delay(self) -> Optional[timedelta]:
):
retry_expiration = self._start_time + self._retry_policy.retry_timeout

if self._retry_policy.backoff_coefficient is None:
backoff_coefficient = 1.0
else:
backoff_coefficient = self._retry_policy.backoff_coefficient

# Compute the next delay and the logical start time of the next attempt
# deterministically from accumulated delays, avoiding non-determinism during replay.
# range(1, attempt_count + 1) sums all delays up to and including the one about to
# be taken, so we're checking whether attempt N+1 would start within the timeout.
total_elapsed_seconds = 0.0
next_delay_f = 0.0
for i in range(1, self._attempt_count + 1):
next_delay_f = (
math.pow(backoff_coefficient, i - 1)
* self._retry_policy.first_retry_interval.total_seconds()
)
if self._retry_policy.max_retry_interval is not None:
next_delay_f = min(
next_delay_f,
self._retry_policy.max_retry_interval.total_seconds(),
)
total_elapsed_seconds += next_delay_f
# Delay for the current attempt plus the cached sum of all prior attempts gives
# the logical start time of the next attempt — identical semantics to the old
# per-call loop, but O(1) instead of O(n).
next_delay_seconds = self._delay_for_attempt(self._attempt_count)
total_elapsed_seconds = self._accumulated_delay_seconds + next_delay_seconds

logical_next_attempt_start = self._start_time + timedelta(seconds=total_elapsed_seconds)
if logical_next_attempt_start < retry_expiration:
return timedelta(seconds=next_delay_f)
return timedelta(seconds=next_delay_seconds)

return None

Expand Down Expand Up @@ -662,7 +670,7 @@ def __init__(
first_retry_interval : timedelta
The retry interval to use for the first retry attempt.
max_number_of_attempts : int
The maximum number of retry attempts.
The maximum number of retry attempts. Use ``-1`` for infinite retries.
backoff_coefficient : Optional[float]
The backoff coefficient to use for calculating the next retry interval.
max_retry_interval : Optional[timedelta]
Expand All @@ -678,8 +686,8 @@ def __init__(
# validate inputs
if first_retry_interval < timedelta(seconds=0):
raise ValueError('first_retry_interval must be >= 0')
if max_number_of_attempts < 1:
raise ValueError('max_number_of_attempts must be >= 1')
if max_number_of_attempts == 0 or max_number_of_attempts < -1:
raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries')
if backoff_coefficient is not None and backoff_coefficient < 1:
raise ValueError('backoff_coefficient must be >= 1')
if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0):
Expand Down
5 changes: 3 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
Args:
first_retry_interval(timedelta): The retry interval to use for the first retry attempt.
max_number_of_attempts(int): The maximum number of retry attempts.
Use ``-1`` for infinite retries.
backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating
the next retry interval.
max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any
Expand All @@ -50,8 +51,8 @@ def __init__(
# validate inputs
if first_retry_interval < timedelta(seconds=0):
raise ValueError('first_retry_interval must be >= 0')
if max_number_of_attempts < 1:
raise ValueError('max_number_of_attempts must be >= 1')
if max_number_of_attempts == 0 or max_number_of_attempts < -1:
raise ValueError('max_number_of_attempts must be >= 1 or -1 for infinite retries')
if backoff_coefficient is not None and backoff_coefficient < 1:
raise ValueError('backoff_coefficient must be >= 1')
if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2654,6 +2654,173 @@ def parent_orchestrator(ctx: task.OrchestrationContext, _):
assert retry_timer_2.childWorkflowRetry.instanceId != expected_second_child_id


def test_activity_infinite_retry_policy_succeeds_after_many_failures():
"""With max_number_of_attempts=-1, the executor keeps scheduling retries
past any finite attempt cap and the workflow completes once the activity
eventually succeeds."""

def dummy_activity(ctx, _):
pass # behavior is simulated via history events below

def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
dummy_activity,
retry_policy=task.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=-1,
),
)
return result

registry = worker._Registry()
name = registry.add_orchestrator(orchestrator)

current_timestamp = datetime(2020, 1, 1, 12, 0, 0)

old_events: list = [
helpers.new_workflow_started_event(timestamp=current_timestamp),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)),
]

# Simulate 5 consecutive failures - far beyond any default finite cap.
# Each failure must produce a retry timer, not a workflow completion.
next_schedule_id = 1
for failure_num in range(5):
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_task_failed_event(next_schedule_id, ValueError('boom')),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
assert len(result.actions) == 1, f'failure #{failure_num + 1} should create a retry timer'
assert result.actions[0].HasField('createTimer'), (
f'failure #{failure_num + 1} must schedule a retry timer, not complete the workflow'
)

timer_id = result.actions[0].id
fire_at = result.actions[0].createTimer.fireAt.ToDatetime()

# Fire the timer and observe the executor schedule the next attempt.
old_events = old_events + new_events
current_timestamp = fire_at
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_timer_created_event(timer_id, current_timestamp),
helpers.new_timer_fired_event(timer_id, current_timestamp),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
assert len(result.actions) == 1
assert result.actions[0].HasField('scheduleTask'), (
f'retry #{failure_num + 1} must re-schedule the activity'
)
next_schedule_id = result.actions[0].id
old_events = (
old_events
+ new_events
+ [
helpers.new_task_scheduled_event(next_schedule_id, task.get_name(dummy_activity)),
]
)

# 6th attempt succeeds -> the workflow must complete successfully.
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_task_completed_event(next_schedule_id, encoded_output=json.dumps('ok')),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
complete = get_and_validate_single_complete_workflow_action(result.actions)
assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert json.loads(complete.result.value) == 'ok'


def test_sub_orchestration_infinite_retry_policy_succeeds_after_many_failures():
"""Parallel to the activity test: sub-orchestrations with max_number_of_attempts=-1
keep retrying past any finite cap, and the parent completes when the child succeeds."""

def child_orchestrator(ctx: task.OrchestrationContext, _):
return 'child-ok' # behavior is simulated via history events below

def parent_orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_sub_orchestrator(
'child_orchestrator',
retry_policy=task.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=-1,
),
)
return result

registry = worker._Registry()
name = registry.add_orchestrator(parent_orchestrator)
registry.add_orchestrator(child_orchestrator)

current_timestamp = datetime(2020, 1, 1, 12, 0, 0)

first_child_id = f'{TEST_INSTANCE_ID}:0001'
old_events: list = [
helpers.new_workflow_started_event(timestamp=current_timestamp),
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
helpers.new_child_workflow_created_event(1, 'child_orchestrator', first_child_id),
]
next_child_id = 1

# Simulate 4 consecutive child-workflow failures - each must schedule a retry timer.
for failure_num in range(4):
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_child_workflow_failed_event(next_child_id, ValueError('child failed')),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
assert len(result.actions) == 1
assert result.actions[0].HasField('createTimer'), (
f'child failure #{failure_num + 1} must schedule a retry timer'
)
timer_id = result.actions[0].id
fire_at = result.actions[0].createTimer.fireAt.ToDatetime()

old_events = old_events + new_events
current_timestamp = fire_at
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_timer_created_event(timer_id, current_timestamp),
helpers.new_timer_fired_event(timer_id, current_timestamp),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
assert len(result.actions) == 1
assert result.actions[0].HasField('createChildWorkflow'), (
f'retry #{failure_num + 1} must create a new child workflow'
)
next_child_id = result.actions[0].id
retried_child_instance_id = f'{TEST_INSTANCE_ID}:{next_child_id:04d}'
old_events = (
old_events
+ new_events
+ [
helpers.new_child_workflow_created_event(
next_child_id, 'child_orchestrator', retried_child_instance_id
),
]
)

# 5th attempt succeeds -> the parent workflow must complete with the child's result.
new_events = [
helpers.new_workflow_started_event(current_timestamp),
helpers.new_child_workflow_completed_event(
next_child_id, encoded_output=json.dumps('child-ok')
),
]
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
complete = get_and_validate_single_complete_workflow_action(result.actions)
assert complete.workflowStatus == pb.ORCHESTRATION_STATUS_COMPLETED
assert json.loads(complete.result.value) == 'child-ok'


def get_and_validate_single_complete_workflow_action(
actions: list[pb.WorkflowAction],
) -> pb.CompleteWorkflowAction:
Expand Down
Loading
Loading