module documentation

Undocumented

Function get_callable_name Return the name of a callable object.
Function get_operation_factory Return the Operation for the object along with the factory function.
Function get_workflow_run_start_method_input_and_output_type_annotations Return operation input and output types.
Function is_async_callable Return True if obj is an async callable.
Function set_operation_factory Set the OperationHandler factory for this object.
Type Variable ServiceHandlerT Undocumented
Function _get_start_method_input_and_output_type_annotations Undocumented
def get_callable_name(fn: Callable[..., Any]) -> str: (source)

Return the name of a callable object.

def get_operation_factory(obj: Any) -> tuple[Callable[[Any], Any] | None, nexusrpc.Operation[Any, Any] | None]: (source)

Return the Operation for the object along with the factory function.

obj should be a decorated operation start method.

def get_workflow_run_start_method_input_and_output_type_annotations(start: Callable[[ServiceHandlerT, WorkflowRunOperationContext, InputT], Awaitable[WorkflowHandle[OutputT]]]) -> tuple[type[InputT] | None, type[OutputT] | None]: (source)

Return operation input and output types.

start must be a type-annotated start method that returns a temporalio.nexus.WorkflowHandle.

def is_async_callable(obj: Any) -> bool: (source)

Return True if obj is an async callable.

Supports partials of async callable class instances.

def set_operation_factory(obj: Any, operation_factory: Callable[[Any], Any]): (source)

Set the OperationHandler factory for this object.

obj should be an operation start method.

ServiceHandlerT = (source)

Undocumented

Value
TypeVar('ServiceHandlerT')
def _get_start_method_input_and_output_type_annotations(start: Callable[[ServiceHandlerT, WorkflowRunOperationContext, InputT], Awaitable[WorkflowHandle[OutputT]]]) -> tuple[type[InputT] | None, type[OutputT] | None]: (source)

Undocumented