class documentation

class WorkflowEnvironment: (source)

View In Hierarchy

Workflow environment for testing workflows.

Most developers will want to use the static start_time_skipping to start a test server process that automatically skips time as needed. Alternatively, start_local may be used for a full, local Temporal server with more features. To use an existing server, use from_client.

This environment is an async context manager, so it can be used with async with to make sure it shuts down properly. Otherwise, shutdown can be manually called.

To use the environment, simply use the client on it.

Workflows invoked on the workflow environment are automatically configured to have assert failures fail the workflow with the assertion error.

Static Method from_client Create a workflow environment from the given client.
Async Static Method start_local Start a full Temporal server locally, downloading if necessary.
Async Static Method start_time_skipping Start a time skipping workflow environment.
Async Method __aenter__ Noop for async with support.
Async Method __aexit__ For async with support to just call shutdown.
Method __init__ Create a workflow environment from a client.
Method auto_time_skipping_disabled Disable any automatic time skipping if this is a time-skipping environment.
Async Method get_current_time Get the current time known to this environment.
Async Method shutdown Shut down this environment.
Async Method sleep Sleep in this environment.
Property client Client to this environment.
Property supports_time_skipping Whether this environment supports time skipping.
Instance Variable _client Undocumented

Create a workflow environment from the given client.

supports_time_skipping will always return False for this environment. sleep will sleep the actual amount of time and get_current_time will return the current time.

Parameters
client:temporalio.client.ClientThe client to use for the environment.
Returns
WorkflowEnvironmentThe workflow environment that runs against the given client.
@staticmethod
async def start_local(*, namespace: str = 'default', data_converter: temporalio.converter.DataConverter = temporalio.converter.default(), interceptors: Sequence[temporalio.client.Interceptor] = [], default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, rpc_metadata: Mapping[str, str] = {}, identity: Optional[str] = None, ip: str = '127.0.0.1', port: Optional[int] = None, download_dest_dir: Optional[str] = None, ui: bool = False, temporalite_existing_path: Optional[str] = None, temporalite_database_filename: Optional[str] = None, temporalite_log_format: str = 'pretty', temporalite_log_level: Optional[str] = 'warn', temporalite_download_version: str = 'default', temporalite_extra_args: Sequence[str] = []) -> WorkflowEnvironment: (source)

Start a full Temporal server locally, downloading if necessary.

This environment is good for testing full server capabilities, but does not support time skipping like start_time_skipping does. supports_time_skipping will always return False for this environment. sleep will sleep the actual amount of time and get_current_time will return the current time.

Internally, this uses Temporalite. Which is a self-contained binary for Temporal using Sqlite persistence. This will download Temporalite to a temporary directory by default if it has not already been downloaded before and temporalite_existing_path is not set.

In the future, the Temporalite implementation may be changed to another implementation. Therefore, all temporalite_ prefixed parameters are Temporalite specific and may not apply to newer versions.

Parameters
namespace:strNamespace name to use for this environment.
data_converter:temporalio.converter.DataConverterSee parameter of the same name on temporalio.client.Client.connect.
interceptors:Sequence[temporalio.client.Interceptor]See parameter of the same name on temporalio.client.Client.connect.
default_workflow_query_reject_condition:Optional[temporalio.common.QueryRejectCondition]See parameter of the same name on temporalio.client.Client.connect.
retry_config:Optional[temporalio.client.RetryConfig]See parameter of the same name on temporalio.client.Client.connect.
rpc_metadata:Mapping[str, str]See parameter of the same name on temporalio.client.Client.connect.
identity:Optional[str]See parameter of the same name on temporalio.client.Client.connect.
ip:strIP address to bind to, or 127.0.0.1 by default.
port:Optional[int]Port number to bind to, or an OS-provided port by default.
download_dest_dir:Optional[str]Directory to download binary to if a download is needed. If unset, this is the system's temporary directory.
ui:boolIf True, will start a UI in Temporalite.
temporalite_existing_path:Optional[str]Existing path to the Temporalite binary. If present, no download will be attempted to fetch the binary.
temporalite_database_filename:Optional[str]Path to the Sqlite database to use for Temporalite. Unset default means only in-memory Sqlite will be used.
temporalite_log_format:strLog format for Temporalite.
temporalite_log_level:Optional[str]Log level to use for Temporalite. Default is warn, but if set to None this will translate the Python logger's level to a Temporalite level.
temporalite_download_version:strSpecific Temporalite version to download. Defaults to default which downloads the version known to work best with this SDK.
temporalite_extra_args:Sequence[str]Extra arguments for the Temporalite binary.
Returns
WorkflowEnvironmentThe started Temporalite workflow environment.
@staticmethod
async def start_time_skipping(*, data_converter: temporalio.converter.DataConverter = temporalio.converter.default(), interceptors: Sequence[temporalio.client.Interceptor] = [], default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, retry_config: Optional[temporalio.client.RetryConfig] = None, rpc_metadata: Mapping[str, str] = {}, identity: Optional[str] = None, port: Optional[int] = None, download_dest_dir: Optional[str] = None, test_server_existing_path: Optional[str] = None, test_server_download_version: str = 'default', test_server_extra_args: Sequence[str] = []) -> WorkflowEnvironment: (source)

