LangSmith interceptor implementation for Temporal SDK.
| Constant | HEADER |
Undocumented |
| Variable | logger |
Undocumented |
| Class | _ |
Undocumented |
| Class | _ |
Instruments activity execution with LangSmith runs. |
| Class | _ |
Instruments all client-side calls with LangSmith runs. |
| Class | _ |
Instruments Nexus operations with LangSmith runs. |
| Class | _ |
Instruments workflow execution with LangSmith runs. |
| Class | _ |
Instruments all outbound calls from workflow code. |
| Class | _ |
Wrapper around a RunTree with replay-safe post, end, and patch. |
| Class | _ |
Factory that produces independent root _ReplaySafeRunTree instances with no parent link. |
| Function | _extract |
Extract LangSmith context from Temporal payload headers. |
| Function | _extract |
Extract LangSmith context from Nexus string headers. |
| Function | _get |
Get the current ambient run for context propagation. |
| Function | _get |
Get a deterministic random generator for the current workflow. |
| Function | _inject |
Inject LangSmith context into Temporal payload headers. |
| Function | _inject |
Inject the current ambient LangSmith context into Temporal payload headers. |
| Function | _inject |
Inject LangSmith context into Nexus string headers. |
| Function | _install |
Install the aio_to_thread override via LangSmith's official API. |
| Function | _is |
Check if an exception is a benign ApplicationError. |
| Function | _is |
Check if we're currently replaying workflow history. |
| Function | _maybe |
Create a LangSmith run, handling errors. |
| Async Function | _temporal |
Run LangSmith's aio_to_thread synchronously inside Temporal workflows. |
| Function | _uuid |
Generate a deterministic UUID4 from a workflow-bound random generator. |
| Constant | _BUILTIN |
Undocumented |
| Variable | _aio |
Undocumented |
Mapping[ str, Payload], executor: ThreadPoolExecutor, ls_client: langsmith.Client) -> _ReplaySafeRunTree | None:
(source)
¶
Extract LangSmith context from Temporal payload headers.
Reconstructs a RunTree from the _temporal-langsmith-context header on
the receiving side, wrapped in a _ReplaySafeRunTree so inbound
interceptors can establish a parent-child relationship with the sender's
run. Returns None if no header is present.
Mapping[ str, str], executor: ThreadPoolExecutor, ls_client: langsmith.Client) -> _ReplaySafeRunTree | None:
(source)
¶
Extract LangSmith context from Nexus string headers.
Get the current ambient run for context propagation.
Filters out _RootReplaySafeRunTreeFactory, which is internal scaffolding that should never be serialized into headers or used as parent runs.
Get a deterministic random generator for the current workflow.
Creates a workflow-safe random generator once via workflow.new_random() and stores it on the workflow instance so subsequent calls return the same generator. The generator is seeded from the workflow's deterministic seed, so it produces identical UUIDs across replays and worker restarts.
Returns None outside a workflow, in read-only (query) contexts, or when workflow APIs are mocked (unit tests).
Mapping[ str, Payload], run_tree: RunTree) -> dict[ str, Payload]:
(source)
¶
Inject LangSmith context into Temporal payload headers.
Serializes the run's trace context (trace ID, parent run ID, dotted order) into a Temporal header under _temporal-langsmith-context, enabling parent-child trace nesting across process boundaries (client → worker, workflow → activity).
Inject the current ambient LangSmith context into Temporal payload headers.
Reads _get_current_run_for_propagation() and injects if present. Returns headers unchanged if no context is active. Called unconditionally so that context propagation is independent of the add_temporal_runs toggle.
Mapping[ str, str], run_tree: RunTree) -> dict[ str, str]:
(source)
¶
Inject LangSmith context into Nexus string headers.
Install the aio_to_thread override via LangSmith's official API.
Safe to call multiple times; the override is only installed once.
langsmith.Client, name: str, *, add_temporal_runs: bool, run_type: str = 'chain', inputs: dict[ str, Any] | None = None, metadata: dict[ str, Any] | None = None, tags: list[ str] | None = None, parent: RunTree | None = None, project_name: str | None = None, executor: ThreadPoolExecutor) -> Iterator[ None]:
(source)
¶
Create a LangSmith run, handling errors.
- If add_temporal_runs is False or langsmith.utils.tracing_is_enabled() returns False, yields None (no run created). Context propagation is handled unconditionally by callers.
- When a run IS created, uses
_ReplaySafeRunTreefor replay and event loop safety, then sets it as ambient context via tracing_context(parent=run_tree) so get_current_run_tree() returns it and _inject_current_context() can inject it. - On exception: marks run as errored (unless benign ApplicationError), re-raises.
Note on tracing_is_enabled() and cross-process traces: tracing_is_enabled() checks for an active run tree in context before consulting the LANGSMITH_TRACING env var (langsmith semantics). If a parent run is propagated into this worker via headers from an upstream tracer, tracing continues regardless of LANGSMITH_TRACING=false. This matches langsmith's "continue mid-trace" model: the env var suppresses new local traces but does not break an inbound parent trace.
| Parameters | |
client:langsmith.Client | LangSmith client instance. |
name:str | Display name for the run. |
addbool | Whether to create Temporal-level trace runs. |
runstr | LangSmith run type (default "chain"). |
inputs:dict[ | Input data to record on the run. |
metadata:dict[ | Extra metadata to attach to the run. |
tags:list[ | Tags to attach to the run. |
parent:RunTree | None | Parent run for nesting. |
projectstr | None | LangSmith project name override. |
executor:ThreadPoolExecutor | ThreadPoolExecutor for background I/O. |
| Returns | |
Iterator[ | Undocumented |
Callable[ ..., Any], ctx: Any, func: Callable[ ..., Any], /, *args: Any, **kwargs: Any) -> Any:
(source)
¶
Run LangSmith's aio_to_thread synchronously inside Temporal workflows.
The @traceable decorator on async functions uses aio_to_thread() → loop.run_in_executor() for run setup/teardown. The Temporal workflow event loop does not support run_in_executor. This override runs those functions synchronously on the workflow thread when inside a workflow, and delegates to the default implementation outside workflows.
Registered via langsmith.set_runtime_overrides(aio_to_thread=...).