跳转至

如何使用 Graph API

本指南演示了 LangGraph 的 Graph API 基础用法。它将带你了解状态,以及如何组合常见的图结构,例如顺序分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 Map-Reduce 工作流的 Send API 以及将状态更新与跨节点“跳转”相结合的 Command API

设置

安装 langgraph

pip install -U langgraph

设置 LangSmith 以获得更好的调试体验

注册使用 LangSmith,快速发现问题并提升你的 LangGraph 项目的性能。LangSmith 允许你利用追踪数据来调试、测试和监控基于 LangGraph 构建的 LLM 应用——如何入门请参见文档

定义并更新状态

这里展示如何在 LangGraph 中定义和更新状态。我们将演示:

  1. 如何使用状态来定义图的模式(schema)
  2. 如何使用归约器(reducers)来控制状态更新的处理方式。

定义状态

LangGraph 中的状态可以是 TypedDictPydantic 模型,或 dataclass。下面我们将使用 TypedDict。关于使用 Pydantic 的细节,请参见此部分

默认情况下,图会有相同的输入与输出模式,并且由状态决定此模式。关于如何定义不同的输入与输出模式,请参见此部分

我们来考虑一个使用消息的简单例子。这种状态表示对于许多 LLM 应用都很通用。更多细节见我们的概念页面

API Reference: AnyMessage

from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int

该状态跟踪一组消息对象,以及一个额外的整数字段。

更新状态

我们来构建一个只有单个节点的示例图。我们的节点只是一个 Python 函数,它读取图的状态并对其进行更新。此函数的第一个参数始终是状态:

API Reference: AIMessage

from langchain_core.messages import AIMessage

def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("Hello!")
    return {"messages": messages + [new_message], "extra_field": 10}

该节点简单地在消息列表中追加一条消息,并填充一个额外字段。

Important

节点应直接返回对状态的更新,而不是就地修改状态。

接下来我们定义一个包含该节点的简单图。我们使用 StateGraph 来定义一个在此状态上运行的图。随后使用 add_node 来填充图。

API Reference: StateGraph

from langgraph.graph import StateGraph

builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()

LangGraph 提供了可视化图形的内置工具。让我们检查一下我们的图。关于可视化的更多细节见此部分

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

仅含单节点的简单图

在这个例子中,我们的图只执行单个节点。让我们继续进行一次简单的调用:

API Reference: HumanMessage

from langchain_core.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}

注意:

  • 我们通过更新状态中的单个键来启动了调用。
  • 调用结果中我们收到了整个状态。

为了方便,我们经常通过 pretty-print 来检查消息对象的内容:

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

使用 reducer 处理状态更新

状态中的每个键都可以拥有独立的归约器(reducer)函数,用来控制节点返回的更新如何应用。如果未显式指定 reducer 函数,则默认所有对该键的更新都会覆盖其当前值。

对于 TypedDict 状态模式,我们可以通过为相应的状态字段添加注解来定义 reducer 函数。

在前面的例子中,我们的节点通过向其追加消息的方式更新了状态中的 "messages" 键。下面,我们为该键添加一个 reducer,使更新可以自动追加:

from typing_extensions import Annotated

def add(left, right):
    """Can also import `add` from the `operator` built-in."""
    return left + right

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add]
    extra_field: int

现在我们的节点可以简化为:

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

API Reference: START

from langgraph.graph import START

graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()

