class documentation

LangGraph plugin for Temporal SDK.

Warning

This package is experimental and may change in future versions. Use with caution in production environments.

This plugin runs LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (StateGraph) and Functional API (@entrypoint / @task).

Parameters
graphsGraph API graphs to make available to workflows, keyed by name. Workflows retrieve them with graph and call .compile() to get a runnable. Each node's metadata must include execute_in ("activity" or "workflow") and may include any kwarg accepted by workflow.execute_activity (e.g. start_to_close_timeout, retry_policy).
entrypointsFunctional API entrypoints to make available to workflows, keyed by name. Workflows retrieve them with entrypoint.
tasksFunctional API @task functions to wrap as Temporal Activities.
activity_optionsPer-task activity options for the Functional API, keyed by task function name. Each entry must include execute_in and may include any workflow.execute_activity kwarg. Used because LangGraph's Functional API has no per-task metadata channel.
default_activity_optionsActivity options applied to every activity-bound node and task, overridable per-node (Graph API metadata) or per-task (activity_options[name]).
streaming_topicWhen set, langgraph.config.get_stream_writer() inside a node publishes to this topic on the workflow's WorkflowStream. The workflow must construct WorkflowStream() in its @workflow.init (the plugin's interceptor verifies this on workflow start). Nodes with execute_in='activity' publish through WorkflowStreamClient (signal); nodes with execute_in='workflow' publish synchronously to the in-workflow stream (no signal).
streaming_batch_intervalHow often the activity-side stream client flushes buffered publishes into a single __temporal_workflow_stream_publish signal. Has no effect on workflow-side nodes (their publishes are synchronous in-memory log appends). Lower values reduce streaming latency at the cost of more signals (more workflow history events); higher values amortize signal cost but make chunks arrive in larger bursts. Default 100ms suits interactive token streaming; raise to 250–1000ms for non-interactive aggregation, lower toward 10–50ms only if you've measured the latency need and accept the history cost.
Method __init__ Initialize the LangGraph plugin with graphs, entrypoints, and tasks.
Method execute Prepare a node or task to execute as an activity or inline in the workflow.
Instance Variable activities Undocumented
Instance Variable _streaming_batch_interval Undocumented
Instance Variable _streaming_topic Undocumented

Inherited from SimplePlugin:

Method configure_client See base class.
Method configure_replayer See base class.
Method configure_worker See base class.
Async Method connect_service_client See base class.
Method name See base class.
Async Method run_replayer See base class.
Async Method run_worker See base class.
Instance Variable data_converter Undocumented
Instance Variable interceptors Undocumented
Instance Variable nexus_service_handlers Undocumented
Instance Variable run_context Undocumented
Instance Variable workflow_failure_exception_types Undocumented
Instance Variable workflow_runner Undocumented
Instance Variable workflows Undocumented
Instance Variable _name Undocumented
def __init__(self, graphs: dict[str, StateGraph[Any, Any, Any, Any]] | None = None, entrypoints: dict[str, Pregel[Any, Any, Any, Any]] | None = None, tasks: list | None = None, activity_options: dict[str, dict[str, Any]] | None = None, default_activity_options: dict[str, Any] | None = None, streaming_topic: str | None = None, streaming_batch_interval: timedelta = timedelta(milliseconds=100)): (source)

Initialize the LangGraph plugin with graphs, entrypoints, and tasks.

Warning

Streaming support is experimental and may change in future versions.

def execute(self, activity_name: str, func: Callable, kwargs: dict[str, Any] | None = None) -> Callable: (source)

Prepare a node or task to execute as an activity or inline in the workflow.

_streaming_batch_interval = (source)

Undocumented

_streaming_topic = (source)

Undocumented