跳到主要内容
Agent Runtime

第6章:Agent Transactions —— Saga / 2PC / Outbox / SagaLLM

为什么 agent 需要事务,Saga / 2PC / Outbox / Event Sourcing 四种模式对比,SagaLLM 论文精读,LangGraph + Temporal 的工程实现

Saga Transaction SagaLLM Compensation 2PC Outbox Event Sourcing

Agent 调了 5 个 tool 的第 4 个失败了,前 3 步副作用怎么办?LLM 第 7 步突然反悔,前 6 步要不要回滚?支付已成功但发货 API 超时——是再试还是退款?这些问题在分布式数据库里有 30 年的成熟答案——Saga / 2PC / Outbox / Event Sourcing。2025 年 SagaLLM 论文(VLDB)首次系统把它们带到 LLM agent 领域。本章把这套”agent 事务”工程一次讲透:论文精读 + 4 种模式对比 + LangGraph/Temporal 实现。这是模块六的灵魂章节。

📑 目录


1. 为什么 Agent 需要事务

1.1 一个让人头大的场景

电商订单 agent:

Step 1: 验证用户
Step 2: 查库存(可用 5 件)
Step 3: 扣库存(剩 4 件)        ← 副作用:库存表 -1
Step 4: 调支付(¥1000 已扣)     ← 副作用:支付系统记录
Step 5: 发货(快递 API)         ← 失败!(快递 API 挂了)
Step 6: 通知用户

Step 5 失败,前 4 步留下的副作用怎么办?

副作用不补偿的后果
Step 3 扣的库存库存少了 1,但实际没卖出 → 错账
Step 4 扣的支付用户付了钱没收到货 → 投诉、退款人工干预

🌟 Agent 的”工具调用链”本质就是一个分布式事务——每个 tool 都涉及外部状态修改,失败必须能正确回滚。

1.2 Agent 事务比传统更难

维度传统分布式事务Agent 事务
步骤数通常 < 5可能 10-50 步
路径静态LLM 动态决定
谁触发回滚程序检测LLM 决策 + 程序检测
补偿动作程序员预定义部分 LLM 生成
一致性要求强(金融)or 最终(电商)通常最终,但需要审计

LLM 的 non-deterministic 让 agent 事务更难——同样的 input 第二次跑可能走不同路径,补偿逻辑必须 robust 到任何路径


2. ACID 与 BASE:取舍光谱

性质ACID(传统数据库)BASE(分布式系统)
AAtomicity 原子性Basically Available 基本可用
CConsistency 一致性Soft state 软状态
IIsolation 隔离性Eventually consistent 最终一致
DDurability 持久性

🍎 Agent 几乎不可能 ACID(跨多个外部 API,无法分布式锁所有资源)→ 走 BASE。

但 BASE 不等于”啥都不管”——通过 Saga + 补偿动作 + 幂等性,可以做到”最终一致 + 可审计”


3. 2PC:Two-Phase Commit

3.1 流程

Phase 1 (Prepare):
  Coordinator → 所有参与者:"准备好了吗?"
  参与者:"准备好"(锁资源)/ "不行"
  
Phase 2 (Commit):
  如果所有都准备好 → "提交!"
  如果有任何一个不行 → "回滚!"

3.2 优劣

强一致性:要么全部成功,要么全部回滚 ❌ 阻塞:Phase 1 锁资源期间所有参与者等待 ❌ Coordinator 单点:挂了所有人卡住 ❌ 延迟高:两轮网络往返

3.3 Agent 中的 2PC

仅适用于:

  • 极少步骤(2-3 个)
  • 强一致性要求(金融转账级别)
  • 所有参与者都支持 2PC 协议

绝大多数 agent 场景 2PC 不实用——LLM 决策的 non-deterministic 性会让 prepare 阶段的”锁”持续时间不可控。


4. Saga:补偿事务

4.1 思想

由 Garcia-Molina & Salem 1987 年提出。每个步骤定义一个对应的”补偿动作”,任何步骤失败时按反序执行已完成步骤的补偿。

Saga = T1 ─ T2 ─ T3 ─ T4 ─ T5
        │    │    │    │    │
       C1   C2   C3   C4   C5

执行 T1, T2, T3 成功
T4 失败
→ 反序执行 C3, C2, C1

4.2 两种 Saga 编排方式

Orchestration(集中协调):一个 orchestrator 控制每一步、每个失败:

       ┌──────────────┐
       │ Orchestrator  │
       └───┬────┬────┬─┘
           │    │    │
       ┌───▼┐ ┌─▼──┐ ┌▼───┐
       │ T1 │ │ T2 │ │ T3 │
       └────┘ └────┘ └────┘