result = graph.invoke({"messages": [HumanMessage("Hi")]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

MessagesState

在实践中,更新消息列表还有其他考虑:

  • 我们可能希望更新状态中已有的一条消息。
  • 我们可能希望接受消息格式的简写,例如 OpenAI 格式

LangGraph 包含内置的 reducer add_messages,可处理这些考虑点:

API Reference: add_messages

from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    extra_field: int

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
input_message = {"role": "user", "content": "Hi"}

result = graph.invoke({"messages": [input_message]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

这是一种适用于涉及聊天模型的应用的通用状态表示。LangGraph 为方便起见包含了预构建的 MessagesState,因此我们可以这样写:

from langgraph.graph import MessagesState

class State(MessagesState):
    extra_field: int

定义输入与输出模式

默认情况下,StateGraph 使用单一模式,所有节点都应使用该模式进行通信。不过,也可以为图定义不同的输入与输出模式。

当指定了不同的模式后,仍将使用内部模式在节点之间通信。输入模式确保所提供的输入符合预期结构,而输出模式会将内部数据过滤,只返回与定义的输出模式相匹配的相关信息。

下面我们看看如何定义不同的输入与输出模式。

API Reference: StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# Define the schema for the input
class InputState(TypedDict):
    question: str

# Define the schema for the output
class OutputState(TypedDict):
    answer: str

# Define the overall schema, combining both input and output
class OverallState(InputState, OutputState):
    pass

# Define the node that processes the input and generates an answer
def answer_node(state: InputState):
    # Example answer and an extra key
    return {"answer": "bye", "question": state["question"]}

# Build the graph with input and output schemas specified
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node)  # Add the answer node
builder.add_edge(START, "answer_node")  # Define the starting edge
builder.add_edge("answer_node", END)  # Define the ending edge
graph = builder.compile()  # Compile the graph

# Invoke the graph with an input and print the result
print(graph.invoke({"question": "hi"}))
{'answer': 'bye'}

请注意,invoke 的输出只包含输出模式定义的字段。

在节点之间传递私有状态

某些情况下,你可能希望节点之间交换一些对中间逻辑至关重要但不需要成为图主模式一部分的信息。这些私有数据与图的整体输入/输出无关,只应在特定节点之间共享。

下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的顺序图,其中私有数据在前两步(node_1 和 node_2)之间传递,而第三步(node_3)仅能访问公共的整体状态。

API Reference: StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):
    a: str

# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):
    private_data: str

# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:
    output = {"private_data": "set by node_1"}
    print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):
    private_data: str

def node_2(state: Node2Input) -> OverallState:
    output = {"a": "set by node_2"}
    print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:
    output = {"a": "set by node_3"}
    print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")
    return output

# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()

# Invoke the graph with the initial state
response = graph.invoke(
    {
        "a": "set at start",
    }
)

print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:
    Input: {'a': 'set at start'}.
    Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:
    Input: {'private_data': 'set by node_1'}.
    Returned: {'a': 'set by node_2'}
Entered node `node_3`:
    Input: {'a': 'set by node_2'}.
    Returned: {'a': 'set by node_3'}

Output of graph invocation: {'a': 'set by node_3'}

将 Pydantic 模型用作图状态

StateGraph 在初始化时接受 state_schema 参数,用于指定图中节点可访问和更新的状态“形状”。

在我们的示例中,我们通常使用 Python 原生的 TypedDictdataclass 作为 state_schema,但 state_schema 可以是任意类型

这里我们将看到如何使用 Pydantic BaseModel 作为 state_schema,以对输入进行运行时校验。

已知限制

  • 目前,图的输出并不会是 pydantic 模型实例。
  • 运行时校验仅发生在进入节点的输入上,而不会校验节点输出。
  • 来自 pydantic 的校验错误追踪不会显示错误发生在哪个节点。
  • Pydantic 的递归校验可能较慢。对性能敏感的应用,你可能更想使用 dataclass

API Reference: StateGraph | START | END

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel

# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):
    a: str

def node(state: OverallState):
    return {"a": "goodbye"}

# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 is the first node
builder.add_edge(START, "node")  # Start the graph with node_1
builder.add_edge("node", END)  # End the graph after node_1
graph = builder.compile()

# Test the graph with a valid input
graph.invoke({"a": "hello"})

使用一个“无效”的输入调用图

try:
    graph.invoke({"a": 123})  # Should be a string
except Exception as e:
    print("An exception was raised because `a` is an integer rather than a string.")
    print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
a
  Input should be a valid string [type=string_type, input_value=123, input_type=int]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type

关于 Pydantic 模型状态的更多特性见下方:

序列化行为

当使用 Pydantic 模型作为状态模式时,理解序列化行为很重要,尤其是以下场景: - 传递 Pydantic 对象作为输入 - 从图中接收输出 - 使用嵌套的 Pydantic 模型

我们来看下这些行为。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class NestedModel(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: NestedModel

def process_node(state: ComplexState):
    # Node receives a validated Pydantic object
    print(f"Input state type: {type(state)}")
    print(f"Nested type: {type(state.nested)}")
    # Return a dictionary update
    return {"text": state.text + " processed", "count": state.count + 1}

# Build the graph
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

# Create a Pydantic instance for input
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"Input object type: {type(input_state)}")

# Invoke graph with a Pydantic instance
result = graph.invoke(input_state)
print(f"Output type: {type(result)}")
print(f"Output content: {result}")

# Convert back to Pydantic model if needed
output_model = ComplexState(**result)
print(f"Converted back to Pydantic: {type(output_model)}")
运行时类型强制转换

Pydantic 会对某些数据类型进行运行时类型强制转换。这很有帮助,但如果不了解也可能带来意外行为。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class CoercionExample(BaseModel):
    # Pydantic will coerce string numbers to integers
    number: int
    # Pydantic will parse string booleans to bool
    flag: bool

def inspect_node(state: CoercionExample):
    print(f"number: {state.number} (type: {type(state.number)})")
    print(f"flag: {state.flag} (type: {type(state.flag)})")
    return {}

builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()

# Demonstrate coercion with string inputs that will be converted
result = graph.invoke({"number": "42", "flag": "true"})

# This would fail with a validation error
try:
    graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
    print(f"\nExpected validation error: {e}")
与消息模型协作

当在状态模式中使用 LangChain 的消息类型时,有一些与序列化相关的重要注意事项。你应该使用 AnyMessage(而不是 BaseMessage),以便在使用消息对象进行跨进程传输时实现正确的序列化/反序列化。

from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from typing import List

class ChatState(BaseModel):
    messages: List[AnyMessage]
    context: str

def add_message(state: ChatState):
    return {"messages": state.messages + [AIMessage(content="Hello there!")]}

builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()

# Create input with a message
initial_state = ChatState(
    messages=[HumanMessage(content="Hi")], context="Customer support chat"
)

result = graph.invoke(initial_state)
print(f"Output: {result}")

# Convert back to Pydantic model to see message types
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
    print(f"Message {i}: {type(msg).__name__} - {msg.content}")

添加运行时配置

有时你希望在调用图时对其进行配置。例如,你可能希望在运行时指定使用哪个 LLM 或系统提示,而不把这些参数“污染”进图的状态中。

添加运行时配置的步骤:

  1. 为你的配置指定一个模式
  2. 将配置添加到节点或条件边的函数签名中
  3. 在运行时将配置传入图

下面是一个简单示例:

API Reference: END | StateGraph | START

from langgraph.graph import END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

# 1. Specify config schema
class ContextSchema(TypedDict):
    my_runtime_value: str

# 2. Define a graph that accesses the config in a node
class State(TypedDict):
    my_state_value: str

def node(state: State, runtime: Runtime[ContextSchema]):
    if runtime.context["my_runtime_value"] == "a":
        return {"my_state_value": 1}
    elif runtime.context["my_runtime_value"] == "b":
        return {"my_state_value": 2}
    else:
        raise ValueError("Unknown values.")

builder = StateGraph(State, context_schema=ContextSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)

graph = builder.compile()

# 3. Pass in configuration at runtime:
print(graph.invoke({}, context={"my_runtime_value": "a"}))
print(graph.invoke({}, context={"my_runtime_value": "b"}))
{'my_state_value': 1}
{'my_state_value': 2}
扩展示例:在运行时指定 LLM

下面演示在运行时配置使用哪个 LLM 的实际例子。我们将同时使用 OpenAI 和 Anthropic 模型。

from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    response = model.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
# With no configuration, uses default (Anthropic)
response_1 = graph.invoke({"messages": [input_message]})["messages"][-1]
# Or, can set OpenAI
response_2 = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai"})["messages"][-1]

print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
claude-3-5-haiku-20241022
gpt-4.1-mini-2025-04-14

扩展示例:在运行时指定模型与系统消息

下面演示在运行时配置两个参数的实际例子:所用 LLM 和系统消息。

from dataclasses import dataclass
from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"
    system_message: str | None = None

MODELS = {
    "anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),
    "openai": init_chat_model("openai:gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    messages = state["messages"]
    if (system_message := runtime.context.system_message):
        messages = [SystemMessage(system_message)] + messages
    response = model.invoke(messages)
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# Usage
input_message = {"role": "user", "content": "hi"}
response = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai", "system_message": "Respond in Italian."})
for message in response["messages"]:
    message.pretty_print()
================================ Human Message ================================

hi
================================== Ai Message ==================================

Ciao! Come posso aiutarti oggi?

添加重试策略

在很多用例中,你可能希望为节点配置自定义重试策略,例如调用 API、查询数据库、调用 LLM 等。LangGraph 允许你为图中的节点添加重试策略。

要配置重试策略,将 retry_policy 参数传给 add_noderetry_policy 参数接受一个 RetryPolicy named tuple 对象。下面我们以默认参数实例化一个 RetryPolicy 对象并将其与节点关联:

from langgraph.pregel import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry_policy=RetryPolicy(),
)

默认情况下,retry_on 参数使用 default_retry_on 函数,它会对除以下异常外的任意异常进行重试:

  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError

另外,对于来自常见 HTTP 请求库(如 requestshttpx)的异常,只有在状态码为 5xx 时才会重试。

扩展示例:自定义重试策略

考虑一个从 SQL 数据库读取的例子。下面我们为两个节点传入了不同的重试策略:

import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# Define a new graph
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()

添加节点缓存

当你希望避免重复操作时(例如执行耗时或昂贵的操作),节点缓存就很有用。LangGraph 允许你为图中的节点添加个性化的缓存策略。

要配置缓存策略,将 cache_policy 参数传给 add_node 函数。下面的示例中,我们实例化了一个 CachePolicy 对象,设置了 120 秒的 TTL,并使用默认的 key_func 生成器,然后将其关联到一个节点:

from langgraph.types import CachePolicy

builder.add_node(
    "node_name",
    node_function,
    cache_policy=CachePolicy(ttl=120),
)

然后,为图启用节点级缓存时,在编译图时设置 cache 参数。下面的例子使用 InMemoryCache 来设置内存缓存,也可以使用 SqliteCache

from langgraph.cache.memory import InMemoryCache

graph = builder.compile(cache=InMemoryCache())

构建一系列步骤

前置知识

本指南默认你已经熟悉上面关于状态的章节。

这里我们展示如何构建一个简单的步骤序列。我们将展示:

  1. 如何构建一个顺序图
  2. 构建类似图结构的内置简写

要添加一系列节点,我们使用图的 .add_node.add_edge 方法(参见):

API Reference: START | StateGraph

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

我们也可以使用内置简写 .add_sequence

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
为什么要用 LangGraph 将应用步骤拆成顺序执行?

LangGraph 让你的应用很容易加入底层的持久化层。 这允许状态在节点执行之间被检查点保存,因此你的 LangGraph 节点会决定:

- 状态更新如何被[检查点](../concepts/persistence.md)
- 在[人类参与](../concepts/human_in_the_loop.md)的工作流中如何恢复中断
- 如何使用 LangGraph 的[时间旅行](../concepts/time-travel.md)功能进行“回滚”和分支

它们还决定了执行步骤如何被[流式](../concepts/streaming.md)传输,以及如何使用 [LangGraph Studio](../concepts/langgraph_studio.md) 对你的应用进行可视化和调试。

让我们演示一个端到端的例子。我们将创建三步顺序:

  1. 在状态的某个键上填充值
  2. 更新该值
  3. 填充另一个值

首先定义我们的状态。它决定了图的模式(schema),并且还能指定如何应用更新。更多细节见此部分

我们的例子中将跟踪两个值:

from typing_extensions import TypedDict

class State(TypedDict):
    value_1: str
    value_2: int

我们的节点只是一些读取图状态并进行更新的 Python 函数。此函数的第一个参数始终是状态:

def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}

python 然后我们使用 add_nodeadd_edge 来填充图并定义其控制流。

API Reference: START | StateGraph

from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# Add nodes
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# Add edges
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
:::

自定义节点名称

你可以在 .add_node 中为节点指定自定义名称:

builder.add_node("my_node", step_1)

注意:

  • .add_edge 接受节点名称,对于函数默认是 node.__name__
  • 我们必须指定图的入口点。为此我们使用 START 节点 添加边。
  • 当没有更多节点可执行时,图会停止。

接着我们编译图。这将对图结构进行一些基本检查(例如识别孤立节点)。如果我们通过检查点器为应用添加持久化,也会在这里传入。

graph = builder.compile()

LangGraph 提供了图可视化的内置工具。让我们检查一下这个顺序。关于可视化的更多细节见本指南

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

多步骤顺序图

让我们进行一次简单调用:

graph.invoke({"value_1": "c"})
{'value_1': 'a b', 'value_2': 10}

请注意:

  • 我们通过为单个状态键提供值来启动调用。必须至少为一个键提供值。
  • 我们传入的值在第一个节点中被覆盖。
  • 第二个节点对该值进行了更新。
  • 第三个节点填充了另一个值。

内置简写

langgraph>=0.2.46 包含了用于添加节点序列的内置简写 add_sequence。你可以如下方式编译同一个图:

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

graph = builder.compile()

graph.invoke({"value_1": "c"})

创建分支

节点的并行执行对于提升图整体运行速度至关重要。LangGraph 原生支持节点的并行执行,这可显著增强基于图的工作流性能。这种并行化通过“扇出”和“扇入”机制实现,既利用标准边也利用条件边。下面通过一些示例展示如何创建适合你的分支数据流。

并行运行图节点

在这个例子中,我们从 Node A 扇出到 B 和 C,然后在 D 处扇入。结合我们的状态,我们为该键指定了 add 这个 reducer 操作。这会对特定状态键进行合并/累加,而不是简单覆盖现有值。对于列表而言,这意味着把新列表与现有列表拼接。关于使用 reducer 更新状态的更多细节见上面状态 reducer部分。

API Reference: StateGraph | START | END

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

并行执行图

使用 reducer 后,你会看到每个节点中添加的值都被累积起来。

graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']

Note

在上面的例子中,节点 "b""c" 在同一个超级步(superstep)中并发执行。因为它们处于同一超级步,节点 "d" 会在 "b""c" 都完成后再执行。

需要注意的是,并行超级步中的更新可能无法保证一致的顺序。如果你需要并行超级步中更新具有一致的、预设的顺序,应将输出写入状态中的单独字段,并附带可用于排序的值。

异常处理?

LangGraph 在超级步(superstep)内执行节点,这意味着虽然并行分支会并行执行,但整个超级步是“事务性”的。如果任意分支抛出异常,状态更新将不会被应用(整个超级步报错)。

需要注意的是,在使用检查点器时,同一超级步中成功节点的结果会被保存,并且在恢复时不会重复执行。

如果你有容易出错的步骤(例如脆弱的 API 调用),LangGraph 提供两种方式来处理:

  1. 你可以在节点内编写常规的 Python 代码来捕获并处理异常。
  2. 你可以设置重试策略(retry_policy)来让图对抛出特定异常类型的节点进行重试。只有失败的分支会被重试,因此不用担心执行冗余工作。

二者结合,让你既能进行并行执行,又能完全控制异常处理。

延迟节点执行

当你希望延迟某个节点的执行,直到所有其他待处理任务完成时,“延迟执行”就很有用。这在分支长度不同的情况下尤其相关(例如常见的 Map-Reduce 流程)。

上面的例子展示了当每条路径都只有一个步骤时如何扇出和扇入。但如果某个分支有多于一个步骤呢?让我们在 "b" 分支添加一个节点 "b_2"

API Reference: StateGraph | START | END

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

延迟执行图

graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']

在上述例子中,节点 "b""c" 在同一个超级步中并发执行。我们对节点 d 设置了 defer=True,这样它将不会执行,直到所有待处理任务完成。在这个例子中,这意味着 "d" 要等到整个 "b" 分支执行完才会执行。

条件分支

如果你的扇出应当在运行时根据状态变化,那么你可以使用 add_conditional_edges 来基于图状态选择一条或多条路径。下面的例子中,节点 a 生成的状态更新决定了后续的节点。

API Reference: StateGraph | START | END

import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # Add a key to the state. We will set this key to determine
    # how we branch.
    which: str

def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"], "which": "c"}

def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)

