跳到主要内容
Agent Runtime

第9章:端到端实战 —— 生产级订单 Agent

完整可跑的订单处理 agent:LangGraph 编排 + Temporal durable + MCP 工具 + Saga 补偿 + Mem0 长期记忆 + LangSmith/OTel 全链路观测 + 故障注入测试

实战 LangGraph Temporal MCP Saga LangSmith Chaos Testing

把模块六前 8 章学到的所有东西串起来——本章给一个完整可跑的端到端案例:订单处理 Agent。LangGraph 编排控制流、Temporal 提供 durable execution、MCP server 暴露三类业务工具、Saga 模式保证多步操作的事务一致性、Mem0 复用模块五的长期记忆、LangSmith + OTel 串起全链路 trace、混沌测试验证补偿正确性。配 docker-compose 一键启动,~600 行代码结构清晰,可以直接拿去改造你的业务。

📑 目录


0. 业务场景与架构总览

0.1 业务

电商订单处理 Agent:

User: "帮我下单 SKU-001,2 件,送上次的地址"

Agent:
  1. 加载用户上下文(姓名、上次地址、VIP 等级)              ← Memory(模块五)
  2. 验证库存                                                ← MCP tool: inventory.check
  3. 扣库存(reserve)                                       ← MCP tool: inventory.reserve  ⚠️副作用
  4. 调支付                                                  ← MCP tool: payment.charge   ⚠️副作用
  5. 调发货 API                                             ← MCP tool: shipping.create   ⚠️副作用
  6. 通知用户(邮件 / push)                                  ← MCP tool: notify.send
  7. 写回 Memory:更新购买历史                               ← Memory
  
任何 step 失败 → Saga 反序补偿(release_inventory, refund, cancel_shipping, ...)

0.2 技术栈

┌────────────────────────────────────────────────────────────────┐
│ Observability:LangSmith + OTel Collector → Tempo + Grafana    │
├────────────────────────────────────────────────────────────────┤
│ Runtime:Temporal Workflow(Saga + Durable)                    │
├────────────────────────────────────────────────────────────────┤
│ Agent:LangGraph(Supervisor + 4 个 worker)                    │
├────────────────────────────────────────────────────────────────┤
│ Memory:Mem0 + Qdrant(模块五)                                │
├────────────────────────────────────────────────────────────────┤
│ Tools:3 个 MCP server(inventory / payment / shipping)        │
├────────────────────────────────────────────────────────────────┤
│ LLM:OpenAI(可换 Anthropic / 本地)                          │
└────────────────────────────────────────────────────────────────┘

0.3 完整架构图

                        User


                   ┌──────────────┐
                   │   FastAPI     │  ← /api/order
                   └──────┬───────┘
                          │ 提交 task

                ┌──────────────────┐
                │ Temporal Workflow │  durable + replay
                │ (OrderSagaWorkflow)│
                └──────┬─────┬─────┘
              activity │     │ activity
                ┌──────▼┐  ┌▼────────┐
                │LangGraph│  │ MCP/   │
                │Agent    │  │ Memory │
                │Loop     │  │        │
                └────┬────┘  └────┬───┘
                     │ tool call  │
                     └────────────┴──────► MCP servers
                                           ├ inventory_server
                                           ├ payment_server
                                           └ shipping_server


                                          (mocked external APIs)

1. 项目骨架

order-agent/
├── docker-compose.yml         # Temporal + Postgres + Qdrant + OTel collector + Grafana
├── requirements.txt
├── .env.example
├── README.md
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI 入口
│   ├── workflows.py            # Temporal OrderSagaWorkflow
│   ├── activities.py           # Temporal activities(幂等的副作用)
│   ├── compensations.py        # Saga 补偿动作
│   ├── agent.py                # LangGraph 主 agent
│   ├── memory_client.py        # Mem0 包装
│   ├── mcp_clients.py          # 连接三个 MCP server
│   ├── observability.py        # OTel 设置
│   └── settings.py
├── mcp_servers/
│   ├── inventory_server.py     # MCP server:库存
│   ├── payment_server.py       # MCP server:支付
│   └── shipping_server.py      # MCP server:物流
├── tests/
│   ├── test_happy_path.py
│   ├── test_chaos.py           # 故障注入
│   └── test_idempotency.py
└── infra/
    ├── otel-collector.yaml
    └── grafana/
        └── dashboards/
            └── agent.json

