class documentation

Workflow environment for testing workflows.

Most developers will want to use the static start_time_skipping to start a test server process that automatically skips time as needed. Alternatively, start_local may be used for a full, local Temporal server with more features. To use an existing server, use from_client.

This environment is an async context manager, so it can be used with async with to make sure it shuts down properly. Otherwise, shutdown can be manually called.

To use the environment, simply use the client on it.

Workflows invoked on the workflow environment are automatically configured to have assert failures fail the workflow with the assertion error.

Static Method from_client Create a workflow environment from the given client.
Async Static Method start_local Start a full Temporal server locally, downloading if necessary.
Async Static Method start_time_skipping Start a time skipping workflow environment.
Async Method __aenter__ Noop for async with support.
Async Method __aexit__ For async with support to just call shutdown.
Method __init__ Create a workflow environment from a client.
Method auto_time_skipping_disabled Disable any automatic time skipping if this is a time-skipping environment.
Async Method get_current_time Get the current time known to this environment.
Async Method shutdown Shut down this environment.
Async Method sleep Sleep in this environment.
Property client Client to this environment.
Property supports_time_skipping Whether this environment supports time skipping.
Instance Variable _client Undocumented

Create a workflow environment from the given client.

supports_time_skipping will always return False for this environment. sleep will sleep the actual amount of time and get_current_time will return the current time.

Parameters
client:temporalio.client.ClientThe client to use for the environment.
Returns
WorkflowEnvironmentThe workflow environment that runs against the given client.
@staticmethod
async def start_local(*, namespace: str = 'default', data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[temporalio.client.Interceptor] = [], default_workflow_query_reject_condition: temporalio.common.QueryRejectCondition | None = None, retry_config: temporalio.client.RetryConfig | None = None, rpc_metadata: Mapping[str, str] = {}, identity: str | None = None, tls: bool | temporalio.client.TLSConfig = False, ip: str = '127.0.0.1', port: int | None = None, download_dest_dir: str | None = None, ui: bool = False, runtime: temporalio.runtime.Runtime | None = None, dev_server_existing_path: str | None = None, dev_server_database_filename: str | None = None, dev_server_log_format: str = 'pretty', dev_server_log_level: str | None = 'warn', dev_server_download_version: str = 'default', dev_server_extra_args: Sequence[str] = []) -> WorkflowEnvironment: (source)

Start a full Temporal server locally, downloading if necessary.

This environment is good for testing full server capabilities, but does not support time skipping like start_time_skipping does. supports_time_skipping will always return False for this environment. sleep will sleep the actual amount of time and get_current_time will return the current time.

Internally, this uses the Temporal CLI dev server from https://github.com/temporalio/cli. This is a self-contained binary for Temporal using Sqlite persistence. This call will download the CLI to a temporary directory by default if it has not already been downloaded before and dev_server_existing_path is not set.

In the future, the dev server implementation may be changed to another implementation. Therefore, all dev_server_ prefixed parameters are dev-server specific and may not apply to newer versions.

Parameters
namespace:strNamespace name to use for this environment.
data_converter:temporalio.converter.DataConverterSee parameter of the same name on temporalio.client.Client.connect.
interceptors:Sequence[temporalio.client.Interceptor]See parameter of the same name on temporalio.client.Client.connect.
default_workflow_query_reject_condition:temporalio.common.QueryRejectCondition | NoneSee parameter of the same name on temporalio.client.Client.connect.
retry_config:temporalio.client.RetryConfig | NoneSee parameter of the same name on temporalio.client.Client.connect.
rpc_metadata:Mapping[str, str]See parameter of the same name on temporalio.client.Client.connect.
identity:str | NoneSee parameter of the same name on temporalio.client.Client.connect.
tls:bool | temporalio.client.TLSConfigSee parameter of the same name on temporalio.client.Client.connect.
ip:strIP address to bind to, or 127.0.0.1 by default.
port:int | NonePort number to bind to, or an OS-provided port by default.
download_dest_dir:str | NoneDirectory to download binary to if a download is needed. If unset, this is the system's temporary directory.
ui:boolIf True, will start a UI in the dev server.
runtime:temporalio.runtime.Runtime | NoneSpecific runtime to use or default if unset.
dev_server_existing_path:str | NoneExisting path to the CLI binary. If present, no download will be attempted to fetch the binary.
dev_server_database_filename:str | NonePath to the Sqlite database to use for the dev server. Unset default means only in-memory Sqlite will be used.
dev_server_log_format:strLog format for the dev server.
dev_server_log_level:str | NoneLog level to use for the dev server. Default is warn, but if set to None this will translate the Python logger's level to a dev server log level.
dev_server_download_version:strSpecific CLI version to download. Defaults to default which downloads the version known to work best with this SDK.
dev_server_extra_args:Sequence[str]Extra arguments for the CLI binary.
Returns
WorkflowEnvironmentThe started CLI dev server workflow environment.
@staticmethod
async def start_time_skipping(*, data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[temporalio.client.Interceptor] = [], default_workflow_query_reject_condition: temporalio.common.QueryRejectCondition | None = None, retry_config: temporalio.client.RetryConfig | None = None, rpc_metadata: Mapping[str, str] = {}, identity: str | None = None, port: int | None = None, download_dest_dir: str | None = None, runtime: temporalio.runtime.Runtime | None = None, test_server_existing_path: str | None = None, test_server_download_version: str = 'default', test_server_extra_args: Sequence[str] = []) -> WorkflowEnvironment: (source)

