import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder
from uuid_utils.compat import uuid7
def create_dotted_order(
start_time: datetime | None = None,
run_id: uuid.UUID | None = None
) -> str:
"""为运行排序和层次结构创建点序字符串。
点序用于建立运行之间的顺序和关系。
它结合了时间戳和唯一标识符,以确保正确的排序和追踪。
"""
st = start_time or datetime.now(timezone.utc)
id_ = run_id or uuid7()
return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"
def create_run_base(
name: str,
run_type: str,
inputs: dict,
start_time: datetime
) -> dict:
"""创建运行的基础结构。"""
run_id = uuid7()
return {
"id": str(run_id),
"trace_id": str(run_id),
"name": name,
"start_time": start_time.isoformat(),
"inputs": inputs,
"run_type": run_type,
}
def construct_run(
name: str,
run_type: str,
inputs: dict,
parent_dotted_order: str | None = None,
) -> dict:
"""使用给定参数构造运行字典。
此函数创建一个具有唯一 ID 和点序的运行,如果它是子运行,则建立其在追踪层次结构中的位置。
"""
start_time = datetime.now(timezone.utc)
run = create_run_base(name, run_type, inputs, start_time)
current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))
if parent_dotted_order:
current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]
run["dotted_order"] = current_dotted_order
return run
def serialize_run(operation: str, run_data: dict) -> List[tuple]:
"""为多部分请求序列化运行。
此函数将运行数据分离为多个部分,以便高效传输和存储。
主要的运行数据和可选字段(输入、输出、事件)被分别序列化。
"""
run_id = run_data.get("id", str(uuid7()))
# 分离可选字段
inputs = run_data.pop("inputs", None)
outputs = run_data.pop("outputs", None)
events = run_data.pop("events", None)
parts = []
# 序列化主要运行数据
run_data_json = json.dumps(run_data).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}",
(
None,
run_data_json,
"application/json",
{"Content-Length": str(len(run_data_json))},
),
)
)
# 序列化可选字段
for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
if value:
serialized_value = json.dumps(value).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}.{key}",
(
None,
serialized_value,
"application/json",
{"Content-Length": str(len(serialized_value))},
),
)
)
return parts
def batch_ingest_runs(
api_url: str,
api_key: str,
posts: list[dict] | None = None,
patches: list[dict] | None = None,
) -> None:
"""在单个批量请求中摄取多个运行。
此函数处理创建新运行(发布)和更新现有运行(修补)。
与单独的 API 调用相比,它在摄取多个运行时效率更高。
"""
boundary = uuid.uuid4().hex
all_parts = []
for operation, runs in zip(("post", "patch"), (posts, patches)):
if runs:
all_parts.extend(
[part for run in runs for part in serialize_run(operation, run)]
)
encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}
try:
response = requests.post(
f"{api_url}/runs/multipart",
data=encoder,
headers=headers
)
response.raise_for_status()
print("成功摄取运行。")
except requests.RequestException as e:
print(f"摄取运行时出错:{e}")
# 在生产环境中,您可能希望记录此错误或更稳健地处理它
# 配置 API URL 和密钥
# 对于生产用途,请考虑使用配置文件或环境变量
api_url = "https://api.smith.langchain.com"
api_key = os.environ.get("LANGSMITH_API_KEY")
if not api_key:
raise ValueError("未设置 LANGSMITH_API_KEY 环境变量")
# 创建一个父运行
parent_run = construct_run(
name="父运行",
run_type="chain",
inputs={"main_question": "告诉我关于法国"},
)
# 创建一个子运行,链接到父运行
child_run = construct_run(
name="子运行",
run_type="llm",
inputs={"question": "法国的首都是什么?"},
parent_dotted_order=parent_run["dotted_order"],
)
# 首先,发布运行以创建它们
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)
# 然后,用结束时间和任何输出更新运行
child_run_update = {
**child_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"answer": "巴黎是法国的首都。"},
}
parent_run_update = {
**parent_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"summary": "关于法国的讨论,包括其首都。"},
}
patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)
# 注意:此示例需要 `requests` 和 `requests_toolbelt` 库。
# 您可以使用 pip 安装它们:
# pip install requests requests_toolbelt