class documentation

Worker to process workflows and/or activities.

Once created, workers can be run and shutdown explicitly via run and shutdown. Alternatively workers can be used in an async with clause. See __aenter__ and __aexit__ for important details about fatal errors.

Async Method __aenter__ Start the worker and return self for use by async with.
Async Method __aexit__ Same as shutdown for use by async with.
Method __init__ Create a worker to process workflows and/or activities.
Method config Config, as a dictionary, used to create this worker.
Async Method run Run the worker and wait on it to be shut down.
Async Method shutdown Initiate a worker shutdown and wait until complete.
Property is_running Whether the worker is running.
Property is_shutdown Whether the worker has run and shut down.
Property task_queue Task queue this worker is on.
Instance Variable _activity_worker Undocumented
Instance Variable _async_context_inner_task Undocumented
Instance Variable _async_context_run_exception Undocumented
Instance Variable _async_context_run_task Undocumented
Instance Variable _bridge_worker Undocumented
Instance Variable _config Undocumented
Instance Variable _shutdown_complete_event Undocumented
Instance Variable _shutdown_event Undocumented
Instance Variable _started Undocumented
Instance Variable _workflow_worker Undocumented
async def __aenter__(self) -> Worker: (source)

Start the worker and return self for use by async with.

This is a wrapper around run. Please review that method.

This takes a similar approach to asyncio.timeout in that it will cancel the current task if there is a fatal worker error and raise that error out of the context manager. However, if the inner async code swallows/wraps the asyncio.CancelledError, the exiting portion of the context manager will not raise the fatal worker error.

async def __aexit__(self, exc_type: Optional[Type[BaseException]], *args): (source)

Same as shutdown for use by async with.

Note, this will raise the worker fatal error if one occurred and the inner task cancellation was not inadvertently swallowed/wrapped.

def __init__(self, client: temporalio.client.Client, *, task_queue: str, activities: Sequence[Callable] = [], workflows: Sequence[Type] = [], activity_executor: Optional[concurrent.futures.Executor] = None, workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None, workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(), unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(), interceptors: Sequence[Interceptor] = [], build_id: Optional[str] = None, identity: Optional[str] = None, max_cached_workflows: int = 1000, max_concurrent_workflow_tasks: int = 100, max_concurrent_activities: int = 100, max_concurrent_local_activities: int = 100, max_concurrent_workflow_task_polls: int = 5, nonsticky_to_sticky_poll_ratio: float = 0.2, max_concurrent_activity_task_polls: int = 5, no_remote_activities: bool = False, sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30), max_activities_per_second: Optional[float] = None, max_task_queue_activities_per_second: Optional[float] = None, graceful_shutdown_timeout: timedelta = timedelta(), shared_state_manager: Optional[SharedStateManager] = None, debug_mode: bool = False, disable_eager_activity_execution: bool = False, on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None, use_worker_versioning: bool = False, disable_safe_workflow_eviction: bool = False): (source)

Create a worker to process workflows and/or activities.

