跳到主要内容
Agent Memory

第9章:端到端实战 —— 三套可跑示例 + LongMemEval 评测

Mem0 + LangGraph 个人助理、Zep 时间感知客服、Letta OS-style 长会话三个端到端示例,带完整代码和 LongMemEval 评测脚本

实战 Mem0 Letta Zep LangGraph Qdrant 评测

把前 8 章的知识落地到代码——本章给三套完全可跑的端到端示例,你可以在本地用 Python 3.10 + 一个 OpenAI API key 跑通。涵盖三种典型场景:个性化助理(Mem0 + LangGraph)、时间感知客服(Zep + Graphiti)、OS-style 长会话(Letta)。最后接 LongMemEval 给出对比评测脚本,以及一份上线 checklist。

📑 目录


0. 通用准备

0.1 环境

# Python 3.10+
python --version

# 创建独立环境
python -m venv .venv && source .venv/bin/activate

# 安装公共依赖
pip install openai python-dotenv

0.2 API key

# .env
OPENAI_API_KEY=sk-...
# 可选:
ANTHROPIC_API_KEY=sk-ant-...
ZEP_API_KEY=z_...        # 注册 https://www.getzep.com 拿

0.3 容器(Docker)

为了让 Qdrant / Neo4j 等开箱即用,建议 Docker:

# Qdrant
docker run -d --name qdrant -p 6333:6333 -v $(pwd)/qdrant_data:/qdrant/storage qdrant/qdrant

# Neo4j(场景 B 需要)
docker run -d --name neo4j -p 7474:7474 -p 7687:7687 \
  -e NEO4J_AUTH=neo4j/password \
  -v $(pwd)/neo4j_data:/data \
  neo4j:5.20

# Letta server(场景 C)
docker run -d --name letta -p 8283:8283 \
  -e OPENAI_API_KEY=$OPENAI_API_KEY \
  letta/letta:latest

1. 场景 A:个人助理(Mem0 + LangGraph + Qdrant)

目标:搭一个能记住用户偏好、跨 session 续聊的助理。

1.1 依赖

pip install langgraph langchain-openai mem0ai qdrant-client

1.2 完整代码 assistant_a.py

"""
场景 A:个人助理
- LangGraph 管对话主循环 + state
- Mem0 管 long-term memory(Qdrant 后端)
- 跨 session 持久化
"""
import os
from typing import TypedDict
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from mem0 import Memory

load_dotenv()

# === 1. 初始化 Mem0(Qdrant 后端) ===
mem0_config = {
    "vector_store": {
        "provider": "qdrant",
        "config": {
            "host": "localhost",
            "port": 6333,
            "collection_name": "assistant_a_memory",
            "embedding_model_dims": 1536,
        },
    },
    "llm": {
        "provider": "openai",
        "config": {"model": "gpt-4o-mini", "temperature": 0.1},
    },
    "embedder": {
        "provider": "openai",
        "config": {"model": "text-embedding-3-small"},
    },
}
memory = Memory.from_config(mem0_config)

# === 2. LangGraph state ===
class AgentState(TypedDict):
    messages: list
    user_id: str
    retrieved_memories: list

# === 3. 节点函数 ===
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

def retrieve_memory_node(state: AgentState):
    """从 Mem0 检索相关 long-term memory。"""
    last_user_msg = state["messages"][-1].content
    results = memory.search(
        query=last_user_msg,
        user_id=state["user_id"],
        limit=5,
    )
    return {"retrieved_memories": results.get("results", [])}

def respond_node(state: AgentState):
    """组装 system prompt + memory + history,生成回复。"""
    memories_text = "\n".join(
        f"- {m['memory']}" for m in state["retrieved_memories"]
    ) or "(暂无相关记忆)"
    
    sys_prompt = f"""你是一个友好的助理。以下是你关于这个用户的长期记忆:
{memories_text}

请基于这些记忆回答用户问题,优先用记忆里的偏好。"""
    
    response = llm.invoke([
        SystemMessage(content=sys_prompt),
        *state["messages"],
    ])
    return {"messages": state["messages"] + [response]}

