第9章:端到端实战 —— 生产级订单 Agent
完整可跑的订单处理 agent:LangGraph 编排 + Temporal durable + MCP 工具 + Saga 补偿 + Mem0 长期记忆 + LangSmith/OTel 全链路观测 + 故障注入测试
把模块六前 8 章学到的所有东西串起来——本章给一个完整可跑的端到端案例:订单处理 Agent。LangGraph 编排控制流、Temporal 提供 durable execution、MCP server 暴露三类业务工具、Saga 模式保证多步操作的事务一致性、Mem0 复用模块五的长期记忆、LangSmith + OTel 串起全链路 trace、混沌测试验证补偿正确性。配 docker-compose 一键启动,~600 行代码结构清晰,可以直接拿去改造你的业务。
📑 目录
- 0. 业务场景与架构总览
- 1. 项目骨架
- 2. docker-compose:基础设施
- 3. MCP server:暴露三类业务工具
- 4. Temporal Activities:幂等的副作用
- 5. LangGraph workflow + Saga
- 6. Memory 集成
- 7. Observability:LangSmith + OTel
- 8. 启动与对话
- 9. 混沌测试:故障注入
- 10. 上线 checklist 与监控仪表盘
- 自我检验清单
- 参考资料
0. 业务场景与架构总览
0.1 业务
电商订单处理 Agent:
User: "帮我下单 SKU-001,2 件,送上次的地址"
↓
Agent:
1. 加载用户上下文(姓名、上次地址、VIP 等级) ← Memory(模块五)
2. 验证库存 ← MCP tool: inventory.check
3. 扣库存(reserve) ← MCP tool: inventory.reserve ⚠️副作用
4. 调支付 ← MCP tool: payment.charge ⚠️副作用
5. 调发货 API ← MCP tool: shipping.create ⚠️副作用
6. 通知用户(邮件 / push) ← MCP tool: notify.send
7. 写回 Memory:更新购买历史 ← Memory
任何 step 失败 → Saga 反序补偿(release_inventory, refund, cancel_shipping, ...)
0.2 技术栈
┌────────────────────────────────────────────────────────────────┐
│ Observability:LangSmith + OTel Collector → Tempo + Grafana │
├────────────────────────────────────────────────────────────────┤
│ Runtime:Temporal Workflow(Saga + Durable) │
├────────────────────────────────────────────────────────────────┤
│ Agent:LangGraph(Supervisor + 4 个 worker) │
├────────────────────────────────────────────────────────────────┤
│ Memory:Mem0 + Qdrant(模块五) │
├────────────────────────────────────────────────────────────────┤
│ Tools:3 个 MCP server(inventory / payment / shipping) │
├────────────────────────────────────────────────────────────────┤
│ LLM:OpenAI(可换 Anthropic / 本地) │
└────────────────────────────────────────────────────────────────┘
0.3 完整架构图
User
│
▼
┌──────────────┐
│ FastAPI │ ← /api/order
└──────┬───────┘
│ 提交 task
▼
┌──────────────────┐
│ Temporal Workflow │ durable + replay
│ (OrderSagaWorkflow)│
└──────┬─────┬─────┘
activity │ │ activity
┌──────▼┐ ┌▼────────┐
│LangGraph│ │ MCP/ │
│Agent │ │ Memory │
│Loop │ │ │
└────┬────┘ └────┬───┘
│ tool call │
└────────────┴──────► MCP servers
├ inventory_server
├ payment_server
└ shipping_server
│
▼
(mocked external APIs)
1. 项目骨架
order-agent/
├── docker-compose.yml # Temporal + Postgres + Qdrant + OTel collector + Grafana
├── requirements.txt
├── .env.example
├── README.md
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 入口
│ ├── workflows.py # Temporal OrderSagaWorkflow
│ ├── activities.py # Temporal activities(幂等的副作用)
│ ├── compensations.py # Saga 补偿动作
│ ├── agent.py # LangGraph 主 agent
│ ├── memory_client.py # Mem0 包装
│ ├── mcp_clients.py # 连接三个 MCP server
│ ├── observability.py # OTel 设置
│ └── settings.py
├── mcp_servers/
│ ├── inventory_server.py # MCP server:库存
│ ├── payment_server.py # MCP server:支付
│ └── shipping_server.py # MCP server:物流
├── tests/
│ ├── test_happy_path.py
│ ├── test_chaos.py # 故障注入
│ └── test_idempotency.py
└── infra/
├── otel-collector.yaml
└── grafana/
└── dashboards/
└── agent.json
2. docker-compose:基础设施
# docker-compose.yml
version: '3.8'
services:
# Temporal cluster(单机简化版)
temporal:
image: temporalio/auto-setup:1.24
ports: ["7233:7233"]
environment:
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgres
depends_on: [postgres]
temporal-ui:
image: temporalio/ui:2.27
ports: ["8080:8080"]
environment:
- TEMPORAL_ADDRESS=temporal:7233
depends_on: [temporal]
postgres:
image: postgres:15
environment:
- POSTGRES_USER=temporal
- POSTGRES_PASSWORD=temporal
- POSTGRES_MULTIPLE_DATABASES=temporal,temporal_visibility,langgraph,mem0
volumes: ["pg-data:/var/lib/postgresql/data"]
# Memory backend(模块五)
qdrant:
image: qdrant/qdrant:v1.10
ports: ["6333:6333"]
volumes: ["qdrant-data:/qdrant/storage"]
# Observability stack
otel-collector:
image: otel/opentelemetry-collector-contrib:0.110.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes: ["./infra/otel-collector.yaml:/etc/otel-collector-config.yaml"]
ports: ["4317:4317", "4318:4318"]
tempo:
image: grafana/tempo:2.6
command: ["-config.file=/etc/tempo.yaml"]
ports: ["3200:3200"]
grafana:
image: grafana/grafana:11
ports: ["3000:3000"]
volumes: ["./infra/grafana/dashboards:/var/lib/grafana/dashboards"]
volumes:
pg-data:
qdrant-data:
启动:docker-compose up -d
3. MCP server:暴露三类业务工具
# mcp_servers/inventory_server.py
from mcp.server.fastmcp import FastMCP
import random
mcp = FastMCP("inventory")
_inventory = {"SKU-001": 10, "SKU-002": 5}
_reservations = {} # 幂等表
@mcp.tool()
def check(sku: str) -> dict:
"""查询库存。"""
return {"sku": sku, "available": _inventory.get(sku, 0)}
@mcp.tool()
def reserve(order_id: str, sku: str, qty: int) -> dict:
"""预留库存(幂等)。"""
if order_id in _reservations:
return _reservations[order_id]
if _inventory.get(sku, 0) < qty:
raise Exception(f"Insufficient inventory: {sku}")
_inventory[sku] -= qty
_reservations[order_id] = {"order_id": order_id, "sku": sku, "qty": qty, "status": "reserved"}
return _reservations[order_id]
@mcp.tool()
def release(order_id: str) -> dict:
"""释放库存(补偿)。"""
if order_id not in _reservations:
return {"order_id": order_id, "status": "noop"}
res = _reservations.pop(order_id)
_inventory[res["sku"]] += res["qty"]
return {"order_id": order_id, "status": "released"}
if __name__ == "__main__":
mcp.run()
payment_server.py / shipping_server.py 结构类似——每个工具有 idempotency_key、有对应的 compensate 动作。略。
4. Temporal Activities:幂等的副作用
# app/activities.py
from temporalio import activity
from app.mcp_clients import inv, pay, ship
@activity.defn
async def reserve_inventory(order_id: str, sku: str, qty: int) -> dict:
"""通过 MCP 调库存预留(幂等)。"""
return await inv.call("reserve", order_id=order_id, sku=sku, qty=qty)
@activity.defn
async def release_inventory(order_id: str) -> dict:
return await inv.call("release", order_id=order_id)
@activity.defn
async def charge_payment(order_id: str, user_id: str, amount: int) -> dict:
return await pay.call("charge", order_id=order_id, user_id=user_id, amount=amount)
@activity.defn
async def refund_payment(order_id: str) -> dict:
return await pay.call("refund", order_id=order_id)
@activity.defn
async def create_shipping(order_id: str, address: str) -> dict:
return await ship.call("create", order_id=order_id, address=address)
@activity.defn
async def cancel_shipping(order_id: str) -> dict:
return await ship.call("cancel", order_id=order_id)
@activity.defn
async def notify_user(user_id: str, msg: str) -> dict:
# 简化:print
print(f"[NOTIFY] user={user_id}: {msg}")
return {"sent": True}
每个 activity 都通过对应 MCP client 调用 server——幂等性由 server 端保证(用 order_id 做去重 key)。
5. LangGraph workflow + Saga
5.1 Temporal workflow 包 LangGraph agent
# app/workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
with workflow.unsafe.imports_passed_through():
from app import activities
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str, user_id: str, sku: str, qty: int) -> dict:
compensations = [] # Saga 补偿栈
result = {"order_id": order_id, "status": "started"}
try:
# === Step 1: 加载 memory ===
user_ctx = await workflow.execute_activity(
activities.load_user_context, user_id,
start_to_close_timeout=timedelta(seconds=10),
)
address = user_ctx.get("default_address", "未知地址")
# === Step 2: 检查库存 ===
stock = await workflow.execute_activity(
activities.check_inventory_activity, sku,
start_to_close_timeout=timedelta(seconds=5),
)
if stock["available"] < qty:
return {**result, "status": "out_of_stock"}
# === Step 3: 预留库存 ⚠️ ===
await workflow.execute_activity(
activities.reserve_inventory, args=[order_id, sku, qty],
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(maximum_attempts=3),
)
compensations.append(("release_inventory", [order_id]))
# === Step 4: 扣支付 ⚠️ ===
amount = qty * 100 # 简化定价
await workflow.execute_activity(
activities.charge_payment, args=[order_id, user_id, amount],
start_to_close_timeout=timedelta(seconds=15),
retry_policy=RetryPolicy(maximum_attempts=3),
)
compensations.append(("refund_payment", [order_id]))
# === Step 5: 发货 ⚠️ ===
await workflow.execute_activity(
activities.create_shipping, args=[order_id, address],
start_to_close_timeout=timedelta(seconds=20),
retry_policy=RetryPolicy(maximum_attempts=3),
)
compensations.append(("cancel_shipping", [order_id]))
# === Step 6: 通知 ===
await workflow.execute_activity(
activities.notify_user, args=[user_id, f"订单 {order_id} 已创建"],
start_to_close_timeout=timedelta(seconds=5),
)
# === Step 7: 写回 Memory ===
await workflow.execute_activity(
activities.write_purchase_memory,
args=[user_id, sku, qty, address],
start_to_close_timeout=timedelta(seconds=10),
)
return {**result, "status": "success", "amount": amount}
except Exception as e:
workflow.logger.error(f"Workflow failed: {e}")
# === Saga:反序执行补偿 ===
for fn_name, args in reversed(compensations):
try:
await workflow.execute_activity(
getattr(activities, fn_name), args=args,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=5), # 补偿更激进重试
)
except Exception as ce:
workflow.logger.error(f"Compensation {fn_name} failed: {ce}")
# 补偿失败 → 标记需要人工干预
await workflow.execute_activity(
activities.mark_for_human_review,
args=[order_id, fn_name, str(ce)],
start_to_close_timeout=timedelta(seconds=10),
)
return {**result, "status": "failed_and_compensated", "error": str(e)}
🌟 设计要点:
- 每个 activity 自己有 retry:LLM/网络抖动自动恢复
- compensations 栈:成功一步就 push 一个补偿动作
- try/except 捕获失败:LangGraph 内部错误也被 Temporal 捕获
- 补偿用更激进的 retry(5 次 vs 3 次)
- 补偿失败兜底:
mark_for_human_review写工单等人
5.2 LangGraph agent 作为 activity
LangGraph agent 作为一个 Temporal activity 执行(可选,更复杂决策时用):
# app/agent.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
async def build_agent():
client = MultiServerMCPClient({
"inventory": {"url": "http://inventory-mcp:8001", "transport": "streamable_http"},
"payment": {"url": "http://payment-mcp:8002", "transport": "streamable_http"},
"shipping": {"url": "http://shipping-mcp:8003", "transport": "streamable_http"},
})
tools = await client.get_tools()
return create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools=tools)
简单业务直接用上面的 Temporal workflow,复杂业务可以让 LangGraph agent 内部决策再调 workflow。
6. Memory 集成
# app/memory_client.py
from mem0 import Memory
import os
memory = Memory.from_config({
"vector_store": {
"provider": "qdrant",
"config": {"host": "qdrant", "port": 6333,
"collection_name": "order_agent_mem"},
},
"llm": {"provider": "openai", "config": {"model": "gpt-4o-mini"}},
"embedder": {"provider": "openai", "config": {"model": "text-embedding-3-small"}},
})
def load_user_context(user_id: str) -> dict:
"""从 mem0 取用户偏好与默认地址。"""
results = memory.search("用户档案 默认地址 VIP 等级", user_id=user_id)
facts = [m["memory"] for m in results.get("results", [])]
# 简化:用 LLM 从 facts 抽出 structured profile
from openai import OpenAI
client = OpenAI()
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"从下面 facts 抽取 default_address 和 vip_level:\n{facts}"}],
response_format={"type": "json_object"},
)
import json
return json.loads(resp.choices[0].message.content)
def write_purchase_memory(user_id: str, sku: str, qty: int, address: str):
memory.add(
messages=[
{"role": "user", "content": f"我买了 {qty} 件 {sku},送到 {address}"},
],
user_id=user_id,
)
7. Observability:LangSmith + OTel
# app/observability.py
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from openinference.instrumentation.langchain import LangChainInstrumentor
def setup_observability():
# 1. OTel pipeline
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://otel-collector:4317", insecure=True)
))
trace.set_tracer_provider(provider)
# 2. 自动 instrument LangChain / LangGraph
LangChainInstrumentor().instrument()
# 3. LangSmith(可选,设环境变量即可)
if os.getenv("LANGSMITH_API_KEY"):
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "order-agent"
infra/otel-collector.yaml:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
exporters:
otlp/tempo:
endpoint: tempo:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp/tempo]
启动后,Grafana(http://localhost:3000)→ Tempo datasource → 搜任何 trace,完整看到 Workflow → Activity → MCP tool → DB 的全链路。
8. 启动与对话
8.1 启动 worker
# app/worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from app import workflows, activities
from app.observability import setup_observability
async def main():
setup_observability()
client = await Client.connect("temporal:7233")
worker = Worker(
client,
task_queue="order-tasks",
workflows=[workflows.OrderSagaWorkflow],
activities=[
activities.load_user_context,
activities.check_inventory_activity,
activities.reserve_inventory,
activities.release_inventory,
activities.charge_payment,
activities.refund_payment,
activities.create_shipping,
activities.cancel_shipping,
activities.notify_user,
activities.write_purchase_memory,
activities.mark_for_human_review,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
8.2 启动 FastAPI 入口
# app/main.py
from fastapi import FastAPI
from temporalio.client import Client
from app.workflows import OrderSagaWorkflow
from uuid import uuid4
app = FastAPI()
_client: Client = None
@app.on_event("startup")
async def startup():
global _client
_client = await Client.connect("temporal:7233")
@app.post("/api/order")
async def create_order(user_id: str, sku: str, qty: int):
order_id = f"order-{uuid4()}"
handle = await _client.start_workflow(
OrderSagaWorkflow.run,
args=[order_id, user_id, sku, qty],
id=order_id,
task_queue="order-tasks",
)
result = await handle.result()
return result
8.3 跑
docker-compose up -d
pip install -r requirements.txt
# 启动 Temporal worker
python -m app.worker &
# 启动 FastAPI
uvicorn app.main:app --port 8000 &
# 启动 3 个 MCP server
python mcp_servers/inventory_server.py --port 8001 &
python mcp_servers/payment_server.py --port 8002 &
python mcp_servers/shipping_server.py --port 8003 &
# 下单
curl -X POST 'http://localhost:8000/api/order?user_id=alice&sku=SKU-001&qty=2'
# {"order_id": "order-...", "status": "success", "amount": 200}
9. 混沌测试:故障注入
9.1 测试 1:支付超时,期望库存回滚
# tests/test_chaos.py
import pytest
from unittest.mock import patch
from app.activities import charge_payment
@pytest.mark.asyncio
async def test_payment_timeout_triggers_rollback(client):
with patch("app.activities.charge_payment", side_effect=TimeoutError("PSP挂了")):
result = await client.post("/api/order", params={
"user_id": "alice", "sku": "SKU-001", "qty": 2,
})
body = result.json()
assert body["status"] == "failed_and_compensated"
# 验证补偿:库存应该回到原值
r = await mcp_call("inventory", "check", sku="SKU-001")
assert r["available"] == 10 # 原始值
9.2 测试 2:发货失败,期望支付退款 + 库存回滚
类似上面,patch create_shipping 抛错,验证 refund_payment 和 release_inventory 都被调用。
9.3 测试 3:补偿失败时人工兜底
patch release_inventory 也失败,验证 mark_for_human_review 被调用,工单写到 DB。
9.4 测试 4:幂等性
同一 order_id 调两次 reserve_inventory,验证库存只扣一次。
9.5 测试 5:Workflow 中途 kill 进程,重启后恢复
# 启动 worker,跑一个长任务
python -m app.worker &
WORKER_PID=$!
curl -X POST 'http://localhost:8000/api/order?user_id=alice&sku=SKU-001&qty=2' &
sleep 3
# 杀 worker
kill -9 $WORKER_PID
# 重启 worker
python -m app.worker &
# Temporal Event History 让 workflow 从断点续跑,最终 status=success
🌟 混沌测试的重点不是测”有没有 bug”,而是验证”系统在故障下行为符合 Saga 设计”。
10. 上线 checklist 与监控仪表盘
10.1 上线 checklist
- 幂等性:所有 activity 用 order_id / idempotency_key
- 超时:每个 activity 都有合理
start_to_close_timeout - 重试策略:transient error 自动重试(3 次起步,补偿用 5 次)
- 补偿动作覆盖:每个有副作用的步骤都有对应 compensate
- 补偿失败兜底:有
mark_for_human_review工单流 - Memory 写入异步:不影响主路径延迟
- MCP server auth:生产用 OAuth,不裸跑
- PII 检测:写 memory 前过 Presidio
- Trace 全开:所有 LLM call / tool call / activity 都有 OTel span
- 告警规则:Saga 补偿率、错误率、cost 异常都有告警
- 手动 override 通道:运营可以人工接管任意 workflow
- 回滚预案:agent 版本灰度,出问题快速 rollback
- Cost 上限:单用户日成本超阈值自动 throttle
- Rate limit:每 user 每分钟下单上限
- Audit log:所有 workflow 启动 / 补偿 / 人工干预都 log
10.2 监控仪表盘(Grafana)
| 面板 | 内容 |
|---|---|
| Workflow throughput | started / completed / failed / compensated 每秒数 |
| Latency P50 / P95 / P99 | 端到端订单处理时间 |
| Compensation rate | failed / total,告警 > 5% |
| Compensation failure rate | comp_fail / comp_total,告警 > 0.1% |
| Token cost(分模型) | 每分钟 input/output tokens 和 USD |
| Tool error rate(分 tool) | inventory / payment / shipping 各自 |
| Memory hit rate | mem0 search 非空比 |
| Agent loop length 分布 | 平均 / P95 step 数,异常飙升告警 |
| 人工干预队列长度 | mark_for_human_review 累积数 |
10.3 成本估算(参考)
10K DAU,平均每天 1 次下单:
| 项 | 量 | 成本 |
|---|---|---|
| 主 LLM(GPT-4o)调用 | 10K × 1 × ~5 LLM call × 1500 token avg | ~$1.5/天 |
| 抽 memory(GPT-4o-mini) | 10K × 1 × 800 token | ~$0.05/天 |
| Embedding | 10K × 几次 × 100 tokens | ~$0.01/天 |
| Temporal | 自托管,EC2 m5.xlarge | ~$5/天 |
| Postgres + Qdrant | EC2 c5.large × 2 | ~$5/天 |
| OTel + Grafana | 自托管 | ~$2/天 |
| 总计 | ~450/月 |
每订单成本 ~ $0.0015——比传统人工客服一单几块钱便宜 1000 倍。这就是 Agent 的经济价值。
✅ 自我检验清单
- 架构图:能默写从 User → FastAPI → Temporal → Activities → MCP → 外部 API 的完整流
- docker-compose 启动:能跑通 Temporal + Postgres + Qdrant + OTel + Grafana
- MCP server:能用 FastMCP 写一个带幂等的 tool
- Saga 反序补偿:能默写 try/except + reversed(compensations) 模式
- 补偿失败兜底:能写 mark_for_human_review 流程
- Memory 集成:能从 mem0 取用户上下文 + 写回购买事实
- OTel 自动 instrument:能 5 行代码接入 LangChainInstrumentor
- 混沌测试 5 项:能写 patch 故障注入测试
- kill -9 重启恢复:能手动验证 Temporal Event History 续跑
- 上线 checklist:能列出至少 10 项关键检查
- 成本估算:能为自己业务算月度成本
📚 参考资料
官方教程
- LangGraph + Temporal samples:github.com/temporalio/samples-python
- Temporal AI agents 系列:temporal.io/blog
- MCP Python SDK:github.com/modelcontextprotocol/python-sdk
- langchain-mcp-adapters:github.com/langchain-ai/langchain-mcp-adapters
- OpenInference Instrumentations:github.com/Arize-ai/openinference
综合实战参考
- Agent Stack 系列:theagentstack.substack.com
- AWS Prescriptive Guidance: Agentic AI Patterns:AWS Docs
- Build durable AI agents with Pydantic and Temporal:Temporal
Chaos / 测试
- Chaos Monkey for Agent:借鉴 Netflix Chaos Engineering 思想
- Hypothesis(Python property-based testing):适合写补偿幂等性测试
🎉 恭喜完成模块六 Agent Runtime!
走完 9 章 + 学习路线总览,你已经掌握:
- 控制流模型(ReAct / Plan-Execute / Reflexion / Graph / Tool-as-Thought)
- 多 Agent 编排(Supervisor / Swarm / Hierarchical / Mesh / Pipeline)
- 8 个主流框架的设计哲学和选型
- Durable Execution(LangGraph / Temporal / Restate)
- Agent Transactions(SagaLLM / Saga / 2PC / Outbox)
- 协议层(MCP + A2A)
- Observability(OTel GenAI / LangSmith / Langfuse)
- 端到端实战(订单 agent 全栈)
模块五 Memory + 模块六 Runtime 合起来 = 一套完整的 production-ready Agent 工程体系。剩下的功夫在你自己的业务里——找一个真实场景,把这套思想落地,跑过基准,持续迭代。Agent 时代的”AI Infra 工程师”就是你。