class documentation

Defines a start-workflow operation used by update-with-start requests.

Update-With-Start allows you to send an update to a workflow, while starting the workflow if necessary.

Warning

This API is experimental

Method __init__ Create a WithStartWorkflowOperation.
Async Method workflow_handle Wait until workflow is running and return a WorkflowHandle.
Instance Variable _start_workflow_input Undocumented
Instance Variable _used Undocumented
Instance Variable _workflow_handle Undocumented
@overload
def __init__(self, workflow: MethodAsyncNoParam[SelfType, ReturnType], *, id: str, task_queue: str, id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy, execution_timeout: timedelta | None = None, run_timeout: timedelta | None = None, task_timeout: timedelta | None = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: temporalio.common.RetryPolicy | None = None, cron_schedule: str = '', memo: Mapping[str, Any] | None = None, search_attributes: temporalio.common.TypedSearchAttributes | temporalio.common.SearchAttributes | None = None, static_summary: str | None = None, static_details: str | None = None, start_delay: timedelta | None = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: timedelta | None = None):
@overload
def __init__(self, workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType, *, id: str, task_queue: str, id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy, execution_timeout: timedelta | None = None, run_timeout: timedelta | None = None, task_timeout: timedelta | None = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: temporalio.common.RetryPolicy | None = None, cron_schedule: str = '', memo: Mapping[str, Any] | None = None, search_attributes: temporalio.common.TypedSearchAttributes | temporalio.common.SearchAttributes | None = None, static_summary: str | None = None, static_details: str | None = None, start_delay: timedelta | None = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: timedelta | None = None):
@overload
def __init__(self, workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]], *, args: Sequence[Any], id: str, task_queue: str, id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy, execution_timeout: timedelta | None = None, run_timeout: timedelta | None = None, task_timeout: timedelta | None = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: temporalio.common.RetryPolicy | None = None, cron_schedule: str = '', memo: Mapping[str, Any] | None = None, search_attributes: temporalio.common.TypedSearchAttributes | temporalio.common.SearchAttributes | None = None, static_summary: str | None = None, static_details: str | None = None, start_delay: timedelta | None = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: timedelta | None = None):
@overload
def __init__(self, workflow: str, arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], id: str, task_queue: str, id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy, result_type: type | None = None, execution_timeout: timedelta | None = None, run_timeout: timedelta | None = None, task_timeout: timedelta | None = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, retry_policy: temporalio.common.RetryPolicy | None = None, cron_schedule: str = '', memo: Mapping[str, Any] | None = None, search_attributes: temporalio.common.TypedSearchAttributes | temporalio.common.SearchAttributes | None = None, static_summary: str | None = None, static_details: str | None = None, start_delay: timedelta | None = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: timedelta | None = None):
(source)

Create a WithStartWorkflowOperation.

Warning

This API is experimental

See temporalio.client.Client.start_workflow for documentation of the arguments.

async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]: (source)

Wait until workflow is running and return a WorkflowHandle.

Warning

This API is experimental

_start_workflow_input = (source)

Undocumented

Undocumented

Undocumented