class documentation
class TestService: (source)
Known subclasses: temporalio.service.TestService
Constructor: TestService(client)
RPC calls for the TestService.
Method | __init__ |
Initialize service with the provided ServiceClient. |
Async Method | get |
Invokes the TestService.get_current_time rpc method. |
Async Method | lock |
Invokes the TestService.lock_time_skipping rpc method. |
Async Method | sleep |
Invokes the TestService.sleep rpc method. |
Async Method | sleep |
Invokes the TestService.sleep_until rpc method. |
Async Method | unlock |
Invokes the TestService.unlock_time_skipping rpc method. |
Async Method | unlock |
Invokes the TestService.unlock_time_skipping_with_sleep rpc method. |
Instance Variable | _client |
Undocumented |
Instance Variable | _service |
Undocumented |
async def get_current_time(self, req:
google.protobuf.empty_pb2.Empty
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.GetCurrentTimeResponse
:
(source)
¶
Invokes the TestService.get_current_time rpc method.
async def lock_time_skipping(self, req:
temporalio.api.testservice.v1.LockTimeSkippingRequest
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.LockTimeSkippingResponse
:
(source)
¶
Invokes the TestService.lock_time_skipping rpc method.
async def sleep(self, req:
temporalio.api.testservice.v1.SleepRequest
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.SleepResponse
:
(source)
¶
Invokes the TestService.sleep rpc method.
async def sleep_until(self, req:
temporalio.api.testservice.v1.SleepUntilRequest
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.SleepResponse
:
(source)
¶
Invokes the TestService.sleep_until rpc method.
async def unlock_time_skipping(self, req:
temporalio.api.testservice.v1.UnlockTimeSkippingRequest
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.UnlockTimeSkippingResponse
:
(source)
¶
Invokes the TestService.unlock_time_skipping rpc method.
async def unlock_time_skipping_with_sleep(self, req:
temporalio.api.testservice.v1.SleepRequest
, retry: bool
= False, metadata: Mapping[ str, str | bytes]
= {}, timeout: timedelta | None
= None) -> temporalio.api.testservice.v1.SleepResponse
:
(source)
¶
Invokes the TestService.unlock_time_skipping_with_sleep rpc method.