def conditional_edge(state: State) -> Literal["b", "c"]:
    # Fill in arbitrary logic here that uses the state
    # to determine the next node
    return state["which"]

builder.add_conditional_edges("a", conditional_edge)

graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

条件分支图

result = graph.invoke({"aggregate": []})
print(result)
Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}

Tip

你的条件边可以路由到多个目标节点。例如:

def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

Map-Reduce 与 Send API

LangGraph 通过 Send API 支持 Map-Reduce 以及其他高级分支模式。下面是一个使用示例:

API Reference: StateGraph | START | END | Send

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing_extensions import TypedDict, Annotated
import operator

class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    best_selected_joke: str

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
builder.add_edge("generate_topics", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

带扇出的 Map-Reduce 图

# Call the graph: here we call it to generate a list of jokes
for step in graph.stream({"topic": "animals"}):
    print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}

创建并控制循环

在创建带有循环的图时,我们需要一种终止执行的机制。最常见的方式是添加一条条件边,在达到某个终止条件时路由至 END 节点。

你还可以在调用或流式运行图时设置图的递归上限(recursion limit)。递归上限设置了图在抛出错误之前允许执行的超级步(supersteps)数量。关于递归上限的概念更多内容见这里

让我们通过一个简单的循环图来更好地理解这些机制。

