第3章:从 STM 到 AgentSTM —— 把数据库 30 年的乐观并发控制移植到 Agent 工具调用层
用 30 年 STM 简史拉出版本号 / read-set / write-set / commit-time validation 这套原语,再一砖一瓦把它移植到 LLM Agent 的 ATVar / 事务上下文 / 重规划循环上
数据库领域为”两个事务并发改一行数据”花了 30 年时间,沉淀出三套主流答案:2PL(悲观锁)、MVCC(多版本并发控制)、STM(软件事务内存)。其中 STM 是最年轻、也是与 LLM Agent 场景最契合的一支——它是 lock-free 的乐观并发:先执行、后验证、冲突就重做。本章不堆 STM 论文综述,只做一件事:把 STM 三十年沉淀的”版本号 / read-set / write-set / commit-time validation”四件套,一砖一瓦移植到 Agent 工具调用层,给出一个 800 行 Python 的可跑实现的核心骨架。读完你应该能从一份空 IDE 起手,自己实现 ATVar、AgentTransaction、ConflictDetector 三个核心类,并理解为什么 Agent 事务的”冲突后做什么”和数据库 STM 是分水岭。
📑 目录
- 1. STM 30 年简史:从 1995 到现在
- 2. STM 四件套:版本号、读集、写集、commit-time validation
- 3. 移植第一步:ATVar —— Agent 的事务变量
- 4. 移植第二步:AgentTransaction —— 读集/写集的 Agent 化
- 5. 移植第三步:ConflictDetector —— commit-time validation
- 6. 关键分水岭:冲突后 DB 做什么 vs Agent 做什么
- 7. Backoff with Escalation:什么时候降级到悲观锁
- 8. 一个完整的执行轨迹
- 自我检验清单
- 参考资料
1. STM 30 年简史:从 1995 到现在
1995 ────── 2003 ────── 2005 ────── 2006 ─────── 2008 ──── 2026
Shavit& RSTM Harris TL2 Haskell AgentSTM
Touitou (Rochester) DSTM (Sun Labs) STM (Agent 层)
"STM 概念" "obstruction-free" "时间戳 + commit ⭐
-time validation"
四个里程碑:
- STM (1995, Shavit & Touitou):第一次提出”软件事务内存”概念。让程序员用
atomic { ... }块代替显式锁,运行时负责检测冲突 - DSTM (2003, Herlihy et al.):Dynamic STM,支持运行时分配事务对象,引入 obstruction-free 进展保证
- TL2 (2006, Dice/Shalev/Shavit, Sun Labs):Transactional Locking II,全局时间戳 + commit-time validation 范式确立。这是 AgentSTM 的直接祖先
- Haskell STM (2008, Harris/Marlow/Peyton-Jones):第一个商业级语言原生 STM,证明工业界可用
🌟 三十年达成的共识:OCC(乐观并发控制)+ commit-time 版本验证 是软件层并发的最佳范式。理由:
- 锁是悲观的,并发度天花板低
- MVCC 需要 DB 内核支持,对应用层不友好
- STM/OCC 应用层就能实现,扩展到分布式、异构资源都自然
🍎 直觉比喻:乐观并发就像”先吃饭再付账”——大部分时候没问题,少数时候账单错了再回头算。比”每吃一口饭都问服务员一次”(悲观锁)效率高得多。
2. STM 四件套:版本号、读集、写集、commit-time validation
无论 TL2、DSTM、还是 Haskell STM,核心机制都是这四件套:
2.1 版本号(Version Number)
每个共享变量带一个单调递增的版本号。每次成功写入版本号 +1。
变量 X:value=42, version=7
某事务读 X:返回 (42, 7),记下"我读的是 v7"
2.2 读集(Read Set)
事务内每读一个变量,记录 (变量, 读到的版本号)。
事务 T1 的 read set:
[(X, 7), (Y, 3), (Z, 12)]
含义:T1 在执行过程中,X 的快照是 v7,Y 是 v3,Z 是 v12。
2.3 写集(Write Set)
事务内每写一个变量,buffer 在事务内——不直接落盘。这是 OCC 的关键:写是”假写”,到 commit 才生效。
事务 T1 的 write set:
[(X, new_value=43, was_v7), (Z, new_value="hello", was_v12)]
2.4 Commit-Time Validation
到 commit() 时,事务做两件事:
- 验证 read set:对每个 (变量, 我读的版本号),检查当前版本号是否仍等于”我读的”。任何一个不等 → 冲突 → 整个事务作废
- 验证 + 应用 write set:对每个待写变量,做 CAS(Compare-And-Swap):当前版本号 == 我读时的版本号?等就写入并把版本号 +1,不等就冲突
伪代码:
def commit(read_set, write_set):
# 阶段 1:验证读
for (var, read_v) in read_set:
if var.current_version != read_v:
return CONFLICT # 有人改过我读的快照
# 阶段 2:CAS 写
for (var, new_val, expected_v) in write_set:
ok = var.try_commit(new_val, expected_v) # 原子 CAS
if not ok:
return CONFLICT
return SUCCESS
🧠 关键洞察:四件套合起来就是**“先按假设跑、最后验证假设是否仍成立”**——这是所有 OCC 系统(数据库 OCC、Git rebase、CRDT 部分)的共同灵魂。
3. 移植第一步:ATVar —— Agent 的事务变量
把 STM 移植到 Agent 层,第一件事是把”shared variable”概念扩展。Agent 的”共享变量”不是内存里的一个 int,而是:
- 一个文件(agent A 改 §2,agent B 改 §3)
- KV store 的一个 key(user profile, booking record)
- DB 的一行记录
- 一个外部 API 的调用结果(带状态的 webhook)
我们叫它 ATVar (Agent Transactional Variable)。它是一个统一的版本化资源 wrapper——不管底层是文件、KV、还是 DB,都暴露同样的接口:read() → (value, version) 和 try_commit(new_value, expected_version) → bool。
参考实现的核心代码:
@dataclass
class VersionedValue(Generic[T]):
value: T
version: int = 0
class ATVar(Generic[T]):
"""Agent Transactional Variable.
Thread-safe versioned container. Reads return (value, version);
writes are buffered in the transaction and only applied at commit.
"""
def __init__(self, name: str, initial_value: T) -> None:
self.name = name
self._lock = threading.Lock()
self._data = VersionedValue(
value=copy.deepcopy(initial_value), version=0
)
def read(self) -> tuple[T, int]:
"""Return (value, version) atomically."""
with self._lock:
return copy.deepcopy(self._data.value), self._data.version
def try_commit(self, new_value: T, expected_version: int) -> bool:
"""CAS: write new_value only if current version == expected_version."""
with self._lock:
if self._data.version != expected_version:
return False
self._data.value = copy.deepcopy(new_value)
self._data.version += 1
return True
ATVar 的子类适配不同后端:
class FileATVar(ATVar[str]):
"""ATVar backed by a file on disk."""
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
content = self.path.read_text() if self.path.exists() else ""
super().__init__(name=str(self.path), initial_value=content)
def try_commit(self, new_value: str, expected_version: int) -> bool:
ok = super().try_commit(new_value, expected_version)
if ok:
self.path.write_text(new_value) # 落盘
return ok
class KVATVar(ATVar[dict[str, Any]]):
"""ATVar wrapping a shared key-value store."""
def __init__(self, name: str) -> None:
super().__init__(name=name, initial_value={})
...
🌟 结论:ATVar 把”异构外部资源”统一成”带版本号的事务变量”——这是 Agent STM 第一性的抽象。没有这一步,read-set/write-set 概念无从谈起。
⭕ 互补:copy.deepcopy 是为了防止”读到的对象在事务过程中被外部修改”——典型的事务隔离需求。在生产系统里这是性能瓶颈点,可以替换成 immutable struct(参考 Haskell STM)或 COW(Copy-On-Write)。
4. 移植第二步:AgentTransaction —— 读集/写集的 Agent 化
AgentTransaction 是 Agent 工具调用的”事务上下文”,对应 STM 的 transaction 概念。它做三件事:
- 拦截所有对 ATVar 的读写
- 维护 read set 和 write set
- 在
commit()时调用 ConflictDetector
骨架:
class AgentTransaction:
"""A single agent transaction with read/write tracking."""
def __init__(self, agent_id: str) -> None:
self.agent_id = agent_id
self.txn_id = str(uuid.uuid4())[:8]
self.status = TxnStatus.ACTIVE
self.read_set: list[ReadEntry] = []
self.write_set: list[WriteEntry] = []
self._read_cache: dict[str, tuple[Any, int]] = {}
def read(self, atvar: ATVar) -> Any:
"""Read an ATVar within this transaction (cached on first read)."""
if atvar.name in self._read_cache:
return self._read_cache[atvar.name][0] # repeatable read
value, version = atvar.read()
self.read_set.append(
ReadEntry(atvar=atvar, read_version=version)
)
self._read_cache[atvar.name] = (value, version)
return value
def write(self, atvar: ATVar, new_value: Any) -> None:
"""Buffer a write to an ATVar (applied at commit time)."""
# 写之前必须有读版本(否则 CAS 没法做)
if atvar.name not in self._read_cache:
self.read(atvar)
_, read_version = self._read_cache[atvar.name]
self.write_set.append(
WriteEntry(atvar=atvar, new_value=new_value, read_version=read_version)
)
# 后续读看见自己的写
self._read_cache[atvar.name] = (new_value, read_version)
4.1 三个细节决定能不能用
Repeatable Read
第一次读 ATVar 后,事务内同一变量的后续读必须返回同一个值——即使外部已经改了。这通过 _read_cache 实现。没有这个,agent 工具调用中”读了一次又算了一次”会得到不同结果,逻辑就错了。
写之前先读
write 函数里 if atvar.name not in self._read_cache: self.read(atvar) 这一步必须有——CAS 需要 expected_version,而 expected_version 来自”我读时的版本”。如果 agent 工具调用直接写没读,AgentSTM 会自动补一次读。
写后读自己
self._read_cache[atvar.name] = (new_value, read_version) 让事务内后续读看见自己的写——这是 SQL 的 “read your own writes” 语义。
🧠 关键洞察:这三条是 STM 教科书里被忽略最多的三条,但放到 Agent 层尤其要紧——Agent 的工具调用比 SQL 复杂得多,一个事务内 read-modify-write 序列可能反复多次,没有 repeatable read + read-your-own-writes,事务行为会非确定性。
5. 移植第三步:ConflictDetector —— commit-time validation
ConflictDetector 在 commit() 时验证 read-set 和应用 write-set。骨架:
@dataclass
class ConflictInfo:
conflicting_reads: list[tuple[str, int, int]] = field(default_factory=list)
# (atvar_name, read_version, current_version)
conflicting_writes: list[tuple[str, Any, Any]] = field(default_factory=list)
@property
def has_conflict(self) -> bool:
return bool(self.conflicting_reads) or bool(self.conflicting_writes)
class ConflictDetector:
@staticmethod
def validate(read_set, write_set) -> ConflictInfo:
info = ConflictInfo()
# 阶段 1:check read set
for entry in read_set:
current_v = entry.atvar.version
if current_v != entry.read_version:
info.conflicting_reads.append(
(entry.atvar.name, entry.read_version, current_v)
)
return info
@staticmethod
def try_apply_writes(write_set) -> bool:
for w in write_set:
ok = w.atvar.try_commit(w.new_value, w.read_version)
if not ok:
return False
return True
⭕ Two-Phase Commit 的影子:注意 try_apply_writes 是按顺序 CAS——任何一个失败立刻返回。这不是真正的原子提交(middle-of-write 失败会留下部分写)。生产实现里要么用 logged write-ahead(先写日志再回放),要么用更强的两阶段提交。AgentSTM 论文的 prototype 用的是简化版本,第 8 章实战会讨论怎么补强。
6. 关键分水岭:冲突后 DB 做什么 vs Agent 做什么
到这里 STM 的移植看起来一切顺利——我们有了 ATVar、有了 AgentTransaction、有了 ConflictDetector。但下一步走出了第一道大分水岭:冲突后到底做什么?
数据库 STM 的标准答案是 “identical retry”——重做同一个事务。理由:
- 数据库事务是”无状态”的逻辑(重做一次结果应该一样)
- 重做代价小(微秒级)
- 多次重做大概率会成功(争用是瞬时的)
Agent STM 不能用这个答案。理由:
- LLM 调用代价大(秒级 + 钱)
- Multi-Agent 持续争用(不是瞬时的)—— Bounded-Retry Impossibility 证明了 identical retry 必然有人永久失败
- LLM 有能力做更聪明的事——重写 plan
所以 AgentSTM 在冲突后做的是:
def execute(agent_id, task_description, task_fn):
current_task = task_description
for attempt in range(MAX_RETRIES + 1):
txn = AgentTransaction(agent_id)
try:
task_fn(txn, current_task) # agent 在事务上下文中跑
except Exception:
return ABORTED
conflict = txn.commit()
if conflict is None:
return COMMITTED
# 冲突!这里是 STM/AgentSTM 的分水岭:
# —— DB STM:重做相同任务
# —— AgentSTM:让 LLM 改写任务
if replan_fn is not None:
new_task = replan_fn(current_task, conflict, attempt + 1)
if new_task is not None:
current_task = new_task # ← intent preservation
else:
break # LLM 说"没辙了",escalate
# 否则 fallthrough 走 plan-rigid retry(基线)
# 退出循环 = 重试预算用尽,降级
return execute_with_lock(agent_id, current_task, task_fn)
🌟 结论:这一段 if 分支就是 AgentSTM 论文最关键的方法贡献——把 STM 的”冲突 → 重做”换成”冲突 → 让 LLM 重规划”。其余四件套都是把 STM 现有概念翻译过来。论文里 Theorem 2(Intent-Preserving Escape)就是为这一步背书的:在随机资源选择下,重规划的失败概率 ≤ ((n-1)/(|R|-1))^k,k 次重试后指数级趋零。
🧠 核心洞察:DB 的事务粒度是”一条 SQL”,重做是廉价的;Agent 的事务粒度是”一个 plan”,重做是昂贵的——但重写比重做更聪明。这是 AgentSTM 区别于所有 DB STM 工作的根本原因。
7. Backoff with Escalation:什么时候降级到悲观锁
光有”重规划”还不够——如果 LLM 反复说”我没辙了”或者重规划本身也接连撞墙,怎么办?AgentSTM 给的是 Backoff-with-Escalation 三段式:
attempt 1-2: 乐观执行 + LLM 重规划
attempt 3: 还冲突?降级到悲观锁,串行执行该 agent 的事务
核心代码:
def _execute_with_lock(self, agent_id, task_description, task_fn):
"""悲观降级路径:拿全局 mutex 串行执行该 agent。"""
with self._lock_lock: # 全局 mutex
txn = AgentTransaction(agent_id=agent_id)
try:
task_fn(txn, task_description)
conflict = txn.commit()
if conflict is None:
return COMMITTED
return ABORTED
except Exception:
txn.abort()
return ABORTED
为什么悲观锁是兜底而不是主路径?
| 路径 | 优点 | 缺点 |
|---|---|---|
| 全程乐观 + LLM 重规划 | 高并发、自适应 | 极端争用下可能反复重试 |
| 全程悲观锁 | 一定成功 | 完全放弃并发,吞吐 = 1 |
| 乐观 → 重规划 → 悲观(AgentSTM) | 99% 任务在乐观路径完成,1% 兜底 | 实现复杂度上升 |
⭕ 互补:第三条路是”前两条的折中”——大部分任务在乐观路径以高并发完成,少数硬骨头由悲观锁兜底保证完成。这就是 graceful degradation。
🍎 直觉比喻:像高速公路的应急车道——99% 时间没人用,1% 时间是救命的。
7.1 一个常见的实现陷阱
刚开始写 AgentSTM prototype 时容易犯的错:乐观路径的 commit 不和悲观路径的 mutex 互斥。结果是:
Agent A 走悲观路径:拿到 _lock_lock,开始执行
Agent B 走乐观路径:完全不知道 A 拿了锁,正常 commit
B 的 commit 在 A 还没结束时就完成了 → 违反"悲观锁应该独占"语义
修复方法在 txn_manager.py 里:乐观路径 commit 时也要拿 _lock_lock,让乐观 commit 和悲观执行互斥。
# 来自 txn_manager.py 的真实修复
with self._lock_lock:
conflict = txn.commit()
🌟 结论:乐观路径和悲观路径必须共享同一把 mutex,否则降级语义破坏。这是 systems 实现的细节,但决定了你的方案在 stress test 下会不会无声出错。
8. 一个完整的执行轨迹
把前面 7 节连起来,跑一个具体例子。两个 Agent 同时给同一个 booking 加 passenger:
T=0: Agent A 启动事务 (txn_a)
A 读 booking_42 → ({passengers: [], baggage: 0, flight: "AA100"}, v=0)
A 写 booking_42 → buffered new_value={passengers: ["Alice"], ...}, expected_v=0
read_set = [(booking_42, v=0)]
write_set = [(booking_42, ..., expected_v=0)]
T=0: Agent B 启动事务 (txn_b),并发
B 读 booking_42 → ({passengers: [], baggage: 0, flight: "AA100"}, v=0)
B 写 booking_42 → buffered new_value={passengers: ["Bob"], ...}, expected_v=0
T=1: A.commit()
- 验证 read_set:booking_42 当前 v=0 == 我读的 v=0 ✓
- try_apply_writes:CAS(booking_42, expected_v=0) → 成功,version 升到 v=1
A 返回 COMMITTED ✓
T=2: B.commit()
- 验证 read_set:booking_42 当前 v=1 ≠ 我读的 v=0 ✗
- 返回 ConflictInfo(conflicting_reads=[(booking_42, 0, 1)])
T=3: B 收到 conflict,调用 LLM 重规划
原 task: "Add Bob to booking_42 passengers"
conflict: "booking_42 was v0 when read, now v1; current value: {passengers: [Alice], ...}"
新 task: "Add Bob to booking_42 passengers, preserving existing entry [Alice]"
↑ intent preservation: 目标"加 Bob"不变,但 plan 加了一句"保留已有"
T=4: B 用新 task 重新执行
B 读 booking_42 → ({passengers: ["Alice"], ...}, v=1)
B 写 booking_42 → buffered new_value={passengers: ["Alice", "Bob"], ...}, expected_v=1
T=5: B.commit()
- 验证 read_set:booking_42 当前 v=1 == 我读的 v=1 ✓
- try_apply_writes:CAS(booking_42, expected_v=1) → 成功,v=2
B 返回 COMMITTED ✓
最终状态:booking_42 = {passengers: ["Alice", "Bob"], ...}, v=2
两个 Agent 的修改都被保留 ✓
这就是 AgentSTM 跑一遍的完整流程。关键的两步是 T=2 的 conflict detection 和 T=3 的 intent-preserving replan——前者是 STM 的标准动作,后者是 AgentSTM 的灵魂。
8.1 反例:plan-rigid retry 在同样场景下会怎样?
如果 T=3 没有 LLM replan,而是直接重做 B 的原始 task “Add Bob to booking_42 passengers”:
B 读 booking_42 → ({passengers: ["Alice"], ...}, v=1)
B 写 booking_42 → new_value={passengers: ["Bob"], ...} ← 因为原始 task 没说"保留 Alice"
B.commit() → COMMITTED v=2
最终状态:booking_42.passengers = ["Bob"] # ← Alice 被覆盖
虽然事务层面”成功”了,但语义上还是 lost update。这就是为什么 plan-rigid retry 即使做完,也救不了多 Agent 并发——它解决的是”事务原子性”,没解决”语义一致性”。AgentSTM 的 LLM 重规划解决的是后者。
🧠 关键洞察:事务原子性 ≠ 语义一致性。STM 给前者,LLM 重规划给后者。两者缺一不可。
✅ 自我检验清单
- STM 谱系:能用一句话说清 Shavit&Touitou (1995) / TL2 (2006) / Haskell STM (2008) 各自的贡献
- 四件套:能默写”版本号 / read-set / write-set / commit-time validation”四件套及它们的协作关系
- ATVar 设计:能解释为什么用
copy.deepcopy+threading.Lock包裹 VersionedValue,不能用裸 dict - 三个细节:能解释 repeatable read / 写之前先读 / 写后读自己 三个不变量为什么必须有
- CAS:能写出 try_commit 的 CAS 语义伪代码(compare expected_version, then write + version++)
- 分水岭:能讲清 DB STM “identical retry” 和 AgentSTM “LLM replan” 的本质差别
- 降级路径:能讲清为什么乐观 commit 必须和悲观执行共享 mutex(否则降级语义破坏)
- 完整轨迹:能复述 §8 两个 Agent 并发加 passenger 的执行流程,以及为什么 plan-rigid retry 在这场景下”事务成功 ≠ 语义正确”
📚 参考资料
概念入门
- STM 维基百科:Software transactional memory
- OCC 维基百科:Optimistic concurrency control
- Compare-And-Swap (CAS):Compare-and-swap —— 所有乐观并发的硬件原子原语
关键论文
- Software Transactional Memory(Shavit & Touitou, 1995):PODC 1995 —— STM 概念奠基
- Software Transactional Memory for Dynamic-Sized Data Structures / DSTM(Herlihy et al., 2003):PODC 2003 —— 动态分配 + obstruction-free
- Composable Memory Transactions / Haskell STM(Harris et al., 2005):PPoPP 2005 —— atomic 块的语言级支持
- Transactional Locking II / TL2(Dice, Shalev, Shavit, 2006):DISC 2006 —— AgentSTM 的直接祖先,全局时间戳 + commit-time validation
- Advanced Contention Management for Dynamic STM(Scherer III & Scott, 2005):PODC 2005 —— STM contention manager 设计原则,AgentSTM Backoff-with-Escalation 的灵感
- AgentSTM(Anonymous, ARR/EMNLP 2026) ⭐:本章方法论的来源;Theorem 1 (Bounded-Retry Impossibility) + Theorem 2 (Intent-Preserving Escape) 的形式化
行业讨论
- Why Haskell STM is rarely used in production —— 各类讨论指出 STM 在长事务和 IO 上有先天局限——这些局限在 Agent 层重新出现
- Database OCC vs MVCC vs 2PL 实测 —— 各 DBMS 厂商博客
框架文档
- AgentSTM 参考实现(开源 prototype):本教程第 8 章会从 0 实现一份骨架
- TL2 论文中的伪代码:DISC 2006 论文 §3 直接对照 AgentSTM 的 ConflictDetector