跳转至

持久化

LangGraph 具有通过检查点器(checkpointers)实现的内置持久化层。当你用检查点器编译图时,检查点器会在每个超级步(super-step)保存图状态的一个 checkpoint。这些检查点会被保存到一个 thread(线程)中,图执行结束后可以访问该线程。由于 threads 允许在执行后访问图的状态,因此可以实现诸如人与环(human-in-the-loop)、记忆(memory)、时间旅行(time travel)以及容错(fault-tolerance)等强大能力。下面我们将更详细地讨论这些概念。

检查点

LangGraph API 会自动处理检查点

使用 LangGraph API 时,无需手动实现或配置检查点器。API 会在幕后为你处理所有持久化基础设施。

线程

线程是分配给每个由检查点器保存的检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当一次运行被执行时,助手底层图的状态会持久化到该线程。

在使用检查点器调用图时,你必须在配置的 configurable 部分指定一个 thread_id

{"configurable": {"thread_id": "1"}}

可以检索线程的当前状态和历史状态。要持久化状态,必须在执行运行之前创建线程。LangGraph 平台 API 提供了多个用于创建和管理线程及线程状态的端点。更多详情请参见 API 参考

检查点

线程在某个特定时间点的状态称为检查点。检查点是在每个超级步保存的图状态快照,由 StateSnapshot 对象表示,具有以下关键属性:

  • config: 与该检查点关联的配置。
  • metadata: 与该检查点关联的元数据。
  • values: 此时刻状态通道的值。
  • next 图中接下来要执行的节点名称元组。
  • tasks: PregelTask 对象的元组,包含将要执行的下一个任务的信息。如果之前尝试过该步骤,它将包含错误信息。如果图在节点内部被动态中断,tasks 将包含与中断相关的附加数据。

检查点是持久化的,并可用于稍后恢复线程状态。

让我们看看在如下方式调用一个简单图时会保存哪些检查点:

API Reference: StateGraph | START | END | InMemorySaver

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

在我们运行该图之后,我们预计会看到恰好 4 个检查点:

  • 空检查点,START 为将要执行的下一个节点
  • 包含用户输入 {'foo': '', 'bar': []} 的检查点,node_a 为将要执行的下一个节点
  • 包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,node_b 为将要执行的下一个节点
  • 包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,且没有下一个要执行的节点

注意,bar 通道的值包含两个节点的输出,因为我们为 bar 通道定义了一个 reducer。

获取状态

与已保存的图状态交互时,你必须指定一个线程标识符。你可以通过调用 graph.get_state(config) 查看图的最新状态。这将返回一个 StateSnapshot 对象,该对象对应于配置中提供的线程 ID 关联的最新检查点;如果提供了线程的检查点 ID,也可返回该检查点对应的快照。

# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

在我们的示例中,get_state 的输出如下所示:

StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史

你可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。需要强调的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot 会在列表的第一位。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

在我们的示例中,get_state_history 的输出如下所示:

[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]

状态

回放

我们也可以回放之前的图执行。如果我们用 thread_idcheckpoint_idinvoke 一个图,那么我们会在对应 checkpoint_id 的检查点之前的步骤进行回放,而只执行该检查点之后的步骤。

  • thread_id 是线程的 ID。
  • checkpoint_id 是指向线程内某个特定检查点的标识符。

你必须在调用图时,将它们作为配置中 configurable 部分的一部分传入:

config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)

需要强调的是,LangGraph 知道某个特定步骤是否曾被执行过。如果是,LangGraph 仅会回放该图中的那个步骤,而不会重新执行它,但这仅适用于提供的 checkpoint_id 之前的步骤。所有在 checkpoint_id 之后的步骤都会被执行(即形成新的分支),即使它们之前也被执行过。参见这篇时间旅行教程,了解更多关于回放的信息

回放

更新状态

除了从特定 checkpoints 回放图之外,我们还可以编辑图状态。我们使用 graph.update_state() 来实现。这一方法接受三个不同的参数:

config

配置应包含 thread_id,用于指定要更新的线程。仅传入 thread_id 时,我们会更新(或分叉)当前状态。可选地,如果包含 checkpoint_id 字段,则会从该指定检查点进行分叉。

values

这些值将用于更新状态。请注意,此次更新与任何节点的更新完全等同。这意味着如果在图状态的一些通道上定义了 reducer 函数,这些值将会传递给 reducer。也就是说,update_state 并不会自动覆盖所有通道的值,而只会覆盖没有 reducer 的通道。下面通过一个示例说明。

假设你用如下模式定义了图的状态(完整示例见上文):

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

现在假设当前图状态为

{"foo": 1, "bar": ["a"]}

如果按如下方式更新状态:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

那么图的新状态将会是:

{"foo": 2, "bar": ["a", "b"]}

foo 键(通道)被完全改变(因为该通道没有指定 reducer,所以 update_state 会覆盖它)。然而,bar 键指定了 reducer,因此会将 "b" 追加到 bar 的状态中。

