跳转至

持久化执行

持久化执行(Durable execution)是一种技术,使进程或工作流能够在关键点保存其进度,从而可以暂停并在之后精确地从停止处继续。这在需要人类参与的场景中特别有用——用户可以在继续前检查、验证或修改流程;也适用于运行时间较长且可能遭遇中断或错误(例如 LLM 调用超时)的任务。通过保留已完成的工作,持久化执行使流程在较长时间后(例如一周后)恢复时无需重新处理先前步骤。

LangGraph 内置的持久化(persistence)层为工作流提供持久化执行,确保每个执行步骤的状态都被保存到持久化存储中。借助这一能力,无论是系统故障中断还是为了人类参与而主动中止,工作流都可以从上次记录的状态恢复。

Tip

如果你在使用带有检查点(checkpointer)的 LangGraph,那么你已经启用了持久化执行。你可以在任意时刻暂停并恢复工作流,即使经历了中断或故障。 为了最大化持久化执行的效果,请确保你的工作流是确定性的并且是幂等的,并将所有副作用或非确定性操作封装到任务(tasks)中。你可以在StateGraph(图 API)函数式 API中使用任务(tasks)

要求

要在 LangGraph 中使用持久化执行,你需要:

  1. 在工作流中启用持久化,通过指定一个会保存工作流进度的检查点器(checkpointer)
  2. 在执行工作流时指定一个线程标识符(thread identifier)。它将跟踪该工作流实例的执行历史。

  3. 将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)封装在 @[tasks][task] 中,以确保在恢复工作流时,这些操作不会在同一次运行中被重复执行,而是从持久化层检索其结果。更多信息请参见确定性与一致回放

确定性与一致回放

当你恢复某次工作流运行时,代码并不会从执行停止的那一行继续;而是会找到一个合适的起始点来接续执行。这意味着工作流会从该起始点开始回放所有步骤,直到抵达停止位置。

因此,在为持久化执行编写工作流时,必须将所有非确定性操作(例如随机数生成)以及所有具有副作用的操作(例如文件写入、API 调用)封装到任务(tasks)节点(nodes)中。

为确保你的工作流是确定性的并且可以一致地回放,请遵循以下指南:

  • 避免重复工作:如果一个节点包含多个具有副作用的操作(例如日志、文件写入或网络调用),请将每个操作封装为单独的任务(task)。这样在恢复时,这些操作不会被重复执行,而是从持久化层获取其结果。
  • 封装非确定性操作:将任何可能产生非确定性结果的代码(例如随机数生成)封装在任务(task)或节点中。这样在恢复时,工作流可以按照已记录的步骤序列获得相同的结果。
  • 使用幂等操作:尽量确保副作用操作(例如 API 调用、文件写入)是幂等的。这意味着在工作流失败后重试时,其效果与第一次执行相同。对于会导致数据写入的操作尤为重要。如果某个任务启动但未成功完成,工作流恢复时会重新运行该任务,并基于已记录的结果保持一致性。通过使用幂等键(idempotency keys)或检查已存在结果来避免意外重复,从而确保工作流顺畅且可预期地执行。

有关应避免的陷阱示例,请参见函数式 API 中的常见陷阱部分,其中展示了如何使用任务(tasks)来组织代码以避免这些问题。同样的原则也适用于 @[StateGraph (Graph API)][StateGraph]。

耐久性模式

LangGraph 支持三种耐久性模式,你可以根据应用的需求在性能与数据一致性之间进行权衡。从低到高分别是:

  • "exit"
  • "async"
  • "sync"

更高的耐久性模式会给工作流执行带来更多的开销。

在 v0.6.0 中新增

使用 durability 参数(v0.6.0 中弃用 checkpoint_during)来管理持久化策略:

  • durability="async" 取代 checkpoint_during=True
  • durability="exit" 取代 checkpoint_during=False

对应关系如下:

  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

仅在图执行完成时(成功或出错)持久化变更。这为长时间运行的图提供最佳性能,但不会保存中间状态,因此无法从执行中途的故障恢复,也不能中断执行后再续跑。

"async"

在执行下一步的同时异步持久化变更。这在性能与耐久性之间取得了良好平衡,但如果进程在执行期间崩溃,仍存在少量检查点未落盘的风险。

"sync"

在开始下一步之前同步持久化变更。它确保在继续执行前已写入每一个检查点,提供高耐久性,但会带来一定的性能开销。

你可以在调用任意图执行方法时指定耐久性模式:

graph.stream(
    {"input": "test"}, 
    durability="sync"
)

在节点中使用 tasks

如果一个节点包含多个操作,可能更容易将每个操作转换为一个任务(task),而不是将它们重构为多个独立节点。

<sup><i>API Reference: <a href="https://langchain-ai.github.io/langgraph/reference/checkpoints/#langgraph.checkpoint.memory.InMemorySaver">InMemorySaver</a> | <a href="https://langchain-ai.github.io/langgraph/reference/graphs/#langgraph.graph.state.StateGraph">StateGraph</a> | <a href="https://langchain-ai.github.io/langgraph/reference/constants/#langgraph.constants.START">START</a> | <a href="https://langchain-ai.github.io/langgraph/reference/constants/#langgraph.constants.END">END</a></i></sup>

```python hl_lines="16"
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义用于表示状态的 TypedDict
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """示例节点:发起一个 API 请求。"""
    result = requests.get(state['url']).text[:100]  # 具有副作用
    return {
        "result": result
    }

# 创建 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 连接起始与结束节点到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点器
checkpointer = InMemorySaver()

# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义带线程 ID 的配置
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
=== "使用 task"
API Reference: InMemorySaver | task | StateGraph | START | END

from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests

# 定义用于表示状态的 TypedDict
class State(TypedDict):
    urls: list[str]
    result: NotRequired[list[str]]


@task
def _make_request(url: str):
    """发起请求。"""
    return requests.get(url).text[:100]

def call_api(state: State):
    """示例节点:发起一个 API 请求。"""
    requests = [_make_request(url) for url in state['urls']]
    results = [request.result() for request in requests]
    return {
        "results": results
    }

# 创建 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 连接起始与结束节点到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点器
checkpointer = InMemorySaver()

# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义带线程 ID 的配置
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)
```

恢复工作流

启用工作流的持久化执行后,你可以在以下场景恢复执行:

  • 暂停与恢复工作流:使用 @[interrupt][interrupt] 函数在特定位置暂停工作流,并使用 @[Command] 原语在更新状态后恢复。详见人类参与(Human-in-the-Loop)
  • 从失败中恢复:在异常(例如 LLM 提供商故障)后,从最后一次成功的检查点自动恢复工作流。这需要使用相同的线程标识符,并将输入值设为 None 来执行工作流(参见函数式 API 的这个示例)。

恢复工作流的起点

  • 如果你使用 @[StateGraph (Graph API)][StateGraph],起点是执行停止处所在节点(node)的开始位置。
  • 如果你在某个节点内调用了子图,那么起点将是调用该被中止子图的父节点。 在子图内部,起点则是执行停止的那个具体节点(node)
  • 如果你使用函数式 API,起点是执行停止处所在入口函数(entrypoint)的开始位置。