Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,31 @@ dapr run --app-id wf-versioning-example -- python3 versioning.py part1
dapr run --app-id wf-versioning-example --log-level debug -- python3 versioning.py part2
```
<!--END_STEP-->

### Pydantic models as workflow/activity inputs

This example shows how to pass [Pydantic](https://docs.pydantic.dev/) `BaseModel`
instances directly as workflow and activity inputs. When a workflow or activity
annotates its input parameter with a `BaseModel` subclass, the runtime
reconstructs the model from the decoded JSON payload automatically — no manual
`model_validate` call is needed at the receiving side.

The wire format remains plain JSON, so workflows and activities stay
interop-friendly with non-Python Dapr apps. Outputs coming back from activities
arrive as dicts; reconstructing them into a typed instance is a one-liner
(`OrderResult.model_validate(...)`).

<!--STEP
name: Run the pydantic models example
expected_stdout_lines:
- "[workflow] received order O-100 for Acme amount=42.0"
- "[activity] approving order O-100"
- "[workflow] activity returned approved=True"
- "[client] workflow output: order_id=O-100 approved=True message=auto-approved"
timeout_seconds: 60
-->

```sh
dapr run --app-id wf-pydantic-example -- python3 pydantic_models.py
```
<!--END_STEP-->
97 changes: 97 additions & 0 deletions examples/workflow/pydantic_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# Copyright 2026 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Native Pydantic model support in Dapr workflows and activities.

Inputs annotated with a Pydantic BaseModel are reconstructed automatically on
the receiving side — no manual serialization is needed. Outputs are emitted
as plain JSON so the wire format stays interop-friendly with non-Python Dapr
apps.
"""

from dapr.ext.workflow import (
DaprWorkflowClient,
DaprWorkflowContext,
WorkflowActivityContext,
WorkflowRuntime,
)
from pydantic import BaseModel


class OrderRequest(BaseModel):
order_id: str
customer: str
amount: float


class OrderResult(BaseModel):
order_id: str
approved: bool
message: str


wfr = WorkflowRuntime()
instance_id = 'pydantic-demo'


@wfr.workflow(name='order_workflow')
def order_workflow(ctx: DaprWorkflowContext, order: OrderRequest):
# `order` arrives as a real OrderRequest instance — the runtime reads the
# annotation and reconstructs the model from the decoded JSON automatically.
if not ctx.is_replaying:
print(
f'[workflow] received order {order.order_id} '
f'for {order.customer} amount={order.amount}',
flush=True,
)
raw = yield ctx.call_activity(approve_order, input=order)
# Activity results come back as a plain dict. One line turns them into a
# typed instance.
result = OrderResult.model_validate(raw)
if not ctx.is_replaying:
print(
f'[workflow] activity returned approved={result.approved}',
flush=True,
)
return result


@wfr.activity(name='approve_order')
def approve_order(ctx: WorkflowActivityContext, order: OrderRequest) -> OrderResult:
# Same story: `order` is already an OrderRequest instance here.
print(f'[activity] approving order {order.order_id}', flush=True)
if order.amount <= 100.0:
return OrderResult(order_id=order.order_id, approved=True, message='auto-approved')
return OrderResult(order_id=order.order_id, approved=False, message='needs review')


def main():
wfr.start()
client = DaprWorkflowClient()
Comment thread
acroca marked this conversation as resolved.

order = OrderRequest(order_id='O-100', customer='Acme', amount=42.0)
client.schedule_new_workflow(workflow=order_workflow, input=order, instance_id=instance_id)
state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

# state.serialized_output is a JSON string — reconstruct a typed instance.
output = OrderResult.model_validate_json(state.serialized_output)
print(
f'[client] workflow output: order_id={output.order_id} '
f'approved={output.approved} message={output.message}',
flush=True,
)

client.purge_workflow(instance_id)
wfr.shutdown()


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions examples/workflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dapr-ext-workflow>=1.17.0.dev
dapr>=1.17.0.dev
pydantic>=2.0
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Optional, Sequence, Union

import grpc
from dapr.ext.workflow import _model_protocol

ClientInterceptor = Union[
grpc.UnaryUnaryClientInterceptor,
Expand Down Expand Up @@ -156,6 +157,16 @@ def encode(self, obj: Any) -> str:
return super().encode(obj)

def default(self, obj):
# Dapr-specific: objects implementing the duck-typed model protocol
# (model_dump + model_validate) are emitted as plain JSON objects with
# no AUTO_SERIALIZED marker, so the payload stays readable by
# non-Python SDKs and by workflows/activities that don't import the
# same class. Type-directed reconstruction happens at the
# activity/workflow input boundary in
# dapr.ext.workflow.workflow_runtime. No pydantic dependency — any
# class matching the protocol works (Pydantic v2, SQLModel, custom).
if _model_protocol.is_model(obj):
return _model_protocol.dump_model(obj)
if dataclasses.is_dataclass(obj):
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
# automatic deserialization by the receiver
Expand Down
151 changes: 151 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/_model_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-

"""
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from __future__ import annotations

import inspect
import typing
from functools import lru_cache
from types import SimpleNamespace
from typing import Any, Callable, Optional

# A "model" here is anything that implements the Pydantic v2 shape:
# - model_dump(self, ...) -> dict
# - cls.model_validate(value) -> instance
# We duck-type on these names rather than importing pydantic so the SDK has no
# hard dependency on pydantic (or any specific version of it). SQLModel,
# FastAPI response models, and custom classes mirroring the protocol all work.


def is_model(obj: Any) -> bool:
"""Whether obj implements the model protocol (model_dump + model_validate)."""
return is_model_class(type(obj))


def is_model_class(cls: Any) -> bool:
"""Whether cls is a class implementing the model protocol."""
return (
inspect.isclass(cls)
and callable(getattr(cls, 'model_dump', None))
and callable(getattr(cls, 'model_validate', None))
)


@lru_cache(maxsize=None)
def _supports_mode_kwarg(cls: type) -> bool:
"""Whether cls.model_dump accepts a `mode` keyword (Pydantic v2 signature)."""
try:
sig = inspect.signature(cls.model_dump)
except (TypeError, ValueError):
return False
params = sig.parameters
if 'mode' in params:
return True
return any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values())


def dump_model(model: Any) -> Any:
"""Serialize a model instance to a JSON-compatible primitive graph.

Prefers model_dump(mode='json') when supported so nested datetimes, enums,
and UUIDs render into JSON-safe primitives. Falls back to bare model_dump()
for protocol-compatible classes that don't accept the mode kwarg — those
classes are responsible for returning JSON-safe values themselves.
"""
if not is_model(model):
raise TypeError(
f'Expected a model-like object with model_dump/model_validate, '
f'got {type(model).__name__}'
)
cls = type(model)
if _supports_mode_kwarg(cls):
return model.model_dump(mode='json')
return model.model_dump()


def coerce_to_model(value: Any, cls: type) -> Any:
"""Reconstruct a model instance from a decoded JSON payload.

Accepts dicts, SimpleNamespace (from the InternalJSONDecoder's
AUTO_SERIALIZED path), or already-instantiated models. Any other shape
raises TypeError so the failure surfaces at the activity/workflow
boundary rather than later as an attribute access error.
"""
if not is_model_class(cls):
raise TypeError(f'{cls!r} is not a model class (no model_dump/model_validate)')
if isinstance(value, cls):
return value
if isinstance(value, SimpleNamespace):
value = vars(value)
if isinstance(value, dict):
return cls.model_validate(value)
raise TypeError(
f'Cannot coerce value of type {type(value).__name__} into {cls.__name__}; '
'expected a dict, SimpleNamespace, or existing model instance.'
)


def resolve_input(fn: Callable[..., Any]) -> tuple[bool, Optional[type]]:
"""Inspect fn's input parameter.

Returns (accepts_input, model_class):
- accepts_input is True when fn declares a second positional parameter
(beyond the context) — the runtime must then pass the input through
even when it is None, so `Optional[Model]` works without a default.
- model_class is the model class annotated on that parameter, or None
when there is no annotation or the annotation is not a model.
Optional[Model] and Model | None are unwrapped to Model.
"""
try:
sig = inspect.signature(fn)
except (TypeError, ValueError):
return False, None

params = list(sig.parameters.values())
if len(params) < 2:
return False, None

annotation = params[1].annotation
if annotation is inspect.Parameter.empty:
return True, None

if isinstance(annotation, str):
try:
hints = typing.get_type_hints(fn)
annotation = hints.get(params[1].name, annotation)
except Exception:
return True, None

annotation = _unwrap_optional(annotation)
return True, (annotation if is_model_class(annotation) else None)


def _unwrap_optional(annotation: Any) -> Any:
"""Unwrap Optional[X] / X | None to X. Leaves other annotations unchanged."""
origin = typing.get_origin(annotation)
if origin is typing.Union or _is_pep604_union(origin):
args = [a for a in typing.get_args(annotation) if a is not type(None)]
if len(args) == 1:
return args[0]
return annotation


def _is_pep604_union(origin: Any) -> bool:
try:
from types import UnionType # type: ignore[attr-defined]

return origin is UnionType
except ImportError:
return False
28 changes: 25 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint

from . import _model_protocol

T = TypeVar('T')
TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
Expand Down Expand Up @@ -89,15 +91,23 @@
effective_name = name or fn.__name__
self._logger.info(f"Registering workflow '{effective_name}' with runtime")

accepts_input, input_model = _model_protocol.resolve_input(fn)

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
instance_id = getattr(ctx, 'instance_id', 'unknown')

try:
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
if not accepts_input:
result = fn(daprWfContext)
else:
if (
(inp is not None)
and (input_model is not None)
and not isinstance(inp, input_model)
):
inp = _model_protocol.coerce_to_model(inp, input_model)
result = fn(daprWfContext, inp)
return result
except Exception as e:
Expand Down Expand Up @@ -131,11 +141,15 @@
f"Registering version {version_name} of workflow '{effective_name}' with runtime"
)

accepts_input, input_model = _model_protocol.resolve_input(fn)

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
if not accepts_input:
return fn(daprWfContext)
if (inp is not None) and (input_model is not None) and not isinstance(inp, input_model):
inp = _model_protocol.coerce_to_model(inp, input_model)
return fn(daprWfContext, inp)

if hasattr(fn, '_workflow_registered'):
Expand Down Expand Up @@ -167,15 +181,23 @@
effective_name = name or fn.__name__
self._logger.info(f"Registering activity '{effective_name}' with runtime")

accepts_input, input_model = _model_protocol.resolve_input(fn)

def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
"""Responsible to call Activity function in activityWrapper"""
activity_id = getattr(ctx, 'task_id', 'unknown')

try:
wfActivityContext = WorkflowActivityContext(ctx)
if inp is None:
if not accepts_input:
result = fn(wfActivityContext)
else:
if (
(inp is not None)
and (input_model is not None)
and not isinstance(inp, input_model)
):
inp = _model_protocol.coerce_to_model(inp, input_model)
result = fn(wfActivityContext, inp)
return result
except Exception as e:
Expand Down Expand Up @@ -249,7 +271,7 @@
try:
is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout)
if not is_ready:
raise RuntimeError('WorkflowRuntime worker and its stream are not ready')

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 274 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready
else:
self._logger.debug(
'WorkflowRuntime worker is ready and its stream can receive work items'
Expand Down
Loading
Loading