Documentation Index
Fetch the complete documentation index at: https://langchain-zh.cn/llms.txt
Use this file to discover all available pages before exploring further.
Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行。
编译一个 StateGraph 或创建一个 entrypoint 会产生一个 Pregel 实例,该实例可以通过输入进行调用。
本指南从高层次解释了运行时,并提供了直接使用 Pregel 实现应用程序的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法 命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
在 LangGraph 中,Pregel 将 执行器 和 通道 组合到一个应用程序中。执行器 从通道读取数据并向通道写入数据。Pregel 按照 Pregel 算法/批量同步并行 模型将应用程序的执行组织为多个步骤。
每个步骤包含三个阶段:
- 规划:确定在此步骤中要执行哪些 执行器。例如,在第一步中,选择订阅特殊 输入 通道的 执行器;在后续步骤中,选择订阅上一步中更新的通道的 执行器。
- 执行:并行执行所有选定的 执行器,直到全部完成、某个失败或达到超时。在此阶段,通道更新对执行器不可见,直到下一步。
- 更新:使用此步骤中 执行器 写入的值更新通道。
重复此过程,直到没有 执行器 被选中执行,或达到最大步骤数。
执行器
一个 执行器 是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。可以将其视为 Pregel 算法中的 执行器。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道用于在执行器(PregelNodes)之间进行通信。每个通道都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据从一个链发送到自身。LangGraph 提供了许多内置通道:
LastValue:默认通道,存储发送到通道的最后一个值,适用于输入和输出值,或用于将数据从一个步骤发送到下一个步骤。
Topic:一个可配置的发布-订阅主题,适用于在 执行器 之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤中累积值。
BinaryOperatorAggregate:存储一个持久值,通过对当前值和发送到通道的每个更新应用二元运算符来更新,适用于在多个步骤中计算聚合;例如,total = BinaryOperatorAggregate(int, operator.add)
虽然大多数用户将通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,让您了解 Pregel API。
import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b");
const app = new Pregel({
nodes: { node1 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
},
inputChannels: ["a"],
outputChannels: ["b"],
});
await app.invoke({ a: "foo" });
import { LastValue, EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b");
const node2 = new NodeBuilder()
.subscribeOnly("b")
.do((x: string) => x + x)
.writeTo("c");
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new LastValue<string>(),
c: new EphemeralValue<string>(),
},
inputChannels: ["a"],
outputChannels: ["b", "c"],
});
await app.invoke({ a: "foo" });
{ b: 'foofoo', c: 'foofoofoofoo' }
import { EphemeralValue, Topic } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b", "c");
const node2 = new NodeBuilder()
.subscribeTo("b")
.do((x: { b: string }) => x.b + x.b)
.writeTo("c");
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
c: new Topic<string>({ accumulate: true }),
},
inputChannels: ["a"],
outputChannels: ["c"],
});
await app.invoke({ a: "foo" });
{ c: ['foofoo', 'foofoofoofoo'] }
此示例演示如何使用 BinaryOperatorAggregate 通道实现一个归约器。import { EphemeralValue, BinaryOperatorAggregate } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder } from "@langchain/langgraph/pregel";
const node1 = new NodeBuilder()
.subscribeOnly("a")
.do((x: string) => x + x)
.writeTo("b", "c");
const node2 = new NodeBuilder()
.subscribeOnly("b")
.do((x: string) => x + x)
.writeTo("c");
const reducer = (current: string, update: string) => {
if (current) {
return current + " | " + update;
} else {
return update;
}
};
const app = new Pregel({
nodes: { node1, node2 },
channels: {
a: new EphemeralValue<string>(),
b: new EphemeralValue<string>(),
c: new BinaryOperatorAggregate<string>({ operator: reducer }),
},
inputChannels: ["a"],
outputChannels: ["c"],
});
await app.invoke({ a: "foo" });
此示例演示如何通过让一个链写入它订阅的通道来在图中引入循环。执行将持续进行,直到向通道写入一个 null 值。import { EphemeralValue } from "@langchain/langgraph/channels";
import { Pregel, NodeBuilder, ChannelWriteEntry } from "@langchain/langgraph/pregel";
const exampleNode = new NodeBuilder()
.subscribeOnly("value")
.do((x: string) => x.length < 10 ? x + x : null)
.writeTo(new ChannelWriteEntry("value", { skipNone: true }));
const app = new Pregel({
nodes: { exampleNode },
channels: {
value: new EphemeralValue<string>(),
},
inputChannels: ["value"],
outputChannels: ["value"],
});
await app.invoke({ value: "a" });
{ value: 'aaaaaaaaaaaaaaaa' }
高级 API
LangGraph 提供了两个用于创建 Pregel 应用程序的高级 API:StateGraph(图 API) 和 函数式 API。
StateGraph(图 API)
函数式 API
StateGraph(图 API) 是一个更高级别的抽象,简化了 Pregel 应用程序的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用程序。import { START, StateGraph } from "@langchain/langgraph";
interface Essay {
topic: string;
content?: string;
score?: number;
}
const writeEssay = (essay: Essay) => {
return {
content: `Essay about ${essay.topic}`,
};
};
const scoreEssay = (essay: Essay) => {
return {
score: 10
};
};
const builder = new StateGraph<Essay>({
channels: {
topic: null,
content: null,
score: null,
}
})
.addNode("writeEssay", writeEssay)
.addNode("scoreEssay", scoreEssay)
.addEdge(START, "writeEssay")
.addEdge("writeEssay", "scoreEssay");
// 编译图。
// 这将返回一个 Pregel 实例。
const graph = builder.compile();
编译后的 Pregel 实例将与一系列节点和通道相关联。您可以通过打印它们来检查节点和通道。console.log(graph.nodes);
您将看到类似这样的内容:{
__start__: PregelNode { ... },
writeEssay: PregelNode { ... },
scoreEssay: PregelNode { ... }
}
console.log(graph.channels);
您应该看到类似这样的内容{
topic: LastValue { ... },
content: LastValue { ... },
score: LastValue { ... },
__start__: EphemeralValue { ... },
writeEssay: EphemeralValue { ... },
scoreEssay: EphemeralValue { ... },
'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
'start:writeEssay': EphemeralValue { ... }
}
在 函数式 API 中,您可以使用 entrypoint 来创建 Pregel 应用程序。entrypoint 装饰器允许您定义一个接收输入并返回输出的函数。import { MemorySaver } from "@langchain/langgraph";
import { entrypoint } from "@langchain/langgraph/func";
interface Essay {
topic: string;
content?: string;
score?: number;
}
const checkpointer = new MemorySaver();
const writeEssay = entrypoint(
{ checkpointer, name: "writeEssay" },
async (essay: Essay) => {
return {
content: `Essay about ${essay.topic}`,
};
}
);
console.log("节点: ");
console.log(writeEssay.nodes);
console.log("通道: ");
console.log(writeEssay.channels);
节点:
{ writeEssay: PregelNode { ... } }
通道:
{
__start__: EphemeralValue { ... },
__end__: LastValue { ... },
__previous__: LastValue { ... }
}