跳到主要内容
多 Agent 并发与事务

第3章:从 STM 到 AgentSTM —— 把数据库 30 年的乐观并发控制移植到 Agent 工具调用层

用 30 年 STM 简史拉出版本号 / read-set / write-set / commit-time validation 这套原语,再一砖一瓦把它移植到 LLM Agent 的 ATVar / 事务上下文 / 重规划循环上

STM OCC ATVar TL2 AgentSTM CAS

数据库领域为”两个事务并发改一行数据”花了 30 年时间,沉淀出三套主流答案:2PL(悲观锁)、MVCC(多版本并发控制)、STM(软件事务内存)。其中 STM 是最年轻、也是与 LLM Agent 场景最契合的一支——它是 lock-free 的乐观并发:先执行、后验证、冲突就重做。本章不堆 STM 论文综述,只做一件事:把 STM 三十年沉淀的”版本号 / read-set / write-set / commit-time validation”四件套,一砖一瓦移植到 Agent 工具调用层,给出一个 800 行 Python 的可跑实现的核心骨架。读完你应该能从一份空 IDE 起手,自己实现 ATVar、AgentTransaction、ConflictDetector 三个核心类,并理解为什么 Agent 事务的”冲突后做什么”和数据库 STM 是分水岭。

📑 目录


1. STM 30 年简史:从 1995 到现在

1995 ────── 2003 ────── 2005 ────── 2006 ─────── 2008 ──── 2026
Shavit&     RSTM        Harris      TL2          Haskell    AgentSTM
Touitou     (Rochester) DSTM        (Sun Labs)   STM       (Agent 层)
"STM 概念"   "obstruction-free"     "时间戳 + commit                   ⭐
                                     -time validation"

四个里程碑:

  • STM (1995, Shavit & Touitou):第一次提出”软件事务内存”概念。让程序员用 atomic { ... } 块代替显式锁,运行时负责检测冲突
  • DSTM (2003, Herlihy et al.):Dynamic STM,支持运行时分配事务对象,引入 obstruction-free 进展保证
  • TL2 (2006, Dice/Shalev/Shavit, Sun Labs):Transactional Locking II,全局时间戳 + commit-time validation 范式确立。这是 AgentSTM 的直接祖先
  • Haskell STM (2008, Harris/Marlow/Peyton-Jones):第一个商业级语言原生 STM,证明工业界可用

🌟 三十年达成的共识OCC(乐观并发控制)+ commit-time 版本验证 是软件层并发的最佳范式。理由:

  • 锁是悲观的,并发度天花板低
  • MVCC 需要 DB 内核支持,对应用层不友好
  • STM/OCC 应用层就能实现,扩展到分布式、异构资源都自然

🍎 直觉比喻乐观并发就像”先吃饭再付账”——大部分时候没问题,少数时候账单错了再回头算。比”每吃一口饭都问服务员一次”(悲观锁)效率高得多。


2. STM 四件套:版本号、读集、写集、commit-time validation

无论 TL2、DSTM、还是 Haskell STM,核心机制都是这四件套:

2.1 版本号(Version Number)

每个共享变量带一个单调递增的版本号。每次成功写入版本号 +1。

变量 X:value=42, version=7
某事务读 X:返回 (42, 7),记下"我读的是 v7"

2.2 读集(Read Set)

事务内每读一个变量,记录 (变量, 读到的版本号)

事务 T1 的 read set:
  [(X, 7), (Y, 3), (Z, 12)]

含义:T1 在执行过程中,X 的快照是 v7,Y 是 v3,Z 是 v12。

2.3 写集(Write Set)

事务内每写一个变量,buffer 在事务内——不直接落盘。这是 OCC 的关键:写是”假写”,到 commit 才生效。

事务 T1 的 write set:
  [(X, new_value=43, was_v7), (Z, new_value="hello", was_v12)]

2.4 Commit-Time Validation

commit() 时,事务做两件事:

  1. 验证 read set:对每个 (变量, 我读的版本号),检查当前版本号是否仍等于”我读的”。任何一个不等 → 冲突 → 整个事务作废
  2. 验证 + 应用 write set:对每个待写变量,做 CAS(Compare-And-Swap):当前版本号 == 我读时的版本号?等就写入并把版本号 +1,不等就冲突

伪代码:

def commit(read_set, write_set):
    # 阶段 1:验证读
    for (var, read_v) in read_set:
        if var.current_version != read_v:
            return CONFLICT  # 有人改过我读的快照
    # 阶段 2:CAS 写
    for (var, new_val, expected_v) in write_set:
        ok = var.try_commit(new_val, expected_v)  # 原子 CAS
        if not ok:
            return CONFLICT
    return SUCCESS

