module documentation

LangSmith interceptor implementation for Temporal SDK.

Constant HEADER_KEY Undocumented
Variable logger Undocumented
Class _InputWithHeaders Undocumented
Class _LangSmithActivityInboundInterceptor Instruments activity execution with LangSmith runs.
Class _LangSmithClientOutboundInterceptor Instruments all client-side calls with LangSmith runs.
Class _LangSmithNexusOperationInboundInterceptor Instruments Nexus operations with LangSmith runs.
Class _LangSmithWorkflowInboundInterceptor Instruments workflow execution with LangSmith runs.
Class _LangSmithWorkflowOutboundInterceptor Instruments all outbound calls from workflow code.
Class _ReplaySafeRunTree Wrapper around a RunTree with replay-safe post, end, and patch.
Class _RootReplaySafeRunTreeFactory Factory that produces independent root _ReplaySafeRunTree instances with no parent link.
Function _extract_context Extract LangSmith context from Temporal payload headers.
Function _extract_nexus_context Extract LangSmith context from Nexus string headers.
Function _get_current_run_for_propagation Get the current ambient run for context propagation.
Function _get_workflow_random Get a deterministic random generator for the current workflow.
Function _inject_context Inject LangSmith context into Temporal payload headers.
Function _inject_current_context Inject the current ambient LangSmith context into Temporal payload headers.
Function _inject_nexus_context Inject LangSmith context into Nexus string headers.
Function _is_benign_error Check if an exception is a benign ApplicationError.
Function _is_replaying Check if we're currently replaying workflow history.
Function _maybe_run Create a LangSmith run, handling errors.
Function _patch_aio_to_thread Patch langsmith's aio_to_thread to run synchronously in workflows.
Function _uuid_from_random Generate a deterministic UUID4 from a workflow-bound random generator.
Constant _BUILTIN_QUERIES Undocumented
Variable _aio_to_thread_patched Undocumented
HEADER_KEY: str = (source)

Undocumented

Value
'_temporal-langsmith-context'

Undocumented

def _extract_context(headers: Mapping[str, Payload], executor: ThreadPoolExecutor, ls_client: langsmith.Client) -> _ReplaySafeRunTree | None: (source)

Extract LangSmith context from Temporal payload headers.

Reconstructs a RunTree from the _temporal-langsmith-context header on the receiving side, wrapped in a _ReplaySafeRunTree so inbound interceptors can establish a parent-child relationship with the sender's run. Returns None if no header is present.

def _extract_nexus_context(headers: Mapping[str, str], executor: ThreadPoolExecutor, ls_client: langsmith.Client) -> _ReplaySafeRunTree | None: (source)

Extract LangSmith context from Nexus string headers.

def _get_current_run_for_propagation() -> RunTree | None: (source)

Get the current ambient run for context propagation.

Filters out _RootReplaySafeRunTreeFactory, which is internal scaffolding that should never be serialized into headers or used as parent runs.

def _get_workflow_random() -> random.Random | None: (source)

Get a deterministic random generator for the current workflow.

Creates a workflow-safe random generator once via workflow.new_random() and stores it on the workflow instance so subsequent calls return the same generator. The generator is seeded from the workflow's deterministic seed, so it produces identical UUIDs across replays and worker restarts.

Returns None outside a workflow, in read-only (query) contexts, or when workflow APIs are mocked (unit tests).

def _inject_context(headers: Mapping[str, Payload], run_tree: RunTree) -> dict[str, Payload]: (source)

Inject LangSmith context into Temporal payload headers.

Serializes the run's trace context (trace ID, parent run ID, dotted order) into a Temporal header under _temporal-langsmith-context, enabling parent-child trace nesting across process boundaries (client → worker, workflow → activity).

def _inject_current_context(headers: Mapping[str, Payload]) -> Mapping[str, Payload]: (source)

Inject the current ambient LangSmith context into Temporal payload headers.

Reads _get_current_run_for_propagation() and injects if present. Returns headers unchanged if no context is active. Called unconditionally so that context propagation is independent of the add_temporal_runs toggle.

def _inject_nexus_context(headers: Mapping[str, str], run_tree: RunTree) -> dict[str, str]: (source)

Inject LangSmith context into Nexus string headers.

def _is_benign_error(exc: Exception) -> bool: (source)

Check if an exception is a benign ApplicationError.

def _is_replaying() -> bool: (source)

Check if we're currently replaying workflow history.

def _maybe_run(client: langsmith.Client, name: str, *, add_temporal_runs: bool, run_type: str = 'chain', inputs: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, tags: list[str] | None = None, parent: RunTree | None = None, project_name: str | None = None, executor: ThreadPoolExecutor) -> Iterator[None]: (source)

Create a LangSmith run, handling errors.

  • If add_temporal_runs is False, yields None (no run created). Context propagation is handled unconditionally by callers.
  • When a run IS created, uses _ReplaySafeRunTree for replay and event loop safety, then sets it as ambient context via tracing_context(parent=run_tree) so get_current_run_tree() returns it and _inject_current_context() can inject it.
  • On exception: marks run as errored (unless benign ApplicationError), re-raises.
Parameters
client:langsmith.ClientLangSmith client instance.
name:strDisplay name for the run.
add_temporal_runs:boolWhether to create Temporal-level trace runs.
run_type:strLangSmith run type (default "chain").
inputs:dict[str, Any] | NoneInput data to record on the run.
metadata:dict[str, Any] | NoneExtra metadata to attach to the run.
tags:list[str] | NoneTags to attach to the run.
parent:RunTree | NoneParent run for nesting.
project_name:str | NoneLangSmith project name override.
executor:ThreadPoolExecutorThreadPoolExecutor for background I/O.
Returns
Iterator[None]Undocumented
def _patch_aio_to_thread(): (source)

Patch langsmith's aio_to_thread to run synchronously in workflows.

The @traceable decorator on async functions uses aio_to_thread()loop.run_in_executor() for run setup/teardown. The Temporal workflow event loop does not support run_in_executor. This patch runs those functions synchronously on the workflow thread when inside a workflow. Functions passed here must not perform blocking I/O.

def _uuid_from_random(rng: random.Random) -> uuid.UUID: (source)

Generate a deterministic UUID4 from a workflow-bound random generator.

_BUILTIN_QUERIES: frozenset[str] = (source)

Undocumented

Value
frozenset(set(['__stack_trace', '__enhanced_stack_trace']))
_aio_to_thread_patched: bool = (source)

Undocumented