module documentation

Undocumented

Function nexus_link_to_workflow_event Convert a nexus link into a WorkflowEvent link
Function workflow_event_to_nexus_link Convert a WorkflowEvent link into a nexusrpc link
Function workflow_execution_started_event_link_from_workflow_handle Create a WorkflowEvent link corresponding to a started workflow
Constant LINK_EVENT_ID_PARAM_NAME Undocumented
Constant LINK_EVENT_TYPE_PARAM_NAME Undocumented
Variable logger Undocumented
Function _event_reference_to_query_params Undocumented
Function _event_type_constant_case_to_pascal_case Convert a CONSTANT_CASE string to PascalCase.
Function _event_type_pascal_case_to_constant_case Convert a PascalCase string to CONSTANT_CASE.
Function _query_params_to_event_reference Return an EventReference from the query params or raise ValueError.
Constant _LINK_URL_PATH_REGEX Undocumented
def nexus_link_to_workflow_event(link: nexusrpc.Link) -> temporalio.api.common.v1.Link.WorkflowEvent | None: (source)

Convert a nexus link into a WorkflowEvent link

This is used when propagating links from a Nexus start operation request to a StartWorklow request.

def workflow_event_to_nexus_link(workflow_event: temporalio.api.common.v1.Link.WorkflowEvent) -> nexusrpc.Link: (source)

Convert a WorkflowEvent link into a nexusrpc link

Used when propagating links from a StartWorkflow response to a Nexus start operation response.

def workflow_execution_started_event_link_from_workflow_handle(handle: temporalio.client.WorkflowHandle[Any, Any]) -> temporalio.api.common.v1.Link.WorkflowEvent: (source)

Create a WorkflowEvent link corresponding to a started workflow

LINK_EVENT_ID_PARAM_NAME: str = (source)

Undocumented

Value
'eventID'
LINK_EVENT_TYPE_PARAM_NAME: str = (source)

Undocumented

Value
'eventType'

Undocumented

def _event_reference_to_query_params(event_ref: temporalio.api.common.v1.Link.WorkflowEvent.EventReference) -> str: (source)

Undocumented

def _event_type_constant_case_to_pascal_case(s: str) -> str: (source)

Convert a CONSTANT_CASE string to PascalCase.

>>> _event_type_constant_case_to_pascal_case("NEXUS_OPERATION_SCHEDULED")
"NexusOperationScheduled"
def _event_type_pascal_case_to_constant_case(s: str) -> str: (source)

Convert a PascalCase string to CONSTANT_CASE.

>>> _event_type_pascal_case_to_constant_case("NexusOperationScheduled")
"NEXUS_OPERATION_SCHEDULED"
def _query_params_to_event_reference(raw_query_params: str) -> temporalio.api.common.v1.Link.WorkflowEvent.EventReference: (source)

Return an EventReference from the query params or raise ValueError.

_LINK_URL_PATH_REGEX = (source)

Undocumented

Value
re.compile(r'^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/
(?P<run_id>[^/]+)/history$')