🧠 关键洞察:四件套合起来就是**“先按假设跑、最后验证假设是否仍成立”**——这是所有 OCC 系统(数据库 OCC、Git rebase、CRDT 部分)的共同灵魂。


3. 移植第一步:ATVar —— Agent 的事务变量

把 STM 移植到 Agent 层,第一件事是把”shared variable”概念扩展。Agent 的”共享变量”不是内存里的一个 int,而是:

  • 一个文件(agent A 改 §2,agent B 改 §3)
  • KV store 的一个 key(user profile, booking record)
  • DB 的一行记录
  • 一个外部 API 的调用结果(带状态的 webhook)

我们叫它 ATVar (Agent Transactional Variable)。它是一个统一的版本化资源 wrapper——不管底层是文件、KV、还是 DB,都暴露同样的接口:read() → (value, version)try_commit(new_value, expected_version) → bool

参考实现的核心代码:

@dataclass
class VersionedValue(Generic[T]):
    value: T
    version: int = 0


class ATVar(Generic[T]):
    """Agent Transactional Variable.
    Thread-safe versioned container. Reads return (value, version);
    writes are buffered in the transaction and only applied at commit.
    """

    def __init__(self, name: str, initial_value: T) -> None:
        self.name = name
        self._lock = threading.Lock()
        self._data = VersionedValue(
            value=copy.deepcopy(initial_value), version=0
        )

    def read(self) -> tuple[T, int]:
        """Return (value, version) atomically."""
        with self._lock:
            return copy.deepcopy(self._data.value), self._data.version

    def try_commit(self, new_value: T, expected_version: int) -> bool:
        """CAS: write new_value only if current version == expected_version."""
        with self._lock:
            if self._data.version != expected_version:
                return False
            self._data.value = copy.deepcopy(new_value)
            self._data.version += 1
            return True

ATVar 的子类适配不同后端:

class FileATVar(ATVar[str]):
    """ATVar backed by a file on disk."""
    def __init__(self, path: str | Path) -> None:
        self.path = Path(path)
        content = self.path.read_text() if self.path.exists() else ""
        super().__init__(name=str(self.path), initial_value=content)

    def try_commit(self, new_value: str, expected_version: int) -> bool:
        ok = super().try_commit(new_value, expected_version)
        if ok:
            self.path.write_text(new_value)  # 落盘
        return ok


class KVATVar(ATVar[dict[str, Any]]):
    """ATVar wrapping a shared key-value store."""
    def __init__(self, name: str) -> None:
        super().__init__(name=name, initial_value={})
    ...

🌟 结论:ATVar 把”异构外部资源”统一成”带版本号的事务变量”——这是 Agent STM 第一性的抽象。没有这一步,read-set/write-set 概念无从谈起。

互补copy.deepcopy 是为了防止”读到的对象在事务过程中被外部修改”——典型的事务隔离需求。在生产系统里这是性能瓶颈点,可以替换成 immutable struct(参考 Haskell STM)或 COW(Copy-On-Write)。


4. 移植第二步:AgentTransaction —— 读集/写集的 Agent 化

AgentTransaction 是 Agent 工具调用的”事务上下文”,对应 STM 的 transaction 概念。它做三件事:

  1. 拦截所有对 ATVar 的读写
  2. 维护 read set 和 write set
  3. commit() 时调用 ConflictDetector

骨架:

class AgentTransaction:
    """A single agent transaction with read/write tracking."""

    def __init__(self, agent_id: str) -> None:
        self.agent_id = agent_id
        self.txn_id = str(uuid.uuid4())[:8]
        self.status = TxnStatus.ACTIVE
        self.read_set: list[ReadEntry] = []
        self.write_set: list[WriteEntry] = []
        self._read_cache: dict[str, tuple[Any, int]] = {}

    def read(self, atvar: ATVar) -> Any:
        """Read an ATVar within this transaction (cached on first read)."""
        if atvar.name in self._read_cache:
            return self._read_cache[atvar.name][0]  # repeatable read

        value, version = atvar.read()
        self.read_set.append(
            ReadEntry(atvar=atvar, read_version=version)
        )
        self._read_cache[atvar.name] = (value, version)
        return value

    def write(self, atvar: ATVar, new_value: Any) -> None:
        """Buffer a write to an ATVar (applied at commit time)."""
        # 写之前必须有读版本(否则 CAS 没法做)
        if atvar.name not in self._read_cache:
            self.read(atvar)
        _, read_version = self._read_cache[atvar.name]
        self.write_set.append(
            WriteEntry(atvar=atvar, new_value=new_value, read_version=read_version)
        )
        # 后续读看见自己的写
        self._read_cache[atvar.name] = (new_value, read_version)

