class documentation
class LangGraphPlugin(SimplePlugin): (source)
Constructor: LangGraphPlugin(graphs, entrypoints, tasks, activity_options, ...)
LangGraph plugin for Temporal SDK.
Warning
This package is experimental and may change in future versions. Use with caution in production environments.
This plugin runs LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (StateGraph) and Functional API (@entrypoint / @task).
| Parameters | |
| graphs | Graph API graphs to make available to workflows, keyed by name.
Workflows retrieve them with graph and call
.compile() to get a runnable. Each node's metadata must
include execute_in ("activity" or "workflow") and
may include any kwarg accepted by
workflow.execute_activity (e.g. start_to_close_timeout,
retry_policy). |
| entrypoints | Functional API entrypoints to make available to
workflows, keyed by name. Workflows retrieve them with
entrypoint. |
| tasks | Functional API @task functions to wrap as Temporal Activities. |
| activity | Per-task activity options for the Functional
API, keyed by task function name. Each entry must include
execute_in and may include any
workflow.execute_activity kwarg. Used because LangGraph's
Functional API has no per-task metadata channel. |
| default | Activity options applied to every activity-bound node and task, overridable per-node (Graph API metadata) or per-task (activity_options[name]). |
| streaming | When set, langgraph.config.get_stream_writer()
inside a node publishes to this topic on the workflow's
WorkflowStream. The workflow must construct
WorkflowStream() in its @workflow.init (the plugin's
interceptor verifies this on workflow start). Nodes with
execute_in='activity' publish through
WorkflowStreamClient (signal); nodes with
execute_in='workflow' publish synchronously to the
in-workflow stream (no signal). |
| streaming | How often the activity-side stream client flushes buffered publishes into a single __temporal_workflow_stream_publish signal. Has no effect on workflow-side nodes (their publishes are synchronous in-memory log appends). Lower values reduce streaming latency at the cost of more signals (more workflow history events); higher values amortize signal cost but make chunks arrive in larger bursts. Default 100ms suits interactive token streaming; raise to 250–1000ms for non-interactive aggregation, lower toward 10–50ms only if you've measured the latency need and accept the history cost. |
| Method | __init__ |
Initialize the LangGraph plugin with graphs, entrypoints, and tasks. |
| Method | execute |
Prepare a node or task to execute as an activity or inline in the workflow. |
| Instance Variable | activities |
Undocumented |
| Instance Variable | _streaming |
Undocumented |
| Instance Variable | _streaming |
Undocumented |
Inherited from SimplePlugin:
| Method | configure |
See base class. |
| Method | configure |
See base class. |
| Method | configure |
See base class. |
| Async Method | connect |
See base class. |
| Method | name |
See base class. |
| Async Method | run |
See base class. |
| Async Method | run |
See base class. |
| Instance Variable | data |
Undocumented |
| Instance Variable | interceptors |
Undocumented |
| Instance Variable | nexus |
Undocumented |
| Instance Variable | run |
Undocumented |
| Instance Variable | workflow |
Undocumented |
| Instance Variable | workflow |
Undocumented |
| Instance Variable | workflows |
Undocumented |
| Instance Variable | _name |
Undocumented |
def __init__(self, graphs: milliseconds=100)):
(source)
¶
dict[ str, StateGraph[ Any, Any, Any, Any]] | None = None, entrypoints: dict[ str, Pregel[ Any, Any, Any, Any]] | None = None, tasks: list | None = None, activity_options: dict[ str, dict[ str, Any]] | None = None, default_activity_options: dict[ str, Any] | None = None, streaming_topic: str | None = None, streaming_batch_interval: timedelta = timedelta(overrides
temporalio.plugin.SimplePlugin.__init__Initialize the LangGraph plugin with graphs, entrypoints, and tasks.
Warning
Streaming support is experimental and may change in future versions.