class documentation

class ActivityEnvironment: (source)

Constructor: ActivityEnvironment()

View In Hierarchy

Activity environment for testing activities.

This environment is used for running activity code that can access the functions in the temporalio.activity module. Use run to run an activity function or any function within an activity context.

Method __init__ Create an ActivityEnvironment for running activity code.
Method cancel Cancel the activity.
Method run Run the given callable in an activity context.
Method worker_shutdown Notify the activity that the worker is shutting down.
Instance Variable info The info that is returned from temporalio.activity.info function.
Instance Variable metric_meter Metric meter set on the activity context. This must be set before run. Changes after the activity has started do not take effect. Default is noop.
Instance Variable on_heartbeat Function called on each heartbeat invocation by the activity.
Instance Variable payload_converter Payload converter set on the activity context. This must be set before run. Changes after the activity has started do not take effect.
Instance Variable _activities Undocumented
Instance Variable _cancelled Undocumented
Instance Variable _worker_shutdown Undocumented
def __init__(self): (source)

Create an ActivityEnvironment for running activity code.

def cancel(self): (source)

Cancel the activity.

This only has an effect on the first call.

def run(self, fn: Callable[_Params, _Return], *args: _Params.args, **kwargs: _Params.kwargs) -> _Return: (source)

Run the given callable in an activity context.

Parameters
fn:Callable[_Params, _Return]The function/callable to run.
*args:_Params.argsAll positional arguments to the callable.
**kwargs:_Params.kwargsAll keyword arguments to the callable.
Returns
_ReturnThe callable's result.
def worker_shutdown(self): (source)

Notify the activity that the worker is shutting down.

This only has an effect on the first call.

The info that is returned from temporalio.activity.info function.

metric_meter = (source)

Metric meter set on the activity context. This must be set before run. Changes after the activity has started do not take effect. Default is noop.

on_heartbeat: Callable[..., None] = (source)

Function called on each heartbeat invocation by the activity.

payload_converter = (source)

Payload converter set on the activity context. This must be set before run. Changes after the activity has started do not take effect.

_activities: set[_Activity] = (source)

Undocumented

_cancelled: bool = (source)

Undocumented

_worker_shutdown: bool = (source)

Undocumented