4.1 三个细节决定能不能用

Repeatable Read

第一次读 ATVar 后,事务内同一变量的后续读必须返回同一个值——即使外部已经改了。这通过 _read_cache 实现。没有这个,agent 工具调用中”读了一次又算了一次”会得到不同结果,逻辑就错了。

写之前先读

write 函数里 if atvar.name not in self._read_cache: self.read(atvar) 这一步必须有——CAS 需要 expected_version,而 expected_version 来自”我读时的版本”。如果 agent 工具调用直接写没读,AgentSTM 会自动补一次读。

写后读自己

self._read_cache[atvar.name] = (new_value, read_version) 让事务内后续读看见自己的写——这是 SQL 的 “read your own writes” 语义。

🧠 关键洞察:这三条是 STM 教科书里被忽略最多的三条,但放到 Agent 层尤其要紧——Agent 的工具调用比 SQL 复杂得多,一个事务内 read-modify-write 序列可能反复多次,没有 repeatable read + read-your-own-writes,事务行为会非确定性


5. 移植第三步:ConflictDetector —— commit-time validation

ConflictDetectorcommit() 时验证 read-set 和应用 write-set。骨架:

@dataclass
class ConflictInfo:
    conflicting_reads: list[tuple[str, int, int]] = field(default_factory=list)
    # (atvar_name, read_version, current_version)
    conflicting_writes: list[tuple[str, Any, Any]] = field(default_factory=list)

    @property
    def has_conflict(self) -> bool:
        return bool(self.conflicting_reads) or bool(self.conflicting_writes)


class ConflictDetector:
    @staticmethod
    def validate(read_set, write_set) -> ConflictInfo:
        info = ConflictInfo()
        # 阶段 1:check read set
        for entry in read_set:
            current_v = entry.atvar.version
            if current_v != entry.read_version:
                info.conflicting_reads.append(
                    (entry.atvar.name, entry.read_version, current_v)
                )
        return info

    @staticmethod
    def try_apply_writes(write_set) -> bool:
        for w in write_set:
            ok = w.atvar.try_commit(w.new_value, w.read_version)
            if not ok:
                return False
        return True

Two-Phase Commit 的影子:注意 try_apply_writes 是按顺序 CAS——任何一个失败立刻返回。这不是真正的原子提交(middle-of-write 失败会留下部分写)。生产实现里要么用 logged write-ahead(先写日志再回放),要么用更强的两阶段提交。AgentSTM 论文的 prototype 用的是简化版本,第 8 章实战会讨论怎么补强。


6. 关键分水岭:冲突后 DB 做什么 vs Agent 做什么

到这里 STM 的移植看起来一切顺利——我们有了 ATVar、有了 AgentTransaction、有了 ConflictDetector。但下一步走出了第一道大分水岭:冲突后到底做什么?

数据库 STM 的标准答案是 “identical retry”——重做同一个事务。理由:

  • 数据库事务是”无状态”的逻辑(重做一次结果应该一样)
  • 重做代价小(微秒级)
  • 多次重做大概率会成功(争用是瞬时的)

Agent STM 不能用这个答案。理由:

  • LLM 调用代价大(秒级 + 钱)
  • Multi-Agent 持续争用(不是瞬时的)—— Bounded-Retry Impossibility 证明了 identical retry 必然有人永久失败
  • LLM 有能力做更聪明的事——重写 plan

所以 AgentSTM 在冲突后做的是:

def execute(agent_id, task_description, task_fn):
    current_task = task_description
    for attempt in range(MAX_RETRIES + 1):
        txn = AgentTransaction(agent_id)
        try:
            task_fn(txn, current_task)  # agent 在事务上下文中跑
        except Exception:
            return ABORTED

        conflict = txn.commit()
        if conflict is None:
            return COMMITTED

        # 冲突!这里是 STM/AgentSTM 的分水岭:
        # —— DB STM:重做相同任务
        # —— AgentSTM:让 LLM 改写任务
        if replan_fn is not None:
            new_task = replan_fn(current_task, conflict, attempt + 1)
            if new_task is not None:
                current_task = new_task   # ← intent preservation
            else:
                break  # LLM 说"没辙了",escalate
        # 否则 fallthrough 走 plan-rigid retry(基线)
    # 退出循环 = 重试预算用尽,降级
    return execute_with_lock(agent_id, current_task, task_fn)