2. docker-compose:基础设施

# docker-compose.yml
version: '3.8'

services:
  # Temporal cluster(单机简化版)
  temporal:
    image: temporalio/auto-setup:1.24
    ports: ["7233:7233"]
    environment:
      - DB=postgres12
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=temporal
      - POSTGRES_SEEDS=postgres
    depends_on: [postgres]
  
  temporal-ui:
    image: temporalio/ui:2.27
    ports: ["8080:8080"]
    environment:
      - TEMPORAL_ADDRESS=temporal:7233
    depends_on: [temporal]
  
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_USER=temporal
      - POSTGRES_PASSWORD=temporal
      - POSTGRES_MULTIPLE_DATABASES=temporal,temporal_visibility,langgraph,mem0
    volumes: ["pg-data:/var/lib/postgresql/data"]
  
  # Memory backend(模块五)
  qdrant:
    image: qdrant/qdrant:v1.10
    ports: ["6333:6333"]
    volumes: ["qdrant-data:/qdrant/storage"]
  
  # Observability stack
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.110.0
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes: ["./infra/otel-collector.yaml:/etc/otel-collector-config.yaml"]
    ports: ["4317:4317", "4318:4318"]
  
  tempo:
    image: grafana/tempo:2.6
    command: ["-config.file=/etc/tempo.yaml"]
    ports: ["3200:3200"]
  
  grafana:
    image: grafana/grafana:11
    ports: ["3000:3000"]
    volumes: ["./infra/grafana/dashboards:/var/lib/grafana/dashboards"]

volumes:
  pg-data:
  qdrant-data:

启动:docker-compose up -d


3. MCP server:暴露三类业务工具

# mcp_servers/inventory_server.py
from mcp.server.fastmcp import FastMCP
import random

mcp = FastMCP("inventory")
_inventory = {"SKU-001": 10, "SKU-002": 5}
_reservations = {}  # 幂等表

@mcp.tool()
def check(sku: str) -> dict:
    """查询库存。"""
    return {"sku": sku, "available": _inventory.get(sku, 0)}

@mcp.tool()
def reserve(order_id: str, sku: str, qty: int) -> dict:
    """预留库存(幂等)。"""
    if order_id in _reservations:
        return _reservations[order_id]
    if _inventory.get(sku, 0) < qty:
        raise Exception(f"Insufficient inventory: {sku}")
    _inventory[sku] -= qty
    _reservations[order_id] = {"order_id": order_id, "sku": sku, "qty": qty, "status": "reserved"}
    return _reservations[order_id]

@mcp.tool()
def release(order_id: str) -> dict:
    """释放库存(补偿)。"""
    if order_id not in _reservations:
        return {"order_id": order_id, "status": "noop"}
    res = _reservations.pop(order_id)
    _inventory[res["sku"]] += res["qty"]
    return {"order_id": order_id, "status": "released"}

if __name__ == "__main__":
    mcp.run()

payment_server.py / shipping_server.py 结构类似——每个工具有 idempotency_key、有对应的 compensate 动作。略。


4. Temporal Activities:幂等的副作用

# app/activities.py
from temporalio import activity
from app.mcp_clients import inv, pay, ship

@activity.defn
async def reserve_inventory(order_id: str, sku: str, qty: int) -> dict:
    """通过 MCP 调库存预留(幂等)。"""
    return await inv.call("reserve", order_id=order_id, sku=sku, qty=qty)

