class documentation

Direct client to Temporal services.

Async Static Method connect Connect directly to Temporal services.
Method __init__ Initialize the base service client.
Async Method check_health Check whether the WorkflowService is up.
Method update_api_key Update service client's API key.
Method update_rpc_metadata Update service client's RPC metadata.
Instance Variable cloud_service Undocumented
Instance Variable config Undocumented
Instance Variable operator_service Undocumented
Instance Variable test_service Undocumented
Instance Variable workflow_service Undocumented
Property worker_service_client Underlying service client.
Method _new_call Undocumented
Async Method _rpc_call Undocumented
Instance Variable _check_health_call Undocumented

Connect directly to Temporal services.

def __init__(self, config: ConnectConfig): (source)

Initialize the base service client.

async def check_health(self, *, service: str = 'temporal.api.workflowservice.v1.WorkflowService', retry: bool = False, metadata: Mapping[str, str] = {}, timeout: timedelta | None = None) -> bool: (source)

Check whether the WorkflowService is up.

In addition to accepting which service to check health on, this accepts some of the same parameters as other RPC calls. See ServiceCall.__call__.

Returns
boolTrue when available, false if the server is running but the service is unavailable (rare), or raises an error if server/service cannot be reached.
@abstractmethod
def update_api_key(self, api_key: str | None): (source)

Update service client's API key.

@abstractmethod
def update_rpc_metadata(self, metadata: Mapping[str, str]): (source)

Update service client's RPC metadata.

cloud_service = (source)

Undocumented

Undocumented

operator_service = (source)

Undocumented

test_service = (source)

Undocumented

workflow_service = (source)

Undocumented

Underlying service client.

def _new_call(self, name: str, req_type: type[ServiceRequest], resp_type: type[ServiceResponse], *, service: str = 'workflow') -> ServiceCall[ServiceRequest, ServiceResponse]: (source)

Undocumented

@abstractmethod
async def _rpc_call(self, rpc: str, req: google.protobuf.message.Message, resp_type: type[ServiceResponse], *, service: str, retry: bool, metadata: Mapping[str, str], timeout: timedelta | None) -> ServiceResponse: (source)

Undocumented

_check_health_call = (source)

Undocumented