def write_memory_node(state: AgentState):
    """对话结束后,把这一轮的事实写入 Mem0。"""
    # 取最近一对 user/assistant 消息
    recent = state["messages"][-2:]
    msgs_for_mem = [
        {"role": "user" if isinstance(m, HumanMessage) else "assistant",
         "content": m.content}
        for m in recent
    ]
    memory.add(messages=msgs_for_mem, user_id=state["user_id"])
    return {}  # state 不变

# === 4. 编排 graph ===
graph = StateGraph(AgentState)
graph.add_node("retrieve", retrieve_memory_node)
graph.add_node("respond", respond_node)
graph.add_node("write_memory", write_memory_node)
graph.set_entry_point("retrieve")
graph.add_edge("retrieve", "respond")
graph.add_edge("respond", "write_memory")
graph.add_edge("write_memory", END)

app = graph.compile(checkpointer=MemorySaver())

# === 5. 主循环 ===
def chat(user_id: str, message: str, thread_id: str = "default"):
    state = {
        "messages": [HumanMessage(content=message)],
        "user_id": user_id,
        "retrieved_memories": [],
    }
    result = app.invoke(
        state,
        config={"configurable": {"thread_id": f"{user_id}_{thread_id}"}},
    )
    return result["messages"][-1].content

if __name__ == "__main__":
    # === Demo:Session 1 ===
    print("=== Session 1 ===")
    print(chat("alice", "我对乳糖不耐受,以后推荐餐厅请避开奶制品"))
    print(chat("alice", "我最爱意大利菜"))
    
    # === Demo:Session 2(模拟新进程的重启) ===
    print("\n=== Session 2(模拟新进程) ===")
    print(chat("alice", "推荐一家适合周末早午餐的店,不要太远"))
    # 期待:agent 主动避开奶制品并优先意大利菜

1.3 运行

python assistant_a.py

预期输出(Session 2):

推荐你试试 "Forno" 意大利餐厅,无奶选项很丰富,记得到时候和服务员
确认你的乳糖不耐受需求...

🌟 关键点:Session 2 是新进程,但 Mem0 的 Qdrant 持久化让 alice 的偏好跨进程保留。


2. 场景 B:时间感知客服(Zep + Graphiti)

目标:处理”用户搬家、改口味”这类事实演化,正确回答”用户当前 vs 历史”的问题。

2.1 依赖

pip install graphiti-core neo4j openai
# Neo4j 容器已启动(见 0.3)

2.2 完整代码 customer_service_b.py

"""
场景 B:时间感知客服
- Graphiti 自动建 KG + bi-temporal
- 演示事实演化:"用户搬家"
"""
import asyncio
import os
from datetime import datetime
from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
from openai import OpenAI

load_dotenv()

graphiti = Graphiti(
    "bolt://localhost:7687", "neo4j", "password",
)

llm = OpenAI()

async def setup():
    """初始化索引(只需运行一次)。"""
    await graphiti.build_indices_and_constraints()

async def add_conversation(user_id: str, conv_text: str, ts: datetime):
    """把一段对话喂给 Graphiti,自动抽取事实。"""
    await graphiti.add_episode(
        name=f"conv_{ts.isoformat()}",
        episode_body=conv_text,
        source=EpisodeType.message,
        source_description=f"客服对话 user={user_id}",
        reference_time=ts,
        group_id=user_id,
    )

async def answer(user_id: str, question: str, as_of: datetime = None) -> str:
    """从 KG 检索相关 facts,生成回答。as_of 用于时间穿越查询。"""
    results = await graphiti.search(
        query=question,
        group_ids=[user_id],
        num_results=10,
    )
    
    facts_text = "\n".join(
        f"- {r.fact} (valid: {r.valid_at} ~ {r.invalid_at or 'now'})"
        for r in results
    ) or "(无相关 facts)"
    
    sys_prompt = f"""你是客服助手。基于以下 facts 回答:

{facts_text}

注意:facts 带时间戳 valid_at / invalid_at,以"现在"({as_of or datetime.now()})为准。
请只用还有效的 facts。"""
    
    resp = llm.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": sys_prompt},
            {"role": "user", "content": question},
        ],
    )
    return resp.choices[0].message.content

