第9章:端到端实战 —— 用 verl 训练 Qwen2.5-7B Search Agent
完整可复现实战:在 8 卡 H100 上用 verl 训练 Qwen2.5-7B Search Agent,GRPO + Async Rollout + GAIA 评测,含监控/避坑/混沌测试
把模块七前 8 章的所有概念串起来——本章给一份完整可复现的端到端实战:在 8 卡 H100 上用 verl 训一个 Qwen2.5-7B 的 Search Agent,reward 设计 + GRPO 配置 + async rollout + 全套监控 + 故意触发 entropy collapse 看怎么救 + GAIA basic 评测目标 ≥ 60%。所有代码、YAML、启动脚本、监控仪表盘全部给出来,你可以直接拿到 8 卡集群上 bash run.sh 跑。
📑 目录
- 0. 实战目标与硬件
- 1. 项目骨架
- 2. 数据准备:HotpotQA + 2WikiMultihop
- 3. Search Tool & Verifier
- 4. verl GRPO 配置
- 5. 启动训练
- 6. 监控仪表盘:必盯曲线
- 7. 故障演示:故意触发 Entropy Collapse
- 8. GAIA 评测
- 9. 上线 checklist 与成本估算
- 自我检验清单
- 参考资料
0. 实战目标与硬件
0.1 训练目标
| 项 | 目标 |
|---|---|
| Base 模型 | Qwen/Qwen2.5-7B-Instruct |
| 算法 | GRPO + DAPO 改进(Clip-Higher、Dynamic Sampling) |
| 数据 | HotpotQA + 2WikiMultihopQA(共 ~120K 样本) |
| 训练目标 | EM 答案正确率 + 简洁 format reward |
| 训练时长 | ~3 天(8 卡 H100) |
| 评测 | GAIA(basic level)目标 ≥ 60%,HotpotQA dev 目标 ≥ 75% |
0.2 硬件需求
| 资源 | 要求 |
|---|---|
| GPU | 8× H100 80GB(最低 8× A100 80GB) |
| 内存 | ≥ 256GB |
| 磁盘 | ≥ 1TB SSD(checkpoint + 中间数据) |
| 网络 | NVLink 全连接(节点内)+ 100Gbps IB(跨节点可选) |
单机 8 卡 H100 是最小可行配置——少于 4 卡 7B GRPO 跑不动(LoRA 风格的 QLoRA 单卡见模块八第 6 章 Unsloth 部分)。
1. 项目骨架
search-agent-rl/
├── run.sh # 一键启动
├── requirements.txt
├── README.md
├── configs/
│ ├── grpo_qwen7b.yaml # verl 主配置
│ └── reward_config.yaml # reward 函数配置
├── data/
│ ├── prepare.py # 数据预处理
│ └── train.parquet # 输出训练集
├── src/
│ ├── search_tool.py # Search 工具实现
│ ├── reward.py # Verifier / reward 函数
│ ├── agent_rollout.py # multi-turn rollout 逻辑
│ └── eval_gaia.py # GAIA 评测
├── monitoring/
│ ├── wandb_alerts.py # 自动告警规则
│ └── dashboard.json # Grafana / wandb 仪表盘
└── tests/
├── test_reward.py # reward 函数单测
└── test_chaos.py # 混沌:故意触发 collapse
2. 数据准备:HotpotQA + 2WikiMultihop
# data/prepare.py
import json
import pandas as pd
from datasets import load_dataset
def prepare_data():
"""合并两个 multi-hop QA 数据集为统一 parquet。"""
examples = []
# HotpotQA
hotpot = load_dataset("hotpot_qa", "distractor", split="train")
for ex in hotpot:
examples.append({
"prompt": format_prompt(ex["question"]),
"gold_answer": ex["answer"],
"supporting_facts": ex["supporting_facts"],
"source": "hotpot",
})
# 2WikiMultihop
wiki = load_dataset("yixuantt/2WikiMultihopQA", split="train")
for ex in wiki:
examples.append({
"prompt": format_prompt(ex["question"]),
"gold_answer": ex["answer"],
"source": "2wiki",
})
# 训练用 ~100K,留 1K 给 val
df = pd.DataFrame(examples)
df.sample(frac=1, random_state=42).reset_index(drop=True)
df[:100000].to_parquet("data/train.parquet")
df[100000:101000].to_parquet("data/val.parquet")
print(f"Saved {len(df[:100000])} train, {len(df[100000:101000])} val")
def format_prompt(question):
return f"""你是一个搜索助手。回答问题前可以多次调用 search 工具查询事实,然后给出最终答案。
格式要求:
- 思考时用 <think>...</think>
- 调用搜索用 <search>查询关键词</search>,系统会返回结果在 <result>...</result>
- 最终答案用 <answer>...</answer> 包裹
Question: {question}
让我们一步步来想。"""
if __name__ == "__main__":
prepare_data()
3. Search Tool & Verifier
3.1 Search Tool(用本地 Wikipedia + BM25)
# src/search_tool.py
from rank_bm25 import BM25Okapi
import json
class WikiSearch:
def __init__(self, corpus_path):
self.docs = []
with open(corpus_path) as f:
for line in f:
self.docs.append(json.loads(line))
tokens = [d["text"].split() for d in self.docs]
self.bm25 = BM25Okapi(tokens)
def search(self, query: str, top_k: int = 3) -> list[dict]:
"""返回 top-k 文档片段。"""
scores = self.bm25.get_scores(query.split())
top_indices = scores.argsort()[-top_k:][::-1]
return [
{"title": self.docs[i]["title"],
"snippet": self.docs[i]["text"][:500]}
for i in top_indices
]
# 全局单例(verl rollout worker 共享)
_search = None
def get_searcher():
global _search
if _search is None:
_search = WikiSearch("/data/wiki_corpus.jsonl")
return _search
3.2 Verifier(reward 函数)
# src/reward.py
import re
def extract_answer(response: str) -> str | None:
"""从 <answer>...</answer> 中提取。"""
m = re.search(r'<answer>(.*?)</answer>', response, re.DOTALL)
return m.group(1).strip().lower() if m else None
def normalize(text: str) -> str:
"""归一化:去标点、去 a/the/an、小写。"""
text = re.sub(r'[\W_]+', ' ', text.lower()).strip()
text = re.sub(r'\b(a|an|the)\b', '', text).strip()
return ' '.join(text.split())
def search_agent_reward(response: str, gold_answer: str, **kwargs) -> dict:
"""
返回:
reward(总分)+ 各维度分数(给 wandb 监控)
"""
answer = extract_answer(response)
if not answer:
return {
"reward": 0.0,
"format": 0.0,
"correctness": 0.0,
"n_calls": 0,
}
# 1. Correctness(EM after normalize)
correctness = float(normalize(answer) == normalize(gold_answer))
# 2. Format
has_think = "<think>" in response and "</think>" in response
has_answer = "<answer>" in response
format_score = float(has_think and has_answer)
# 3. Tool call statistics(供监控)
n_calls = len(re.findall(r'<search>', response))
n_results = len(re.findall(r'<result>', response))
call_success = n_results / max(n_calls, 1)
# 4. Length penalty(超长惩罚,DAPO Overlong Reward Shaping)
n_tokens = len(response.split())
if n_tokens > 4000:
length_penalty = 0.5
elif n_tokens > 8000:
length_penalty = 0.0
else:
length_penalty = 1.0
# 总 reward(主 reward 70% + format 10% + length 20%)
reward = (
0.7 * correctness +
0.1 * format_score +
0.2 * length_penalty * correctness # length 只在 correct 时奖励
)
return {
"reward": reward,
"correctness": correctness,
"format": format_score,
"length_penalty": length_penalty,
"n_calls": float(n_calls),
"call_success": call_success,
}
3.3 Multi-turn rollout 逻辑(给 verl Agentic 接口)
# src/agent_rollout.py
import re
from src.search_tool import get_searcher
MAX_TURNS = 8
SEARCH_PATTERN = re.compile(r'<search>(.*?)</search>', re.DOTALL)
async def search_agent_rollout(prompt: str, model, sampling_params):
"""
Multi-turn rollout:LLM 生成 → 检测 <search> 标签 → 真正执行 search → 拼回 LLM。
"""
state = prompt
searcher = get_searcher()
for turn in range(MAX_TURNS):
# 让 LLM 继续生成
completion = await model.generate(state, **sampling_params)
state += completion
# 检测 search 调用
last_search = list(SEARCH_PATTERN.finditer(completion))
if not last_search:
# 无 search,可能已经 final answer
break
if "<answer>" in completion:
break
query = last_search[-1].group(1).strip()
results = searcher.search(query, top_k=3)
result_text = "\n".join(
f"[{r['title']}] {r['snippet']}" for r in results
)
state += f"<result>{result_text}</result>"
return state
4. verl GRPO 配置
4.1 主配置 configs/grpo_qwen7b.yaml
data:
train_files: data/train.parquet
val_files: data/val.parquet
prompt_key: prompt
reward_key: gold_answer
max_prompt_length: 1024
max_response_length: 4096
algorithm:
algo: GRPO
group_size: 16
use_kl_in_reward: false
kl_coef: 0.01
# DAPO 改进
clip_ratio_low: 0.2
clip_ratio_high: 0.28 # Clip-Higher
dynamic_sampling: true # 跳过全 0/全 1 group
overlong_penalty: true # 超长输出渐进式惩罚
actor_rollout_ref:
model:
path: Qwen/Qwen2.5-7B-Instruct
enable_gradient_checkpointing: true
actor:
optim:
lr: 1e-6
warmup_steps: 100
weight_decay: 0.0
fsdp_config:
param_offload: false
optimizer_offload: false
forward_prefetch: true
use_torch_compile: true
rollout:
name: vllm
n: 16 # 等于 group_size
temperature: 1.0
top_p: 0.9
max_num_batched_tokens: 8192
enable_prefix_caching: true
enable_chunked_prefill: true
gpu_memory_utilization: 0.85
# 自定义 multi-turn rollout
custom_rollout: src.agent_rollout.search_agent_rollout
ref:
fsdp_config:
param_offload: true # ref 不训,offload 省显存
custom_reward_function:
path: src.reward.search_agent_reward
trainer:
total_epochs: 3
total_training_steps: 5000
n_gpus_per_node: 8
nnodes: 1
fully_async:
enable: true
max_staleness: 4
save_freq: 200
test_freq: 200
log_freq: 10
default_local_dir: ./checkpoints/qwen7b_search_grpo
resume_mode: auto
# Monitoring
logger:
- console
- wandb
wandb:
project: search-agent-rl
name: qwen7b-grpo-v1
# Adaptive KL(防 KL drift)
adaptive_kl:
enable: true
target_kl: 2.0
kl_horizon: 10000
4.2 关键超参解读
| 项 | 值 | 理由 |
|---|---|---|
| LR 1e-6 | 保守起步,GRPO 通常 1e-7 ~ 5e-6 | |
| group_size 16 | 平衡信号质量和 rollout 成本 | |
| clip_low/high 0.2/0.28 | DAPO 不对称 clip,鼓励探索 | |
| KL coef 0.01 | 偏弱,搭配 adaptive_kl 动态调 | |
| max_response 4096 | 8 turn × ~500 token 已够 | |
| max_staleness 4 | rollout 落后 train 不超 4 步 | |
| save_freq 200 | 频繁存,出问题能回滚 |
5. 启动训练
5.1 一键脚本 run.sh
#!/bin/bash
set -e
# 1. 准备数据(只需第一次)
[ ! -f data/train.parquet ] && python data/prepare.py
# 2. 检查 GPU
nvidia-smi
[ -z "$(nvidia-smi -L | grep H100)" ] && echo "Warning: not H100"
# 3. 启动 Ray cluster(verl 用)
ray stop || true
ray start --head --port=6379
# 4. 启动训练
export VLLM_ATTENTION_BACKEND=FLASH_ATTN
export TOKENIZERS_PARALLELISM=false
export WANDB_PROJECT=search-agent-rl
python -m verl.trainer.main_ppo \
--config-path=configs \
--config-name=grpo_qwen7b \
2>&1 | tee logs/train_$(date +%Y%m%d_%H%M%S).log
5.2 启动后看到什么
正常启动 5 分钟内日志:
[INFO] Loading model Qwen/Qwen2.5-7B-Instruct ...
[INFO] vLLM engine ready, 4 worker
[INFO] Trainer ready, 4 worker
[INFO] Step 1: rollout 16 traj per prompt, batch 32 prompts ...
[INFO] Step 1: avg reward=0.18, entropy=2.85, kl=0.02
[INFO] Step 2: avg reward=0.22, entropy=2.81, kl=0.04
...
正常前 100 step:
- reward 0.15 → 0.30(在涨)
- entropy 2.8 → 2.5(缓慢下降,健康)
- KL < 0.5
- effective_group_ratio > 70%
6. 监控仪表盘:必盯曲线
按第 3 章给的 wandb 配置,重点盯下面几条:
6.1 健康曲线参考
0 ────── 500 ────── 1500 ────── 3000 ────── 5000
│ │ │ │ │
reward: 0.18 → 0.32 → 0.55 → 0.65 → 0.72(慢但稳)
entropy: 2.85 → 2.4 → 1.8 → 1.5 → 1.3(缓慢下)
KL: 0.02 → 0.5 → 1.5 → 2.5 → 3.0(逐步上升,< 5 安全)
EGR: 90% → 80% → 70% → 65% → 60%(渐降但不崩)
6.2 wandb 自定义告警
# monitoring/wandb_alerts.py
import wandb
def setup_alerts(run):
"""每个监控指标配自动告警。"""
# Entropy collapse
run.alert(
title="Entropy Collapse Risk",
text="Entropy dropped below threshold",
level=wandb.AlertLevel.WARN,
# 在 wandb 网页配阈值
)
# 训练里每 step 调用:
def check_and_alert(metrics, step, history):
alerts = []
if metrics["entropy"] < 0.3 * history.entropy_init:
alerts.append(("ENTROPY_COLLAPSE", "WARN"))
if metrics["effective_group_ratio"] < 0.4:
alerts.append(("ADV_COLLAPSE", "WARN"))
if metrics["kl_to_ref"] > 8:
alerts.append(("KL_DRIFT", "ERROR"))
if step > 500:
recent_logp = history.mean_log_pi[-300:]
if recent_logp[-1] / recent_logp[0] < 0.6: # 跌 40%
alerts.append(("LLD_RISK", "ERROR"))
for title, level in alerts:
wandb.alert(title=title, level=getattr(wandb.AlertLevel, level))
6.3 Grafana 仪表盘要素
┌─────────────────┬─────────────────┬─────────────────┐
│ Mean Reward │ Entropy │ KL to Ref │
│ ─── 趋势 ─── │ ─── 趋势 ─── │ ─── 趋势 ─── │
└─────────────────┴─────────────────┴─────────────────┘
┌─────────────────┬─────────────────┬─────────────────┐
│ Effective Group │ Mean log_pi │ Gradient Norm │
│ Ratio │ (LLD 监控) │ │
└─────────────────┴─────────────────┴─────────────────┘
┌─────────────────┬─────────────────┬─────────────────┐
│ Per-dim reward: │ Avg #search/ │ Avg response │
│ correctness/ │ trajectory │ length │
│ format/length │ │ │
└─────────────────┴─────────────────┴─────────────────┘
7. 故障演示:故意触发 Entropy Collapse
为了让你”亲眼看见”训练崩,这里给一段故意搞炸的配置:
7.1 错误配置
# configs/grpo_collapse_demo.yaml(差异)
algorithm:
kl_coef: 0.0001 # 太小,policy 自由跑
actor_rollout_ref:
actor:
optim:
lr: 5e-5 # 比 1e-6 大 50 倍,太激进
rollout:
temperature: 0.5 # 偏低,输出本身就尖锐
7.2 触发后的曲线(模拟)
Step: 0 100 300 500 800
Reward: 0.18 0.42 0.55 0.55 0.55 ← 看似在收敛
Entropy: 2.85 2.0 0.8 0.2 0.05 ← 急剧下降
EGR: 90% 80% 50% 30% 10% ← 信号即将消失
KL: 0.02 0.5 3.0 8.0 15 ← 漂得离谱
第 800 step 后:模型每个 prompt 采 16 个样本几乎一模一样,advantage = 0,梯度 = 0,训练实际上停滞。
7.3 救援步骤(实战流程)
发现 entropy 暴跌后:
# 1. 立即停止
ctrl+C
# 2. 回滚到最近 healthy checkpoint
ls checkpoints/qwen7b_search_grpo/
# 选 step 100 那个(entropy 还是 2.0)
# 3. 改配置
sed -i 's/lr: 5e-5/lr: 1e-6/' configs/grpo_qwen7b.yaml
sed -i 's/kl_coef: 0.0001/kl_coef: 0.01/' configs/grpo_qwen7b.yaml
# 加 entropy bonus
echo " entropy_coef: 0.005" >> configs/grpo_qwen7b.yaml
# 4. 从 healthy ckpt 继续
python -m verl.trainer.main_ppo \
--config-path=configs --config-name=grpo_qwen7b \
trainer.resume_path=checkpoints/qwen7b_search_grpo/step_100
第 3 章讲的”避坑工具箱”在这里完整复现一遍。
8. GAIA 评测
8.1 GAIA 简介
GAIA(General AI Assistants Benchmark):Princeton + HAL,Agent 真实场景评测——研究论文查找、跨文档推理、工具使用、计算等。Level 1 是基础,Level 3 是高难。
# 拉取 GAIA(需要 HF token)
pip install gaia
git clone https://huggingface.co/datasets/gaia-benchmark/GAIA
8.2 评测脚本
# src/eval_gaia.py
import json
from gaia import GAIA
from src.agent_rollout import search_agent_rollout
from vllm import LLM, SamplingParams
llm = LLM(model="checkpoints/qwen7b_search_grpo/step_5000",
tensor_parallel_size=8)
sampling = SamplingParams(temperature=0.1, max_tokens=4096)
gaia = GAIA(level=1) # basic
results = []
for task in gaia.tasks:
state = await search_agent_rollout(task.question, llm, sampling)
answer = extract_answer(state)
correct = gaia.verify(task.id, answer)
results.append({"id": task.id, "correct": correct})
acc = sum(r["correct"] for r in results) / len(results)
print(f"GAIA Level 1 accuracy: {acc:.2%}")
with open("eval_results.json", "w") as f:
json.dump(results, f, indent=2)
8.3 期望结果
按 Search-R1 论文范式,Qwen2.5-7B + 5K step GRPO 在 GAIA Level 1 应达到 60-65%(SFT-only baseline 约 35%)。
注意:GAIA 在 2026-04 被 UC Berkeley 揭示存在 reward hacking 风险,生产模型评测应同时跑多个 benchmark(WebArena、TAU-bench 等)交叉验证。
9. 上线 checklist 与成本估算
9.1 上线 checklist
- 健康曲线:reward 单调上升,entropy 缓慢下,KL < 5,EGR > 50%
- GAIA Level 1 ≥ 60%(主目标)
- HotpotQA dev ≥ 75%(子目标)
- 通用能力不退化:跑 MMLU / AlpacaEval,RL 后下降 < 3%
- 抽样人审 100 条 reward 高的 trajectory:无明显 reward hacking
- 多 verifier 交叉:跑 WebArena / TAU-bench 部分子集
- 推理部署:RL 模型用 vLLM 起 OpenAI 兼容 API
- 回滚预案:保留至少 3 个 healthy checkpoint(每 1000 step)
- 生产灰度:5% → 50% → 100% 三阶段灰度
- 监控:推理时 OTel trace(模块六第 8 章)+ reward 异常告警
9.2 成本估算(参考)
8 卡 H100(AWS p5.48xlarge,~$98/小时):
| 阶段 | 时长 | 成本 |
|---|---|---|
| 数据准备 | 4 小时 | $400 |
| Cold-start SFT(可选) | 12 小时 | $1.2K |
| GRPO 主训练 5K step | 60 小时 | $5.9K |
| RFT 整形 | 8 小时 | $800 |
| 评测 + 调试 | 16 小时 | $1.6K |
| 合计 | ~100 小时 | ~$10K |
🌟 **训一个 production-ready 7B Search Agent 大约 1M+)便宜 100 倍,但比纯 SFT($1K)贵 10 倍。ROI 在”是否需要 emergent reasoning capability”——需要就值,不需要就省。
9.3 自托管 vs 云
| 选项 | 月成本(连续训) | 适合 |
|---|---|---|
| AWS / GCP / Azure | ~$70K(8 卡 H100) | 短期实验、灵活 |
| Hyperstack / Lambda Cloud | ~$30K | 中期 |
| 自购 8× H100 服务器 | $250K 一次性 + 电费 | 长期 + 团队多人复用 |
✅ 自我检验清单
- 项目骨架:能默写 src / configs / data / monitoring 目录的文件
- 数据处理:能写 HotpotQA + 2WikiMultihop 合并 prompt 模板
- Search Tool 实现:能写一个 BM25 风格的 search 函数
- Reward 函数:能写多维度 reward(correctness / format / length penalty)
- Multi-turn rollout:能写”LLM 生成 → 检测 search 标签 → 调 tool → 拼回”
- verl YAML:能默写 GRPO + DAPO 改进 + async + adaptive_kl 关键配置
- 健康曲线:能描述 5K step 训练 reward / entropy / KL 的健康范围
- 故障救援:能复述 entropy collapse 出现后的 4 步救援
- GAIA 评测:能跑通 verifier 函数,目标 60%+
- 成本估算:能算 8 卡 H100 训 7B Search Agent 总成本约 $10K
📚 参考资料
复现路径
- Search-R1 paper:arXiv 2503.09516
- Search-R1 verl recipe:verl repo
- Verl-Tool:github
- OpenRLHF Search Agent example:github
数据集
- HotpotQA:HuggingFace
- 2WikiMultihopQA:GitHub
- GAIA:HuggingFace
- TAU-bench:GitHub
- WebArena:官网
监控与运维
- wandb GRPO 仪表盘模板:wandb 社区
- UC Berkeley RDI Reward Hacking:博文
🎉 恭喜完成模块七 Agentic RL!
走完 9 章 + 学习路线总览,你已经掌握:
- 算法:PG / Actor-Critic / PPO / DPO / GRPO / DAPO 五代核心
- 稳定性:Entropy / Advantage / KL collapse + LLD Death Spiral 失败模式
- Reward 设计:RLVR、PRM/ORM、Multi-objective、Reward Hacking 防御
- 论文:Search-R1 / ToolRL / Agent-R1 / VerlTool 等 7 篇里程碑
- 工程:Multi-turn + Async Rollout、verl Fully Async 范式
- 自我改进:RFT / ReST / STaR / Synthetic Data / Self-Play / SFT-RL Flywheel
- 框架:verl / OpenRLHF / TRL / NeMo-RL / Unsloth 8 框架横评
- 实战:8 卡 H100 训 Qwen2.5-7B Search Agent 完整可复现
模块五 Memory + 模块六 Runtime + 模块七 RL = 完整的 Agent 工程三件套。下一步该准备模块八 Agent Evaluation & Benchmarks——给所有这些 agent 系统一个”可比、可信、可重现”的尺子。