class documentation

class ActivityEnvironment: (source)

View In Hierarchy

Activity environment for testing activities.

This environment is used for running activity code that can access the functions in the temporalio.activity module. Use run to run an activity function or any function within an activity context.

Method __init__ Create an ActivityEnvironment for running activity code.
Method cancel Cancel the activity.
Method run Run the given callable in an activity context.
Method worker_shutdown Notify the activity that the worker is shutting down.
Instance Variable info The info that is returned from temporalio.activity.info function.
Instance Variable on_heartbeat Function called on each heartbeat invocation by the activity.
Instance Variable _activities Undocumented
Instance Variable _cancelled Undocumented
Instance Variable _worker_shutdown Undocumented
def __init__(self): (source)

Create an ActivityEnvironment for running activity code.

def cancel(self): (source)

Cancel the activity.

This only has an effect on the first call.

def run(self, fn: Callable[_Params, _Return], *args: _Params.args, **kwargs: _Params.kwargs) -> _Return: (source)

Run the given callable in an activity context.

Parameters
fn:Callable[_Params, _Return]The function/callable to run.
*args:_Params.argsAll positional arguments to the callable.
**kwargs:_Params.kwargsAll keyword arguments to the callable.
Returns
_ReturnThe callable's result.
def worker_shutdown(self): (source)

Notify the activity that the worker is shutting down.

This only has an effect on the first call.

info = (source)

The info that is returned from temporalio.activity.info function.

on_heartbeat: Callable[..., None] = (source)

Function called on each heartbeat invocation by the activity.

_activities: Set[_Activity] = (source)

Undocumented

_cancelled: bool = (source)

Undocumented

_worker_shutdown: bool = (source)

Undocumented