async def demo():
    await setup()
    
    # === T1:用户首次告知地址 ===
    await add_conversation(
        user_id="bob",
        conv_text="客户说: 我目前住在上海浦东张江,送货请到这个地址。",
        ts=datetime(2024, 6, 1),
    )
    
    # === T2:几个月后用户搬家 ===
    await add_conversation(
        user_id="bob",
        conv_text="客户说: 我上个月搬到了北京海淀,以后请寄到新地址。",
        ts=datetime(2025, 1, 15),
    )
    
    # === Q1:当前住址 ===
    print("\nQ: 用户当前的送货地址在哪?")
    print("A:", await answer("bob", "用户当前的送货地址在哪?"))
    # 期待:北京海淀
    
    # === Q2:时间穿越——2024 年 8 月时的住址 ===
    print("\nQ: 2024 年 8 月时,用户住哪?")
    print("A:", await answer(
        "bob", "2024 年 8 月时用户住哪里?",
        as_of=datetime(2024, 8, 1),
    ))
    # 期待:上海浦东张江

if __name__ == "__main__":
    asyncio.run(demo())

2.3 关键能力

  • 自动事实抽取 + 演化:“搬家”事件会让旧地址自动 t_invalid 标记
  • 时间穿越查询:as_of 参数让 agent 回答”过去某时刻的事实”
  • 审计:所有 fact 带时间戳,可追溯何时被记录

3. 场景 C:OS-style 长会话(Letta)

目标:跨周复用、agent 自我管理 memory(core_memory_append 等),适合长期助手。

3.1 依赖

pip install letta-client
# Letta server 已启动(见 0.3,容器跑在 8283 端口)

3.2 完整代码 long_session_c.py

"""
场景 C:OS-style 长会话
- Letta 提供完整 agent runtime
- agent 自己决定何时 append/replace/search memory
"""
import os
from dotenv import load_dotenv
from letta_client import Letta

load_dotenv()

client = Letta(base_url=os.getenv("LETTA_BASE_URL", "http://localhost:8283"))

def create_or_get_agent(name: str, persona: str, human_facts: str = ""):
    """创建一个 agent(若已存在则复用)。"""
    existing = client.agents.list(name=name)
    if existing:
        return existing[0]
    
    return client.agents.create(
        name=name,
        memory_blocks=[
            {"label": "human", "value": human_facts or "(尚无)"},
            {"label": "persona", "value": persona},
        ],
        model="openai/gpt-4o-mini",
        embedding="openai/text-embedding-3-small",
    )

def chat(agent_id: str, message: str) -> str:
    """单 turn 对话,Letta 内部自动 manage memory。"""
    response = client.agents.messages.create(
        agent_id=agent_id,
        messages=[{"role": "user", "content": message}],
    )
    # response.messages 包含 agent 的 reasoning steps + tool calls + final text
    final_text = ""
    for msg in response.messages:
        if msg.message_type == "assistant_message":
            final_text = msg.content
    return final_text

if __name__ == "__main__":
    agent = create_or_get_agent(
        name="alice_assistant",
        persona="你是 Alice 的私人助理,温和、细致,擅长记住她的偏好。",
        human_facts="姓名: Alice。职业: 数据科学家。",
    )
    
    # === Day 1 ===
    print("=== Day 1 ===")
    print(chat(agent.id, "我下周要去东京出差,有什么建议吗?"))
    print(chat(agent.id, "顺便帮我记一下,我对花粉过敏,会议尽量安排在室内"))
    
    # === Day 2(下次跑脚本时,agent 状态已持久化) ===
    print("\n=== Day 2 ===")
    print(chat(agent.id, "下周东京的天气怎样?"))
    # 期待:agent 知道是出差、知道用户花粉过敏,
    #       会主动提醒避开花粉高峰、推荐室内场所

3.3 观察 Memory 变化

# 拉 agent 的 core memory(应该看到 Letta 自动 append 的过敏信息)
curl -s http://localhost:8283/v1/agents/{agent_id}/core-memory | jq

