module documentation

Utilities that can decorate or be called inside workflows.

Class ActivityCancellationType How an activity cancellation should be handled.
Class ActivityConfig TypedDict of config that can be used for start_activity and execute_activity.
Class ActivityHandle Handle returned from start_activity and start_local_activity.
Class ChildWorkflowCancellationType How a child workflow cancellation should be handled.
Class ChildWorkflowConfig TypedDict of config that can be used for start_child_workflow and execute_child_workflow.
Class ChildWorkflowHandle Handle for interacting with a child workflow.
Class ExternalWorkflowHandle Handle for interacting with an external workflow.
Class HandlerUnfinishedPolicy Actions taken if a workflow terminates with running handlers.
Class Info Information about the running workflow.
Class LocalActivityConfig TypedDict of config that can be used for start_local_activity and execute_local_activity.
Class LoggerAdapter Adapter that adds details to the log about the running workflow.
Class ParentClosePolicy How a child workflow should be handled when the parent closes.
Class ParentInfo Information about the parent workflow.
Class unsafe Contains static methods that should not normally be called during workflow execution except in advanced cases.
Class UpdateInfo Information about a workflow update.
Class UpdateMethodMultiParam Decorated workflow update functions implement this.
Class VersioningIntent Indicates whether the user intends certain commands to be run on a compatible worker Build Id version or not.
Exception ContinueAsNewError Error thrown by continue_as_new.
Exception NondeterminismError Error that can be thrown during replay for non-deterministic workflow.
Exception ReadOnlyContextError Error thrown when trying to do mutable workflow calls in a read-only context like a query or update validator.
Exception UnfinishedSignalHandlersWarning The workflow exited before all signal handlers had finished executing.
Exception UnfinishedUpdateHandlersWarning The workflow exited before all update handlers had finished executing.
Function all_handlers_finished Whether update and signal handlers have finished executing.
Function as_completed Return an iterator whose values are coroutines.
Function continue_as_new Stop the workflow immediately and continue as new.
Function current_update_info Info for the current update if any.
Function defn Decorator for workflow classes.
Function deprecate_patch Mark a patch as deprecated.
Async Function execute_activity Start an activity and wait for completion.
Async Function execute_activity_class Start an activity from a callable class and wait for completion.
Async Function execute_activity_method Start an activity from a method and wait for completion.
Async Function execute_child_workflow Start a child workflow and wait for completion.
Async Function execute_local_activity Start a local activity and wait for completion.
Async Function execute_local_activity_class Start a local activity from a callable class and wait for completion.
Async Function execute_local_activity_method Start a local activity from a method and wait for completion.
Function extern_functions External functions available in the workflow sandbox.
Function get_current_details Get the current details of the workflow which may appear in the UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow and is independent of the static details...
Function get_dynamic_query_handler Get the dynamic query handler if any.
Function get_dynamic_signal_handler Get the dynamic signal handler if any.
Function get_dynamic_update_handler Get the dynamic update handler if any.
Function get_external_workflow_handle Get a workflow handle to an existing workflow by its ID.
Function get_external_workflow_handle_for Get a typed workflow handle to an existing workflow by its ID.
Function get_query_handler Get the query handler for the given name if any.
Function get_signal_handler Get the signal handler for the given name if any.
Function get_update_handler Get the update handler for the given name if any.
Function info Current workflow's info.
Function init Decorator for the workflow init method.
Function memo Current workflow's memo values, converted without type hints.
Function memo_value Memo value for the given key, optional default, and optional type hint.
Function metric_meter Get the metric meter for the current workflow.
Function now Current time from the workflow perspective.
Function patched Patch a workflow.
Function payload_converter Get the payload converter for the current workflow.
Function query Decorator for a workflow query method.
Function random Get a deterministic pseudo-random number generator.
Function run Decorator for the workflow run method.
Function set_current_details Set the current details of the workflow which may appear in the UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow and is independent of the static details...
Function set_dynamic_query_handler Set or unset the dynamic query handler.
Function set_dynamic_signal_handler Set or unset the dynamic signal handler.
Function set_dynamic_update_handler Set or unset the dynamic update handler.
Function set_query_handler Set or unset the query handler for the given name.
Function set_signal_handler Set or unset the signal handler for the given name.
Function set_update_handler Set or unset the update handler for the given name.
Function signal Decorator for a workflow signal method.
Async Function sleep Sleep for the given duration.
Function start_activity Start an activity and return its handle.
Function start_activity_class Start an activity from a callable class.
Function start_activity_method Start an activity from a method.
Async Function start_child_workflow Start a child workflow and return its handle.
Function start_local_activity Start a local activity and return its handle.
Function start_local_activity_class Start a local activity from a callable class.
Function start_local_activity_method Start a local activity from a method.
Function time Current seconds since the epoch from the workflow perspective.
Function time_ns Current nanoseconds since the epoch from the workflow perspective.
Function update Decorator for a workflow update handler method.
Function upsert_search_attributes Upsert search attributes for this workflow.
Function uuid4 Get a new, determinism-safe v4 UUID based on random.
Async Function wait Wait for the Futures or Tasks given by fs to complete.
Async Function wait_condition Wait on a callback to become true.
Variable logger Logger that will have contextual workflow details embedded.
Class _AsyncioTask Undocumented
Class _Definition Undocumented
Class _QueryDefinition Undocumented
Class _Runtime Undocumented
Class _SignalDefinition Undocumented
Class _UpdateDefinition Undocumented
Exception _NotInWorkflowEventLoopError Undocumented
Exception _UnexpectedEvictionError Undocumented
Function _assert_dynamic_handler_args Undocumented
Function _bind_method Undocumented
Function _is_unbound_method_on_cls Undocumented
Function _parameters_identical_up_to_naming Return True if the functions have identical parameter lists, ignoring parameter names.
Function _release_waiter Undocumented
Function _set_current_update_info Undocumented
Function _update_validator Decorator for a workflow update validator method.
Async Function _wait Undocumented
Type Variable _FT Undocumented
Variable _current_update_info Undocumented
Variable _imports_passed_through Undocumented
Variable _in_sandbox Undocumented
Variable _sandbox_unrestricted Undocumented
def all_handlers_finished() -> bool: (source)

