LangGraph integration
Temporal's integration with LangGraph runs your LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts.
The plugin supports both the LangGraph Graph API (StateGraph with nodes and edges) and the Functional API
(@entrypoint / @task decorators). Each graph node or task executes as a Temporal Activity, so you get configurable
timeouts and retry policies out of the box.
The Temporal Python SDK integration with LangGraph is currently at an experimental release stage. The API may change in future versions.
Code snippets in this guide are taken from the LangGraph plugin samples. Refer to the samples for the complete code.
Prerequisites
- This guide assumes you are already familiar with LangGraph. If you aren't, refer to the LangGraph documentation for more details.
- If you are new to Temporal, we recommend reading Understanding Temporal or taking the Temporal 101 course.
- Ensure you have set up your local development environment by following the Set up your local development environment guide. When you're done, leave the Temporal Development Server running if you want to test your code locally.
Install the plugin
Install the Temporal Python SDK with LangGraph support:
uv add "temporalio[langgraph]"
or with pip:
pip install "temporalio[langgraph]"
Graph API
The Graph API uses StateGraph to define nodes and edges declaratively.
Define a graph and Workflow
Build a StateGraph, then retrieve it inside your Workflow with the graph() helper:
from datetime import timedelta
from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph
async def process_query(query: str) -> str:
"""Process a query and return a response."""
return f"Processed: {query}"
def build_graph() -> StateGraph:
"""Construct a single-node graph."""
g = StateGraph(str)
g.add_node(
"process_query",
process_query,
metadata={"start_to_close_timeout": timedelta(seconds=10)},
)
g.add_edge(START, "process_query")
return g
@workflow.defn
class HelloWorldWorkflow:
@workflow.run
async def run(self, query: str) -> str:
return await graph("hello-world").compile().ainvoke(query)
Configure the Worker
Create a LangGraphPlugin with your graphs and pass it to the Worker:
import asyncio
from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker
async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(graphs={"hello-world": build_graph()})
worker = Worker(
client,
task_queue="langgraph-hello-world",
workflows=[HelloWorldWorkflow],
plugins=[plugin],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Set Activity options
Pass Activity options as node metadata when calling add_node:
from datetime import timedelta
from temporalio.common import RetryPolicy
g = StateGraph(str)
g.add_node(
"my_node",
my_node,
metadata={
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
},
)
Functional API
The Functional API uses @entrypoint and @task decorators, which let you express agent loops with native Python
control flow (while, if/else, for).
Define tasks and a Workflow
from datetime import timedelta
from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint
@task
def agent_think(query: str, history: list[str]) -> dict:
"""Decide the next action based on query and tool history."""
tool_results = [h for h in history if h.startswith("[Tool]")]
if len(tool_results) < 2:
return {"action": "tool", "tool_name": "search", "tool_input": query}
return {"action": "final", "answer": f"Found: {'; '.join(tool_results)}"}
@task
def execute_tool(tool_name: str, tool_input: str) -> str:
"""Execute a tool by name."""
return f"[Tool] Result for {tool_name}({tool_input})"
@lg_entrypoint()
async def react_agent(query: str) -> dict:
"""ReAct agent loop: think -> act -> observe -> repeat."""
history: list[str] = []
while True:
decision = await agent_think(query, history)
if decision["action"] == "final":
return {"answer": decision["answer"], "steps": len(history)}
result = await execute_tool(decision["tool_name"], decision["tool_input"])
history.append(result)
all_tasks = [agent_think, execute_tool]
activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}
@workflow.defn
class ReactAgentWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
return await entrypoint("react-agent").ainvoke(query)
Configure the Worker with the Functional API
from temporalio.contrib.langgraph import LangGraphPlugin
plugin = LangGraphPlugin(
entrypoints={"react-agent": react_agent},
tasks=all_tasks,
activity_options=activity_options,
)
worker = Worker(
client,
task_queue="langgraph-react-agent",
workflows=[ReactAgentWorkflow],
plugins=[plugin],
)
Checkpointer
Use InMemorySaver as your checkpointer. Temporal handles durability, so third-party checkpointers (like PostgreSQL or
Redis) are not needed.
import langgraph.checkpoint.memory
g = graph("my-graph").compile(
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)
Activity vs Workflow execution
By default, every graph node or @task runs as a Temporal Activity. You can override this by setting execute_in to
"workflow", which runs the node directly inside the Workflow.
Understanding when to use each mode is important for correctness and durability.
When to use an Activity (default)
Use an Activity — the default — when a node does any of the following:
- Makes network calls — LLM calls, HTTP requests, database queries, or any I/O. Activities can do I/O; Workflows cannot.
- Has non-deterministic behavior — anything that can return different results on re-execution (random numbers, current time, external data). Workflows must be deterministic.
- Is long-running or may fail — Activities get configurable timeouts, automatic retries, and heartbeating. If an LLM call times out or a service is unavailable, Temporal retries the Activity without re-running the entire Workflow.
- Calls
interrupt()— LangGraph'sinterrupt()is supported in Activity nodes. The plugin serializes the interrupt and propagates it back to the Workflow for human-in-the-loop patterns.
When to run in the Workflow
Use execute_in: "workflow" when a node:
- Orchestrates other graphs — a node that calls
graph("child").compile().ainvoke(state)to dispatch to a subgraph. The subgraph's own nodes still run as Activities, but the orchestration logic runs in the Workflow. - Performs pure state transformations — deterministic data reshaping, merging, or filtering with no I/O.
- Is a lightweight routing step — when a node's only job is to decide what happens next and you want to avoid the overhead of an Activity round-trip.
Workflow code must be deterministic. A node running in
the Workflow must not make network calls, use random, read the system clock, or do file I/O. Violating this causes
non-determinism errors on replay.
Decision tree: should this node be an Activity?
Does this node make network calls (LLM, HTTP, DB)?
├── Yes → Activity (default)
└── No
Does this node have non-deterministic behavior?
├── Yes → Activity (default)
└── No
Does this node call interrupt()?
├── Yes → Activity (default)
└── No
Is this node orchestrating a subgraph or doing a pure state transform?
├── Yes → execute_in: "workflow"
└── No → Activity (default, the safe choice)
When in doubt, use the default (Activity). The Activity overhead is small, and it gives you retries, timeouts, and correctness guarantees.
Where LangGraph primitives run
Not all LangGraph primitives are node functions. Some run in the Workflow context regardless of the execute_in setting:
| Primitive | Runs in | Notes |
|---|---|---|
| Node functions | Activity (default) or Workflow | Controlled by execute_in |
@task functions | Activity (default) or Workflow | Controlled by activity_options |
Conditional edge functions (add_conditional_edges) | Workflow | Always runs in the Workflow. Must be deterministic and async (sync functions trigger run_in_executor, which is not allowed in the Temporal sandbox). |
interrupt() | Activity | Call interrupt() inside Activity nodes. The plugin serializes the interrupt and propagates it to the Workflow. |
Command(resume=...) | Workflow | Used from Workflow code to resume after an interrupt. |
InMemorySaver checkpointer | Workflow | Runs in-process. Temporal handles durability — external checkpointers are not needed. |
Streaming (astream) | Workflow | The Workflow drives the stream; each node still executes as an Activity. |
Conditional edge functions like should_continue must be async def, not plain def. Synchronous functions cause
LangGraph to use run_in_executor, which is not supported inside Temporal's Workflow sandbox.
# ✅ Correct: async conditional edge function
async def should_continue(state: AgentState) -> str:
if state["messages"][-1].startswith("[Agent]") and "Calling" in state["messages"][-1]:
return "tools"
return END
g.add_conditional_edges("agent", should_continue)
Syntax
# Graph API
g.add_node("my_node", my_node, metadata={"execute_in": "workflow"})
# Functional API
plugin = LangGraphPlugin(
tasks=[my_task],
activity_options={"my_task": {"execute_in": "workflow"}},
)
Example: subgraph orchestration
A common pattern is a parent node that runs in the Workflow and dispatches to a child graph whose nodes run as Activities:
async def parent_node(state: State) -> dict[str, str]:
return await graph("child").compile().ainvoke(state)
parent = StateGraph(State)
parent.add_node("parent_node", parent_node, metadata={"execute_in": "workflow"})
parent.add_edge(START, "parent_node")
plugin = LangGraphPlugin(graphs={"parent": parent, "child": child})
Human-in-the-loop
LangGraph's interrupt() works with Temporal signals and queries to support human-in-the-loop patterns:
- A graph node calls
interrupt(draft), pausing execution. - The Workflow exposes the pending draft via a Temporal query.
- An external process (UI, CLI) queries the draft and sends approval via a Temporal signal.
- The graph resumes —
interrupt()returns the signal value and the node completes.
See the human-in-the-loop samples for complete working examples using both Graph and Functional APIs.
Continue-as-new with caching
For long-running workflows, use continue_as_new with the task result cache to avoid re-executing completed nodes:
- Call
cache()before continuing to capture completed task results. - Pass the cache to the next Workflow execution.
- On the next execution, pass the cache to
graph()to restore results.
See the continue-as-new samples for complete working examples.
Samples
The LangGraph plugin samples demonstrate all supported patterns across both APIs:
| Sample | Graph API | Functional API | Description |
|---|---|---|---|
| Hello World | Yes | Yes | Simplest possible single-node graph |
| Human-in-the-loop | Yes | Yes | interrupt() with Temporal signals and queries |
| Continue-as-new | Yes | Yes | Long-running workflows with task result caching |
| ReAct Agent | Yes | Yes | Tool-calling agent loop |
| Control Flow | — | Yes | Parallel execution, loops, and branching |
To run any sample:
# Terminal 1: Start Temporal
temporal server start-dev
# Terminal 2: Start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py
# Terminal 3: Run the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py