适合 Agent—— LangGraph / Temporal 都是 orchestrator 范式

Choreography(分布协作):每个服务监听事件,决定自己的动作:

T1 完成 → emit event → T2 监听并执行 → emit event → T3 ...
失败时 emit failure event,各服务监听并执行自己的补偿

适合微服务松耦合,不太适合 agent(agent 编排需要中心化决策)。

4.3 补偿动作的设计原则

原则说明
幂等(idempotent)同一补偿调多次不会出错
可逆(semantically)不一定物理 undo,但语义上恢复一致
必须成功(critical)补偿失败要有人工干预通道
不依赖状态(stateless preferred)给定原 transaction 信息就能跑

4.4 不能 undo 的怎么办

很多副作用物理上不能 undo:

  • 已发出的邮件(撤回?道歉?)
  • 已转走的钱(发起退款 transaction)
  • 已发的快递(联系快递员拦截 / 退货流程)

Saga 的”补偿”是”语义上的恢复”,不是”物理 undo”——发邮件失败的补偿是”发一封致歉邮件”,转账成功后回滚的补偿是”再发一笔反向转账”。


5. Outbox / Event Sourcing

5.1 Outbox Pattern

解决”修改 DB + 发消息”的原子性:

传统(有 bug):
  UPDATE order SET status='paid'
  publish_message(OrderPaid)        ← 如果这里失败,DB 改了消息没发

Outbox:
  BEGIN TX
    UPDATE order SET status='paid'
    INSERT INTO outbox (event='OrderPaid')   ← 同一事务
  COMMIT
  
  另一进程轮询 outbox,把 event 发到 broker,发完删除

保证”DB 改 + 消息发”原子可靠

5.2 Event Sourcing

把”状态”视为”事件流的累计”:

不存:Order(status='paid', amount=1000)
存:[OrderCreated(...), OrderPaid(...)]

需要 status 时,replay 所有 event 算出来

天然 audit trail + replay,Temporal 的 workflow event history 就是这个思想

5.3 在 Agent 中

  • Outbox:agent 写 memory + 触发外部 API,可以用 outbox 保证两者一致
  • Event Sourcing:agent 的所有 step 作为 event 持久化,既是 checkpoint 又是 audit

6. SagaLLM 论文精读 ⭐

Chang et al., “SagaLLM: Context Management, Validation, and Transaction Guarantees for Multi-Agent LLM Planning”, arXiv 2503.11951, VLDB 2025

6.1 核心问题

现有 multi-agent 框架(LangGraph、CrewAI 等)的工作流出错时,处理简单粗暴:重试整个 plan,或人工干预。没有 transaction 语义,缺乏:

  • Context isolation:多个 agent 共享 context 时,失败的 agent 不能”污染”其他人的 context
  • Validation:没有事中校验”这步结果是否合规”
  • Compensation:失败时没有自动 rollback

6.2 SagaLLM 架构

                      ┌─────────────────────┐
                      │  Orchestrator        │
                      │  (Saga Manager)      │
                      └─┬───────────────────┘

            ┌───────────┼───────────┬───────────┐
            ▼           ▼           ▼           ▼
       ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐
       │ Agent  │  │ Comp   │  │Validator│  │Persist │
       │ Node   │  │ Agent  │  │Agent    │  │Memory  │
       │ (T1)   │  │ (C1)   │  │         │  │        │
       └────────┘  └────────┘  └────────┘  └────────┘
       
       每个 workflow 节点都配:
       - 主 agent(执行 transaction)
       - 补偿 agent(执行 rollback)
       - validator agent(独立校验结果)
       - 共享 persistent memory(跨节点 context 一致)

6.3 4 大创新

① 节点 + 边都配补偿

不只 transaction node 有补偿,workflow 的”边”(transition)也有补偿——例如”从 A 到 B 的传递动作如果失败,需要 special handling”。

② LLM 自动生成补偿逻辑

传统 Saga 要程序员手写每个 compensate function。SagaLLM 用 LLM 自动生成:

prompt: "Action T_5 = ship(order_id, address). 
        If T_5 succeeded but later steps need rollback,
        write the compensating action."
LLM: "compensate_T_5(order_id) → 联系快递员 cancel,
       如已发出走退货流程,标记 order.status='shipped_then_canceled'"

LLM 还自动生成 state tracking schema、log schema、recovery orchestration 逻辑。

③ Independent Validator Agents