Tip

若希望返回状态的最后值而不是收到递归上限错误,请参见下一节

在创建循环时,你可以包含一条条件边,指定终止条件:

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

def route(state: State) -> Literal["b", END]:
    if termination_condition(state):
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

要控制递归上限,在配置中指定 "recursionLimit"。这会抛出 GraphRecursionError,你可以捕获并处理它:

from langgraph.errors import GraphRecursionError

try:
    graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
    print("Recursion Error")

让我们定义一个带有简单循环的图。注意我们使用了条件边来实现终止条件。

API Reference: StateGraph | START | END

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

简单循环图

这种架构类似于一个React agent,其中节点 "a" 是一个工具调用模型,而节点 "b" 代表工具。

在我们的 route 条件边中,我们指定当状态中 "aggregate" 列表的长度超过阈值后应结束。

调用图时,我们会看到在达到终止条件之前,节点 "a""b" 交替执行。

graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']

施加递归上限

在某些应用中,我们不能保证一定会达到某个终止条件。在这些情况下,我们可以设置图的递归上限。这会在经过一定数量的超级步(supersteps)后抛出 GraphRecursionError。之后我们可以捕获并处理该异常:

from langgraph.errors import GraphRecursionError

try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error
扩展示例:达到递归上限时返回状态