# 拉 archival memory(长期事实库)
curl -s http://localhost:8283/v1/agents/{agent_id}/archival-memory | jq

🌟 关键体验:Letta 的 agent 会主动在内部 reasoning 中调用 core_memory_append("human", "用户对花粉过敏"),完全无需你写”写入 memory”的代码——这就是 OS-style 设计的优势。


4. 接 LongMemEval 评测三套方案

4.1 准备

git clone https://github.com/xiaowu0162/LongMemEval
cd LongMemEval
pip install -e .

4.2 评测脚本 bench.py

"""
统一评测三个 agent 在 LongMemEval-S 上的表现。
"""
import json
import time
from collections import defaultdict
from longmemeval.dataset import load_dataset
from longmemeval.evaluator import evaluate_answer

# 引入三套 agent(见上面的代码)
from assistant_a import chat as chat_mem0
from customer_service_b import answer as chat_zep
from long_session_c import create_or_get_agent, chat as chat_letta

# Letta agent 池(每用户一个)
letta_agents = {}

def get_letta_agent(user_id):
    if user_id not in letta_agents:
        letta_agents[user_id] = create_or_get_agent(
            name=f"benchmark_{user_id}",
            persona="你是一个助手,只回答用户问题,简洁直接。",
        )
    return letta_agents[user_id]

agents = {
    "mem0":  lambda uid, msg: chat_mem0(uid, msg),
    # "zep":   lambda uid, msg: asyncio.run(chat_zep(uid, msg)),  # 略,zep async
    "letta": lambda uid, msg: chat_letta(get_letta_agent(uid).id, msg),
}

def run_benchmark(agent_name: str, dataset, sample_size: int = 50):
    chat_fn = agents[agent_name]
    scores = defaultdict(list)
    times = []
    
    items = dataset[:sample_size]
    for i, item in enumerate(items):
        print(f"\n[{agent_name} {i+1}/{len(items)}] {item['question'][:60]}...")
        uid = f"{agent_name}_{item['user_id']}"
        
        # 1. Replay history sessions
        for session in item["history"]:
            for m in session["messages"]:
                if m["role"] == "user":
                    chat_fn(uid, m["content"])  # 让 agent 建立 memory
        
        # 2. Ask evaluation question
        t0 = time.time()
        try:
            ans = chat_fn(uid, item["question"])
        except Exception as e:
            print(f"  Error: {e}")
            ans = ""
        dt = time.time() - t0
        times.append(dt)
        
        # 3. Score
        correct = evaluate_answer(ans, item["answer"], item["category"])
        scores[item["category"]].append(int(correct))
        scores["overall"].append(int(correct))
    
    summary = {cat: sum(v) / len(v) for cat, v in scores.items()}
    summary["avg_latency_s"] = sum(times) / len(times)
    return summary

if __name__ == "__main__":
    dataset = load_dataset("longmemeval_s")
    
    results = {}
    for name in ["mem0", "letta"]:
        print(f"\n{'='*60}\n  Benchmarking: {name}\n{'='*60}")
        results[name] = run_benchmark(name, dataset, sample_size=30)
    
    # 输出对比表
    print("\n\n========== Results ==========")
    print(f"{'Framework':<10} | {'Info':<6} | {'Multi':<6} | {'Temp':<6} | "
          f"{'Update':<6} | {'Abst':<6} | {'Overall':<8} | {'Latency(s)':<10}")
    print("-" * 90)
    for name, s in results.items():
        print(f"{name:<10} | {s.get('info_extraction', 0):.2f}   | "
              f"{s.get('multi_session', 0):.2f}   | "
              f"{s.get('temporal', 0):.2f}   | "
              f"{s.get('update', 0):.2f}   | "
              f"{s.get('abstention', 0):.2f}   | "
              f"{s['overall']:.2f}     | {s['avg_latency_s']:.2f}")
    
    with open("bench_results.json", "w") as f:
        json.dump(results, f, indent=2)
    print("\nSaved to bench_results.json")

4.3 运行

python bench.py

期望输出(数字仅参考,实际取决于 LLM 版本):

