第9章:端到端实战 —— 三套可跑示例 + LongMemEval 评测
Mem0 + LangGraph 个人助理、Zep 时间感知客服、Letta OS-style 长会话三个端到端示例,带完整代码和 LongMemEval 评测脚本
把前 8 章的知识落地到代码——本章给三套完全可跑的端到端示例,你可以在本地用 Python 3.10 + 一个 OpenAI API key 跑通。涵盖三种典型场景:个性化助理(Mem0 + LangGraph)、时间感知客服(Zep + Graphiti)、OS-style 长会话(Letta)。最后接 LongMemEval 给出对比评测脚本,以及一份上线 checklist。
📑 目录
- 0. 通用准备
- 1. 场景 A:个人助理(Mem0 + LangGraph + Qdrant)
- 2. 场景 B:时间感知客服(Zep + Graphiti)
- 3. 场景 C:OS-style 长会话(Letta)
- 4. 接 LongMemEval 评测三套方案
- 5. 上线 checklist
- 6. 监控指标与成本估算
- 自我检验清单
- 参考资料
0. 通用准备
0.1 环境
# Python 3.10+
python --version
# 创建独立环境
python -m venv .venv && source .venv/bin/activate
# 安装公共依赖
pip install openai python-dotenv
0.2 API key
# .env
OPENAI_API_KEY=sk-...
# 可选:
ANTHROPIC_API_KEY=sk-ant-...
ZEP_API_KEY=z_... # 注册 https://www.getzep.com 拿
0.3 容器(Docker)
为了让 Qdrant / Neo4j 等开箱即用,建议 Docker:
# Qdrant
docker run -d --name qdrant -p 6333:6333 -v $(pwd)/qdrant_data:/qdrant/storage qdrant/qdrant
# Neo4j(场景 B 需要)
docker run -d --name neo4j -p 7474:7474 -p 7687:7687 \
-e NEO4J_AUTH=neo4j/password \
-v $(pwd)/neo4j_data:/data \
neo4j:5.20
# Letta server(场景 C)
docker run -d --name letta -p 8283:8283 \
-e OPENAI_API_KEY=$OPENAI_API_KEY \
letta/letta:latest
1. 场景 A:个人助理(Mem0 + LangGraph + Qdrant)
目标:搭一个能记住用户偏好、跨 session 续聊的助理。
1.1 依赖
pip install langgraph langchain-openai mem0ai qdrant-client
1.2 完整代码 assistant_a.py
"""
场景 A:个人助理
- LangGraph 管对话主循环 + state
- Mem0 管 long-term memory(Qdrant 后端)
- 跨 session 持久化
"""
import os
from typing import TypedDict
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from mem0 import Memory
load_dotenv()
# === 1. 初始化 Mem0(Qdrant 后端) ===
mem0_config = {
"vector_store": {
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333,
"collection_name": "assistant_a_memory",
"embedding_model_dims": 1536,
},
},
"llm": {
"provider": "openai",
"config": {"model": "gpt-4o-mini", "temperature": 0.1},
},
"embedder": {
"provider": "openai",
"config": {"model": "text-embedding-3-small"},
},
}
memory = Memory.from_config(mem0_config)
# === 2. LangGraph state ===
class AgentState(TypedDict):
messages: list
user_id: str
retrieved_memories: list
# === 3. 节点函数 ===
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
def retrieve_memory_node(state: AgentState):
"""从 Mem0 检索相关 long-term memory。"""
last_user_msg = state["messages"][-1].content
results = memory.search(
query=last_user_msg,
user_id=state["user_id"],
limit=5,
)
return {"retrieved_memories": results.get("results", [])}
def respond_node(state: AgentState):
"""组装 system prompt + memory + history,生成回复。"""
memories_text = "\n".join(
f"- {m['memory']}" for m in state["retrieved_memories"]
) or "(暂无相关记忆)"
sys_prompt = f"""你是一个友好的助理。以下是你关于这个用户的长期记忆:
{memories_text}
请基于这些记忆回答用户问题,优先用记忆里的偏好。"""
response = llm.invoke([
SystemMessage(content=sys_prompt),
*state["messages"],
])
return {"messages": state["messages"] + [response]}
def write_memory_node(state: AgentState):
"""对话结束后,把这一轮的事实写入 Mem0。"""
# 取最近一对 user/assistant 消息
recent = state["messages"][-2:]
msgs_for_mem = [
{"role": "user" if isinstance(m, HumanMessage) else "assistant",
"content": m.content}
for m in recent
]
memory.add(messages=msgs_for_mem, user_id=state["user_id"])
return {} # state 不变
# === 4. 编排 graph ===
graph = StateGraph(AgentState)
graph.add_node("retrieve", retrieve_memory_node)
graph.add_node("respond", respond_node)
graph.add_node("write_memory", write_memory_node)
graph.set_entry_point("retrieve")
graph.add_edge("retrieve", "respond")
graph.add_edge("respond", "write_memory")
graph.add_edge("write_memory", END)
app = graph.compile(checkpointer=MemorySaver())
# === 5. 主循环 ===
def chat(user_id: str, message: str, thread_id: str = "default"):
state = {
"messages": [HumanMessage(content=message)],
"user_id": user_id,
"retrieved_memories": [],
}
result = app.invoke(
state,
config={"configurable": {"thread_id": f"{user_id}_{thread_id}"}},
)
return result["messages"][-1].content
if __name__ == "__main__":
# === Demo:Session 1 ===
print("=== Session 1 ===")
print(chat("alice", "我对乳糖不耐受,以后推荐餐厅请避开奶制品"))
print(chat("alice", "我最爱意大利菜"))
# === Demo:Session 2(模拟新进程的重启) ===
print("\n=== Session 2(模拟新进程) ===")
print(chat("alice", "推荐一家适合周末早午餐的店,不要太远"))
# 期待:agent 主动避开奶制品并优先意大利菜
1.3 运行
python assistant_a.py
预期输出(Session 2):
推荐你试试 "Forno" 意大利餐厅,无奶选项很丰富,记得到时候和服务员
确认你的乳糖不耐受需求...
🌟 关键点:Session 2 是新进程,但 Mem0 的 Qdrant 持久化让 alice 的偏好跨进程保留。
2. 场景 B:时间感知客服(Zep + Graphiti)
目标:处理”用户搬家、改口味”这类事实演化,正确回答”用户当前 vs 历史”的问题。
2.1 依赖
pip install graphiti-core neo4j openai
# Neo4j 容器已启动(见 0.3)
2.2 完整代码 customer_service_b.py
"""
场景 B:时间感知客服
- Graphiti 自动建 KG + bi-temporal
- 演示事实演化:"用户搬家"
"""
import asyncio
import os
from datetime import datetime
from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
from openai import OpenAI
load_dotenv()
graphiti = Graphiti(
"bolt://localhost:7687", "neo4j", "password",
)
llm = OpenAI()
async def setup():
"""初始化索引(只需运行一次)。"""
await graphiti.build_indices_and_constraints()
async def add_conversation(user_id: str, conv_text: str, ts: datetime):
"""把一段对话喂给 Graphiti,自动抽取事实。"""
await graphiti.add_episode(
name=f"conv_{ts.isoformat()}",
episode_body=conv_text,
source=EpisodeType.message,
source_description=f"客服对话 user={user_id}",
reference_time=ts,
group_id=user_id,
)
async def answer(user_id: str, question: str, as_of: datetime = None) -> str:
"""从 KG 检索相关 facts,生成回答。as_of 用于时间穿越查询。"""
results = await graphiti.search(
query=question,
group_ids=[user_id],
num_results=10,
)
facts_text = "\n".join(
f"- {r.fact} (valid: {r.valid_at} ~ {r.invalid_at or 'now'})"
for r in results
) or "(无相关 facts)"
sys_prompt = f"""你是客服助手。基于以下 facts 回答:
{facts_text}
注意:facts 带时间戳 valid_at / invalid_at,以"现在"({as_of or datetime.now()})为准。
请只用还有效的 facts。"""
resp = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": sys_prompt},
{"role": "user", "content": question},
],
)
return resp.choices[0].message.content
async def demo():
await setup()
# === T1:用户首次告知地址 ===
await add_conversation(
user_id="bob",
conv_text="客户说: 我目前住在上海浦东张江,送货请到这个地址。",
ts=datetime(2024, 6, 1),
)
# === T2:几个月后用户搬家 ===
await add_conversation(
user_id="bob",
conv_text="客户说: 我上个月搬到了北京海淀,以后请寄到新地址。",
ts=datetime(2025, 1, 15),
)
# === Q1:当前住址 ===
print("\nQ: 用户当前的送货地址在哪?")
print("A:", await answer("bob", "用户当前的送货地址在哪?"))
# 期待:北京海淀
# === Q2:时间穿越——2024 年 8 月时的住址 ===
print("\nQ: 2024 年 8 月时,用户住哪?")
print("A:", await answer(
"bob", "2024 年 8 月时用户住哪里?",
as_of=datetime(2024, 8, 1),
))
# 期待:上海浦东张江
if __name__ == "__main__":
asyncio.run(demo())
2.3 关键能力
- 自动事实抽取 + 演化:“搬家”事件会让旧地址自动
t_invalid标记 - 时间穿越查询:
as_of参数让 agent 回答”过去某时刻的事实” - 审计:所有 fact 带时间戳,可追溯何时被记录
3. 场景 C:OS-style 长会话(Letta)
目标:跨周复用、agent 自我管理 memory(core_memory_append 等),适合长期助手。
3.1 依赖
pip install letta-client
# Letta server 已启动(见 0.3,容器跑在 8283 端口)
3.2 完整代码 long_session_c.py
"""
场景 C:OS-style 长会话
- Letta 提供完整 agent runtime
- agent 自己决定何时 append/replace/search memory
"""
import os
from dotenv import load_dotenv
from letta_client import Letta
load_dotenv()
client = Letta(base_url=os.getenv("LETTA_BASE_URL", "http://localhost:8283"))
def create_or_get_agent(name: str, persona: str, human_facts: str = ""):
"""创建一个 agent(若已存在则复用)。"""
existing = client.agents.list(name=name)
if existing:
return existing[0]
return client.agents.create(
name=name,
memory_blocks=[
{"label": "human", "value": human_facts or "(尚无)"},
{"label": "persona", "value": persona},
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
def chat(agent_id: str, message: str) -> str:
"""单 turn 对话,Letta 内部自动 manage memory。"""
response = client.agents.messages.create(
agent_id=agent_id,
messages=[{"role": "user", "content": message}],
)
# response.messages 包含 agent 的 reasoning steps + tool calls + final text
final_text = ""
for msg in response.messages:
if msg.message_type == "assistant_message":
final_text = msg.content
return final_text
if __name__ == "__main__":
agent = create_or_get_agent(
name="alice_assistant",
persona="你是 Alice 的私人助理,温和、细致,擅长记住她的偏好。",
human_facts="姓名: Alice。职业: 数据科学家。",
)
# === Day 1 ===
print("=== Day 1 ===")
print(chat(agent.id, "我下周要去东京出差,有什么建议吗?"))
print(chat(agent.id, "顺便帮我记一下,我对花粉过敏,会议尽量安排在室内"))
# === Day 2(下次跑脚本时,agent 状态已持久化) ===
print("\n=== Day 2 ===")
print(chat(agent.id, "下周东京的天气怎样?"))
# 期待:agent 知道是出差、知道用户花粉过敏,
# 会主动提醒避开花粉高峰、推荐室内场所
3.3 观察 Memory 变化
# 拉 agent 的 core memory(应该看到 Letta 自动 append 的过敏信息)
curl -s http://localhost:8283/v1/agents/{agent_id}/core-memory | jq
# 拉 archival memory(长期事实库)
curl -s http://localhost:8283/v1/agents/{agent_id}/archival-memory | jq
🌟 关键体验:Letta 的 agent 会主动在内部 reasoning 中调用 core_memory_append("human", "用户对花粉过敏"),完全无需你写”写入 memory”的代码——这就是 OS-style 设计的优势。
4. 接 LongMemEval 评测三套方案
4.1 准备
git clone https://github.com/xiaowu0162/LongMemEval
cd LongMemEval
pip install -e .
4.2 评测脚本 bench.py
"""
统一评测三个 agent 在 LongMemEval-S 上的表现。
"""
import json
import time
from collections import defaultdict
from longmemeval.dataset import load_dataset
from longmemeval.evaluator import evaluate_answer
# 引入三套 agent(见上面的代码)
from assistant_a import chat as chat_mem0
from customer_service_b import answer as chat_zep
from long_session_c import create_or_get_agent, chat as chat_letta
# Letta agent 池(每用户一个)
letta_agents = {}
def get_letta_agent(user_id):
if user_id not in letta_agents:
letta_agents[user_id] = create_or_get_agent(
name=f"benchmark_{user_id}",
persona="你是一个助手,只回答用户问题,简洁直接。",
)
return letta_agents[user_id]
agents = {
"mem0": lambda uid, msg: chat_mem0(uid, msg),
# "zep": lambda uid, msg: asyncio.run(chat_zep(uid, msg)), # 略,zep async
"letta": lambda uid, msg: chat_letta(get_letta_agent(uid).id, msg),
}
def run_benchmark(agent_name: str, dataset, sample_size: int = 50):
chat_fn = agents[agent_name]
scores = defaultdict(list)
times = []
items = dataset[:sample_size]
for i, item in enumerate(items):
print(f"\n[{agent_name} {i+1}/{len(items)}] {item['question'][:60]}...")
uid = f"{agent_name}_{item['user_id']}"
# 1. Replay history sessions
for session in item["history"]:
for m in session["messages"]:
if m["role"] == "user":
chat_fn(uid, m["content"]) # 让 agent 建立 memory
# 2. Ask evaluation question
t0 = time.time()
try:
ans = chat_fn(uid, item["question"])
except Exception as e:
print(f" Error: {e}")
ans = ""
dt = time.time() - t0
times.append(dt)
# 3. Score
correct = evaluate_answer(ans, item["answer"], item["category"])
scores[item["category"]].append(int(correct))
scores["overall"].append(int(correct))
summary = {cat: sum(v) / len(v) for cat, v in scores.items()}
summary["avg_latency_s"] = sum(times) / len(times)
return summary
if __name__ == "__main__":
dataset = load_dataset("longmemeval_s")
results = {}
for name in ["mem0", "letta"]:
print(f"\n{'='*60}\n Benchmarking: {name}\n{'='*60}")
results[name] = run_benchmark(name, dataset, sample_size=30)
# 输出对比表
print("\n\n========== Results ==========")
print(f"{'Framework':<10} | {'Info':<6} | {'Multi':<6} | {'Temp':<6} | "
f"{'Update':<6} | {'Abst':<6} | {'Overall':<8} | {'Latency(s)':<10}")
print("-" * 90)
for name, s in results.items():
print(f"{name:<10} | {s.get('info_extraction', 0):.2f} | "
f"{s.get('multi_session', 0):.2f} | "
f"{s.get('temporal', 0):.2f} | "
f"{s.get('update', 0):.2f} | "
f"{s.get('abstention', 0):.2f} | "
f"{s['overall']:.2f} | {s['avg_latency_s']:.2f}")
with open("bench_results.json", "w") as f:
json.dump(results, f, indent=2)
print("\nSaved to bench_results.json")
4.3 运行
python bench.py
期望输出(数字仅参考,实际取决于 LLM 版本):
========== Results ==========
Framework | Info | Multi | Temp | Update | Abst | Overall | Latency(s)
------------------------------------------------------------------------------------------
mem0 | 0.78 | 0.55 | 0.40 | 0.50 | 0.65 | 0.58 | 1.85
letta | 0.80 | 0.62 | 0.55 | 0.55 | 0.70 | 0.64 | 2.40
5. 上线 checklist
- Memory schema 定型:确定 namespace 层级、必带字段、PII 字段
- PII 检测:写入前过 Presidio / 自研 detector
- GDPR:hard delete API、export API、audit log
- Embedding 模型固定:换模型要全库 reindex,代价大
- 冲突解决策略:LLM judge / latest wins / bi-temporal,文档化
- Decay 策略:确定 importance 计算 + 衰减半衰期
- Backup:vector store + KG 都要定期 snapshot
- 限流:每用户每分钟最多写 N 条,防 memory injection
- Reflection 调度:离线 batch job(凌晨) + 关键事件触发
- 监控 dashboard:见下一节
6. 监控指标与成本估算
6.1 监控指标
| 指标 | 采集方式 | 告警阈值 |
|---|---|---|
| Memory hit rate | retrieval 结果非空 / 总查询 | > 60% |
| Extraction error rate | LLM 抽取失败 / 总写入 | < 2% |
| Write latency P95 | timing | < 500ms(异步) |
| Read latency P95 | timing | < 100ms |
| Vector store size | 容量 | < 80% 容量 |
| 每用户 memory 条数 | 统计 | 异常飙升告警(可能 injection) |
| 冲突解决率 | UPDATE/DELETE / 总写入 | 监控趋势 |
| 检索准确率 | 在 benchmark 上每天跑 | 不退化 5%+ |
6.2 成本估算
以 Mem0 为例,1 万 DAU 每用户每天 20 条对话:
| 项 | 量 | 单价 | 月成本 |
|---|---|---|---|
| Extraction LLM 调用(GPT-4o-mini) | 10K × 20 × 30 = 6M /月 | $0.15/M token,平均 200 token | ~$180 |
| Embedding(text-embedding-3-small) | 同上 | $0.02/M token | ~$24 |
| Decision LLM(judge ADD/UPDATE) | 6M × 30%(只在有相关 memory 时) | 同 extraction | ~$54 |
| Qdrant 自托管(10M vectors) | — | EC2 c5.2xlarge ~$200/月 | ~$200 |
| Reflection(每用户每天 1 次) | 10K × 30 × 长 prompt | GPT-4o-mini 较长输入 | ~$100 |
| 小计 | ~$558/月 |
每月 ¥4000 左右支撑 1 万 DAU——实际上 90% 成本在 LLM,不在存储。优化方向:
- 用更便宜的 extraction 模型(Haiku、本地小模型)
- 减少 extraction 频率(每 N 条对话才抽一次)
- Schema-driven 替代 LLM 抽取
✅ 自我检验清单
- 三套环境:能在本地容器跑通 Qdrant / Neo4j / Letta
- 场景 A 改造:能把 demo 改成”加上 user 主动撤回偏好”的能力
- 场景 B 时间穿越:能用代码演示”现在的住址 vs 一年前的住址”两个查询
- 场景 C 自动 self-edit:能从 Letta 后台看到 agent 自动 append 的 core memory
- LongMemEval 评测:能跑通 bench.py 并解释五大能力的对比
- 上线 checklist:能列出至少 8 项,知道每项为什么重要
- 成本估算:能根据自己的 DAU 和对话量估算月度 LLM 成本
- 优化思路:能提出至少 3 种降低 extraction 成本的方案
📚 参考资料
官方文档与代码
- Mem0 + LangGraph 教程:Mem0 官方
- LangGraph Long-term Memory:官方教程
- Graphiti 快速开始:GitHub README
- Letta Quickstart:docs.letta.com
评测
- LongMemEval 仓库:github.com/xiaowu0162/LongMemEval
- LoCoMo:snap-research.github.io/locomo
生产经验
- Adding Long-Term Memory to LangGraph and LangChain Agents:Vectorize.io
- Powering Long-Term Memory For Agents With LangGraph And MongoDB:MongoDB Blog
- agentmemory 仓库 —— 真实项目模板:GitHub
- YourMemory(MCP server with Ebbinghaus):GitHub
🎉 恭喜完成 Agent Memory 模块!
走完 9 章,你已经从概念、分类、论文、框架、操作、多 agent、评测一路打到端到端实战。剩下的功夫在你自己的业务里——找一个真实场景,把这套思想落地,跑过 benchmark,持续迭代。Memory 是 Agent 时代最重要的工程底座之一,也是接下来 5 年都会持续演进的方向。期待看到你做出更精彩的 Agent 系统。