@activity.defn
async def release_inventory(order_id: str) -> dict:
    return await inv.call("release", order_id=order_id)

@activity.defn
async def charge_payment(order_id: str, user_id: str, amount: int) -> dict:
    return await pay.call("charge", order_id=order_id, user_id=user_id, amount=amount)

@activity.defn
async def refund_payment(order_id: str) -> dict:
    return await pay.call("refund", order_id=order_id)

@activity.defn
async def create_shipping(order_id: str, address: str) -> dict:
    return await ship.call("create", order_id=order_id, address=address)

@activity.defn
async def cancel_shipping(order_id: str) -> dict:
    return await ship.call("cancel", order_id=order_id)

@activity.defn
async def notify_user(user_id: str, msg: str) -> dict:
    # 简化:print
    print(f"[NOTIFY] user={user_id}: {msg}")
    return {"sent": True}

每个 activity 都通过对应 MCP client 调用 server——幂等性由 server 端保证(用 order_id 做去重 key)。


5. LangGraph workflow + Saga

5.1 Temporal workflow 包 LangGraph agent

# app/workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

with workflow.unsafe.imports_passed_through():
    from app import activities

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str, user_id: str, sku: str, qty: int) -> dict:
        compensations = []   # Saga 补偿栈
        result = {"order_id": order_id, "status": "started"}
        
        try:
            # === Step 1: 加载 memory ===
            user_ctx = await workflow.execute_activity(
                activities.load_user_context, user_id,
                start_to_close_timeout=timedelta(seconds=10),
            )
            address = user_ctx.get("default_address", "未知地址")
            
            # === Step 2: 检查库存 ===
            stock = await workflow.execute_activity(
                activities.check_inventory_activity, sku,
                start_to_close_timeout=timedelta(seconds=5),
            )
            if stock["available"] < qty:
                return {**result, "status": "out_of_stock"}
            
            # === Step 3: 预留库存 ⚠️ ===
            await workflow.execute_activity(
                activities.reserve_inventory, args=[order_id, sku, qty],
                start_to_close_timeout=timedelta(seconds=10),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            compensations.append(("release_inventory", [order_id]))
            
            # === Step 4: 扣支付 ⚠️ ===
            amount = qty * 100  # 简化定价
            await workflow.execute_activity(
                activities.charge_payment, args=[order_id, user_id, amount],
                start_to_close_timeout=timedelta(seconds=15),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            compensations.append(("refund_payment", [order_id]))
            
            # === Step 5: 发货 ⚠️ ===
            await workflow.execute_activity(
                activities.create_shipping, args=[order_id, address],
                start_to_close_timeout=timedelta(seconds=20),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            compensations.append(("cancel_shipping", [order_id]))
            
            # === Step 6: 通知 ===
            await workflow.execute_activity(
                activities.notify_user, args=[user_id, f"订单 {order_id} 已创建"],
                start_to_close_timeout=timedelta(seconds=5),
            )
            
            # === Step 7: 写回 Memory ===
            await workflow.execute_activity(
                activities.write_purchase_memory,
                args=[user_id, sku, qty, address],
                start_to_close_timeout=timedelta(seconds=10),
            )
            
            return {**result, "status": "success", "amount": amount}
        
        except Exception as e:
            workflow.logger.error(f"Workflow failed: {e}")
            # === Saga:反序执行补偿 ===
            for fn_name, args in reversed(compensations):
                try:
                    await workflow.execute_activity(
                        getattr(activities, fn_name), args=args,
                        start_to_close_timeout=timedelta(seconds=30),
                        retry_policy=RetryPolicy(maximum_attempts=5),  # 补偿更激进重试
                    )
                except Exception as ce:
                    workflow.logger.error(f"Compensation {fn_name} failed: {ce}")
                    # 补偿失败 → 标记需要人工干预
                    await workflow.execute_activity(
                        activities.mark_for_human_review,
                        args=[order_id, fn_name, str(ce)],
                        start_to_close_timeout=timedelta(seconds=10),
                    )
            return {**result, "status": "failed_and_compensated", "error": str(e)}

🌟 设计要点:

  • 每个 activity 自己有 retry:LLM/网络抖动自动恢复
  • compensations 栈:成功一步就 push 一个补偿动作
  • try/except 捕获失败:LangGraph 内部错误也被 Temporal 捕获
  • 补偿用更激进的 retry(5 次 vs 3 次)
  • 补偿失败兜底:mark_for_human_review 写工单等人

5.2 LangGraph agent 作为 activity

LangGraph agent 作为一个 Temporal activity 执行(可选,更复杂决策时用):

# app/agent.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient

async def build_agent():
    client = MultiServerMCPClient({
        "inventory": {"url": "http://inventory-mcp:8001", "transport": "streamable_http"},
        "payment":   {"url": "http://payment-mcp:8002", "transport": "streamable_http"},
        "shipping":  {"url": "http://shipping-mcp:8003", "transport": "streamable_http"},
    })
    tools = await client.get_tools()
    return create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools=tools)

简单业务直接用上面的 Temporal workflow,复杂业务可以让 LangGraph agent 内部决策再调 workflow。


6. Memory 集成

# app/memory_client.py
from mem0 import Memory
import os

memory = Memory.from_config({
    "vector_store": {
        "provider": "qdrant",
        "config": {"host": "qdrant", "port": 6333,
                   "collection_name": "order_agent_mem"},
    },
    "llm": {"provider": "openai", "config": {"model": "gpt-4o-mini"}},
    "embedder": {"provider": "openai", "config": {"model": "text-embedding-3-small"}},
})

def load_user_context(user_id: str) -> dict:
    """从 mem0 取用户偏好与默认地址。"""
    results = memory.search("用户档案 默认地址 VIP 等级", user_id=user_id)
    facts = [m["memory"] for m in results.get("results", [])]
    
    # 简化:用 LLM 从 facts 抽出 structured profile
    from openai import OpenAI
    client = OpenAI()
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"从下面 facts 抽取 default_address 和 vip_level:\n{facts}"}],
        response_format={"type": "json_object"},
    )
    import json
    return json.loads(resp.choices[0].message.content)