我们可以不抛出 GraphRecursionError,而是向状态中引入一个新键,用于记录距达到递归上限还有多少步。然后我们可以利用该键决定是否结束本次运行。

LangGraph 实现了一个特殊的 RemainingSteps 注解。其底层会创建一个 ManagedValue 通道——一种仅在图运行期间存在的状态通道。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    remaining_steps: RemainingSteps

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# Define edges
def route(state: State) -> Literal["b", END]:
    if state["remaining_steps"] <= 2:
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

# Test it out
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}

扩展示例:带分支的循环

为了更好地理解递归上限是如何工作的,让我们考虑一个更复杂的例子。下面我们实现一个循环,其中某一步会扇出到两个节点:

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'Node C sees {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'Node D sees {state["aggregate"]}')
    return {"aggregate": ["D"]}

# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

带分支的复杂循环图

该图看起来复杂,但可以被视为一组超级步构成的循环:

  1. 节点 A
  2. 节点 B
  3. 节点 C 和 D
  4. 节点 A
  5. ...

我们有一个由四个超级步构成的循环,其中节点 C 和 D 并发执行。

像之前一样调用图,可以看到在达到终止条件前我们完成了两圈:

result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']

然而,如果我们将递归上限设置为 4,由于每圈有四个超级步,我们只能完成一圈:

from langgraph.errors import GraphRecursionError

try:
    result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步

当并发运行IO 密集型代码(例如对聊天模型提供商的并发 API 请求)时,使用异步编程范式可以带来显著的性能提升。

要将图的 sync 实现转换为 async 实现,你需要:

  1. nodesdef 改为 async def
  2. 在函数内部适当使用 await
  3. 按需使用 .ainvoke.astream 来调用图。

由于许多 LangChain 对象实现了Runnable 协议,该协议对所有 sync 方法都有 async 变体,因此通常将 sync 图升级为 async 图所需工作并不多。

见下面的示例。为了演示对底层 LLM 的异步调用,我们会包含一个聊天模型:

pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-..."

llm = init_chat_model("openai:gpt-4.1")

👉 Read the OpenAI integration docs

pip install -U "langchain[anthropic]"
import os
from langchain.chat_models import init_chat_model

os.environ["ANTHROPIC_API_KEY"] = "sk-..."

llm = init_chat_model("anthropic:claude-3-5-sonnet-latest")

👉 Read the Anthropic integration docs

pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["AZURE_OPENAI_API_KEY"] = "..."
os.environ["AZURE_OPENAI_ENDPOINT"] = "..."
os.environ["OPENAI_API_VERSION"] = "2025-03-01-preview"

llm = init_chat_model(
    "azure_openai:gpt-4.1",
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
)

👉 Read the Azure integration docs

pip install -U "langchain[google-genai]"
import os
from langchain.chat_models import init_chat_model