as_node

调用 update_state 时,你还可以选择性地指定 as_node。如果提供了该参数,更新将会被视为来自于节点 as_node。如果未提供 as_node,且不含歧义,它将被设置为上次更新状态的那个节点。这一点之所以重要,是因为接下来要执行的步骤取决于上一次提交更新的节点,因此你可以用它来控制接下来执行哪个节点。参见这篇时间旅行教程,了解更多关于分叉状态的信息

更新

内存存储

共享状态模型

状态模式指定了图执行时会被填充的一组键。如上所述,状态可以由检查点器在每个图步骤写入到线程,从而启用状态持久化。

但是,如果我们想要在不同线程之间保留一些信息该怎么办?考虑一个聊天机器人的场景:我们希望在与该用户的所有聊天会话(例如不同线程)之间保留该用户的特定信息!

仅依靠检查点器,我们无法在不同线程之间共享信息。这就引出了 Store 接口的需求。作为示例,我们可以定义一个 InMemoryStore 来在不同线程间存储有关用户的信息。我们像之前一样用检查点器编译图,同时加入新的 in_memory_store 变量。

LangGraph API 会自动处理存储

使用 LangGraph API 时,你无需手动实现或配置存储。API 会在幕后为你处理所有存储基础设施。

基本用法

首先,让我们先在不使用 LangGraph 的情况下单独演示。

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

记忆按一个 tuple 命名空间划分,在本例中为 (<user_id>, "memories")。命名空间可以是任意长度,表示任何含义,不一定与用户相关。

user_id = "1"
namespace_for_memory = (user_id, "memories")

我们使用 store.put 方法将记忆保存到存储中的命名空间。执行时,我们指定上述命名空间,并提供记忆的键值对:键是该记忆的唯一标识符(memory_id),值(字典)是记忆本身。

memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)

我们可以使用 store.search 方法读取命名空间中的记忆,它会以列表形式返回某个用户的所有记忆。最新的记忆在列表最后一项。

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

每种记忆类型都是一个 Python 类(Item),具有特定属性。我们可以像上面那样通过 .dict 转换为字典来访问。

它具有以下属性:

  • value: 此记忆的值(本身是字典)
  • key: 此命名空间中该记忆的唯一键
  • namespace: 字符串列表,即该记忆类型的命名空间
  • created_at: 创建该记忆的时间戳
  • updated_at: 更新该记忆的时间戳

语义搜索

除了简单检索外,存储还支持语义搜索,允许你基于含义而非精确匹配来查找记忆。要启用此功能,请用嵌入模型配置存储:

API Reference: init_embeddings

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # 嵌入提供方
        "dims": 1536,                              # 嵌入维度
        "fields": ["food_preference", "$"]              # 需要嵌入的字段
    }
)

现在进行搜索时,你可以使用自然语言查询来查找相关记忆:

# 查找关于食物偏好的记忆
# (可在将记忆写入存储后执行)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # 返回匹配度最高的 3 条
)

你可以通过配置 fields 参数来控制哪些部分会被嵌入,或在存储记忆时通过指定 index 参数来控制:

# 仅对特定字段进行嵌入
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # 仅对 "food_preference" 字段进行嵌入
)

# 不进行嵌入(仍可检索,但无法通过语义搜索命中)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

准备就绪后,我们在 LangGraph 中使用 in_memory_storein_memory_store 与检查点器配合工作:检查点器如上所述将状态保存到线程,而 in_memory_store 允许我们存储可跨线程访问的任意信息。我们在编译图时同时传入检查点器和 in_memory_store,如下所示。

API Reference: InMemorySaver

from langgraph.checkpoint.memory import InMemorySaver

# 需要启用线程(会话),因此需要检查点器
checkpointer = InMemorySaver()

# ... 定义图 ...

# 使用检查点器和存储编译图
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

我们像之前一样用 thread_id 调用图,并额外传入 user_id,我们将使用它来将记忆命名空间限定到特定用户。

# 调用图
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# 首先让我们向 AI 打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)

在任意节点中,我们都可以通过将 store: BaseStoreconfig: RunnableConfig 作为节点参数来访问 in_memory_storeuser_id。下面展示了如何在节点中使用语义搜索来查找相关记忆:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # 从配置中获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 为记忆设置命名空间
    namespace = (user_id, "memories")

    # ... 分析对话并创建一条新记忆

    # 创建新的记忆 ID
    memory_id = str(uuid.uuid4())

    # 创建一条新记忆
    store.put(namespace, memory_id, {"memory": memory})

如上所示,我们也可以在任意节点中访问存储并使用 store.search 方法获取记忆。回顾一下,记忆以对象列表形式返回,可转换为字典。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

