跳转至

LangGraph 运行时

Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用的执行。

编译 StateGraph 或创建 entrypoint 会生成一个 Pregel 实例,它可以用输入进行调用。

本指南从高层次介绍运行时,并提供使用 Pregel 直接实现应用的说明。

注意: Pregel 运行时以 Google 的 Pregel 算法 命名,该算法描述了使用图进行大规模并行计算的高效方法。

概览

在 LangGraph 中,Pregel 将参与者(actors)和**通道(channels)组合为一个应用。**参与者**从通道读取数据并向通道写入数据。Pregel 遵循**Pregel 算法/**批量同步并行(Bulk Synchronous Parallel)**模型,将应用的执行组织为多个步骤。

每个步骤包含三个阶段:

  • 计划(Plan):确定在本步骤中要执行哪些**参与者**。例如,在第一步,选择订阅特殊**输入**通道的**参与者**;在随后的步骤中,选择订阅上一步已更新通道的**参与者**。
  • 执行(Execution):并行执行所有选定的**参与者**,直到全部完成、或其中一个失败、或达到超时时间。在此阶段,此步骤中的通道更新对参与者不可见,直到下一步才可见。
  • 更新(Update):用本步骤中**参与者**写入的值更新通道。

重复上述步骤,直到没有**参与者**被选中执行,或达到最大步骤数。

参与者

一个**参与者**是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。它可被视为 Pregel 算法中的一个**参与者**。PregelNode 实现了 LangChain 的 Runnable 接口。

通道

通道用于在参与者(PregelNode)之间通信。每个通道具有一个值类型、一个更新类型,以及一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于在一个链与另一个链之间传递数据,或在将来的步骤中让一个链向自身传递数据。LangGraph 提供了若干内置通道:

  • LastValue:默认通道,存储发送到该通道的最后一个值;适用于输入和输出值,或用于将数据从一步传递到下一步。
  • Topic:可配置的发布-订阅主题(PubSub Topic),适合在**参与者**之间发送多个值或用于累积输出。可以配置为对值去重,或在多个步骤中累积值。
  • BinaryOperatorAggregate:存储一个持久值,通过将二元运算符依次应用到当前值和每个发送到通道的更新来进行更新;适用于跨多个步骤计算聚合;例如:total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户会通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。

下面通过几个不同的示例来帮助你了解 Pregel API。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}

此示例演示如何使用 BinaryOperatorAggregate 通道来实现一个 reducer。

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder


node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

def reducer(current, update):
    if current:
        return current + " | " + update
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})

此示例演示如何通过让某个链写入它所订阅的通道来在图中引入一个环。执行将持续进行,直到向该通道写入 None 值。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry

example_node = (
    NodeBuilder().subscribe_only("value")
    .do(lambda x: x + x if len(x) < 10 else None)
    .write_to(ChannelWriteEntry("value", skip_none=True))
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}

高级 API

LangGraph 提供两种用于创建 Pregel 应用的高级 API:StateGraph(图 API)函数式 API

StateGraph(图 API) 是一个更高层次的抽象,简化了 Pregel 应用的创建。它允许你定义由节点和边组成的图。当你编译该图时,StateGraph API 会为你自动创建 Pregel 应用。

from typing import TypedDict, Optional

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()

编译后的 Pregel 实例将关联一组节点和通道。你可以通过打印它们来查看这些节点和通道。

print(graph.nodes)

你会看到类似如下的内容:

{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)

你应该会看到类似如下的内容

{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}

函数式 API 中,你可以使用 entrypoint 创建一个 Pregel 应用。entrypoint 装饰器允许你定义一个接收输入并返回输出的函数。

from typing import TypedDict, Optional

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]


checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}