os.environ["GOOGLE_API_KEY"] = "..."

llm = init_chat_model("google_genai:gemini-2.0-flash")

👉 Read the Google GenAI integration docs

pip install -U "langchain[aws]"
from langchain.chat_models import init_chat_model

# Follow the steps here to configure your credentials:
# https://docs.aws.amazon.com/bedrock/latest/userguide/getting-started.html

llm = init_chat_model(
    "anthropic.claude-3-5-sonnet-20240620-v1:0",
    model_provider="bedrock_converse",
)

👉 Read the AWS Bedrock integration docs

API Reference: init_chat_model | StateGraph

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph

async def node(state: MessagesState): # (1)!
    new_message = await llm.ainvoke(state["messages"]) # (2)!
    return {"messages": [new_message]}

builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()

input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]}) # (3)!
  1. 将节点声明为异步函数。
  2. 在节点内部使用可用的异步调用。
  3. 在图对象上使用异步调用。

异步流式传输

关于异步流式的示例见流式传输指南

使用 Command 同时控制流程与更新状态

在某些情况下,将控制流(边)和状态更新(节点)结合起来很有用。例如,你可能希望在同一个节点中既进行状态更新,又决定下一个要前往的节点。LangGraph 提供了一种方式:从节点函数返回一个 Command 对象:

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # state update
        update={"foo": "bar"},
        # control flow
        goto="my_other_node"
    )

下面展示一个端到端的例子。我们创建一个简单的 3 节点图:A、B 和 C。我们首先执行节点 A,然后基于 A 的输出决定下一步走 B 还是 C。

API Reference: StateGraph | START | Command

import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command

# Define graph state
class State(TypedDict):
    foo: str

# Define the nodes

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["b", "c"])
    # this is a replacement for a conditional edge function
    if value == "b":
        goto = "node_b"
    else:
        goto = "node_c"

    # note how Command allows you to BOTH update the graph state AND route to the next node
    return Command(
        # this is the state update
        update={"foo": value},
        # this is a replacement for an edge
        goto=goto,
    )

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}

现在我们用上述节点创建 StateGraph。注意图中没有用于路由的条件边!这是因为控制流在 node_a 内通过 Command 定义了。

builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# NOTE: there are no edges between nodes A, B and C!

graph = builder.compile()

Important

你可能注意到我们使用了 Command 作为返回类型注解,例如 Command[Literal["node_b", "node_c"]]。这对于图渲染是必要的,并告诉 LangGraph,node_a 可以跳转到 node_bnode_c

from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

基于 Command 的图导航

如果我们多次运行该图,会看到它基于节点 A 中的随机选择走不同路径(A -> B 或 A -> C)。

graph.invoke({"foo": ""})
Called A
Called C

在父图中跳转到节点

如果你使用了子图,你可能希望从子图中的节点跳转到另一个子图(即父图中的不同节点)。为此,你可以在 Command 中指定 graph=Command.PARENT

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )

让我们用上面的例子来演示。我们将把上述例子中的 nodeA 改造成一个单节点图,并将其作为子图添加到父图中。

Command.PARENT 中的状态更新

当你从子图节点向父图节点发送对某个键的更新,并且该键同时存在于父图与子图的状态模式中时,你必须在父图状态中为要更新的键定义一个reducer。见下方示例。

import operator
from typing_extensions import Annotated

class State(TypedDict):
    # NOTE: we define a reducer here
    foo: Annotated[str, operator.add]

def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    # this is a replacement for a conditional edge function
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # note how Command allows you to BOTH update the graph state AND route to the next node
    return Command(
        update={"foo": value},
        goto=goto,
        # this tells LangGraph to navigate to node_b or node_c in the parent graph
        # NOTE: this will navigate to the closest parent graph relative to the subgraph
        graph=Command.PARENT,
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # NOTE: since we've defined a reducer, we don't need to manually append
    # new characters to existing 'foo' value. instead, reducer will append these
    # automatically (via operator.add)
    return {"foo": "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": "c"}

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()
graph.invoke({"foo": ""})
Called A
Called C

在工具内部使用

一个常见用例是在工具内部更新图状态。例如,在客服应用中,你可能希望在对话开始时根据账户号或 ID 查询用户信息。要从工具中更新图状态,你可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})