Whether update and signal handlers have finished executing.

Consider waiting on this condition before workflow return or continue-as-new, to prevent interruption of in-progress handlers by workflow exit: await workflow.wait_condition(lambda: workflow.all_handlers_finished())

Returns
boolTrue if there are no in-progress update or signal handler executions.
def as_completed(fs: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None) -> Iterator[Awaitable[AnyType]]: (source)

Return an iterator whose values are coroutines.

This is a deterministic version of asyncio.as_completed. This function should be used instead of that one in workflows.

@overload
def continue_as_new(arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: MethodAsyncNoParam[SelfType, Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None) -> NoReturn:
@overload
def continue_as_new(arg: ParamType, *, workflow: MethodAsyncSingleParam[SelfType, ParamType, Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[Any]], args: Sequence[Any], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None) -> NoReturn:
@overload
def continue_as_new(*, workflow: str, args: Sequence[Any] = [], task_queue: Optional[str] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None) -> NoReturn:
(source)

Stop the workflow immediately and continue as new.

Parameters
arg:AnySingle argument to the continued workflow.
args:Sequence[Any]Multiple arguments to the continued workflow. Cannot be set if arg is.
workflow:Union[None, Callable, str]Specific workflow to continue to. Defaults to the current workflow.
task_queue:Optional[str]Task queue to run the workflow on. Defaults to the current workflow's task queue.
run_timeout:Optional[timedelta]Timeout of a single workflow run. Defaults to the current workflow's run timeout.
task_timeout:Optional[timedelta]Timeout of a single workflow task. Defaults to the current workflow's task timeout.
retry_policy:Optional[temporalio.common.RetryPolicy]Undocumented
memo:Optional[Mapping[str, Any]]Memo for the workflow. Defaults to the current workflow's memo.
search_attributes:Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]]Search attributes for the workflow. Defaults to the current workflow's search attributes. The dictionary form of this is DEPRECATED.
versioning_intent:Optional[VersioningIntent]When using the Worker Versioning feature, specifies whether this Workflow should Continue-as-New onto a worker with a compatible Build Id or not.
Returns
NoReturnNever returns, always raises a ContinueAsNewError.
Raises
ContinueAsNewErrorAlways raised by this function. Should not be caught but instead be allowed to
def current_update_info() -> Optional[UpdateInfo]: (source)

Info for the current update if any.