========== Results ==========
Framework  | Info   | Multi  | Temp   | Update | Abst   | Overall  | Latency(s)
------------------------------------------------------------------------------------------
mem0       | 0.78   | 0.55   | 0.40   | 0.50   | 0.65   | 0.58     | 1.85
letta      | 0.80   | 0.62   | 0.55   | 0.55   | 0.70   | 0.64     | 2.40

5. 上线 checklist

  • Memory schema 定型:确定 namespace 层级、必带字段、PII 字段
  • PII 检测:写入前过 Presidio / 自研 detector
  • GDPR:hard delete API、export API、audit log
  • Embedding 模型固定:换模型要全库 reindex,代价大
  • 冲突解决策略:LLM judge / latest wins / bi-temporal,文档化
  • Decay 策略:确定 importance 计算 + 衰减半衰期
  • Backup:vector store + KG 都要定期 snapshot
  • 限流:每用户每分钟最多写 N 条,防 memory injection
  • Reflection 调度:离线 batch job(凌晨) + 关键事件触发
  • 监控 dashboard:见下一节

6. 监控指标与成本估算

6.1 监控指标

指标采集方式告警阈值
Memory hit rateretrieval 结果非空 / 总查询> 60%
Extraction error rateLLM 抽取失败 / 总写入< 2%
Write latency P95timing< 500ms(异步)
Read latency P95timing< 100ms
Vector store size容量< 80% 容量
每用户 memory 条数统计异常飙升告警(可能 injection)
冲突解决率UPDATE/DELETE / 总写入监控趋势
检索准确率在 benchmark 上每天跑不退化 5%+

6.2 成本估算

以 Mem0 为例,1 万 DAU 每用户每天 20 条对话:

单价月成本
Extraction LLM 调用(GPT-4o-mini)10K × 20 × 30 = 6M /月$0.15/M token,平均 200 token~$180
Embedding(text-embedding-3-small)同上$0.02/M token~$24
Decision LLM(judge ADD/UPDATE)6M × 30%(只在有相关 memory 时)同 extraction~$54
Qdrant 自托管(10M vectors)EC2 c5.2xlarge ~$200/月~$200
Reflection(每用户每天 1 次)10K × 30 × 长 promptGPT-4o-mini 较长输入~$100
小计~$558/月

每月 ¥4000 左右支撑 1 万 DAU——实际上 90% 成本在 LLM,不在存储。优化方向:

  1. 用更便宜的 extraction 模型(Haiku、本地小模型)
  2. 减少 extraction 频率(每 N 条对话才抽一次)
  3. Schema-driven 替代 LLM 抽取

✅ 自我检验清单

  • 三套环境:能在本地容器跑通 Qdrant / Neo4j / Letta
  • 场景 A 改造:能把 demo 改成”加上 user 主动撤回偏好”的能力
  • 场景 B 时间穿越:能用代码演示”现在的住址 vs 一年前的住址”两个查询
  • 场景 C 自动 self-edit:能从 Letta 后台看到 agent 自动 append 的 core memory
  • LongMemEval 评测:能跑通 bench.py 并解释五大能力的对比
  • 上线 checklist:能列出至少 8 项,知道每项为什么重要
  • 成本估算:能根据自己的 DAU 和对话量估算月度 LLM 成本
  • 优化思路:能提出至少 3 种降低 extraction 成本的方案

📚 参考资料

官方文档与代码

评测

生产经验

  • Adding Long-Term Memory to LangGraph and LangChain Agents:Vectorize.io
  • Powering Long-Term Memory For Agents With LangGraph And MongoDB:MongoDB Blog
  • agentmemory 仓库 —— 真实项目模板:GitHub
  • YourMemory(MCP server with Ebbinghaus):GitHub

🎉 恭喜完成 Agent Memory 模块!

走完 9 章,你已经从概念、分类、论文、框架、操作、多 agent、评测一路打到端到端实战。剩下的功夫在你自己的业务里——找一个真实场景,把这套思想落地,跑过 benchmark,持续迭代。Memory 是 Agent 时代最重要的工程底座之一,也是接下来 5 年都会持续演进的方向。期待看到你做出更精彩的 Agent 系统。