每个 transaction 完成后,一个独立的 validator agent(不参与决策)检查结果:

  • 数据是否符合 schema?
  • 是否违反业务约束?
  • 与历史是否一致?

validator 也用 LLM,但 prompt 专门面向”挑刺”——降低主 agent 自我验证的偏见。

④ Persistent Memory for Context Consistency

多 agent 协作时容易 context drift——A 改了一个事实,B 不知道,导致后续决策矛盾。SagaLLM 引入 persistent shared memory(类似模块五的共享 KG),所有 agent 写入立即对其他 agent 可见。

6.4 实验结果

TravelPlanner 和自定义 multi-agent benchmark 上:

  • Success rate:+15% 到 +40%(vs vanilla LangGraph / CrewAI)
  • Recovery time:从”重跑全部” 到”只重跑失败 + 补偿”,缩短 60%
  • Context consistency:跨 agent 决策矛盾率下降 70%

6.5 工业启示

  • LLM 自动生成补偿是最大创新——之前必须手写,现在 LLM 给一阶,人工 review
  • Validator agent 是低成本高收益——一个便宜小 LLM 做”挑刺”,拦下大量错误
  • Persistent memory 是必需品(模块五),不只是 nice-to-have

7. LangGraph 实现 Saga

7.1 基本模式

from langgraph.graph import StateGraph, END
from typing import TypedDict, List

class OrderState(TypedDict):
    order_id: str
    completed_steps: List[str]   # 已完成步骤,用于反序补偿
    error: str | None

# 主 transaction
def reserve_inventory(state: OrderState):
    inventory_api.reserve(state["order_id"])
    return {"completed_steps": state["completed_steps"] + ["inventory"]}

def charge_payment(state: OrderState):
    try:
        payment_api.charge(state["order_id"])
        return {"completed_steps": state["completed_steps"] + ["payment"]}
    except Exception as e:
        return {"error": str(e)}

def ship_order(state: OrderState):
    try:
        shipping_api.create(state["order_id"])
        return {"completed_steps": state["completed_steps"] + ["shipping"]}
    except Exception as e:
        return {"error": str(e)}

# 补偿
def compensate(state: OrderState):
    """反序执行已完成步骤的补偿。"""
    for step in reversed(state["completed_steps"]):
        if step == "inventory":
            inventory_api.release(state["order_id"])
        elif step == "payment":
            payment_api.refund(state["order_id"])
        elif step == "shipping":
            shipping_api.cancel(state["order_id"])
    return {"completed_steps": []}

# 路由:有 error 走补偿
def route(state: OrderState):
    return "compensate" if state.get("error") else "next"

graph = StateGraph(OrderState)
graph.add_node("reserve", reserve_inventory)
graph.add_node("charge", charge_payment)
graph.add_node("ship", ship_order)
graph.add_node("compensate", compensate)
graph.add_node("done", lambda s: s)

graph.set_entry_point("reserve")
graph.add_conditional_edges("reserve", route, {"compensate": "compensate", "next": "charge"})
graph.add_conditional_edges("charge", route, {"compensate": "compensate", "next": "ship"})
graph.add_conditional_edges("ship", route, {"compensate": "compensate", "next": "done"})
graph.add_edge("compensate", END)
graph.add_edge("done", END)

app = graph.compile(checkpointer=PostgresSaver(...))

7.2 关键设计

  • state 跟踪 completed_steps:补偿时知道”之前做了什么”
  • 每个节点 catch exception 写入 error 字段:不直接 raise,让 graph 决定怎么处理
  • conditional edges 路由到 compensate:不需要在每个节点写 try/except 调补偿
  • checkpointer 持久化:即使补偿过程中崩溃,重启后能继续补偿(idempotent!)

7.3 SagaLLM 风格的扩展

加 validator + compensation agent:

def validate_node(state: OrderState):
    result = validator_llm.invoke(f"Step {state['last_step']} 的结果是否合规? state: {state}")
    if "INVALID" in result.content:
        return {"error": "validation_failed"}
    return {}

# 在每个 transaction 后插入 validate
graph.add_edge("charge", "validate_charge")
graph.add_conditional_edges("validate_charge", route, {...})

8. Temporal 实现 Saga

Temporal 有专门的 Saga helper(各语言 SDK):

