module documentation

Utilities that can decorate or be called inside workflows.

Class ActivityCancellationType How an activity cancellation should be handled.
Class ActivityConfig TypedDict of config that can be used for start_activity and execute_activity.
Class ActivityHandle Handle returned from start_activity and start_local_activity.
Class ChildWorkflowCancellationType How a child workflow cancellation should be handled.
Class ChildWorkflowConfig TypedDict of config that can be used for start_child_workflow and execute_child_workflow.
Class ChildWorkflowHandle Handle for interacting with a child workflow.
Class ExternalWorkflowHandle Handle for interacting with an external workflow.
Class Info Information about the running workflow.
Class LocalActivityConfig TypedDict of config that can be used for start_local_activity and execute_local_activity.
Class LoggerAdapter Adapter that adds details to the log about the running workflow.
Class ParentClosePolicy How a child workflow should be handled when the parent closes.
Class ParentInfo Information about the parent workflow.
Class unsafe Contains static methods that should not normally be called during workflow execution except in advanced cases.
Exception ContinueAsNewError Error thrown by continue_as_new.
Exception NondeterminismError Error that can be thrown during replay for non-deterministic workflow.
Function continue_as_new Stop the workflow immediately and continue as new.
Function defn Decorator for workflow classes.
Function deprecate_patch Mark a patch as deprecated.
Async Function execute_activity Start an activity and wait for completion.
Async Function execute_activity_class Start an activity from a callable class and wait for completion.
Async Function execute_activity_method Start an activity from a method and wait for completion.
Async Function execute_child_workflow Start a child workflow and wait for completion.
Async Function execute_local_activity Start a local activity and wait for completion.
Async Function execute_local_activity_class Start a local activity from a callable class and wait for completion.
Async Function execute_local_activity_method Start a local activity from a method and wait for completion.
Function extern_functions External functions available in the workflow sandbox.
Function get_dynamic_query_handler Get the dynamic query handler if any.
Function get_dynamic_signal_handler Get the dynamic signal handler if any.
Function get_external_workflow_handle Get a workflow handle to an existing workflow by its ID.
Function get_external_workflow_handle_for Get a typed workflow handle to an existing workflow by its ID.
Function get_query_handler Get the query handler for the given name if any.
Function get_signal_handler Get the signal handler for the given name if any.
Function info Current workflow's info.
Function memo Current workflow's memo values, converted without type hints.
Function memo_value Memo value for the given key, optional default, and optional type hint.
Function now Current time from the workflow perspective.
Function patched Patch a workflow.
Function query Decorator for a workflow query method.
Function random Get a deterministic pseudo-random number generator.
Function run Decorator for the workflow run method.
Function set_dynamic_query_handler Set or unset the dynamic query handler.
Function set_dynamic_signal_handler Set or unset the dynamic signal handler.
Function set_query_handler Set or unset the query handler for the given name.
Function set_signal_handler Set or unset the signal handler for the given name.
Function signal Decorator for a workflow signal method.
Function start_activity Start an activity and return its handle.
Function start_activity_class Start an activity from a callable class.
Function start_activity_method Start an activity from a method.
Async Function start_child_workflow Start a child workflow and return its handle.
Function start_local_activity Start a local activity and return its handle.
Function start_local_activity_class Start a local activity from a callable class.
Function start_local_activity_method Start a local activity from a method.
Function time Current seconds since the epoch from the workflow perspective.
Function time_ns Current nanoseconds since the epoch from the workflow perspective.
Function upsert_search_attributes Upsert search attributes for this workflow.
Function uuid4 Get a new, determinism-safe v4 UUID based on random.
Async Function wait_condition Wait on a callback to become true.
Variable logger Logger that will have contextual workflow details embedded.
Class _AsyncioTask Undocumented
Class _Definition Undocumented
Class _QueryDefinition Undocumented
Class _Runtime Undocumented
Class _SignalDefinition Undocumented
Exception _UnexpectedEvictionError Undocumented
Function _assert_dynamic_signature Undocumented
Function _bind_method Undocumented
Function _is_unbound_method_on_cls Undocumented
Variable _imports_passed_through Undocumented
Variable _in_sandbox Undocumented
Variable _sandbox_unrestricted Undocumented
@overload
def continue_as_new(arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: MethodAsyncNoParam[SelfType, Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> NoReturn:
@overload
def continue_as_new(arg: ParamType, *, workflow: MethodAsyncSingleParam[SelfType, ParamType, Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[Any]], args: Sequence[Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: str, args: Sequence[Any] = [], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> NoReturn:
(source)

Stop the workflow immediately and continue as new.

Parameters
arg:AnySingle argument to the continued workflow.
args:Sequence[Any]Multiple arguments to the continued workflow. Cannot be set if arg is.
workflow:Union[None, Callable, str]Specific workflow to continue to. Defaults to the current workflow.
task_queue:Optional[str]Task queue to run the workflow on. Defaults to the current workflow's task queue.
run_timeout:Optional[timedelta]Timeout of a single workflow run. Defaults to the current workflow's run timeout.
task_timeout:Optional[timedelta]Timeout of a single workflow task. Defaults to the current workflow's task timeout.
retry_policy:Optional[temporalio.common.RetryPolicy]Undocumented
memo:Optional[Mapping[str, Any]]Memo for the workflow. Defaults to the current workflow's memo.
search_attributes:Optional[temporalio.common.SearchAttributes]Search attributes for the workflow. Defaults to the current workflow's search attributes.
Returns
NoReturnNever returns, always raises a ContinueAsNewError.
Raises
ContinueAsNewErrorAlways raised by this function. Should not be caught but instead be allowed to
@overload
def defn(cls: ClassType) -> ClassType:
@overload
def defn(*, name: Optional[str] = None, sandboxed: bool = True) -> Callable[[ClassType], ClassType]:
(source)

Decorator for workflow classes.

This must be set on any registered workflow class (it is ignored if on a base class).

Parameters
cls:Optional[ClassType]The class to decorate.
name:Optional[str]Name to use for the workflow. Defaults to class __name__.
sandboxed:boolWhether the workflow should run in a sandbox. Default is true.
def deprecate_patch(id: str): (source)

Mark a patch as deprecated.

This marks a workflow that had patched in a previous version of the code as no longer applicable because all workflows that use the old code path are done and will never be queried again. Therefore the old code path is removed as well.

Parameters
id:strThe identifier originally used with patched.
@overload
async def execute_activity(activity: CallableAsyncNoParam[ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: CallableSyncNoParam[ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> Any:
(source)

Start an activity and wait for completion.

This is a shortcut for await start_activity.

@overload
async def execute_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
(source)

Start an activity from a callable class and wait for completion.

This is a shortcut for await start_activity_class.

@overload
async def execute_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
(source)

Start an activity from a method and wait for completion.

This is a shortcut for await start_activity_method.

@overload
@overload
async def execute_child_workflow(workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ReturnType:
@overload
async def execute_child_workflow(workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ReturnType:
@overload
async def execute_child_workflow(workflow: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> Any:
(source)

Start a child workflow and wait for completion.

This is a shortcut for await start_child_workflow.

@overload
async def execute_local_activity(activity: CallableAsyncNoParam[ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableSyncNoParam[ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> Any:
(source)

Start a local activity and wait for completion.

This is a shortcut for await start_local_activity.

Warning

Local activities are currently experimental.

@overload
async def execute_local_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
(source)

Start a local activity from a callable class and wait for completion.

This is a shortcut for await start_local_activity_class.

Warning

Local activities are currently experimental.

@overload
async def execute_local_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
@overload
async def execute_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ReturnType:
(source)

Start a local activity from a method and wait for completion.

This is a shortcut for await start_local_activity_method.

Warning

Local activities are currently experimental.

def extern_functions() -> Mapping[str, Callable]: (source)

External functions available in the workflow sandbox.

Returns
Mapping[str, Callable]Mapping of external functions that can be called from inside a workflow sandbox.
def get_dynamic_query_handler() -> Optional[Callable]: (source)

Get the dynamic query handler if any.

This includes dynamic handlers created via the @workflow.query decorator.

Returns
Optional[Callable]Callable for the dynamic query handler if any.
def get_dynamic_signal_handler() -> Optional[Callable]: (source)

Get the dynamic signal handler if any.

This includes dynamic handlers created via the @workflow.signal decorator.

Returns
Optional[Callable]Callable for the dynamic signal handler if any.
def get_external_workflow_handle(workflow_id: str, *, run_id: Optional[str] = None) -> ExternalWorkflowHandle[Any]: (source)

Get a workflow handle to an existing workflow by its ID.

Parameters
workflow_id:strWorkflow ID to get a handle to.
run_id:Optional[str]Optional run ID for the workflow.
Returns
ExternalWorkflowHandle[Any]The external workflow handle.
def get_external_workflow_handle_for(workflow: Union[MethodAsyncNoParam[SelfType, Any], MethodAsyncSingleParam[SelfType, Any, Any]], workflow_id: str, *, run_id: Optional[str] = None) -> ExternalWorkflowHandle[SelfType]: (source)

Get a typed workflow handle to an existing workflow by its ID.

This is the same as get_external_workflow_handle but typed. Note, the workflow type given is not validated, it is only for typing.

Parameters
workflow:Union[MethodAsyncNoParam[SelfType, Any], MethodAsyncSingleParam[SelfType, Any, Any]]The workflow run method to use for typing the handle.
workflow_id:strWorkflow ID to get a handle to.
run_id:Optional[str]Optional run ID for the workflow.
Returns
ExternalWorkflowHandle[SelfType]The external workflow handle.
def get_query_handler(name: str) -> Optional[Callable]: (source)

Get the query handler for the given name if any.

This includes handlers created via the @workflow.query decorator.

Parameters
name:strName of the query.
Returns
Optional[Callable]Callable for the query if any. If a handler is not found for the name, this will not return the dynamic handler even if there is one.
def get_signal_handler(name: str) -> Optional[Callable]: (source)

Get the signal handler for the given name if any.

This includes handlers created via the @workflow.signal decorator.

Parameters
name:strName of the signal.
Returns
Optional[Callable]Callable for the signal if any. If a handler is not found for the name, this will not return the dynamic handler even if there is one.
def info() -> Info: (source)

Current workflow's info.

Returns
InfoInfo for the currently running workflow.
def memo() -> Mapping[str, Any]: (source)

Current workflow's memo values, converted without type hints.

Since type hints are not used, the default converted values will come back. For example, if the memo was originally created with a dataclass, the value will be a dict. To convert using proper type hints, use memo_value.

Returns
Mapping[str, Any]Mapping of all memo keys and they values without type hints.
@overload
def memo_value(key: str, default: Any = temporalio.common._arg_unset) -> Any:
@overload
def memo_value(key: str, *, type_hint: Type[ParamType]) -> ParamType:
@overload
def memo_value(key: str, default: AnyType, *, type_hint: Type[ParamType]) -> Union[AnyType, ParamType]:
(source)

Memo value for the given key, optional default, and optional type hint.

Parameters
key:strKey to get memo value for.
default:AnyDefault to use if key is not present. If unset, a KeyError is raised when the key does not exist.
type_hint:Optional[Type]Type hint to use when converting.
Returns
AnyMemo value, converted with the type hint if present.
Raises
KeyErrorKey not present and default not set.

Current time from the workflow perspective.

This is the workflow equivalent of datetime.now with the timezone.utc parameter.

Returns
datetimeUTC datetime for the current workflow time. The datetime does have UTC set as the time zone.
def patched(id: str) -> bool: (source)

Patch a workflow.

When called, this will only return true if code should take the newer path which means this is either not replaying or is replaying and has seen this patch before.

Use deprecate_patch when all workflows are done and will never be queried again. The old code path can be used at that time too.

Parameters
id:strThe identifier for this patch. This identifier may be used repeatedly in the same workflow to represent the same patch
Returns
boolTrue if this should take the newer path, false if it should take the older path.
@overload
def query(fn: CallableType) -> CallableType:
@overload
def query(*, name: str) -> Callable[[CallableType], CallableType]:
@overload
def query(*, dynamic: Literal[True]) -> Callable[[CallableType], CallableType]:
(source)

Decorator for a workflow query method.

This is set on any async or non-async method that expects to handle a query. If a function overrides one with this decorator, it too must be decorated.

Query methods can only have positional parameters. Best practice for non-dynamic query methods is to only take a single object/dataclass argument that can accept more fields later if needed. The return value is the resulting query value. Query methods must not mutate any workflow state.

Parameters
fn:Optional[CallableType]The function to decorate.
name:Optional[str]Query name. Defaults to method __name__. Cannot be present when dynamic is present.
dynamic:Optional[bool]If true, this handles all queries not otherwise handled. The parameters of the method must be self, a string name, and a *args positional varargs. Cannot be present when name is present.
def random() -> Random: (source)

Get a deterministic pseudo-random number generator.

Note, this random number generator is not cryptographically safe and should not be used for security purposes.

Returns
RandomThe deterministically-seeded pseudo-random number generator.

Decorator for the workflow run method.

This must be set on one and only one async method defined on the same class as @workflow.defn. This can be defined on a base class method but must then be explicitly overridden and defined on the workflow class.

Run methods can only have positional parameters. Best practice is to only take a single object/dataclass argument that can accept more fields later if needed.

Parameters
fn:CallableAsyncTypeThe function to decorate.
def set_dynamic_query_handler(handler: Optional[Callable]): (source)

Set or unset the dynamic query handler.

This overrides the existing dynamic handler even if it was created via the @workflow.query decorator.

Parameters
handler:Optional[Callable]Callable to set or None to unset.
def set_dynamic_signal_handler(handler: Optional[Callable]): (source)

Set or unset the dynamic signal handler.

This overrides the existing dynamic handler even if it was created via the @workflow.signal decorator.

When set, all unhandled past signals are immediately sent to the handler.

Parameters
handler:Optional[Callable]Callable to set or None to unset.
def set_query_handler(name: str, handler: Optional[Callable]): (source)

Set or unset the query handler for the given name.

This overrides any existing handlers for the given name, including handlers created via the @workflow.query decorator.

Parameters
name:strName of the query.
handler:Optional[Callable]Callable to set or None to unset.
def set_signal_handler(name: str, handler: Optional[Callable]): (source)

Set or unset the signal handler for the given name.

This overrides any existing handlers for the given name, including handlers created via the @workflow.signal decorator.

When set, all unhandled past signals for the given name are immediately sent to the handler.

Parameters
name:strName of the signal.
handler:Optional[Callable]Callable to set or None to unset.

Decorator for a workflow signal method.

This is set on any async or non-async method that you wish to be called upon receiving a signal. If a function overrides one with this decorator, it too must be decorated.

Signal methods can only have positional parameters. Best practice for non-dynamic signal methods is to only take a single object/dataclass argument that can accept more fields later if needed. Return values from signal methods are ignored.

Parameters
fn:Optional[CallableSyncOrAsyncReturnNoneType]The function to decorate.
name:Optional[str]Signal name. Defaults to method __name__. Cannot be present when dynamic is present.
dynamic:Optional[bool]If true, this handles all signals not otherwise handled. The parameters of the method must be self, a string name, and a *args positional varargs. Cannot be present when name is present.
@overload
def start_activity(activity: CallableAsyncNoParam[ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableSyncNoParam[ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[Any]:
(source)

Start an activity and return its handle.

At least one of schedule_to_close_timeout or start_to_close_timeout must be present.

Parameters
activity:AnyActivity name or function reference.
arg:AnySingle argument to the activity.
args:Sequence[Any]Multiple arguments to the activity. Cannot be set if arg is.
activity_id:Optional[str]Optional unique identifier for the activity.
task_queue:Optional[str]Task queue to run the activity on. Defaults to the current workflow's task queue.
schedule_to_close_timeout:Optional[timedelta]Max amount of time the activity can take from first being scheduled to being completed before it times out. This is inclusive of all retries.
schedule_to_start_timeout:Optional[timedelta]Max amount of time the activity can take to be started from first being scheduled.
start_to_close_timeout:Optional[timedelta]Max amount of time a single activity run can take from when it starts to when it completes. This is per retry.
heartbeat_timeout:Optional[timedelta]How frequently an activity must invoke heartbeat while running before it is considered timed out.
retry_policy:Optional[temporalio.common.RetryPolicy]How an activity is retried on failure. If unset, a server-defined default is used. Set maximum attempts to 1 to disable retries.
cancellation_type:ActivityCancellationTypeHow the activity is treated when it is cancelled from the workflow.
Returns
ActivityHandle[Any]An activity handle to the activity which is an async task.
@overload
def start_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
(source)

Start an activity from a callable class.

See start_activity for parameter and return details.

@overload
def start_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
(source)

Start an activity from a method.

See start_activity for parameter and return details.

@overload
async def start_child_workflow(workflow: MethodAsyncNoParam[SelfType, ReturnType], *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None) -> ChildWorkflowHandle[Any, Any]:
(source)

Start a child workflow and return its handle.

Parameters
workflow:AnyString name or class method decorated with @workflow.run for the workflow to start.
arg:AnySingle argument to the child workflow.
args:Sequence[Any]Multiple arguments to the child workflow. Cannot be set if arg is.
id:Optional[str]Optional unique identifier for the workflow execution. If not set, defaults to uuid4.
task_queue:Optional[str]Task queue to run the workflow on. Defaults to the current workflow's task queue.
cancellation_type:ChildWorkflowCancellationTypeHow the child workflow will react to cancellation.
parent_close_policy:ParentClosePolicyHow to handle the child workflow when the parent workflow closes.
execution_timeout:Optional[timedelta]Total workflow execution timeout including retries and continue as new.
run_timeout:Optional[timedelta]Timeout of a single workflow run.
task_timeout:Optional[timedelta]Timeout of a single workflow task.
id_reuse_policy:temporalio.common.WorkflowIDReusePolicyHow already-existing IDs are treated.
retry_policy:Optional[temporalio.common.RetryPolicy]Retry policy for the workflow.
cron_schedule:strSee https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo:Optional[Mapping[str, Any]]Memo for the workflow.
search_attributes:Optional[temporalio.common.SearchAttributes]Search attributes for the workflow.
Returns
ChildWorkflowHandle[Any, Any]A workflow handle to the started/existing workflow.
@overload
def start_local_activity(activity: CallableAsyncNoParam[ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableSyncNoParam[ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[Any]:
(source)

Start a local activity and return its handle.

At least one of schedule_to_close_timeout or start_to_close_timeout must be present.

Warning

Local activities are currently experimental.

Parameters
activity:AnyActivity name or function reference.
arg:AnySingle argument to the activity.
args:Sequence[Any]Multiple arguments to the activity. Cannot be set if arg is.
activity_id:Optional[str]Optional unique identifier for the activity.
schedule_to_close_timeout:Optional[timedelta]Max amount of time the activity can take from first being scheduled to being completed before it times out. This is inclusive of all retries.
schedule_to_start_timeout:Optional[timedelta]Max amount of time the activity can take to be started from first being scheduled.
start_to_close_timeout:Optional[timedelta]Max amount of time a single activity run can take from when it starts to when it completes. This is per retry.
retry_policy:Optional[temporalio.common.RetryPolicy]How an activity is retried on failure. If unset, an SDK-defined default is used. Set maximum attempts to 1 to disable retries.
local_retry_threshold:Optional[timedelta]Undocumented
cancellation_type:ActivityCancellationTypeHow the activity is treated when it is cancelled from the workflow.
Returns
ActivityHandle[Any]An activity handle to the activity which is an async task.
@overload
def start_local_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
(source)

Start a local activity from a callable class.

See start_local_activity for parameter and return details.

Warning

Local activities are currently experimental.

@overload
def start_local_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
(source)

Start a local activity from a method.

See start_local_activity for parameter and return details.

Warning

Local activities are currently experimental.

def time() -> float: (source)

Current seconds since the epoch from the workflow perspective.

This is the workflow equivalent of time.time.

Returns
floatSeconds since the epoch as a float.
def time_ns() -> int: (source)

Current nanoseconds since the epoch from the workflow perspective.

This is the workflow equivalent of time.time_ns.

Returns
intNanoseconds since the epoch
def upsert_search_attributes(attributes: temporalio.common.SearchAttributes): (source)

Upsert search attributes for this workflow.

The keys will be added or replaced on top of the existing search attributes in the same manner as dict.update. Info.search_attributes will also be updated with these values.

Technically an existing search attribute cannot be deleted, but an empty list can be provided for the values which is effectively the same thing.

Parameters
attributes:temporalio.common.SearchAttributesThe attributes to set. Keys are search attribute names and values are a list of values.
def uuid4() -> uuid.UUID: (source)

Get a new, determinism-safe v4 UUID based on random.

Note, this UUID is not cryptographically safe and should not be used for security purposes.

Returns
uuid.UUIDA deterministically-seeded v4 UUID.
async def wait_condition(fn: Callable[[], bool], *, timeout: Optional[Union[timedelta, float]] = None): (source)

Wait on a callback to become true.

This function returns when the callback returns true (invoked each loop iteration) or the timeout has been reached.

Parameters
fn:Callable[[], bool]Non-async callback that accepts no parameters and returns a boolean.
timeout:Optional[Union[timedelta, float]]Optional number of seconds to wait until throwing asyncio.TimeoutError.

Logger that will have contextual workflow details embedded.

Logs are skipped during replay by default.

def _assert_dynamic_signature(fn: Callable): (source)

Undocumented

def _bind_method(obj: Any, fn: Callable[..., Any]) -> Callable[..., Any]: (source)

Undocumented

def _is_unbound_method_on_cls(fn: Callable[..., Any], cls: Type) -> bool: (source)

Undocumented

_imports_passed_through = (source)

Undocumented

_in_sandbox = (source)

Undocumented

_sandbox_unrestricted = (source)

Undocumented