我们可以访问这些记忆,并将它们用于模型调用。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # 从配置中获取用户 ID
    user_id = config["configurable"]["user_id"]

    # 为记忆设置命名空间
    namespace = (user_id, "memories")

    # 基于最新一条消息进行搜索
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... 在模型调用中使用记忆

如果我们创建一个新线程,只要 user_id 相同,仍然可以访问相同的记忆。

# 调用图
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 我们再打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)

在使用 LangGraph 平台时,无论是本地(例如在 LangGraph Studio)还是在 LangGraph 平台上,基础存储默认可用,无需在图编译期间指定。要启用语义搜索,你需要在 langgraph.json 文件中配置索引设置。例如:

{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}

更多详情和配置选项请参见部署指南

检查点器库

在底层,检查点由符合 BaseCheckpointSaver 接口的检查点器对象驱动。LangGraph 提供了若干检查点器实现,都通过独立可安装的库提供:

  • langgraph-checkpoint: 检查点器保存器的基础接口(BaseCheckpointSaver)以及序列化/反序列化接口(SerializerProtocol)。包含用于实验的内存检查点器实现(InMemorySaver)。LangGraph 默认包含 langgraph-checkpoint
  • langgraph-checkpoint-sqlite: 使用 SQLite 数据库的 LangGraph 检查点器实现(SqliteSaver / AsyncSqliteSaver)。适合实验和本地工作流。需单独安装。
  • langgraph-checkpoint-postgres: 使用 Postgres 数据库的高级检查点器(PostgresSaver / AsyncPostgresSaver),在 LangGraph 平台中使用。适合生产环境。需单独安装。

检查点器接口

每个检查点器都遵循 BaseCheckpointSaver 接口,并实现以下方法:

  • .put - 存储带有其配置和元数据的检查点。
  • .put_writes - 存储与检查点关联的中间写入(即挂起写入)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出符合给定配置和筛选条件的检查点。用于在 graph.get_state_history() 中填充状态历史。

如果在异步图执行中使用检查点器(例如通过 .ainvoke, .astream, .abatch 执行图),将使用上述方法的异步版本(.aput, .aput_writes, .aget_tuple, .alist)。

Note

要异步运行图,你可以使用 InMemorySaver,或使用 Sqlite/Postgres 检查点器的异步版本 —— AsyncSqliteSaver / AsyncPostgresSaver

序列化器

当检查点器保存图状态时,需要序列化状态中的通道值。这通过序列化器对象完成。

langgraph_checkpoint 定义了用于实现序列化器的 protocol,并提供了默认实现(JsonPlusSerializer),可处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用 pickle 进行序列化

默认序列化器 JsonPlusSerializer 底层使用 ormsgpack 和 JSON,对所有类型的对象并不都适用。

如果你想对当前 msgpack 编码器尚不支持的对象(如 Pandas DataFrame)回退到 pickle,可以在 JsonPlusSerializer 中使用 pickle_fallback 参数:

API Reference: InMemorySaver | JsonPlusSerializer

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... 定义图 ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点器可以选择性地对所有持久化状态进行加密。要启用此功能,请将 EncryptedSerializer 的实例传给任意 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器最简单的方法是使用 from_pycryptodome_aes,它会从环境变量 LANGGRAPH_AES_KEY 读取 AES 密钥(或接受一个 key 参数):

API Reference: SqliteSaver

import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # 读取 LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)

API Reference: PostgresSaver

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()

在 LangGraph 平台上运行时,只要存在 LANGGRAPH_AES_KEY,就会自动启用加密,因此你只需提供该环境变量。你也可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用其他加密方案。

能力

人在回路

首先,检查点器通过允许人类检查、打断并批准图步骤来促进人在回路工作流。这类工作流需要检查点器,因为人类需要能够在任意时间点查看图的状态,并且在人类对状态做出任何更新后,图需要能够恢复执行。示例请参见操作指南

记忆

其次,检查点器允许交互之间的“记忆”。在重复的人机交互(如对话)场景下,任何后续消息都可以发送到同一个线程,该线程将保留之前消息的记忆。查看添加记忆以了解如何使用检查点器添加和管理对话记忆。

时间旅行

第三,检查点器允许“时间旅行”,让用户可以回放之前的图执行,以审查和/或调试特定图步骤。此外,检查点器使得可以在任意检查点分叉图状态,以探索替代轨迹。

容错

最后,检查点也提供了容错和错误恢复:如果一个或多个节点在某个超级步失败,你可以从上一个成功步骤重新启动图。此外,当某个图节点在某个超级步执行中途失败时,LangGraph 会存储在该超级步中其他成功完成节点的挂起检查点写入,这样在我们从该超级步恢复图执行时,就不需要重新运行那些已经成功的节点。

挂起写入

此外,当某个图节点在某个超级步执行中途失败时,LangGraph 会存储在该超级步中其他成功完成节点的挂起检查点写入,这样在我们从该超级步恢复图执行时,就不需要重新运行那些已经成功的节点。