Documentation Index
Fetch the complete documentation index at: https://langchain-zh.cn/llms.txt
Use this file to discover all available pages before exploring further.
本指南提供了快速开始使用 Drasi 工具的概述。有关所有 Drasi 功能、参数和配置的详细列表,请前往 Drasi 文档,以及 langchain_drasi 仓库。
Drasi 是一个变更检测平台,使检测和响应数据库中的变更变得简单高效。LangChain-Drasi 集成通过连接外部数据变更与工作流执行,创建响应式、变更驱动的 AI 智能体。这使得智能体能够通过桥接外部数据变更与智能体工作流,发现、订阅并响应实时查询更新。Drasi 连续查询流式传输实时更新,触发智能体状态转换、修改内存或动态控制工作流执行——将静态智能体转变为环境感知型长生命周期、响应式系统。
- 查询发现 - 自动识别可用的 Drasi 查询
- 实时订阅 - 监控连续查询更新
- 通知处理器 - 六种内置处理器用于不同用例
- 控制台
- 日志
- 内存
- 缓冲区
- LangChain 内存
- LangGraph 内存
- 自定义处理器 - 扩展基础处理器以进行特定领域的逻辑
要访问 Drasi 工具,您需要运行 Drasi 和 Drasi MCP 服务器。
先决条件
凭据(可选)
如果您的 Drasi MCP 服务器需要身份验证,您可以使用 Bearer token 或其他身份验证方法配置请求头:
from langchain_drasi import MCPConnectionConfig
config = MCPConnectionConfig(
server_url="http://localhost:8083",
headers={"Authorization": "Bearer your-token"},
timeout=30.0
)
Drasi 工具位于 langchain-drasi 包中:
pip install -U langchain-drasi
实例化
现在我们可以实例化一个 Drasi 工具的实例。您需要配置 MCP 连接,并可选地添加通知处理器来处理实时更新:
from langchain_drasi import create_drasi_tool, MCPConnectionConfig, ConsoleHandler
# Configure connection to Drasi MCP server
config = MCPConnectionConfig(
server_url="http://localhost:8083",
timeout=30.0
)
# Create a notification handler
handler = ConsoleHandler()
# Create the tool
tool = create_drasi_tool(
mcp_config=config,
notification_handlers=[handler]
)
直接调用
以下是直接调用工具的简单示例。
# Discover available queries
queries = await tool.discover_queries()
# Returns: [QueryInfo, QueryInfo, ...]
# Subscribe to a specific query
await tool.subscribe("hot-freezers")
# Notifications routed to registered handlers
# Read current results from a query
result = await tool.read_query("active-orders")
# Returns: QueryResult with current data
我们也可以使用模型生成的 ToolCall 来调用工具,在这种情况下将返回一个 ToolMessage。
在智能体内
我们可以在 LangGraph 智能体中使用 Drasi 工具来创建响应式、事件驱动的工作流。为此我们需要一个具有工具调用能力的模型。
from langchain_anthropic import ChatAnthropic
from langchain.agents import create_agent
# Initialize the model
model = ChatAnthropic(model="claude-sonnet-4-6")
# Create agent with Drasi tool
agent = create_agent(model, [tool])
# Run the agent
result = agent.invoke(
{"messages": [{"role": "user", "content": "What queries are available?"}]}
)
print(result["messages"][-1].content)
result = agent.invoke(
{"messages": [{"role": "user", "content": "Subscribe to the customer-orders query"}]}
)
print(result["messages"][-1].content)
通知处理器
Drasi 的关键功能之一是其内置的通知处理器,用于处理实时查询结果变更。您可以使用这些处理器根据数据变更采取特定操作。
内置处理器
ConsoleHandler - 将格式化的通知输出到 stdout:
from langchain_drasi import ConsoleHandler
handler = ConsoleHandler()
LoggingHandler - 使用 Python 的 logging 框架记录通知:
from langchain_drasi import LoggingHandler
import logging
handler = LoggingHandler(
logger_name="drasi.notifications",
log_level=logging.INFO
)
MemoryHandler - 将通知存储在内存中,支持可选过滤:
from langchain_drasi import MemoryHandler
handler = MemoryHandler(max_size=100)
# Retrieve notifications
all_notifs = handler.get_all()
freezer_notifs = handler.get_by_query("hot-freezers")
added_events = handler.get_by_type("added")
BufferHandler - 用于顺序处理的 FIFO 队列:
这对于缓冲传入的变更通知很有用,特别是当您的工作流正忙于其他任务时;然后您可以在工作流中设置循环,以便在工作流准备好时从缓冲区消耗通知。
from langchain_drasi import BufferHandler
handler = BufferHandler(max_size=100)
# Later, consume notifications
notification = handler.consume() # Remove and return next notification
notification = handler.peek() # View next notification without removing
LangGraphMemoryHandler - 直接将更新注入到 LangGraph 检查点:
from langchain_drasi import LangGraphMemoryHandler
from langgraph.checkpoint.memory import MemorySaver
checkpoint_manager = MemorySaver()
handler = LangGraphMemoryHandler(
checkpointer=checkpoint_manager,
thread_id="your-thread-id"
)
自定义处理器
您可以通过扩展 BaseDrasiNotificationHandler 来创建自定义处理器:
from langchain_drasi import BaseDrasiNotificationHandler
class CustomHandler(BaseDrasiNotificationHandler):
def on_result_added(self, query_name: str, added_data: dict):
# Handle new results
print(f"New result in {query_name}: {added_data}")
def on_result_updated(self, query_name: str, updated_data: dict):
# Handle updated results
print(f"Updated result in {query_name}: {updated_data}")
def on_result_deleted(self, query_name: str, deleted_data: dict):
# Handle deleted results
print(f"Deleted result in {query_name}: {deleted_data}")
handler = CustomHandler()
tool = create_drasi_tool(
mcp_config=config,
notification_handlers=[handler]
)
- 交互式聊天:使用 Drasi 进行实时内存更新的聊天应用程序。
- 终结者游戏:利用 Drasi 实现动态 NPC 行为的游戏。
Drasi 特别适用于构建需要响应实时数据变更的环境感知型智能体。一些示例用例包括:
- AI 副驾驶 - 监控并响应系统事件的助手
- AI 游戏玩家 - 适应游戏内事件的 NPC
- 物联网监控 - 处理传感器数据流的智能体
- 客户支持 - 对工单更新或客户操作做出反应的机器人
- DevOps 助手 - 监控基础设施变更的工具
- 协作编辑 - 响应文档或代码变更的系统
API 参考
有关所有 Drasi 功能和配置的详细文档,请前往 API 参考。