第6章:Agent Transactions —— Saga / 2PC / Outbox / SagaLLM
为什么 agent 需要事务,Saga / 2PC / Outbox / Event Sourcing 四种模式对比,SagaLLM 论文精读,LangGraph + Temporal 的工程实现
Agent 调了 5 个 tool 的第 4 个失败了,前 3 步副作用怎么办?LLM 第 7 步突然反悔,前 6 步要不要回滚?支付已成功但发货 API 超时——是再试还是退款?这些问题在分布式数据库里有 30 年的成熟答案——Saga / 2PC / Outbox / Event Sourcing。2025 年 SagaLLM 论文(VLDB)首次系统把它们带到 LLM agent 领域。本章把这套”agent 事务”工程一次讲透:论文精读 + 4 种模式对比 + LangGraph/Temporal 实现。这是模块六的灵魂章节。
📑 目录
- 1. 为什么 Agent 需要事务
- 2. ACID 与 BASE:取舍光谱
- 3. 2PC:Two-Phase Commit
- 4. Saga:补偿事务
- 5. Outbox / Event Sourcing
- 6. SagaLLM 论文精读 ⭐
- 7. LangGraph 实现 Saga
- 8. Temporal 实现 Saga
- 9. 工程实战要点
- 自我检验清单
- 参考资料
1. 为什么 Agent 需要事务
1.1 一个让人头大的场景
电商订单 agent:
Step 1: 验证用户
Step 2: 查库存(可用 5 件)
Step 3: 扣库存(剩 4 件) ← 副作用:库存表 -1
Step 4: 调支付(¥1000 已扣) ← 副作用:支付系统记录
Step 5: 发货(快递 API) ← 失败!(快递 API 挂了)
Step 6: 通知用户
Step 5 失败,前 4 步留下的副作用怎么办?
| 副作用 | 不补偿的后果 |
|---|---|
| Step 3 扣的库存 | 库存少了 1,但实际没卖出 → 错账 |
| Step 4 扣的支付 | 用户付了钱没收到货 → 投诉、退款人工干预 |
🌟 Agent 的”工具调用链”本质就是一个分布式事务——每个 tool 都涉及外部状态修改,失败必须能正确回滚。
1.2 Agent 事务比传统更难
| 维度 | 传统分布式事务 | Agent 事务 |
|---|---|---|
| 步骤数 | 通常 < 5 | 可能 10-50 步 |
| 路径 | 静态 | LLM 动态决定 |
| 谁触发回滚 | 程序检测 | LLM 决策 + 程序检测 |
| 补偿动作 | 程序员预定义 | 部分 LLM 生成 |
| 一致性要求 | 强(金融)or 最终(电商) | 通常最终,但需要审计 |
LLM 的 non-deterministic 让 agent 事务更难——同样的 input 第二次跑可能走不同路径,补偿逻辑必须 robust 到任何路径。
2. ACID 与 BASE:取舍光谱
| 性质 | ACID(传统数据库) | BASE(分布式系统) |
|---|---|---|
| A | Atomicity 原子性 | Basically Available 基本可用 |
| C | Consistency 一致性 | Soft state 软状态 |
| I | Isolation 隔离性 | Eventually consistent 最终一致 |
| D | Durability 持久性 | — |
🍎 Agent 几乎不可能 ACID(跨多个外部 API,无法分布式锁所有资源)→ 走 BASE。
但 BASE 不等于”啥都不管”——通过 Saga + 补偿动作 + 幂等性,可以做到”最终一致 + 可审计”。
3. 2PC:Two-Phase Commit
3.1 流程
Phase 1 (Prepare):
Coordinator → 所有参与者:"准备好了吗?"
参与者:"准备好"(锁资源)/ "不行"
Phase 2 (Commit):
如果所有都准备好 → "提交!"
如果有任何一个不行 → "回滚!"
3.2 优劣
✅ 强一致性:要么全部成功,要么全部回滚 ❌ 阻塞:Phase 1 锁资源期间所有参与者等待 ❌ Coordinator 单点:挂了所有人卡住 ❌ 延迟高:两轮网络往返
3.3 Agent 中的 2PC
仅适用于:
- 极少步骤(2-3 个)
- 强一致性要求(金融转账级别)
- 所有参与者都支持 2PC 协议
绝大多数 agent 场景 2PC 不实用——LLM 决策的 non-deterministic 性会让 prepare 阶段的”锁”持续时间不可控。
4. Saga:补偿事务
4.1 思想
由 Garcia-Molina & Salem 1987 年提出。每个步骤定义一个对应的”补偿动作”,任何步骤失败时按反序执行已完成步骤的补偿。
Saga = T1 ─ T2 ─ T3 ─ T4 ─ T5
│ │ │ │ │
C1 C2 C3 C4 C5
执行 T1, T2, T3 成功
T4 失败
→ 反序执行 C3, C2, C1
4.2 两种 Saga 编排方式
Orchestration(集中协调):一个 orchestrator 控制每一步、每个失败:
┌──────────────┐
│ Orchestrator │
└───┬────┬────┬─┘
│ │ │
┌───▼┐ ┌─▼──┐ ┌▼───┐
│ T1 │ │ T2 │ │ T3 │
└────┘ └────┘ └────┘
适合 Agent—— LangGraph / Temporal 都是 orchestrator 范式。
Choreography(分布协作):每个服务监听事件,决定自己的动作:
T1 完成 → emit event → T2 监听并执行 → emit event → T3 ...
失败时 emit failure event,各服务监听并执行自己的补偿
适合微服务松耦合,不太适合 agent(agent 编排需要中心化决策)。
4.3 补偿动作的设计原则
| 原则 | 说明 |
|---|---|
| 幂等(idempotent) | 同一补偿调多次不会出错 |
| 可逆(semantically) | 不一定物理 undo,但语义上恢复一致 |
| 必须成功(critical) | 补偿失败要有人工干预通道 |
| 不依赖状态(stateless preferred) | 给定原 transaction 信息就能跑 |
4.4 不能 undo 的怎么办
很多副作用物理上不能 undo:
- 已发出的邮件(撤回?道歉?)
- 已转走的钱(发起退款 transaction)
- 已发的快递(联系快递员拦截 / 退货流程)
Saga 的”补偿”是”语义上的恢复”,不是”物理 undo”——发邮件失败的补偿是”发一封致歉邮件”,转账成功后回滚的补偿是”再发一笔反向转账”。
5. Outbox / Event Sourcing
5.1 Outbox Pattern
解决”修改 DB + 发消息”的原子性:
传统(有 bug):
UPDATE order SET status='paid'
publish_message(OrderPaid) ← 如果这里失败,DB 改了消息没发
Outbox:
BEGIN TX
UPDATE order SET status='paid'
INSERT INTO outbox (event='OrderPaid') ← 同一事务
COMMIT
另一进程轮询 outbox,把 event 发到 broker,发完删除
保证”DB 改 + 消息发”原子可靠。
5.2 Event Sourcing
把”状态”视为”事件流的累计”:
不存:Order(status='paid', amount=1000)
存:[OrderCreated(...), OrderPaid(...)]
需要 status 时,replay 所有 event 算出来
天然 audit trail + replay,Temporal 的 workflow event history 就是这个思想。
5.3 在 Agent 中
- Outbox:agent 写 memory + 触发外部 API,可以用 outbox 保证两者一致
- Event Sourcing:agent 的所有 step 作为 event 持久化,既是 checkpoint 又是 audit
6. SagaLLM 论文精读 ⭐
Chang et al., “SagaLLM: Context Management, Validation, and Transaction Guarantees for Multi-Agent LLM Planning”, arXiv 2503.11951, VLDB 2025
6.1 核心问题
现有 multi-agent 框架(LangGraph、CrewAI 等)的工作流出错时,处理简单粗暴:重试整个 plan,或人工干预。没有 transaction 语义,缺乏:
- Context isolation:多个 agent 共享 context 时,失败的 agent 不能”污染”其他人的 context
- Validation:没有事中校验”这步结果是否合规”
- Compensation:失败时没有自动 rollback
6.2 SagaLLM 架构
┌─────────────────────┐
│ Orchestrator │
│ (Saga Manager) │
└─┬───────────────────┘
│
┌───────────┼───────────┬───────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Agent │ │ Comp │ │Validator│ │Persist │
│ Node │ │ Agent │ │Agent │ │Memory │
│ (T1) │ │ (C1) │ │ │ │ │
└────────┘ └────────┘ └────────┘ └────────┘
每个 workflow 节点都配:
- 主 agent(执行 transaction)
- 补偿 agent(执行 rollback)
- validator agent(独立校验结果)
- 共享 persistent memory(跨节点 context 一致)
6.3 4 大创新
① 节点 + 边都配补偿
不只 transaction node 有补偿,workflow 的”边”(transition)也有补偿——例如”从 A 到 B 的传递动作如果失败,需要 special handling”。
② LLM 自动生成补偿逻辑
传统 Saga 要程序员手写每个 compensate function。SagaLLM 用 LLM 自动生成:
prompt: "Action T_5 = ship(order_id, address).
If T_5 succeeded but later steps need rollback,
write the compensating action."
LLM: "compensate_T_5(order_id) → 联系快递员 cancel,
如已发出走退货流程,标记 order.status='shipped_then_canceled'"
LLM 还自动生成 state tracking schema、log schema、recovery orchestration 逻辑。
③ Independent Validator Agents
每个 transaction 完成后,一个独立的 validator agent(不参与决策)检查结果:
- 数据是否符合 schema?
- 是否违反业务约束?
- 与历史是否一致?
validator 也用 LLM,但 prompt 专门面向”挑刺”——降低主 agent 自我验证的偏见。
④ Persistent Memory for Context Consistency
多 agent 协作时容易 context drift——A 改了一个事实,B 不知道,导致后续决策矛盾。SagaLLM 引入 persistent shared memory(类似模块五的共享 KG),所有 agent 写入立即对其他 agent 可见。
6.4 实验结果
在 TravelPlanner 和自定义 multi-agent benchmark 上:
- Success rate:+15% 到 +40%(vs vanilla LangGraph / CrewAI)
- Recovery time:从”重跑全部” 到”只重跑失败 + 补偿”,缩短 60%
- Context consistency:跨 agent 决策矛盾率下降 70%
6.5 工业启示
- LLM 自动生成补偿是最大创新——之前必须手写,现在 LLM 给一阶,人工 review
- Validator agent 是低成本高收益——一个便宜小 LLM 做”挑刺”,拦下大量错误
- Persistent memory 是必需品(模块五),不只是 nice-to-have
7. LangGraph 实现 Saga
7.1 基本模式
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
class OrderState(TypedDict):
order_id: str
completed_steps: List[str] # 已完成步骤,用于反序补偿
error: str | None
# 主 transaction
def reserve_inventory(state: OrderState):
inventory_api.reserve(state["order_id"])
return {"completed_steps": state["completed_steps"] + ["inventory"]}
def charge_payment(state: OrderState):
try:
payment_api.charge(state["order_id"])
return {"completed_steps": state["completed_steps"] + ["payment"]}
except Exception as e:
return {"error": str(e)}
def ship_order(state: OrderState):
try:
shipping_api.create(state["order_id"])
return {"completed_steps": state["completed_steps"] + ["shipping"]}
except Exception as e:
return {"error": str(e)}
# 补偿
def compensate(state: OrderState):
"""反序执行已完成步骤的补偿。"""
for step in reversed(state["completed_steps"]):
if step == "inventory":
inventory_api.release(state["order_id"])
elif step == "payment":
payment_api.refund(state["order_id"])
elif step == "shipping":
shipping_api.cancel(state["order_id"])
return {"completed_steps": []}
# 路由:有 error 走补偿
def route(state: OrderState):
return "compensate" if state.get("error") else "next"
graph = StateGraph(OrderState)
graph.add_node("reserve", reserve_inventory)
graph.add_node("charge", charge_payment)
graph.add_node("ship", ship_order)
graph.add_node("compensate", compensate)
graph.add_node("done", lambda s: s)
graph.set_entry_point("reserve")
graph.add_conditional_edges("reserve", route, {"compensate": "compensate", "next": "charge"})
graph.add_conditional_edges("charge", route, {"compensate": "compensate", "next": "ship"})
graph.add_conditional_edges("ship", route, {"compensate": "compensate", "next": "done"})
graph.add_edge("compensate", END)
graph.add_edge("done", END)
app = graph.compile(checkpointer=PostgresSaver(...))
7.2 关键设计
- state 跟踪
completed_steps:补偿时知道”之前做了什么” - 每个节点 catch exception 写入
error字段:不直接 raise,让 graph 决定怎么处理 - conditional edges 路由到 compensate:不需要在每个节点写 try/except 调补偿
- checkpointer 持久化:即使补偿过程中崩溃,重启后能继续补偿(idempotent!)
7.3 SagaLLM 风格的扩展
加 validator + compensation agent:
def validate_node(state: OrderState):
result = validator_llm.invoke(f"Step {state['last_step']} 的结果是否合规? state: {state}")
if "INVALID" in result.content:
return {"error": "validation_failed"}
return {}
# 在每个 transaction 后插入 validate
graph.add_edge("charge", "validate_charge")
graph.add_conditional_edges("validate_charge", route, {...})
8. Temporal 实现 Saga
Temporal 有专门的 Saga helper(各语言 SDK):
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
compensations = []
try:
# T1: 库存
await workflow.execute_activity(
reserve_inventory, order_id,
start_to_close_timeout=timedelta(seconds=30),
)
compensations.append(lambda: workflow.execute_activity(release_inventory, order_id, ...))
# T2: 支付
await workflow.execute_activity(charge_payment, order_id, ...)
compensations.append(lambda: workflow.execute_activity(refund_payment, order_id, ...))
# T3: 发货
await workflow.execute_activity(ship_order, order_id, ...)
compensations.append(lambda: workflow.execute_activity(cancel_shipping, order_id, ...))
return "success"
except Exception as e:
# 反序执行补偿
for compensate in reversed(compensations):
try:
await compensate()
except Exception as ce:
# 补偿失败 → 记录 + 人工干预
workflow.logger.error(f"Compensation failed: {ce}")
await workflow.execute_activity(notify_humans, ...)
raise
8.1 Temporal Saga 的优势
- 每个 activity 自动重试(network blip 不需要补偿)
- 补偿本身也走 activity(也有 retry / timeout)
- Event history 完整审计
- 跨进程持续(机器重启不丢)
8.2 跨语言 SDK
Temporal 的 Saga 在 Python / Go / Java / TS 都有 helper,多语言团队特别合适。
9. 工程实战要点
9.1 幂等性是基石
所有 transaction 和 compensate 必须幂等——重试或重跑不会出错:
# ❌ 不幂等
def charge(order_id):
payment_api.create_charge(order_id, amount=100) # 重试两次扣两次钱
# ✅ 幂等
def charge(order_id):
# 用 idempotency_key,服务端去重
payment_api.create_charge(
order_id,
amount=100,
idempotency_key=f"order_{order_id}_charge",
)
外部 API 不支持 idempotency_key 时,自己加 dedup 表:
def charge(order_id):
if dedup_table.exists(f"charge_{order_id}"):
return # 已扣过
payment_api.create_charge(order_id, amount=100)
dedup_table.insert(f"charge_{order_id}")
9.2 补偿失败怎么办
补偿不能假设永远成功(支付系统挂了你怎么退款?):
| 策略 | 适合 |
|---|---|
| 指数退避重试 | 临时故障 |
| Dead letter queue | 多次重试失败,人工 |
| Manual override | 关键场景必须有兜底 |
| Compensation of compensation | 高级场景,补偿失败再触发更高层补偿 |
9.3 critical path vs nice-to-have
不是所有步骤都需要 Saga 严格补偿:
| 类型 | 例子 | 策略 |
|---|---|---|
| Critical(钱、库存) | 扣款、出库 | 严格 Saga 补偿 |
| Important(用户体验) | 发邮件、积分 | 重试 + log,补偿宽松 |
| Best effort(辅助) | 推荐列表更新 | 失败忽略 |
严格补偿的代价是工程复杂度,只在 critical path 上花。
9.4 与 LangGraph time travel 配合
LangGraph 的 time travel(第 5 章)+ Saga = 强力组合:
- 失败时不只补偿,还能”回到失败前一步分叉,换个策略再试”
- 例:支付失败 → 补偿 → 回到”选支付方式”那一步,换支付宝再试
9.5 监控与告警
| 指标 | 告警阈值 |
|---|---|
| Saga compensation rate | > 5% (异常多回滚) |
| Compensation failure rate | > 0.1% (补偿失败必须低) |
| Manual override count | > 0 立即告警 |
| Average steps per saga | 趋势监控(突增可能是 bug) |
✅ 自我检验清单
- 场景痛点:能用一个具体场景讲清”为什么 agent 需要事务”
- 2PC vs Saga:能讲清两者的优劣和适用边界
- 补偿设计原则:能默写幂等、可逆、必须成功、stateless 4 个原则
- 不能 undo 怎么办:能给 3 个例子(邮件、转账、发货)说明”语义补偿”
- SagaLLM 4 大创新:能默写并解释每一个
- LangGraph Saga 代码:能不查资料写一个 3 步 + 补偿的 graph
- Temporal Saga 代码:能写一个 try/except 反序补偿的 workflow
- 幂等性:能解释 idempotency_key 的作用,以及没有时怎么自己实现
- 补偿失败兜底:能列出至少 3 种应对策略
- critical path 取舍:能说出哪些 step 必须严格补偿、哪些可以宽松
📚 参考资料
经典论文
- Sagas (Garcia-Molina & Salem, 1987) —— Saga 模式鼻祖论文
- Pat Helland — Life Beyond Distributed Transactions —— 分布式事务理论必读
Agent Transactions 论文
- SagaLLM (Chang et al., VLDB 2025):arXiv 2503.11951 ⭐
- transactional-ai npm 包(参考实现):npm
工业实践
- Temporal: Saga Compensating Transactions:Temporal Blog
- Microsoft: Compensating Transaction Pattern:Azure Architecture Center
- AWS: Prompt Chaining Saga Patterns:AWS Prescriptive Guidance
- Compensating Transactions — SAGA:Medium
- Compensation Transaction Patterns — Orkes:Orkes Blog
Outbox / Event Sourcing 背景
- Microservices Patterns (Chris Richardson) - Saga + Outbox
- Designing Data-Intensive Applications (Kleppmann) Chapter 7-12