def write_purchase_memory(user_id: str, sku: str, qty: int, address: str):
    memory.add(
        messages=[
            {"role": "user", "content": f"我买了 {qty}{sku},送到 {address}"},
        ],
        user_id=user_id,
    )

7. Observability:LangSmith + OTel

# app/observability.py
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from openinference.instrumentation.langchain import LangChainInstrumentor

def setup_observability():
    # 1. OTel pipeline
    provider = TracerProvider()
    provider.add_span_processor(BatchSpanProcessor(
        OTLPSpanExporter(endpoint="http://otel-collector:4317", insecure=True)
    ))
    trace.set_tracer_provider(provider)
    
    # 2. 自动 instrument LangChain / LangGraph
    LangChainInstrumentor().instrument()
    
    # 3. LangSmith(可选,设环境变量即可)
    if os.getenv("LANGSMITH_API_KEY"):
        os.environ["LANGSMITH_TRACING"] = "true"
        os.environ["LANGSMITH_PROJECT"] = "order-agent"

infra/otel-collector.yaml:

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

exporters:
  otlp/tempo:
    endpoint: tempo:4317
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [otlp/tempo]

启动后,Grafana(http://localhost:3000)→ Tempo datasource → 搜任何 trace,完整看到 Workflow → Activity → MCP tool → DB 的全链路。


8. 启动与对话

8.1 启动 worker

# app/worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from app import workflows, activities
from app.observability import setup_observability

async def main():
    setup_observability()
    client = await Client.connect("temporal:7233")
    worker = Worker(
        client,
        task_queue="order-tasks",
        workflows=[workflows.OrderSagaWorkflow],
        activities=[
            activities.load_user_context,
            activities.check_inventory_activity,
            activities.reserve_inventory,
            activities.release_inventory,
            activities.charge_payment,
            activities.refund_payment,
            activities.create_shipping,
            activities.cancel_shipping,
            activities.notify_user,
            activities.write_purchase_memory,
            activities.mark_for_human_review,
        ],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

8.2 启动 FastAPI 入口

# app/main.py
from fastapi import FastAPI
from temporalio.client import Client
from app.workflows import OrderSagaWorkflow
from uuid import uuid4

app = FastAPI()
_client: Client = None

@app.on_event("startup")
async def startup():
    global _client
    _client = await Client.connect("temporal:7233")

@app.post("/api/order")
async def create_order(user_id: str, sku: str, qty: int):
    order_id = f"order-{uuid4()}"
    handle = await _client.start_workflow(
        OrderSagaWorkflow.run,
        args=[order_id, user_id, sku, qty],
        id=order_id,
        task_queue="order-tasks",
    )
    result = await handle.result()
    return result

8.3 跑

docker-compose up -d
pip install -r requirements.txt

# 启动 Temporal worker
python -m app.worker &

# 启动 FastAPI
uvicorn app.main:app --port 8000 &

# 启动 3 个 MCP server
python mcp_servers/inventory_server.py --port 8001 &
python mcp_servers/payment_server.py   --port 8002 &
python mcp_servers/shipping_server.py  --port 8003 &

# 下单
curl -X POST 'http://localhost:8000/api/order?user_id=alice&sku=SKU-001&qty=2'
# {"order_id": "order-...", "status": "success", "amount": 200}

9. 混沌测试:故障注入

9.1 测试 1:支付超时,期望库存回滚

# tests/test_chaos.py
import pytest
from unittest.mock import patch
from app.activities import charge_payment

@pytest.mark.asyncio
async def test_payment_timeout_triggers_rollback(client):
    with patch("app.activities.charge_payment", side_effect=TimeoutError("PSP挂了")):
        result = await client.post("/api/order", params={
            "user_id": "alice", "sku": "SKU-001", "qty": 2,
        })
        body = result.json()
        assert body["status"] == "failed_and_compensated"
    
    # 验证补偿:库存应该回到原值
    r = await mcp_call("inventory", "check", sku="SKU-001")
    assert r["available"] == 10  # 原始值

9.2 测试 2:发货失败,期望支付退款 + 库存回滚

类似上面,patch create_shipping 抛错,验证 refund_paymentrelease_inventory 都被调用。

9.3 测试 3:补偿失败时人工兜底

patch release_inventory 也失败,验证 mark_for_human_review 被调用,工单写到 DB。

9.4 测试 4:幂等性

同一 order_id 调两次 reserve_inventory,验证库存只扣一次。

9.5 测试 5:Workflow 中途 kill 进程,重启后恢复

# 启动 worker,跑一个长任务
python -m app.worker &
WORKER_PID=$!
curl -X POST 'http://localhost:8000/api/order?user_id=alice&sku=SKU-001&qty=2' &
sleep 3

# 杀 worker
kill -9 $WORKER_PID

# 重启 worker
python -m app.worker &

# Temporal Event History 让 workflow 从断点续跑,最终 status=success

🌟 混沌测试的重点不是测”有没有 bug”,而是验证”系统在故障下行为符合 Saga 设计”


10. 上线 checklist 与监控仪表盘

10.1 上线 checklist

  • 幂等性:所有 activity 用 order_id / idempotency_key
  • 超时:每个 activity 都有合理 start_to_close_timeout
  • 重试策略:transient error 自动重试(3 次起步,补偿用 5 次)
  • 补偿动作覆盖:每个有副作用的步骤都有对应 compensate
  • 补偿失败兜底:有 mark_for_human_review 工单流
  • Memory 写入异步:不影响主路径延迟
  • MCP server auth:生产用 OAuth,不裸跑
  • PII 检测:写 memory 前过 Presidio
  • Trace 全开:所有 LLM call / tool call / activity 都有 OTel span
  • 告警规则:Saga 补偿率、错误率、cost 异常都有告警
  • 手动 override 通道:运营可以人工接管任意 workflow
  • 回滚预案:agent 版本灰度,出问题快速 rollback
  • Cost 上限:单用户日成本超阈值自动 throttle
  • Rate limit:每 user 每分钟下单上限
  • Audit log:所有 workflow 启动 / 补偿 / 人工干预都 log

10.2 监控仪表盘(Grafana)

面板内容
Workflow throughputstarted / completed / failed / compensated 每秒数
Latency P50 / P95 / P99端到端订单处理时间
Compensation ratefailed / total,告警 > 5%
Compensation failure ratecomp_fail / comp_total,告警 > 0.1%
Token cost(分模型)每分钟 input/output tokens 和 USD
Tool error rate(分 tool)inventory / payment / shipping 各自
Memory hit ratemem0 search 非空比
Agent loop length 分布平均 / P95 step 数,异常飙升告警
人工干预队列长度mark_for_human_review 累积数

10.3 成本估算(参考)

10K DAU,平均每天 1 次下单:

成本
主 LLM(GPT-4o)调用10K × 1 × ~5 LLM call × 1500 token avg~$1.5/天
抽 memory(GPT-4o-mini)10K × 1 × 800 token~$0.05/天
Embedding10K × 几次 × 100 tokens~$0.01/天
Temporal自托管,EC2 m5.xlarge~$5/天
Postgres + QdrantEC2 c5.large × 2~$5/天
OTel + Grafana自托管~$2/天
总计~15/15/天 ≈ 450/月

每订单成本 ~ $0.0015——比传统人工客服一单几块钱便宜 1000 倍。这就是 Agent 的经济价值


✅ 自我检验清单

  • 架构图:能默写从 User → FastAPI → Temporal → Activities → MCP → 外部 API 的完整流
  • docker-compose 启动:能跑通 Temporal + Postgres + Qdrant + OTel + Grafana
  • MCP server:能用 FastMCP 写一个带幂等的 tool
  • Saga 反序补偿:能默写 try/except + reversed(compensations) 模式
  • 补偿失败兜底:能写 mark_for_human_review 流程
  • Memory 集成:能从 mem0 取用户上下文 + 写回购买事实
  • OTel 自动 instrument:能 5 行代码接入 LangChainInstrumentor
  • 混沌测试 5 项:能写 patch 故障注入测试
  • kill -9 重启恢复:能手动验证 Temporal Event History 续跑
  • 上线 checklist:能列出至少 10 项关键检查
  • 成本估算:能为自己业务算月度成本

📚 参考资料

官方教程

综合实战参考

Chaos / 测试

  • Chaos Monkey for Agent:借鉴 Netflix Chaos Engineering 思想
  • Hypothesis(Python property-based testing):适合写补偿幂等性测试

🎉 恭喜完成模块六 Agent Runtime!

走完 9 章 + 学习路线总览,你已经掌握:

  • 控制流模型(ReAct / Plan-Execute / Reflexion / Graph / Tool-as-Thought)
  • 多 Agent 编排(Supervisor / Swarm / Hierarchical / Mesh / Pipeline)
  • 8 个主流框架的设计哲学和选型
  • Durable Execution(LangGraph / Temporal / Restate)
  • Agent Transactions(SagaLLM / Saga / 2PC / Outbox)
  • 协议层(MCP + A2A)
  • Observability(OTel GenAI / LangSmith / Langfuse)
  • 端到端实战(订单 agent 全栈)

模块五 Memory + 模块六 Runtime 合起来 = 一套完整的 production-ready Agent 工程体系。剩下的功夫在你自己的业务里——找一个真实场景,把这套思想落地,跑过基准,持续迭代。Agent 时代的”AI Infra 工程师”就是你