This is powered by contextvars so it is only valid within the update handler and coroutines/tasks it has started.

Warning

This API is experimental

Returns
Optional[UpdateInfo]
Info for the current update handler the code calling this is executing
within if any.
@overload
def defn(cls: ClassType) -> ClassType:
@overload
def defn(*, name: Optional[str] = None, sandboxed: bool = True, failure_exception_types: Sequence[Type[BaseException]] = []) -> Callable[[ClassType], ClassType]:
@overload
def defn(*, sandboxed: bool = True, dynamic: bool = False) -> Callable[[ClassType], ClassType]:
(source)

Decorator for workflow classes.

This must be set on any registered workflow class (it is ignored if on a base class).

Parameters
cls:Optional[ClassType]The class to decorate.
name:Optional[str]Name to use for the workflow. Defaults to class __name__. This cannot be set if dynamic is set.
sandboxed:boolWhether the workflow should run in a sandbox. Default is true.
dynamic:boolIf true, this activity will be dynamic. Dynamic workflows have to accept a single 'Sequence[RawValue]' parameter. This cannot be set to true if name is present.
failure_exception_types:Sequence[Type[BaseException]]The types of exceptions that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to ones set on the worker constructor. If Exception is set, it effectively will fail a workflow/update in all user exception cases. WARNING: This setting is experimental.
def deprecate_patch(id: str): (source)

Mark a patch as deprecated.

This marks a workflow that had patched in a previous version of the code as no longer applicable because all workflows that use the old code path are done and will never be queried again. Therefore the old code path is removed as well.