🌟 结论这一段 if 分支就是 AgentSTM 论文最关键的方法贡献——把 STM 的”冲突 → 重做”换成”冲突 → 让 LLM 重规划”。其余四件套都是把 STM 现有概念翻译过来。论文里 Theorem 2(Intent-Preserving Escape)就是为这一步背书的:在随机资源选择下,重规划的失败概率 ≤ ((n-1)/(|R|-1))^k,k 次重试后指数级趋零

🧠 核心洞察:DB 的事务粒度是”一条 SQL”,重做是廉价的;Agent 的事务粒度是”一个 plan”,重做是昂贵的——但重写比重做更聪明。这是 AgentSTM 区别于所有 DB STM 工作的根本原因。


7. Backoff with Escalation:什么时候降级到悲观锁

光有”重规划”还不够——如果 LLM 反复说”我没辙了”或者重规划本身也接连撞墙,怎么办?AgentSTM 给的是 Backoff-with-Escalation 三段式:

attempt 1-2: 乐观执行 + LLM 重规划
attempt 3: 还冲突?降级到悲观锁,串行执行该 agent 的事务

核心代码:

def _execute_with_lock(self, agent_id, task_description, task_fn):
    """悲观降级路径:拿全局 mutex 串行执行该 agent。"""
    with self._lock_lock:  # 全局 mutex
        txn = AgentTransaction(agent_id=agent_id)
        try:
            task_fn(txn, task_description)
            conflict = txn.commit()
            if conflict is None:
                return COMMITTED
            return ABORTED
        except Exception:
            txn.abort()
            return ABORTED

为什么悲观锁是兜底而不是主路径?

路径优点缺点
全程乐观 + LLM 重规划高并发、自适应极端争用下可能反复重试
全程悲观锁一定成功完全放弃并发,吞吐 = 1
乐观 → 重规划 → 悲观(AgentSTM)99% 任务在乐观路径完成,1% 兜底实现复杂度上升

互补:第三条路是”前两条的折中”——大部分任务在乐观路径以高并发完成,少数硬骨头由悲观锁兜底保证完成。这就是 graceful degradation。

🍎 直觉比喻像高速公路的应急车道——99% 时间没人用,1% 时间是救命的。

7.1 一个常见的实现陷阱

刚开始写 AgentSTM prototype 时容易犯的错:乐观路径的 commit 不和悲观路径的 mutex 互斥。结果是:

Agent A 走悲观路径:拿到 _lock_lock,开始执行
Agent B 走乐观路径:完全不知道 A 拿了锁,正常 commit
B 的 commit 在 A 还没结束时就完成了 → 违反"悲观锁应该独占"语义

修复方法在 txn_manager.py 里:乐观路径 commit 时也要拿 _lock_lock,让乐观 commit 和悲观执行互斥。

# 来自 txn_manager.py 的真实修复
with self._lock_lock:
    conflict = txn.commit()

🌟 结论乐观路径和悲观路径必须共享同一把 mutex,否则降级语义破坏。这是 systems 实现的细节,但决定了你的方案在 stress test 下会不会无声出错。


8. 一个完整的执行轨迹

把前面 7 节连起来,跑一个具体例子。两个 Agent 同时给同一个 booking 加 passenger:

T=0: Agent A 启动事务 (txn_a)
     A 读 booking_42 → ({passengers: [], baggage: 0, flight: "AA100"}, v=0)
     A 写 booking_42 → buffered new_value={passengers: ["Alice"], ...}, expected_v=0
     read_set = [(booking_42, v=0)]
     write_set = [(booking_42, ..., expected_v=0)]

T=0: Agent B 启动事务 (txn_b),并发
     B 读 booking_42 → ({passengers: [], baggage: 0, flight: "AA100"}, v=0)
     B 写 booking_42 → buffered new_value={passengers: ["Bob"], ...}, expected_v=0

T=1: A.commit()
     - 验证 read_set:booking_42 当前 v=0 == 我读的 v=0 ✓
     - try_apply_writes:CAS(booking_42, expected_v=0) → 成功,version 升到 v=1
     A 返回 COMMITTED ✓

T=2: B.commit()
     - 验证 read_set:booking_42 当前 v=1 ≠ 我读的 v=0 ✗
     - 返回 ConflictInfo(conflicting_reads=[(booking_42, 0, 1)])

T=3: B 收到 conflict,调用 LLM 重规划
     原 task: "Add Bob to booking_42 passengers"
     conflict: "booking_42 was v0 when read, now v1; current value: {passengers: [Alice], ...}"
     新 task: "Add Bob to booking_42 passengers, preserving existing entry [Alice]"
     ↑ intent preservation: 目标"加 Bob"不变,但 plan 加了一句"保留已有"