Start a time skipping workflow environment.

By default, this environment will automatically skip to the next events in time when a workflow's temporalio.client.WorkflowHandle.result is awaited on (which includes temporalio.client.Client.execute_workflow). Before the result is awaited on, time can be manually skipped forward using sleep. The currently known time can be obtained via get_current_time.

Internally, this environment lazily downloads a test-server binary for the current OS/arch into the temp directory if it is not already there. Then the executable is started and will be killed when shutdown is called (which is implicitly done if this is started via async with await WorkflowEnvironment.start_time_skipping()).

Users can reuse this environment for testing multiple independent workflows, but not concurrently. Time skipping, which is automatically done when awaiting a workflow result and manually done on sleep, is global to the environment, not to the workflow under test.

In the future, the test server implementation may be changed to another implementation. Therefore, all test_server_ prefixed parameters are test server specific and may not apply to newer versions.

Parameters
data_converter:temporalio.converter.DataConverterSee parameter of the same name on temporalio.client.Client.connect.
interceptors:Sequence[temporalio.client.Interceptor]See parameter of the same name on temporalio.client.Client.connect.
default_workflow_query_reject_condition:Optional[temporalio.common.QueryRejectCondition]See parameter of the same name on temporalio.client.Client.connect.
retry_config:Optional[temporalio.client.RetryConfig]See parameter of the same name on temporalio.client.Client.connect.
rpc_metadata:Mapping[str, str]See parameter of the same name on temporalio.client.Client.connect.
identity:Optional[str]See parameter of the same name on temporalio.client.Client.connect.
port:Optional[int]Port number to bind to, or an OS-provided port by default.
download_dest_dir:Optional[str]Directory to download binary to if a download is needed. If unset, this is the system's temporary directory.
test_server_existing_path:Optional[str]Existing path to the test server binary. If present, no download will be attempted to fetch the binary.
test_server_download_version:strSpecific test server version to download. Defaults to default which downloads the version known to work best with this SDK.
test_server_extra_args:Sequence[str]Extra arguments for the test server binary.
Returns
WorkflowEnvironmentThe started workflow environment with time skipping.
async def __aenter__(self) -> WorkflowEnvironment: (source)

Noop for async with support.

async def __aexit__(self, *args): (source)

For async with support to just call shutdown.

def __init__(self, client: temporalio.client.Client): (source)

Create a workflow environment from a client.

Most users would use a static method instead.

@contextmanager
def auto_time_skipping_disabled(self) -> Iterator[None]: (source)

Disable any automatic time skipping if this is a time-skipping environment.

This is a context manager for use via with. Usually in time-skipping environments, waiting on a workflow result causes time to automatically skip until the next event. This can disable that. However, this only applies to results awaited inside this context. This will not disable automatic time skipping on previous results.

This has no effect on non-time-skipping environments.

async def get_current_time(self) -> datetime: (source)

Get the current time known to this environment.

For non-time-skipping environments this is simply the system time. For time-skipping environments this is whatever time has been skipped to.

async def shutdown(self): (source)

Shut down this environment.

async def sleep(self, duration: Union[timedelta, float]): (source)

Sleep in this environment.

This awaits a regular asyncio.sleep in regular environments, or manually skips time in time-skipping environments.

Parameters
duration:Union[timedelta, float]Amount of time to sleep.

Client to this environment.

@property
supports_time_skipping: bool = (source)

Whether this environment supports time skipping.

_client = (source)

Undocumented