Parameters
id:strThe identifier originally used with patched.
@overload
async def execute_activity(activity: CallableAsyncNoParam[ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: CallableSyncNoParam[ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], task_queue: Optional[str] = None, result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> Any:
(source)

Start an activity and wait for completion.

This is a shortcut for await start_activity.

@overload
async def execute_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
(source)

Start an activity from a callable class and wait for completion.

This is a shortcut for await start_activity_class.

@overload
async def execute_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
(source)

Start an activity from a method and wait for completion.

This is a shortcut for await start_activity_method.

@overload
async def execute_child_workflow(workflow: MethodAsyncNoParam[SelfType, ReturnType], *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_child_workflow(workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_child_workflow(workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ReturnType:
@overload
async def execute_child_workflow(workflow: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> Any:
(source)

Start a child workflow and wait for completion.

This is a shortcut for await (await start_child_workflow ).

@overload
async def execute_local_activity(activity: CallableAsyncNoParam[ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableSyncNoParam[ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> Any:
(source)

Start a local activity and wait for completion.

This is a shortcut for await start_local_activity.

Warning

Local activities are currently experimental.

@overload
async def execute_local_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
(source)

Start a local activity from a callable class and wait for completion.

This is a shortcut for await start_local_activity_class.

Warning

Local activities are currently experimental.

@overload
async def execute_local_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
@overload
async def execute_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ReturnType:
(source)

Start a local activity from a method and wait for completion.

This is a shortcut for await start_local_activity_method.

Warning

Local activities are currently experimental.

def extern_functions() -> Mapping[str, Callable]: (source)

External functions available in the workflow sandbox.

Returns
Mapping[str, Callable]Mapping of external functions that can be called from inside a workflow sandbox.
def get_current_details() -> str: (source)

Get the current details of the workflow which may appear in the UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow and is independent of the static details. This can be in Temporal markdown format and can span multiple lines.

def get_dynamic_query_handler() -> Optional[Callable]: (source)

Get the dynamic query handler if any.

This includes dynamic handlers created via the @workflow.query decorator.

Returns
Optional[Callable]Callable for the dynamic query handler if any.
def get_dynamic_signal_handler() -> Optional[Callable]: (source)

Get the dynamic signal handler if any.

This includes dynamic handlers created via the @workflow.signal decorator.

Returns
Optional[Callable]Callable for the dynamic signal handler if any.
def get_dynamic_update_handler() -> Optional[Callable]: (source)

Get the dynamic update handler if any.

This includes dynamic handlers created via the @workflow.update decorator.

Returns
Optional[Callable]Callable for the dynamic update handler if any.
def get_external_workflow_handle(workflow_id: str, *, run_id: Optional[str] = None) -> ExternalWorkflowHandle[Any]: (source)

Get a workflow handle to an existing workflow by its ID.

Parameters
workflow_id:strWorkflow ID to get a handle to.
run_id:Optional[str]Optional run ID for the workflow.
Returns
ExternalWorkflowHandle[Any]The external workflow handle.
def get_external_workflow_handle_for(workflow: Union[MethodAsyncNoParam[SelfType, Any], MethodAsyncSingleParam[SelfType, Any, Any]], workflow_id: str, *, run_id: Optional[str] = None) -> ExternalWorkflowHandle[SelfType]: (source)

Get a typed workflow handle to an existing workflow by its ID.

This is the same as get_external_workflow_handle but typed. Note, the workflow type given is not validated, it is only for typing.

Parameters
workflow:Union[MethodAsyncNoParam[SelfType, Any], MethodAsyncSingleParam[SelfType, Any, Any]]The workflow run method to use for typing the handle.
workflow_id:strWorkflow ID to get a handle to.
run_id:Optional[str]Optional run ID for the workflow.
Returns
ExternalWorkflowHandle[SelfType]The external workflow handle.
def get_query_handler(name: str) -> Optional[Callable]: (source)

Get the query handler for the given name if any.

This includes handlers created via the @workflow.query decorator.

Parameters
name:strName of the query.
Returns
Optional[Callable]Callable for the query if any. If a handler is not found for the name, this will not return the dynamic handler even if there is one.
def get_signal_handler(name: str) -> Optional[Callable]: (source)

Get the signal handler for the given name if any.

This includes handlers created via the @workflow.signal decorator.

Parameters
name:strName of the signal.
Returns
Optional[Callable]Callable for the signal if any. If a handler is not found for the name, this will not return the dynamic handler even if there is one.
def get_update_handler(name: str) -> Optional[Callable]: (source)

Get the update handler for the given name if any.

This includes handlers created via the @workflow.update decorator.

Parameters
name:strName of the update.
Returns
Optional[Callable]Callable for the update if any. If a handler is not found for the name, this will not return the dynamic handler even if there is one.
def info() -> Info: (source)

Current workflow's info.

Returns
InfoInfo for the currently running workflow.

Decorator for the workflow init method.

This may be used on the __init__ method of the workflow class to specify that it accepts the same workflow input arguments as the @workflow.run method. If used, the parameters of your __init__ and @workflow.run methods must be identical.

Parameters
init_fn:CallableTypeThe __init__ method to decorate.
Returns
CallableTypeUndocumented
def memo() -> Mapping[str, Any]: (source)

Current workflow's memo values, converted without type hints.

Since type hints are not used, the default converted values will come back. For example, if the memo was originally created with a dataclass, the value will be a dict. To convert using proper type hints, use memo_value.

Returns
Mapping[str, Any]Mapping of all memo keys and they values without type hints.
@overload
def memo_value(key: str, default: Any = temporalio.common._arg_unset) -> Any:
@overload
def memo_value(key: str, *, type_hint: Type[ParamType]) -> ParamType:
@overload
def memo_value(key: str, default: AnyType, *, type_hint: Type[ParamType]) -> Union[AnyType, ParamType]:
(source)

Memo value for the given key, optional default, and optional type hint.

Parameters
key:strKey to get memo value for.
default:AnyDefault to use if key is not present. If unset, a KeyError is raised when the key does not exist.
type_hint:Optional[Type]Type hint to use when converting.
Returns
AnyMemo value, converted with the type hint if present.
Raises
KeyErrorKey not present and default not set.

Get the metric meter for the current workflow.

This meter is replay safe which means that metrics will not be recorded during replay.

Returns
temporalio.common.MetricMeterCurrent metric meter for this workflow for recording metrics.

Current time from the workflow perspective.

This is the workflow equivalent of datetime.now with the timezone.utc parameter.

Returns
datetimeUTC datetime for the current workflow time. The datetime does have UTC set as the time zone.
def patched(id: str) -> bool: (source)

Patch a workflow.

When called, this will only return true if code should take the newer path which means this is either not replaying or is replaying and has seen this patch before.

Use deprecate_patch when all workflows are done and will never be queried again. The old code path can be used at that time too.

Parameters
id:strThe identifier for this patch. This identifier may be used repeatedly in the same workflow to represent the same patch
Returns
boolTrue if this should take the newer path, false if it should take the older path.

Get the payload converter for the current workflow.

This is often used for dynamic workflows/signals/queries to convert payloads.

@overload
def query(fn: CallableType) -> CallableType:
@overload
def query(*, name: str, description: Optional[str] = None) -> Callable[[CallableType], CallableType]:
@overload
def query(*, dynamic: Literal[True], description: Optional[str] = None) -> Callable[[CallableType], CallableType]:
@overload
def query(*, description: str) -> Callable[[CallableType], CallableType]:
(source)

Decorator for a workflow query method.

This is used on any non-async method that expects to handle a query. If a function overrides one with this decorator, it too must be decorated.

Query methods can only have positional parameters. Best practice for non-dynamic query methods is to only take a single object/dataclass argument that can accept more fields later if needed. The return value is the resulting query value. Query methods must not mutate any workflow state.

Parameters
fn:Optional[CallableType]The function to decorate.
name:Optional[str]Query name. Defaults to method __name__. Cannot be present when dynamic is present.
dynamic:Optional[bool]If true, this handles all queries not otherwise handled. The parameters of the method should be self, a string name, and a Sequence[RawValue]. An older form of this accepted vararg parameters which will now warn. Cannot be present when name is present.
description:Optional[str]A short description of the query that may appear in the UI/CLI.
def random() -> Random: (source)

Get a deterministic pseudo-random number generator.

Note, this random number generator is not cryptographically safe and should not be used for security purposes.

Returns
RandomThe deterministically-seeded pseudo-random number generator.

Decorator for the workflow run method.

This must be used on one and only one async method defined on the same class as @workflow.defn. This can be defined on a base class method but must then be explicitly overridden and defined on the workflow class.

Run methods can only have positional parameters. Best practice is to only take a single object/dataclass argument that can accept more fields later if needed.

Parameters
fn:CallableAsyncTypeThe function to decorate.
Returns
CallableAsyncTypeUndocumented
def set_current_details(description: str): (source)

Set the current details of the workflow which may appear in the UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow and is independent of the static details. This can be in Temporal markdown format and can span multiple lines.

def set_dynamic_query_handler(handler: Optional[Callable]): (source)

Set or unset the dynamic query handler.

This overrides the existing dynamic handler even if it was created via the @workflow.query decorator.

Parameters
handler:Optional[Callable]Callable to set or None to unset.
def set_dynamic_signal_handler(handler: Optional[Callable]): (source)

Set or unset the dynamic signal handler.

This overrides the existing dynamic handler even if it was created via the @workflow.signal decorator.

When set, all unhandled past signals are immediately sent to the handler.

Parameters
handler:Optional[Callable]Callable to set or None to unset.
def set_dynamic_update_handler(handler: Optional[Callable], *, validator: Optional[Callable] = None): (source)

Set or unset the dynamic update handler.

This overrides the existing dynamic handler even if it was created via the @workflow.update decorator.

Parameters
handler:Optional[Callable]Callable to set or None to unset.
validator:Optional[Callable]Callable to set or None to unset as the update validator.
def set_query_handler(name: str, handler: Optional[Callable]): (source)

Set or unset the query handler for the given name.

This overrides any existing handlers for the given name, including handlers created via the @workflow.query decorator.

Parameters
name:strName of the query.
handler:Optional[Callable]Callable to set or None to unset.
def set_signal_handler(name: str, handler: Optional[Callable]): (source)

Set or unset the signal handler for the given name.

This overrides any existing handlers for the given name, including handlers created via the @workflow.signal decorator.

When set, all unhandled past signals for the given name are immediately sent to the handler.

Parameters
name:strName of the signal.
handler:Optional[Callable]Callable to set or None to unset.
def set_update_handler(name: str, handler: Optional[Callable], *, validator: Optional[Callable] = None): (source)

Set or unset the update handler for the given name.

This overrides any existing handlers for the given name, including handlers created via the @workflow.update decorator.

Parameters
name:strName of the update.
handler:Optional[Callable]Callable to set or None to unset.
validator:Optional[Callable]Callable to set or None to unset as the update validator.

Decorator for a workflow signal method.

This is used on any async or non-async method that you wish to be called upon receiving a signal. If a function overrides one with this decorator, it too must be decorated.

Signal methods can only have positional parameters. Best practice for non-dynamic signal methods is to only take a single object/dataclass argument that can accept more fields later if needed. Return values from signal methods are ignored.

Parameters
fn:Optional[CallableSyncOrAsyncReturnNoneType]The function to decorate.
name:Optional[str]Signal name. Defaults to method __name__. Cannot be present when dynamic is present.
dynamic:Optional[bool]If true, this handles all signals not otherwise handled. The parameters of the method must be self, a string name, and a *args positional varargs. Cannot be present when name is present.
unfinished_policy:HandlerUnfinishedPolicyActions taken if a workflow terminates with a running instance of this handler.
description:Optional[str]A short description of the signal that may appear in the UI/CLI.
async def sleep(duration: Union[float, timedelta], *, summary: Optional[str] = None): (source)

Sleep for the given duration.

Parameters
duration:Union[float, timedelta]Duration to sleep in seconds or as a timedelta.
summary:Optional[str]A single-line fixed summary for this timer that may appear in UI/CLI. This can be in single-line Temporal markdown format.
@overload
def start_activity(activity: CallableAsyncNoParam[ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableSyncNoParam[ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], task_queue: Optional[str] = None, result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None) -> ActivityHandle[Any]:
(source)

Start an activity and return its handle.

At least one of schedule_to_close_timeout or start_to_close_timeout must be present.

Parameters
activity:AnyActivity name or function reference.
arg:AnySingle argument to the activity.
args:Sequence[Any]Multiple arguments to the activity. Cannot be set if arg is.
task_queue:Optional[str]Task queue to run the activity on. Defaults to the current workflow's task queue.
result_type:Optional[Type]For string activities, this can set the specific result type hint to deserialize into.
schedule_to_close_timeout:Optional[timedelta]Max amount of time the activity can take from first being scheduled to being completed before it times out. This is inclusive of all retries.
schedule_to_start_timeout:Optional[timedelta]Max amount of time the activity can take to be started from first being scheduled.
start_to_close_timeout:Optional[timedelta]Max amount of time a single activity run can take from when it starts to when it completes. This is per retry.
heartbeat_timeout:Optional[timedelta]How frequently an activity must invoke heartbeat while running before it is considered timed out.
retry_policy:Optional[temporalio.common.RetryPolicy]How an activity is retried on failure. If unset, a server-defined default is used. Set maximum attempts to 1 to disable retries.
cancellation_type:ActivityCancellationTypeHow the activity is treated when it is cancelled from the workflow.
activity_id:Optional[str]Optional unique identifier for the activity. This is an advanced setting that should not be set unless users are sure they need to. Contact Temporal before setting this value.
versioning_intent:Optional[VersioningIntent]When using the Worker Versioning feature, specifies whether this Activity should run on a worker with a compatible Build Id or not.
summary:Optional[str]A single-line fixed summary for this activity that may appear in UI/CLI. This can be in single-line Temporal markdown format.
Returns
ActivityHandle[Any]An activity handle to the activity which is an async task.
@overload
def start_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
(source)

Start an activity from a callable class.

See start_activity for parameter and return details.

@overload
def start_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], task_queue: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, heartbeat_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, versioning_intent: Optional[VersioningIntent] = None, summary: Optional[str] = None) -> ActivityHandle[ReturnType]:
(source)

Start an activity from a method.

See start_activity for parameter and return details.

@overload
async def start_child_workflow(workflow: MethodAsyncNoParam[SelfType, ReturnType], *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], id: Optional[str] = None, task_queue: Optional[str] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None) -> ChildWorkflowHandle[SelfType, ReturnType]:
@overload
async def start_child_workflow(workflow: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = '', memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]] = None, versioning_intent: Optional[VersioningIntent] = None, static_summary: Optional[str] = None, static_details: Optional[str] = None) -> ChildWorkflowHandle[Any, Any]:
(source)

Start a child workflow and return its handle.

Parameters
workflow:AnyString name or class method decorated with @workflow.run for the workflow to start.
arg:AnySingle argument to the child workflow.
args:Sequence[Any]Multiple arguments to the child workflow. Cannot be set if arg is.
id:Optional[str]Optional unique identifier for the workflow execution. If not set, defaults to uuid4.
task_queue:Optional[str]Task queue to run the workflow on. Defaults to the current workflow's task queue.
result_type:Optional[Type]For string workflows, this can set the specific result type hint to deserialize into.
cancellation_type:ChildWorkflowCancellationTypeHow the child workflow will react to cancellation.
parent_close_policy:ParentClosePolicyHow to handle the child workflow when the parent workflow closes.
execution_timeout:Optional[timedelta]Total workflow execution timeout including retries and continue as new.
run_timeout:Optional[timedelta]Timeout of a single workflow run.
task_timeout:Optional[timedelta]Timeout of a single workflow task.
id_reuse_policy:temporalio.common.WorkflowIDReusePolicyHow already-existing IDs are treated.
retry_policy:Optional[temporalio.common.RetryPolicy]Retry policy for the workflow.
cron_schedule:strSee https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo:Optional[Mapping[str, Any]]Memo for the workflow.
search_attributes:Optional[Union[temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes]]Search attributes for the workflow. The dictionary form of this is DEPRECATED.
versioning_intent:Optional[VersioningIntent]When using the Worker Versioning feature, specifies whether this Child Workflow should run on a worker with a compatible Build Id or not.
static_summary:Optional[str]A single-line fixed summary for this child workflow execution that may appear in the UI/CLI. This can be in single-line Temporal markdown format.
static_details:Optional[str]General fixed details for this child workflow execution that may appear in UI/CLI. This can be in Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be updated. For details that can be updated, use Workflow.get_current_details within the workflow.
Returns
ChildWorkflowHandle[Any, Any]A workflow handle to the started/existing workflow.
@overload
def start_local_activity(activity: CallableAsyncNoParam[ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableSyncNoParam[ReturnType], *, activity_id: Optional[str] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableAsyncSingleParam[ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: CallableSyncSingleParam[ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: Callable[..., Awaitable[ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: Callable[..., ReturnType], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity(activity: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[Any]:
(source)

Start a local activity and return its handle.

At least one of schedule_to_close_timeout or start_to_close_timeout must be present.

Warning

Local activities are currently experimental.

Parameters
activity:AnyActivity name or function reference.
arg:AnySingle argument to the activity.
args:Sequence[Any]Multiple arguments to the activity. Cannot be set if arg is.
result_type:Optional[Type]For string activities, this can set the specific result type hint to deserialize into.
schedule_to_close_timeout:Optional[timedelta]Max amount of time the activity can take from first being scheduled to being completed before it times out. This is inclusive of all retries.
schedule_to_start_timeout:Optional[timedelta]Max amount of time the activity can take to be started from first being scheduled.
start_to_close_timeout:Optional[timedelta]Max amount of time a single activity run can take from when it starts to when it completes. This is per retry.
retry_policy:Optional[temporalio.common.RetryPolicy]How an activity is retried on failure. If unset, an SDK-defined default is used. Set maximum attempts to 1 to disable retries.
local_retry_threshold:Optional[timedelta]Undocumented
cancellation_type:ActivityCancellationTypeHow the activity is treated when it is cancelled from the workflow.
activity_id:Optional[str]Optional unique identifier for the activity. This is an advanced setting that should not be set unless users are sure they need to. Contact Temporal before setting this value.
Returns
ActivityHandle[Any]An activity handle to the activity which is an async task.
@overload
def start_local_activity_class(activity: Type[CallableAsyncNoParam[ReturnType]], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableSyncNoParam[ReturnType]], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableAsyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[CallableSyncSingleParam[ParamType, ReturnType]], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[Callable[..., Awaitable[ReturnType]]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_class(activity: Type[Callable[..., ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
(source)

Start a local activity from a callable class.

See start_local_activity for parameter and return details.

Warning

Local activities are currently experimental.

@overload
def start_local_activity_method(activity: MethodAsyncNoParam[SelfType, ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodSyncNoParam[SelfType, ReturnType], *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: MethodSyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
@overload
def start_local_activity_method(activity: Callable[Concatenate[SelfType, MultiParamSpec], ReturnType], *, args: Sequence[Any], schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None) -> ActivityHandle[ReturnType]:
(source)

Start a local activity from a method.

See start_local_activity for parameter and return details.

Warning

Local activities are currently experimental.

def time() -> float: (source)

Current seconds since the epoch from the workflow perspective.

This is the workflow equivalent of time.time.

Returns
floatSeconds since the epoch as a float.
def time_ns() -> int: (source)

Current nanoseconds since the epoch from the workflow perspective.

This is the workflow equivalent of time.time_ns.

Returns
intNanoseconds since the epoch

Decorator for a workflow update handler method.

This is used on any async or non-async method that you wish to be called upon receiving an update. If a function overrides one with this decorator, it too must be decorated.

You may also optionally define a validator method that will be called before this handler you have applied this decorator to. You can specify the validator with @update_handler_function_name.validator.

Update methods can only have positional parameters. Best practice for non-dynamic update methods is to only take a single object/dataclass argument that can accept more fields later if needed. The handler may return a serializable value which will be sent back to the caller of the update.

Warning

This API is experimental

Parameters
fn:Optional[CallableSyncOrAsyncType]The function to decorate.
name:Optional[str]Update name. Defaults to method __name__. Cannot be present when dynamic is present.
dynamic:Optional[bool]If true, this handles all updates not otherwise handled. The parameters of the method must be self, a string name, and a *args positional varargs. Cannot be present when name is present.
unfinished_policy:HandlerUnfinishedPolicyActions taken if a workflow terminates with a running instance of this handler.
description:Optional[str]A short description of the update that may appear in the UI/CLI.

Upsert search attributes for this workflow.

Parameters
attributes:Union[temporalio.common.SearchAttributes, Sequence[temporalio.common.SearchAttributeUpdate]]The attributes to set. This should be a sequence of updates (i.e. values created via value_set and value_unset calls on search attribute keys). The dictionary form of attributes is DEPRECATED and if used, result in invalid key types on the typed_search_attributes property in the info.
def uuid4() -> uuid.UUID: (source)

Get a new, determinism-safe v4 UUID based on random.

Note, this UUID is not cryptographically safe and should not be used for security purposes.

Returns
uuid.UUIDA deterministically-seeded v4 UUID.
@overload
async def wait(fs: Iterable[_FT], *, timeout: Optional[float] = None, return_when: str = asyncio.ALL_COMPLETED) -> Tuple[List[_FT], List[_FT]]:
@overload
(source)

Wait for the Futures or Tasks given by fs to complete.

This is a deterministic version of asyncio.wait. This function should be used instead of that one in workflows.

async def wait_condition(fn: Callable[[], bool], *, timeout: Optional[Union[timedelta, float]] = None, timeout_summary: Optional[str] = None): (source)

Wait on a callback to become true.

This function returns when the callback returns true (invoked each loop iteration) or the timeout has been reached.

Parameters
fn:Callable[[], bool]Non-async callback that accepts no parameters and returns a boolean.
timeout:Optional[Union[timedelta, float]]Optional number of seconds to wait until throwing asyncio.TimeoutError.
timeout_summary:Optional[str]Optional simple string identifying the timer (created if timeout is present) that may be visible in UI/CLI. While it can be normal text, it is best to treat as a timer ID.

Logger that will have contextual workflow details embedded.

Logs are skipped during replay by default.

def _assert_dynamic_handler_args(fn: Callable, arg_types: Optional[List[Type]], is_method: bool) -> bool: (source)

Undocumented

def _bind_method(obj: Any, fn: Callable[..., Any]) -> Callable[..., Any]: (source)

Undocumented

def _is_unbound_method_on_cls(fn: Callable[..., Any], cls: Type) -> bool: (source)

Undocumented

def _parameters_identical_up_to_naming(fn1: Callable, fn2: Callable) -> bool: (source)

Return True if the functions have identical parameter lists, ignoring parameter names.

def _release_waiter(waiter: asyncio.Future[Any], *args): (source)

Undocumented

def _set_current_update_info(info: UpdateInfo): (source)

Undocumented

def _update_validator(update_def: _UpdateDefinition, fn: Optional[Callable[..., None]] = None) -> Optional[Callable[..., None]]: (source)

Decorator for a workflow update validator method.

async def _wait(fs: Iterable[Union[asyncio.Future, asyncio.Task]], timeout: Optional[float], return_when: str, loop: asyncio.AbstractEventLoop) -> Tuple[List, List]: (source)

Undocumented

Undocumented

Value
TypeVar('_FT',
        bound=asyncio.Future[Any])

Undocumented

_imports_passed_through = (source)

Undocumented

_in_sandbox = (source)

Undocumented

_sandbox_unrestricted = (source)

Undocumented