T=4: B 用新 task 重新执行
     B 读 booking_42 → ({passengers: ["Alice"], ...}, v=1)
     B 写 booking_42 → buffered new_value={passengers: ["Alice", "Bob"], ...}, expected_v=1

T=5: B.commit()
     - 验证 read_set:booking_42 当前 v=1 == 我读的 v=1 ✓
     - try_apply_writes:CAS(booking_42, expected_v=1) → 成功,v=2
     B 返回 COMMITTED ✓

最终状态:booking_42 = {passengers: ["Alice", "Bob"], ...}, v=2
两个 Agent 的修改都被保留 ✓

这就是 AgentSTM 跑一遍的完整流程。关键的两步是 T=2 的 conflict detection 和 T=3 的 intent-preserving replan——前者是 STM 的标准动作,后者是 AgentSTM 的灵魂。

8.1 反例:plan-rigid retry 在同样场景下会怎样?

如果 T=3 没有 LLM replan,而是直接重做 B 的原始 task “Add Bob to booking_42 passengers”:

B 读 booking_42 → ({passengers: ["Alice"], ...}, v=1)
B 写 booking_42 → new_value={passengers: ["Bob"], ...}  ← 因为原始 task 没说"保留 Alice"
B.commit() → COMMITTED v=2

最终状态:booking_42.passengers = ["Bob"]  # ← Alice 被覆盖

虽然事务层面”成功”了,但语义上还是 lost update。这就是为什么 plan-rigid retry 即使做完,也救不了多 Agent 并发——它解决的是”事务原子性”,没解决”语义一致性”。AgentSTM 的 LLM 重规划解决的是后者。

🧠 关键洞察事务原子性 ≠ 语义一致性。STM 给前者,LLM 重规划给后者。两者缺一不可。


✅ 自我检验清单

  • STM 谱系:能用一句话说清 Shavit&Touitou (1995) / TL2 (2006) / Haskell STM (2008) 各自的贡献
  • 四件套:能默写”版本号 / read-set / write-set / commit-time validation”四件套及它们的协作关系
  • ATVar 设计:能解释为什么用 copy.deepcopy + threading.Lock 包裹 VersionedValue,不能用裸 dict
  • 三个细节:能解释 repeatable read / 写之前先读 / 写后读自己 三个不变量为什么必须有
  • CAS:能写出 try_commit 的 CAS 语义伪代码(compare expected_version, then write + version++)
  • 分水岭:能讲清 DB STM “identical retry” 和 AgentSTM “LLM replan” 的本质差别
  • 降级路径:能讲清为什么乐观 commit 必须和悲观执行共享 mutex(否则降级语义破坏)
  • 完整轨迹:能复述 §8 两个 Agent 并发加 passenger 的执行流程,以及为什么 plan-rigid retry 在这场景下”事务成功 ≠ 语义正确”

📚 参考资料

概念入门

关键论文

  • Software Transactional Memory(Shavit & Touitou, 1995):PODC 1995 —— STM 概念奠基
  • Software Transactional Memory for Dynamic-Sized Data Structures / DSTM(Herlihy et al., 2003):PODC 2003 —— 动态分配 + obstruction-free
  • Composable Memory Transactions / Haskell STM(Harris et al., 2005):PPoPP 2005 —— atomic 块的语言级支持
  • Transactional Locking II / TL2(Dice, Shalev, Shavit, 2006):DISC 2006 —— AgentSTM 的直接祖先,全局时间戳 + commit-time validation
  • Advanced Contention Management for Dynamic STM(Scherer III & Scott, 2005):PODC 2005 —— STM contention manager 设计原则,AgentSTM Backoff-with-Escalation 的灵感
  • AgentSTM(Anonymous, ARR/EMNLP 2026) ⭐:本章方法论的来源;Theorem 1 (Bounded-Retry Impossibility) + Theorem 2 (Intent-Preserving Escape) 的形式化

行业讨论

  • Why Haskell STM is rarely used in production —— 各类讨论指出 STM 在长事务和 IO 上有先天局限——这些局限在 Agent 层重新出现
  • Database OCC vs MVCC vs 2PL 实测 —— 各 DBMS 厂商博客

框架文档

  • AgentSTM 参考实现(开源 prototype):本教程第 8 章会从 0 实现一份骨架
  • TL2 论文中的伪代码:DISC 2006 论文 §3 直接对照 AgentSTM 的 ConflictDetector