Start a time skipping workflow environment.

By default, this environment will automatically skip to the next events in time when a workflow's temporalio.client.WorkflowHandle.result is awaited on (which includes temporalio.client.Client.execute_workflow). Before the result is awaited on, time can be manually skipped forward using sleep. The currently known time can be obtained via get_current_time.

Internally, this environment lazily downloads a test-server binary for the current OS/arch into the temp directory if it is not already there. Then the executable is started and will be killed when shutdown is called (which is implicitly done if this is started via async with await WorkflowEnvironment.start_time_skipping()).

Users can reuse this environment for testing multiple independent workflows, but not concurrently. Time skipping, which is automatically done when awaiting a workflow result and manually done on sleep, is global to the environment, not to the workflow under test.

In the future, the test server implementation may be changed to another implementation. Therefore, all test_server_ prefixed parameters are test server specific and may not apply to newer versions.

Parameters
data_converter:temporalio.converter.DataConverterSee parameter of the same name on temporalio.client.Client.connect.
interceptors:Sequence[temporalio.client.Interceptor]See parameter of the same name on temporalio.client.Client.connect.
default_workflow_query_reject_condition:temporalio.common.QueryRejectCondition | NoneSee parameter of the same name on temporalio.client.Client.connect.
retry_config:temporalio.client.RetryConfig | NoneSee parameter of the same name on temporalio.client.Client.connect.
rpc_metadata:Mapping[str, str]See parameter of the same name on temporalio.client.Client.connect.
identity:str | NoneSee parameter of the same name on temporalio.client.Client.connect.
port:int | NonePort number to bind to, or an OS-provided port by default.
download_dest_dir:str | NoneDirectory to download binary to if a download is needed. If unset, this is the system's temporary directory.
runtime:temporalio.runtime.Runtime | NoneSpecific runtime to use or default if unset.
test_server_existing_path:str | NoneExisting path to the test server binary. If present, no download will be attempted to fetch the binary.
test_server_download_version:strSpecific test server version to download. Defaults to default which downloads the version known to work best with this SDK.
test_server_extra_args:Sequence[str]Extra arguments for the test server binary.
Returns
WorkflowEnvironmentThe started workflow environment with time skipping.
async def __aenter__(self) -> WorkflowEnvironment: (source)

Noop for async with support.

async def __aexit__(self, *args): (source)

For async with support to just call shutdown.

def __init__(self, client: temporalio.client.Client): (source)

Create a workflow environment from a client.

Most users would use a static method instead.

@contextmanager
def auto_time_skipping_disabled(self) -> Iterator[None]: (source)

Disable any automatic time skipping if this is a time-skipping environment.

This is a context manager for use via with. Usually in time-skipping environments, waiting on a workflow result causes time to automatically skip until the next event. This can disable that. However, this only applies to results awaited inside this context. This will not disable automatic time skipping on previous results.

This has no effect on non-time-skipping environments.

async def get_current_time(self) -> datetime: (source)

Get the current time known to this environment.

For non-time-skipping environments this is simply the system time. For time-skipping environments this is whatever time has been skipped to.

async def shutdown(self): (source)

Shut down this environment.

async def sleep(self, duration: timedelta | float): (source)

Sleep in this environment.

This awaits a regular asyncio.sleep in regular environments, or manually skips time in time-skipping environments.

Parameters
duration:timedelta | floatAmount of time to sleep.

Client to this environment.

@property
supports_time_skipping: bool = (source)

Whether this environment supports time skipping.

Undocumented