from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        compensations = []
        
        try:
            # T1: 库存
            await workflow.execute_activity(
                reserve_inventory, order_id,
                start_to_close_timeout=timedelta(seconds=30),
            )
            compensations.append(lambda: workflow.execute_activity(release_inventory, order_id, ...))
            
            # T2: 支付
            await workflow.execute_activity(charge_payment, order_id, ...)
            compensations.append(lambda: workflow.execute_activity(refund_payment, order_id, ...))
            
            # T3: 发货
            await workflow.execute_activity(ship_order, order_id, ...)
            compensations.append(lambda: workflow.execute_activity(cancel_shipping, order_id, ...))
            
            return "success"
        
        except Exception as e:
            # 反序执行补偿
            for compensate in reversed(compensations):
                try:
                    await compensate()
                except Exception as ce:
                    # 补偿失败 → 记录 + 人工干预
                    workflow.logger.error(f"Compensation failed: {ce}")
                    await workflow.execute_activity(notify_humans, ...)
            raise

8.1 Temporal Saga 的优势

  • 每个 activity 自动重试(network blip 不需要补偿)
  • 补偿本身也走 activity(也有 retry / timeout)
  • Event history 完整审计
  • 跨进程持续(机器重启不丢)

8.2 跨语言 SDK

Temporal 的 Saga 在 Python / Go / Java / TS 都有 helper,多语言团队特别合适


9. 工程实战要点

9.1 幂等性是基石

所有 transaction 和 compensate 必须幂等——重试或重跑不会出错:

# ❌ 不幂等
def charge(order_id):
    payment_api.create_charge(order_id, amount=100)   # 重试两次扣两次钱

# ✅ 幂等
def charge(order_id):
    # 用 idempotency_key,服务端去重
    payment_api.create_charge(
        order_id,
        amount=100,
        idempotency_key=f"order_{order_id}_charge",
    )

外部 API 不支持 idempotency_key 时,自己加 dedup 表:

def charge(order_id):
    if dedup_table.exists(f"charge_{order_id}"):
        return  # 已扣过
    payment_api.create_charge(order_id, amount=100)
    dedup_table.insert(f"charge_{order_id}")

9.2 补偿失败怎么办

补偿不能假设永远成功(支付系统挂了你怎么退款?):

策略适合
指数退避重试临时故障
Dead letter queue多次重试失败,人工
Manual override关键场景必须有兜底
Compensation of compensation高级场景,补偿失败再触发更高层补偿

9.3 critical path vs nice-to-have

不是所有步骤都需要 Saga 严格补偿:

类型例子策略
Critical(钱、库存)扣款、出库严格 Saga 补偿
Important(用户体验)发邮件、积分重试 + log,补偿宽松
Best effort(辅助)推荐列表更新失败忽略

严格补偿的代价是工程复杂度,只在 critical path 上花

9.4 与 LangGraph time travel 配合

LangGraph 的 time travel(第 5 章)+ Saga = 强力组合:

  • 失败时不只补偿,还能”回到失败前一步分叉,换个策略再试”
  • 例:支付失败 → 补偿 → 回到”选支付方式”那一步,换支付宝再试

9.5 监控与告警

指标告警阈值
Saga compensation rate> 5% (异常多回滚)
Compensation failure rate> 0.1% (补偿失败必须低)
Manual override count> 0 立即告警
Average steps per saga趋势监控(突增可能是 bug)

✅ 自我检验清单

  • 场景痛点:能用一个具体场景讲清”为什么 agent 需要事务”
  • 2PC vs Saga:能讲清两者的优劣和适用边界
  • 补偿设计原则:能默写幂等、可逆、必须成功、stateless 4 个原则
  • 不能 undo 怎么办:能给 3 个例子(邮件、转账、发货)说明”语义补偿”
  • SagaLLM 4 大创新:能默写并解释每一个
  • LangGraph Saga 代码:能不查资料写一个 3 步 + 补偿的 graph
  • Temporal Saga 代码:能写一个 try/except 反序补偿的 workflow
  • 幂等性:能解释 idempotency_key 的作用,以及没有时怎么自己实现
  • 补偿失败兜底:能列出至少 3 种应对策略
  • critical path 取舍:能说出哪些 step 必须严格补偿、哪些可以宽松

📚 参考资料

经典论文

  • Sagas (Garcia-Molina & Salem, 1987) —— Saga 模式鼻祖论文
  • Pat Helland — Life Beyond Distributed Transactions —— 分布式事务理论必读

Agent Transactions 论文

  • SagaLLM (Chang et al., VLDB 2025):arXiv 2503.11951
  • transactional-ai npm 包(参考实现):npm

工业实践

Outbox / Event Sourcing 背景

  • Microservices Patterns (Chris Richardson) - Saga + Outbox
  • Designing Data-Intensive Applications (Kleppmann) Chapter 7-12