@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
    """Use this to look up user information to better assist them with their questions."""
    user_info = get_user_info(config.get("configurable", {}).get("user_id"))
    return Command(
        update={
            # update the state keys
            "user_info": user_info,
            # update the message history
            "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
        }
    )

Important

当从工具返回 Command 时,你必须在 Command.update 中包含 messages(或用于消息历史记录的任意状态键),并且 messages 列表中必须包含一个 ToolMessage。这对于确保结果消息历史的有效性是必要的(LLM 提供商要求带工具调用的 AI 消息之后紧跟工具结果消息)。

如果你使用通过 Command 更新状态的工具,推荐使用预构建的 ToolNode,它会自动处理工具返回的 Command 对象并将其传播到图状态中。如果你编写的是一个调用工具的自定义节点,则需要手动将工具返回的 Command 对象作为节点的更新向上游传播。

可视化你的图

这里展示如何对你创建的图进行可视化。

你可以可视化任意Graph,包括 StateGraph

来点乐子,画个分形图 :)。

API Reference: StateGraph | START | END | add_messages

import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

class MyNode:
    def __init__(self, name: str):
        self.name = name
    def __call__(self, state: State):
        return {"messages": [("assistant", f"Called node {self.name}")]}

def route(state) -> Literal["entry_node", "__end__"]:
    if len(state["messages"]) > 10:
        return "__end__"
    return "entry_node"

def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
        return
    # Number of nodes to create at this level
    num_nodes = random.randint(1, 3)  # Adjust randomness as needed
    for i in range(num_nodes):
        nm = ["A", "B", "C"][i]
        node_name = f"node_{current_node}_{nm}"
        builder.add_node(node_name, MyNode(node_name))
        builder.add_edge(current_node, node_name)
        # Recursively add more nodes
        r = random.random()
        if r > 0.2 and level + 1 < max_level:
            add_fractal_nodes(builder, node_name, level + 1, max_level)
        elif r > 0.05:
            builder.add_conditional_edges(node_name, route, node_name)
        else:
            # End
            builder.add_edge(node_name, "__end__")

def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # Optional: set a finish point if required
    builder.add_edge(entry_point, END)  # or any specific node
    return builder.compile()

app = build_fractal_graph(3)

Mermaid

我们也可以将图类转成 Mermaid 语法。

print(app.get_graph().draw_mermaid())
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    __start__([<p>__start__</p>]):::first
    entry_node(entry_node)
    node_entry_node_A(node_entry_node_A)
    node_entry_node_B(node_entry_node_B)
    node_node_entry_node_B_A(node_node_entry_node_B_A)
    node_node_entry_node_B_B(node_node_entry_node_B_B)
    node_node_entry_node_B_C(node_node_entry_node_B_C)
    __end__([<p>__end__</p>]):::last
    __start__ --> entry_node;
    entry_node --> __end__;
    entry_node --> node_entry_node_A;
    entry_node --> node_entry_node_B;
    node_entry_node_B --> node_node_entry_node_B_A;
    node_entry_node_B --> node_node_entry_node_B_B;
    node_entry_node_B --> node_node_entry_node_B_C;
    node_entry_node_A -.-> entry_node;
    node_entry_node_A -.-> __end__;
    node_node_entry_node_B_A -.-> entry_node;
    node_node_entry_node_B_A -.-> __end__;
    node_node_entry_node_B_B -.-> entry_node;
    node_node_entry_node_B_B -.-> __end__;
    node_node_entry_node_B_C -.-> entry_node;
    node_node_entry_node_B_C -.-> __end__;
    classDef default fill:#f2f0ff,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#bfb6fc

PNG

如果需要,我们可以将图渲染为 .png。这里有三种方式:

  • 使用 Mermaid.ink API(不需要额外的包)
  • 使用 Mermaid + Pyppeteer(需要 pip install pyppeteer
  • 使用 graphviz(需要 pip install graphviz

使用 Mermaid.Ink

默认情况下,draw_mermaid_png() 使用 Mermaid.Ink 的 API 生成图表。

API Reference: CurveStyle | MermaidDrawMethod | NodeStyles

from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(Image(app.get_graph().draw_mermaid_png()))

分形图可视化

使用 Mermaid + Pyppeteer

import nest_asyncio

nest_asyncio.apply()  # Required for Jupyter Notebook to run async functions

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)

使用 Graphviz

try:
    display(Image(app.get_graph().draw_png()))
except ImportError:
    print(
        "You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
    )