Skip to main content

LangGraph integration

Temporal's integration with LangGraph runs your LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts.

The plugin supports both the LangGraph Graph API (StateGraph with nodes and edges) and the Functional API (@entrypoint / @task decorators). Each graph node or task executes as a Temporal Activity, so you get configurable timeouts and retry policies out of the box.

info

The Temporal Python SDK integration with LangGraph is currently at an experimental release stage. The API may change in future versions.

Code snippets in this guide are taken from the LangGraph plugin samples. Refer to the samples for the complete code.

Prerequisites

Install the plugin

Install the Temporal Python SDK with LangGraph support:

uv add "temporalio[langgraph]"

or with pip:

pip install "temporalio[langgraph]"

Graph API

The Graph API uses StateGraph to define nodes and edges declaratively.

Define a graph and Workflow

Build a StateGraph, then retrieve it inside your Workflow with the graph() helper:

from datetime import timedelta

from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph


async def process_query(query: str) -> str:
"""Process a query and return a response."""
return f"Processed: {query}"


def build_graph() -> StateGraph:
"""Construct a single-node graph."""
g = StateGraph(str)
g.add_node(
"process_query",
process_query,
metadata={"start_to_close_timeout": timedelta(seconds=10)},
)
g.add_edge(START, "process_query")
return g


@workflow.defn
class HelloWorldWorkflow:
@workflow.run
async def run(self, query: str) -> str:
return await graph("hello-world").compile().ainvoke(query)

Configure the Worker

Create a LangGraphPlugin with your graphs and pass it to the Worker:

import asyncio

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker


async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(graphs={"hello-world": build_graph()})

worker = Worker(
client,
task_queue="langgraph-hello-world",
workflows=[HelloWorldWorkflow],
plugins=[plugin],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Set Activity options

Pass Activity options as node metadata when calling add_node:

from datetime import timedelta
from temporalio.common import RetryPolicy

g = StateGraph(str)
g.add_node(
"my_node",
my_node,
metadata={
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
},
)

Functional API

The Functional API uses @entrypoint and @task decorators, which let you express agent loops with native Python control flow (while, if/else, for).

Define tasks and a Workflow

from datetime import timedelta

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint


@task
def agent_think(query: str, history: list[str]) -> dict:
"""Decide the next action based on query and tool history."""
tool_results = [h for h in history if h.startswith("[Tool]")]
if len(tool_results) < 2:
return {"action": "tool", "tool_name": "search", "tool_input": query}
return {"action": "final", "answer": f"Found: {'; '.join(tool_results)}"}


@task
def execute_tool(tool_name: str, tool_input: str) -> str:
"""Execute a tool by name."""
return f"[Tool] Result for {tool_name}({tool_input})"


@lg_entrypoint()
async def react_agent(query: str) -> dict:
"""ReAct agent loop: think -> act -> observe -> repeat."""
history: list[str] = []
while True:
decision = await agent_think(query, history)
if decision["action"] == "final":
return {"answer": decision["answer"], "steps": len(history)}
result = await execute_tool(decision["tool_name"], decision["tool_input"])
history.append(result)


all_tasks = [agent_think, execute_tool]

activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}


@workflow.defn
class ReactAgentWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
return await entrypoint("react-agent").ainvoke(query)

Configure the Worker with the Functional API

from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(
entrypoints={"react-agent": react_agent},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-react-agent",
workflows=[ReactAgentWorkflow],
plugins=[plugin],
)

Checkpointer

Use InMemorySaver as your checkpointer. Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.

import langgraph.checkpoint.memory

g = graph("my-graph").compile(
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)

Activity vs Workflow execution

By default, every graph node or @task runs as a Temporal Activity. You can override this by setting execute_in to "workflow", which runs the node directly inside the Workflow.

Understanding when to use each mode is important for correctness and durability.

When to use an Activity (default)

Use an Activity — the default — when a node does any of the following:

  • Makes network calls — LLM calls, HTTP requests, database queries, or any I/O. Activities can do I/O; Workflows cannot.
  • Has non-deterministic behavior — anything that can return different results on re-execution (random numbers, current time, external data). Workflows must be deterministic.
  • Is long-running or may fail — Activities get configurable timeouts, automatic retries, and heartbeating. If an LLM call times out or a service is unavailable, Temporal retries the Activity without re-running the entire Workflow.
  • Calls interrupt() — LangGraph's interrupt() is supported in Activity nodes. The plugin serializes the interrupt and propagates it back to the Workflow for human-in-the-loop patterns.

When to run in the Workflow

