class Worker: (source)
Constructor: Worker(client, task_queue, activities, workflows, ...)
Worker to process workflows and/or activities.
Once created, workers can be run and shutdown explicitly via run
and shutdown
. Alternatively workers can be used in an
async with clause. See __aenter__
and __aexit__
for important details about fatal errors.
Async Method | __aenter__ |
Start the worker and return self for use by async with. |
Async Method | __aexit__ |
Same as shutdown for use by async with. |
Method | __init__ |
Create a worker to process workflows and/or activities. |
Method | client |
Update the client associated with the worker. |
Method | config |
Config, as a dictionary, used to create this worker. |
Async Method | run |
Run the worker and wait on it to be shut down. |
Async Method | shutdown |
Initiate a worker shutdown and wait until complete. |
Property | client |
Client currently set on the worker. |
Property | is |
Whether the worker is running. |
Property | is |
Whether the worker has run and shut down. |
Property | task |
Task queue this worker is on. |
Instance Variable | _activity |
Undocumented |
Instance Variable | _async |
Undocumented |
Instance Variable | _async |
Undocumented |
Instance Variable | _async |
Undocumented |
Instance Variable | _bridge |
Undocumented |
Instance Variable | _config |
Undocumented |
Instance Variable | _runtime |
Undocumented |
Instance Variable | _shutdown |
Undocumented |
Instance Variable | _shutdown |
Undocumented |
Instance Variable | _started |
Undocumented |
Instance Variable | _workflow |
Undocumented |
Start the worker and return self for use by async with.
This is a wrapper around run
. Please review that method.
This takes a similar approach to asyncio.timeout
in that it
will cancel the current task if there is a fatal worker error and raise
that error out of the context manager. However, if the inner async code
swallows/wraps the asyncio.CancelledError
, the exiting
portion of the context manager will not raise the fatal worker error.
Same as shutdown
for use by async with.
Note, this will raise the worker fatal error if one occurred and the inner task cancellation was not inadvertently swallowed/wrapped.
temporalio.client.Client
, *, task_queue: str
, activities: Sequence[ Callable]
= [], workflows: Sequence[ Type]
= [], activity_executor: Optional[ concurrent.futures.Executor]
= None, workflow_task_executor: Optional[ concurrent.futures.ThreadPoolExecutor]
= None, workflow_runner: WorkflowRunner
= SandboxedWorkflowRunner(), unsandboxed_workflow_runner: WorkflowRunner
= UnsandboxedWorkflowRunner(), interceptors: Sequence[ Interceptor]
= [], build_id: Optional[ str]
= None, identity: Optional[ str]
= None, max_cached_workflows: int
= 1000, max_concurrent_workflow_tasks: Optional[ int]
= None, max_concurrent_activities: Optional[ int]
= None, max_concurrent_local_activities: Optional[ int]
= None, tuner: Optional[ WorkerTuner]
= None, max_concurrent_workflow_task_polls: int
= 5, nonsticky_to_sticky_poll_ratio: float
= 0.2, max_concurrent_activity_task_polls: int
= 5, no_remote_activities: bool
= False, sticky_queue_schedule_to_start_timeout: timedelta
= timedelta(timedelta
= timedelta(timedelta
= timedelta(Optional[ float]
= None, max_task_queue_activities_per_second: Optional[ float]
= None, graceful_shutdown_timeout: timedelta
= timedelta(), workflow_failure_exception_types: Sequence[ Type[ BaseException]]
= [], shared_state_manager: Optional[ SharedStateManager]
= None, debug_mode: bool
= False, disable_eager_activity_execution: bool
= False, on_fatal_error: Optional[ Callable[ [ BaseException], Awaitable[ None]]]
= None, use_worker_versioning: bool
= False, disable_safe_workflow_eviction: bool
= False):
(source)
¶
Create a worker to process workflows and/or activities.
Parameters | |
client:temporalio.client.Client | Client to use for this worker. This is required and must be
the temporalio.client.Client instance or have a
worker_service_client attribute with reference to the original
client's underlying service client. This client cannot be
"lazy". |
taskstr | Required task queue for this worker. |
activities:Sequence[ | Set of activity callables decorated with
@activity.defn . Activities
may be async functions or non-async functions. |
workflows:Sequence[ | Set of workflow classes decorated with
@workflow.defn . |
activityOptional[ | Concurrent executor to use for non-async
activities. This is required if any activities are non-async.
concurrent.futures.ThreadPoolExecutor is
recommended. If this is a
concurrent.futures.ProcessPoolExecutor , all
non-async activities must be picklable. max_workers on the
executor should at least be max_concurrent_activities or a
warning is issued. Note, a broken-executor failure from this
executor will cause the worker to fail and shutdown. |
workflowOptional[ | Thread pool executor for workflow tasks. If
this is not present, a new
concurrent.futures.ThreadPoolExecutor will be
created with max_workers set to max(os.cpu_count(), 4).
The default one will be properly shutdown, but if one is
provided, the caller is responsible for shutting it down after
the worker is shut down. |
workflowWorkflowRunner | Runner for workflows. |
unsandboxedWorkflowRunner | Runner for workflows that opt-out of sandboxing. |
interceptors:Sequence[ | Collection of interceptors for this worker. Any
interceptors already on the client that also implement
Interceptor are prepended to this list and should
not be explicitly given here. |
buildOptional[ | Unique identifier for the current runtime. This is best set as a hash of all code and should change only when code does. If unset, a best-effort identifier is generated. |
identity:Optional[ | Identity for this worker client. If unset, the client identity is used. |
maxint | If nonzero, workflows will be cached and sticky task queues will be used. |
maxOptional[ | Maximum allowed number of workflow tasks that will ever be given to this worker at one time. Mutually exclusive with tuner. |
maxOptional[ | Maximum number of activity tasks that will ever be given to this worker concurrently. Mutually exclusive with tuner. |
maxOptional[ | Maximum number of local activity tasks that will ever be given to this worker concurrently. Mutually exclusive with tuner. |
tuner:Optional[ | Provide a custom WARNING: This argument is experimental |
maxint | Maximum number of concurrent poll workflow task requests we will perform at a time on this worker's task queue. |
nonstickyfloat | max_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if max_concurrent_workflow_task_polls is 1 and sticky queues are enabled, there will be 2 concurrent polls. |
maxint | Maximum number of concurrent poll activity task requests we will perform at a time on this worker's task queue. |
nobool | If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks. |
stickytimedelta | How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker. |
maxtimedelta | Longest interval for throttling activity heartbeats. |
defaulttimedelta | Default interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it's the per-activity heartbeat timeout * 0.8. |
maxOptional[ | Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit. |
maxOptional[ | Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning. |
gracefultimedelta | Amount of time after shutdown is called that activities are given to complete before their tasks are cancelled. |
workflowSequence[ | The types of exceptions that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to ones set on the workflow.defn decorator. If Exception is set, it effectively will fail a workflow/update in all user exception cases. WARNING: This setting is experimental. |
sharedOptional[ | Used for obtaining cross-process friendly
synchronization primitives. This is required for non-async
activities where the activity_executor is not a
concurrent.futures.ThreadPoolExecutor . Reuse of
these across workers is encouraged. |
debugbool | If true, will disable deadlock detection and may disable sandboxing in order to make using a debugger easier. If false but the environment variable TEMPORAL_DEBUG is truthy, this will be set to true. |
disablebool | If true, will disable eager activity execution. Eager activity execution is an optimization on some servers that sends activities back to the same worker as the calling workflow if they can run there. This setting is experimental and may be removed in a future release. |
onOptional[ | An async function that can handle a failure before the worker shutdown commences. This cannot stop the shutdown and any exception raised is logged and ignored. |
usebool | If true, the build_id argument must be
specified, and this worker opts into the worker versioning
feature. This ensures it only receives workflow tasks for
workflows which it claims to be compatible with. For more
information, see
https://docs.temporal.io/workers#worker-versioning. |
disablebool | If true, instead of letting the workflow collect its tasks properly, the worker will simply let the Python garbage collector collect the tasks. WARNING: Users should not set this value to true. The garbage collector will throw GeneratorExit in coroutines causing them to wake up in different threads and run finally and other code in the wrong workflow environment. |
Update the client associated with the worker.
Changing the client will make sure the worker starts using it for the next calls it makes. However, outstanding client calls will still complete with the existing client. The new client cannot be "lazy" and must be using the same runtime as the current client.
Config, as a dictionary, used to create this worker.
Returns | |
WorkerConfig | Configuration, shallow-copied. |
Run the worker and wait on it to be shut down.
This will not return until shutdown is complete. This means that activities have all completed after being told to cancel after the graceful timeout period.
This method will raise if there is a worker fatal error. While
shutdown
does not need to be invoked in this case, it is
harmless to do so. Otherwise, to shut down this worker, invoke
shutdown
.
Technically this worker can be shutdown by issuing a cancel to this async function assuming that it is currently running. A cancel could also cancel the shutdown process. Therefore users are encouraged to use explicit shutdown instead.
Initiate a worker shutdown and wait until complete.
This can be called before the worker has even started and is safe for repeated invocations. It simply sets a marker informing the worker to shut down as it runs.
This will not return until the worker has completed shutting down.
Whether the worker is running.
This is only True if the worker has been started and not yet shut down.
Whether the worker has run and shut down.
This is only True if the worker was once started and then shutdown.
This is not necessarily True after shutdown
is first
called because the shutdown process can take a bit.