Parameters
client:temporalio.client.ClientClient to use for this worker. This is required and must be the temporalio.client.Client instance or have a worker_service_client attribute with reference to the original client's underlying service client. This client cannot be "lazy".
task_queue:strRequired task queue for this worker.
activities:Sequence[Callable]Set of activity callables decorated with @activity.defn. Activities may be async functions or non-async functions.
workflows:Sequence[Type]Set of workflow classes decorated with @workflow.defn.
activity_executor:Optional[concurrent.futures.Executor]Concurrent executor to use for non-async activities. This is required if any activities are non-async. concurrent.futures.ThreadPoolExecutor is recommended. If this is a concurrent.futures.ProcessPoolExecutor, all non-async activities must be picklable. max_workers on the executor should at least be max_concurrent_activities or a warning is issued. Note, a broken-executor failure from this executor will cause the worker to fail and shutdown.
workflow_task_executor:Optional[concurrent.futures.ThreadPoolExecutor]Thread pool executor for workflow tasks. If this is not present, a new concurrent.futures.ThreadPoolExecutor will be created with max_workers set to max(os.cpu_count(), 4). The default one will be properly shutdown, but if one is provided, the caller is responsible for shutting it down after the worker is shut down.
workflow_runner:WorkflowRunnerRunner for workflows.
unsandboxed_workflow_runner:WorkflowRunnerRunner for workflows that opt-out of sandboxing.
interceptors:Sequence[Interceptor]Collection of interceptors for this worker. Any interceptors already on the client that also implement Interceptor are prepended to this list and should not be explicitly given here.
build_id:Optional[str]Unique identifier for the current runtime. This is best set as a hash of all code and should change only when code does. If unset, a best-effort identifier is generated.
identity:Optional[str]Identity for this worker client. If unset, the client identity is used.
max_cached_workflows:intIf nonzero, workflows will be cached and sticky task queues will be used.
max_concurrent_workflow_tasks:intMaximum allowed number of workflow tasks that will ever be given to this worker at one time.
max_concurrent_activities:intMaximum number of activity tasks that will ever be given to this worker concurrently.
max_concurrent_local_activities:intMaximum number of local activity tasks that will ever be given to this worker concurrently.
max_concurrent_workflow_task_polls:intMaximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue.
nonsticky_to_sticky_poll_ratio:floatmax_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if max_concurrent_workflow_task_polls is 1 and sticky queues are enabled, there will be 2 concurrent polls.
max_concurrent_activity_task_polls:intMaximum number of concurrent poll activity task requests we will perform at a time on this worker's task queue.
no_remote_activities:boolIf true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.
sticky_queue_schedule_to_start_timeout:timedeltaHow long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.
max_heartbeat_throttle_interval:timedeltaLongest interval for throttling activity heartbeats.
default_heartbeat_throttle_interval:timedeltaDefault interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it's the per-activity heartbeat timeout * 0.8.
max_activities_per_second:Optional[float]Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.
max_task_queue_activities_per_second:Optional[float]Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.
graceful_shutdown_timeout:timedeltaAmount of time after shutdown is called that activities are given to complete before their tasks are cancelled.
shared_state_manager:Optional[SharedStateManager]Used for obtaining cross-process friendly synchronization primitives. This is required for non-async activities where the activity_executor is not a concurrent.futures.ThreadPoolExecutor. Reuse of these across workers is encouraged.
debug_mode:boolIf true, will disable deadlock detection and may disable sandboxing in order to make using a debugger easier. If false but the environment variable TEMPORAL_DEBUG is truthy, this will be set to true.
disable_eager_activity_execution:boolIf true, will disable eager activity execution. Eager activity execution is an optimization on some servers that sends activities back to the same worker as the calling workflow if they can run there. This setting is experimental and may be removed in a future release.
on_fatal_error:Optional[Callable[[BaseException], Awaitable[None]]]An async function that can handle a failure before the worker shutdown commences. This cannot stop the shutdown and any exception raised is logged and ignored.
use_worker_versioning:boolIf true, the build_id argument must be specified, and this worker opts into the worker versioning feature. This ensures it only receives workflow tasks for workflows which it claims to be compatible with. For more information, see https://docs.temporal.io/workers#worker-versioning.
disable_safe_workflow_eviction:boolIf true, instead of letting the workflow collect its tasks properly, the worker will simply let the Python garbage collector collect the tasks. WARNING: Users should not set this value to true. The garbage collector will throw GeneratorExit in coroutines causing them to to wake up in different threads and run finally and other code in the wrong workflow environment.
def config(self) -> WorkerConfig: (source)

Config, as a dictionary, used to create this worker.

Returns
WorkerConfigConfiguration, shallow-copied.
async def run(self): (source)

Run the worker and wait on it to be shut down.

This will not return until shutdown is complete. This means that activities have all completed after being told to cancel after the graceful timeout period.

This method will raise if there is a worker fatal error. While shutdown does not need to be invoked in this case, it is harmless to do so. Otherwise, to shut down this worker, invoke shutdown.

Technically this worker can be shutdown by issuing a cancel to this async function assuming that it is currently running. A cancel could also cancel the shutdown process. Therefore users are encouraged to use explicit shutdown instead.

async def shutdown(self): (source)

Initiate a worker shutdown and wait until complete.

This can be called before the worker has even started and is safe for repeated invocations. It simply sets a marker informing the worker to shut down as it runs.

This will not return until the worker has completed shutting down.

Whether the worker is running.

This is only True if the worker has been started and not yet shut down.

Whether the worker has run and shut down.

This is only True if the worker was once started and then shutdown. This is not necessarily True after shutdown is first called because the shutdown process can take a bit.

Task queue this worker is on.

_activity_worker = (source)

Undocumented

_async_context_inner_task = (source)

Undocumented

_async_context_run_exception: Optional[BaseException] = (source)

Undocumented

_async_context_run_task = (source)

Undocumented

_bridge_worker = (source)

Undocumented

Undocumented

_shutdown_complete_event = (source)

Undocumented

_shutdown_event = (source)

Undocumented

_started: bool = (source)

Undocumented

_workflow_worker = (source)

Undocumented