Use execute_in: "workflow" when a node:

  • Orchestrates other graphs — a node that calls graph("child").compile().ainvoke(state) to dispatch to a subgraph. The subgraph's own nodes still run as Activities, but the orchestration logic runs in the Workflow.
  • Performs pure state transformations — deterministic data reshaping, merging, or filtering with no I/O.
  • Is a lightweight routing step — when a node's only job is to decide what happens next and you want to avoid the overhead of an Activity round-trip.
danger

Workflow code must be deterministic. A node running in the Workflow must not make network calls, use random, read the system clock, or do file I/O. Violating this causes non-determinism errors on replay.

Decision tree: should this node be an Activity?

Does this node make network calls (LLM, HTTP, DB)?
├── Yes → Activity (default)
└── No
Does this node have non-deterministic behavior?
├── Yes → Activity (default)
└── No
Does this node call interrupt()?
├── Yes → Activity (default)
└── No
Is this node orchestrating a subgraph or doing a pure state transform?
├── Yes → execute_in: "workflow"
└── No → Activity (default, the safe choice)

When in doubt, use the default (Activity). The Activity overhead is small, and it gives you retries, timeouts, and correctness guarantees.

Where LangGraph primitives run

Not all LangGraph primitives are node functions. Some run in the Workflow context regardless of the execute_in setting:

PrimitiveRuns inNotes
Node functionsActivity (default) or WorkflowControlled by execute_in
@task functionsActivity (default) or WorkflowControlled by activity_options
Conditional edge functions (add_conditional_edges)WorkflowAlways runs in the Workflow. Must be deterministic and async (sync functions trigger run_in_executor, which is not allowed in the Temporal sandbox).
interrupt()ActivityCall interrupt() inside Activity nodes. The plugin serializes the interrupt and propagates it to the Workflow.
Command(resume=...)WorkflowUsed from Workflow code to resume after an interrupt.
InMemorySaver checkpointerWorkflowRuns in-process. Temporal handles durability — external checkpointers are not needed.
Streaming (astream)WorkflowThe Workflow drives the stream; each node still executes as an Activity.
tip

Conditional edge functions like should_continue must be async def, not plain def. Synchronous functions cause LangGraph to use run_in_executor, which is not supported inside Temporal's Workflow sandbox.

# ✅ Correct: async conditional edge function
async def should_continue(state: AgentState) -> str:
if state["messages"][-1].startswith("[Agent]") and "Calling" in state["messages"][-1]:
return "tools"
return END

g.add_conditional_edges("agent", should_continue)

Syntax

# Graph API
g.add_node("my_node", my_node, metadata={"execute_in": "workflow"})

# Functional API
plugin = LangGraphPlugin(
tasks=[my_task],
activity_options={"my_task": {"execute_in": "workflow"}},
)

Example: subgraph orchestration

A common pattern is a parent node that runs in the Workflow and dispatches to a child graph whose nodes run as Activities:

async def parent_node(state: State) -> dict[str, str]:
return await graph("child").compile().ainvoke(state)

parent = StateGraph(State)
parent.add_node("parent_node", parent_node, metadata={"execute_in": "workflow"})
parent.add_edge(START, "parent_node")

plugin = LangGraphPlugin(graphs={"parent": parent, "child": child})

Human-in-the-loop

LangGraph's interrupt() works with Temporal signals and queries to support human-in-the-loop patterns:

  1. A graph node calls interrupt(draft), pausing execution.
  2. The Workflow exposes the pending draft via a Temporal query.
  3. An external process (UI, CLI) queries the draft and sends approval via a Temporal signal.
  4. The graph resumes — interrupt() returns the signal value and the node completes.

See the human-in-the-loop samples for complete working examples using both Graph and Functional APIs.

Continue-as-new with caching

For long-running workflows, use continue_as_new with the task result cache to avoid re-executing completed nodes:

  1. Call cache() before continuing to capture completed task results.
  2. Pass the cache to the next Workflow execution.
  3. On the next execution, pass the cache to graph() to restore results.

See the continue-as-new samples for complete working examples.

Samples

The LangGraph plugin samples demonstrate all supported patterns across both APIs:

SampleGraph APIFunctional APIDescription
Hello WorldYesYesSimplest possible single-node graph
Human-in-the-loopYesYesinterrupt() with Temporal signals and queries
Continue-as-newYesYesLong-running workflows with task result caching
ReAct AgentYesYesTool-calling agent loop
Control FlowYesParallel execution, loops, and branching

To run any sample:

# Terminal 1: Start Temporal
temporal server start-dev

# Terminal 2: Start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py

# Terminal 3: Run the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py