持久化执行¶
持久化执行(Durable execution)是一种技术,使进程或工作流能够在关键点保存其进度,从而可以暂停并在之后精确地从停止处继续。这在需要人类参与的场景中特别有用——用户可以在继续前检查、验证或修改流程;也适用于运行时间较长且可能遭遇中断或错误(例如 LLM 调用超时)的任务。通过保留已完成的工作,持久化执行使流程在较长时间后(例如一周后)恢复时无需重新处理先前步骤。
LangGraph 内置的持久化(persistence)层为工作流提供持久化执行,确保每个执行步骤的状态都被保存到持久化存储中。借助这一能力,无论是系统故障中断还是为了人类参与而主动中止,工作流都可以从上次记录的状态恢复。
Tip
如果你在使用带有检查点(checkpointer)的 LangGraph,那么你已经启用了持久化执行。你可以在任意时刻暂停并恢复工作流,即使经历了中断或故障。 为了最大化持久化执行的效果,请确保你的工作流是确定性的并且是幂等的,并将所有副作用或非确定性操作封装到任务(tasks)中。你可以在StateGraph(图 API)和函数式 API中使用任务(tasks)。
要求¶
要在 LangGraph 中使用持久化执行,你需要:
- 在工作流中启用持久化,通过指定一个会保存工作流进度的检查点器(checkpointer)。
-
在执行工作流时指定一个线程标识符(thread identifier)。它将跟踪该工作流实例的执行历史。
-
将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)封装在 @[tasks][task] 中,以确保在恢复工作流时,这些操作不会在同一次运行中被重复执行,而是从持久化层检索其结果。更多信息请参见确定性与一致回放。
确定性与一致回放¶
当你恢复某次工作流运行时,代码并不会从执行停止的那一行继续;而是会找到一个合适的起始点来接续执行。这意味着工作流会从该起始点开始回放所有步骤,直到抵达停止位置。
因此,在为持久化执行编写工作流时,必须将所有非确定性操作(例如随机数生成)以及所有具有副作用的操作(例如文件写入、API 调用)封装到任务(tasks)或节点(nodes)中。
为确保你的工作流是确定性的并且可以一致地回放,请遵循以下指南:
- 避免重复工作:如果一个节点包含多个具有副作用的操作(例如日志、文件写入或网络调用),请将每个操作封装为单独的任务(task)。这样在恢复时,这些操作不会被重复执行,而是从持久化层获取其结果。
- 封装非确定性操作:将任何可能产生非确定性结果的代码(例如随机数生成)封装在任务(task)或节点中。这样在恢复时,工作流可以按照已记录的步骤序列获得相同的结果。
- 使用幂等操作:尽量确保副作用操作(例如 API 调用、文件写入)是幂等的。这意味着在工作流失败后重试时,其效果与第一次执行相同。对于会导致数据写入的操作尤为重要。如果某个任务启动但未成功完成,工作流恢复时会重新运行该任务,并基于已记录的结果保持一致性。通过使用幂等键(idempotency keys)或检查已存在结果来避免意外重复,从而确保工作流顺畅且可预期地执行。
有关应避免的陷阱示例,请参见函数式 API 中的常见陷阱部分,其中展示了如何使用任务(tasks)来组织代码以避免这些问题。同样的原则也适用于 @[StateGraph (Graph API)][StateGraph]。
耐久性模式¶
LangGraph 支持三种耐久性模式,你可以根据应用的需求在性能与数据一致性之间进行权衡。从低到高分别是:
"exit"
"async"
"sync"
更高的耐久性模式会给工作流执行带来更多的开销。
在 v0.6.0 中新增
使用 durability
参数(v0.6.0 中弃用 checkpoint_during
)来管理持久化策略:
durability="async"
取代checkpoint_during=True
durability="exit"
取代checkpoint_during=False
对应关系如下:
checkpoint_during=True
->durability="async"
checkpoint_during=False
->durability="exit"
"exit"
¶
仅在图执行完成时(成功或出错)持久化变更。这为长时间运行的图提供最佳性能,但不会保存中间状态,因此无法从执行中途的故障恢复,也不能中断执行后再续跑。
"async"
¶
在执行下一步的同时异步持久化变更。这在性能与耐久性之间取得了良好平衡,但如果进程在执行期间崩溃,仍存在少量检查点未落盘的风险。
"sync"
¶
在开始下一步之前同步持久化变更。它确保在继续执行前已写入每一个检查点,提供高耐久性,但会带来一定的性能开销。
你可以在调用任意图执行方法时指定耐久性模式:
在节点中使用 tasks¶
如果一个节点包含多个操作,可能更容易将每个操作转换为一个任务(task),而不是将它们重构为多个独立节点。
<sup><i>API Reference: <a href="https://langchain-ai.github.io/langgraph/reference/checkpoints/#langgraph.checkpoint.memory.InMemorySaver">InMemorySaver</a> | <a href="https://langchain-ai.github.io/langgraph/reference/graphs/#langgraph.graph.state.StateGraph">StateGraph</a> | <a href="https://langchain-ai.github.io/langgraph/reference/constants/#langgraph.constants.START">START</a> | <a href="https://langchain-ai.github.io/langgraph/reference/constants/#langgraph.constants.END">END</a></i></sup>
```python hl_lines="16"
from typing import NotRequired
from typing_extensions import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests
# 定义用于表示状态的 TypedDict
class State(TypedDict):
url: str
result: NotRequired[str]
def call_api(state: State):
"""示例节点:发起一个 API 请求。"""
result = requests.get(state['url']).text[:100] # 具有副作用
return {
"result": result
}
# 创建 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# 连接起始与结束节点到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# 指定检查点器
checkpointer = InMemorySaver()
# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)
# 定义带线程 ID 的配置
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}
# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests
# 定义用于表示状态的 TypedDict
class State(TypedDict):
urls: list[str]
result: NotRequired[list[str]]
@task
def _make_request(url: str):
"""发起请求。"""
return requests.get(url).text[:100]
def call_api(state: State):
"""示例节点:发起一个 API 请求。"""
requests = [_make_request(url) for url in state['urls']]
results = [request.result() for request in requests]
return {
"results": results
}
# 创建 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# 连接起始与结束节点到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# 指定检查点器
checkpointer = InMemorySaver()
# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)
# 定义带线程 ID 的配置
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}
# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)
恢复工作流¶
启用工作流的持久化执行后,你可以在以下场景恢复执行:
- 暂停与恢复工作流:使用 @[interrupt][interrupt] 函数在特定位置暂停工作流,并使用 @[Command] 原语在更新状态后恢复。详见人类参与(Human-in-the-Loop)。
- 从失败中恢复:在异常(例如 LLM 提供商故障)后,从最后一次成功的检查点自动恢复工作流。这需要使用相同的线程标识符,并将输入值设为
None
来执行工作流(参见函数式 API 的这个示例)。
恢复工作流的起点¶
- 如果你使用 @[StateGraph (Graph API)][StateGraph],起点是执行停止处所在节点(node)的开始位置。
- 如果你在某个节点内调用了子图,那么起点将是调用该被中止子图的父节点。 在子图内部,起点则是执行停止的那个具体节点(node)。
- 如果你使用函数式 API,起点是执行停止处所在入口函数(entrypoint)的开始位置。