class documentation

class Worker: (source)

View In Hierarchy

Worker to process workflows and/or activities.

Once created, workers can be run and shutdown explicitly via run and shutdown, or they can be used in an async with clause.

Async Method __aenter__ Start the worker and return self for use by async with.
Async Method __aexit__ Same as shutdown for use by async with.
Method __init__ Create a worker to process workflows and/or activities.
Method config Config, as a dictionary, used to create this worker.
Async Method run Run the worker and wait on it to be shutdown.
Async Method shutdown Shutdown the worker and wait until all activities have completed.
Property task_queue Task queue this worker is on.
Method _start Undocumented
Instance Variable _activity_worker Undocumented
Instance Variable _bridge_worker Undocumented
Instance Variable _config Undocumented
Instance Variable _task Undocumented
Instance Variable _workflow_worker Undocumented
async def __aenter__(self) -> Worker: (source)

Start the worker and return self for use by async with.

Returns
WorkerSelf.
async def __aexit__(self, *args): (source)

Same as shutdown for use by async with.

def __init__(self, client: temporalio.client.Client, *, task_queue: str, activities: Sequence[Callable] = [], workflows: Sequence[Type] = [], activity_executor: Optional[concurrent.futures.Executor] = None, workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None, workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(), interceptors: Sequence[Interceptor] = [], build_id: Optional[str] = None, identity: Optional[str] = None, max_cached_workflows: int = 1000, max_concurrent_workflow_tasks: int = 100, max_concurrent_activities: int = 100, max_concurrent_local_activities: int = 100, max_concurrent_workflow_task_polls: int = 5, nonsticky_to_sticky_poll_ratio: float = 0.2, max_concurrent_activity_task_polls: int = 5, no_remote_activities: bool = False, sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30), max_activities_per_second: Optional[float] = None, max_task_queue_activities_per_second: Optional[float] = None, graceful_shutdown_timeout: timedelta = timedelta(), shared_state_manager: Optional[SharedStateManager] = None, debug_mode: bool = False): (source)

Create a worker to process workflows and/or activities.

Parameters
client:temporalio.client.ClientClient to use for this worker. This is required and must be the temporalio.client.Client instance or have a worker_service_client attribute with reference to the original client's underlying service client. This client cannot be "lazy".
task_queue:strRequired task queue for this worker.
activities:Sequence[Callable]Set of activity callables decorated with @activity.defn. Activities may be async functions or non-async functions.
workflows:Sequence[Type]Set of workflow classes decorated with @workflow.defn.
activity_executor:Optional[concurrent.futures.Executor]Concurrent executor to use for non-async activities. This is required if any activities are non-async. If this is a concurrent.futures.ProcessPoolExecutor, all non-async activities must be picklable.
workflow_task_executor:Optional[concurrent.futures.ThreadPoolExecutor]Thread pool executor for workflow tasks. If this is not present, a new concurrent.futures.ThreadPoolExecutor will be created with max_workers set to max(os.cpu_count(), 4). The default one will be properly shutdown, but if one is provided, the caller is responsible for shutting it down after the worker is shut down.
workflow_runner:WorkflowRunnerRunner for workflows.
interceptors:Sequence[Interceptor]Collection of interceptors for this worker. Any interceptors already on the client that also implement Interceptor are prepended to this list and should not be explicitly given here.
build_id:Optional[str]Unique identifier for the current runtime. This is best set as a hash of all code and should change only when code does. If unset, a best-effort identifier is generated.
identity:Optional[str]Identity for this worker client. If unset, the client identity is used.
max_cached_workflows:intIf nonzero, workflows will be cached and sticky task queues will be used.
max_concurrent_workflow_tasks:intMaximum allowed number of workflow tasks that will ever be given to this worker at one time.
max_concurrent_activities:intMaximum number of activity tasks that will ever be given to this worker concurrently.
max_concurrent_local_activities:intMaximum number of local activity tasks that will ever be given to this worker concurrently.
max_concurrent_workflow_task_polls:intMaximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue.
nonsticky_to_sticky_poll_ratio:floatmax_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if max_concurrent_workflow_task_polls is 1 and sticky queues are enabled, there will be 2 concurrent polls.
max_concurrent_activity_task_polls:intMaximum number of concurrent poll activity task requests we will perform at a time on this worker's task queue.
no_remote_activities:boolIf true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.
sticky_queue_schedule_to_start_timeout:timedeltaHow long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.
max_heartbeat_throttle_interval:timedeltaLongest interval for throttling activity heartbeats.
default_heartbeat_throttle_interval:timedeltaDefault interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it's the per-activity heartbeat timeout * 0.8.
max_activities_per_second:Optional[float]Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.
max_task_queue_activities_per_second:Optional[float]Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.
graceful_shutdown_timeout:timedeltaAmount of time after shutdown is called that activities are given to complete before their tasks are cancelled.
shared_state_manager:Optional[SharedStateManager]Used for obtaining cross-process friendly synchronization primitives. This is required for non-async activities where the activity_executor is not a concurrent.futures.ThreadPoolExecutor. Reuse of these across workers is encouraged.
debug_mode:boolIf true, will disable deadlock detection and may disable sandboxing in order to make using a debugger easier. If false but the environment variable TEMPORAL_DEBUG is truthy, this will be set to true.
def config(self) -> WorkerConfig: (source)

Config, as a dictionary, used to create this worker.

Returns
WorkerConfigConfiguration, shallow-copied.
async def run(self): (source)

Run the worker and wait on it to be shutdown.

async def shutdown(self): (source)

Shutdown the worker and wait until all activities have completed.

This will initiate a shutdown and optionally wait for a grace period before sending cancels to all activities.

This worker should not be used in any way once this is called.

@property
task_queue: str = (source)

Task queue this worker is on.

def _start(self) -> asyncio.Task: (source)

Undocumented

_activity_worker = (source)

Undocumented

_bridge_worker = (source)

Undocumented

_config = (source)

Undocumented

_task = (source)

Undocumented

_workflow_worker = (source)

Undocumented