class documentation
class LangSmithInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): (source)
Constructor: LangSmithInterceptor(client, project_name, add_temporal_runs, default_metadata, ...)
Interceptor that supports client and worker LangSmith run creation and context propagation.
| Method | __init__ |
Initialize the LangSmith interceptor with tracing configuration. |
| Method | intercept |
Create an activity inbound interceptor for LangSmith tracing. |
| Method | intercept |
Create a client outbound interceptor for LangSmith tracing. |
| Method | intercept |
Create a Nexus operation inbound interceptor for LangSmith tracing. |
| Method | maybe |
Create a LangSmith run with this interceptor's config already applied. |
| Method | workflow |
Return the workflow interceptor class with config bound. |
| Instance Variable | _add |
Undocumented |
| Instance Variable | _client |
Undocumented |
| Instance Variable | _default |
Undocumented |
| Instance Variable | _default |
Undocumented |
| Instance Variable | _executor |
Undocumented |
| Instance Variable | _project |
Undocumented |
def __init__(self, *, client:
langsmith.Client | None = None, project_name: str | None = None, add_temporal_runs: bool = False, default_metadata: dict[ str, Any] | None = None, default_tags: list[ str] | None = None):
(source)
¶
Initialize the LangSmith interceptor with tracing configuration.
def intercept_activity(self, next:
temporalio.worker.ActivityInboundInterceptor) -> temporalio.worker.ActivityInboundInterceptor:
(source)
¶
Create an activity inbound interceptor for LangSmith tracing.
def intercept_client(self, next:
temporalio.client.OutboundInterceptor) -> temporalio.client.OutboundInterceptor:
(source)
¶
Create a client outbound interceptor for LangSmith tracing.
def intercept_nexus_operation(self, next:
temporalio.worker.NexusOperationInboundInterceptor) -> temporalio.worker.NexusOperationInboundInterceptor:
(source)
¶
Create a Nexus operation inbound interceptor for LangSmith tracing.
@contextmanager
def maybe_run(self, name: str, *, run_type: str = 'chain', parent: RunTree | None = None, extra_metadata: dict[ str, Any] | None = None) -> Iterator[ None]:
(source)
¶
Create a LangSmith run with this interceptor's config already applied.
def workflow_interceptor_class(self, input:
temporalio.worker.WorkflowInterceptorClassInput) -> type[ _LangSmithWorkflowInboundInterceptor]:
(source)
¶
Return the workflow interceptor class with config bound.