跳到主要内容
Agentic RL

第9章:端到端实战 —— 用 verl 训练 Qwen2.5-7B Search Agent

完整可复现实战:在 8 卡 H100 上用 verl 训练 Qwen2.5-7B Search Agent,GRPO + Async Rollout + GAIA 评测,含监控/避坑/混沌测试

verl Search-R1 GRPO Qwen2.5 GAIA 实战

把模块七前 8 章的所有概念串起来——本章给一份完整可复现的端到端实战:在 8 卡 H100 上用 verl 训一个 Qwen2.5-7B 的 Search Agent,reward 设计 + GRPO 配置 + async rollout + 全套监控 + 故意触发 entropy collapse 看怎么救 + GAIA basic 评测目标 ≥ 60%。所有代码、YAML、启动脚本、监控仪表盘全部给出来,你可以直接拿到 8 卡集群上 bash run.sh 跑。

📑 目录


0. 实战目标与硬件

0.1 训练目标

目标
Base 模型Qwen/Qwen2.5-7B-Instruct
算法GRPO + DAPO 改进(Clip-Higher、Dynamic Sampling)
数据HotpotQA + 2WikiMultihopQA(共 ~120K 样本)
训练目标EM 答案正确率 + 简洁 format reward
训练时长~3 天(8 卡 H100)
评测GAIA(basic level)目标 ≥ 60%,HotpotQA dev 目标 ≥ 75%

0.2 硬件需求

资源要求
GPU8× H100 80GB(最低 8× A100 80GB)
内存≥ 256GB
磁盘≥ 1TB SSD(checkpoint + 中间数据)
网络NVLink 全连接(节点内)+ 100Gbps IB(跨节点可选)

单机 8 卡 H100 是最小可行配置——少于 4 卡 7B GRPO 跑不动(LoRA 风格的 QLoRA 单卡见模块八第 6 章 Unsloth 部分)。


1. 项目骨架

search-agent-rl/
├── run.sh                    # 一键启动
├── requirements.txt
├── README.md
├── configs/
│   ├── grpo_qwen7b.yaml      # verl 主配置
│   └── reward_config.yaml    # reward 函数配置
├── data/
│   ├── prepare.py            # 数据预处理
│   └── train.parquet         # 输出训练集
├── src/
│   ├── search_tool.py        # Search 工具实现
│   ├── reward.py             # Verifier / reward 函数
│   ├── agent_rollout.py      # multi-turn rollout 逻辑
│   └── eval_gaia.py          # GAIA 评测
├── monitoring/
│   ├── wandb_alerts.py       # 自动告警规则
│   └── dashboard.json        # Grafana / wandb 仪表盘
└── tests/
    ├── test_reward.py        # reward 函数单测
    └── test_chaos.py         # 混沌:故意触发 collapse

2. 数据准备:HotpotQA + 2WikiMultihop

# data/prepare.py
import json
import pandas as pd
from datasets import load_dataset

def prepare_data():
    """合并两个 multi-hop QA 数据集为统一 parquet。"""
    examples = []
    
    # HotpotQA
    hotpot = load_dataset("hotpot_qa", "distractor", split="train")
    for ex in hotpot:
        examples.append({
            "prompt": format_prompt(ex["question"]),
            "gold_answer": ex["answer"],
            "supporting_facts": ex["supporting_facts"],
            "source": "hotpot",
        })
    
    # 2WikiMultihop
    wiki = load_dataset("yixuantt/2WikiMultihopQA", split="train")
    for ex in wiki:
        examples.append({
            "prompt": format_prompt(ex["question"]),
            "gold_answer": ex["answer"],
            "source": "2wiki",
        })
    
    # 训练用 ~100K,留 1K 给 val
    df = pd.DataFrame(examples)
    df.sample(frac=1, random_state=42).reset_index(drop=True)
    df[:100000].to_parquet("data/train.parquet")
    df[100000:101000].to_parquet("data/val.parquet")
    print(f"Saved {len(df[:100000])} train, {len(df[100000:101000])} val")

def format_prompt(question):
    return f"""你是一个搜索助手。回答问题前可以多次调用 search 工具查询事实,然后给出最终答案。

格式要求:
- 思考时用 <think>...</think>
- 调用搜索用 <search>查询关键词</search>,系统会返回结果在 <result>...</result>
- 最终答案用 <answer>...</answer> 包裹

Question: {question}

让我们一步步来想。"""

if __name__ == "__main__":
    prepare_data()

3. Search Tool & Verifier

3.1 Search Tool(用本地 Wikipedia + BM25)

# src/search_tool.py
from rank_bm25 import BM25Okapi
import json

class WikiSearch:
    def __init__(self, corpus_path):
        self.docs = []
        with open(corpus_path) as f:
            for line in f:
                self.docs.append(json.loads(line))
        tokens = [d["text"].split() for d in self.docs]
        self.bm25 = BM25Okapi(tokens)
    
    def search(self, query: str, top_k: int = 3) -> list[dict]:
        """返回 top-k 文档片段。"""
        scores = self.bm25.get_scores(query.split())
        top_indices = scores.argsort()[-top_k:][::-1]
        return [
            {"title": self.docs[i]["title"],
             "snippet": self.docs[i]["text"][:500]}
            for i in top_indices
        ]

# 全局单例(verl rollout worker 共享)
_search = None
def get_searcher():
    global _search
    if _search is None:
        _search = WikiSearch("/data/wiki_corpus.jsonl")
    return _search

3.2 Verifier(reward 函数)

# src/reward.py
import re

def extract_answer(response: str) -> str | None:
    """从 <answer>...</answer> 中提取。"""
    m = re.search(r'<answer>(.*?)</answer>', response, re.DOTALL)
    return m.group(1).strip().lower() if m else None

def normalize(text: str) -> str:
    """归一化:去标点、去 a/the/an、小写。"""
    text = re.sub(r'[\W_]+', ' ', text.lower()).strip()
    text = re.sub(r'\b(a|an|the)\b', '', text).strip()
    return ' '.join(text.split())

def search_agent_reward(response: str, gold_answer: str, **kwargs) -> dict:
    """
    返回:
      reward(总分)+ 各维度分数(给 wandb 监控)
    """
    answer = extract_answer(response)
    if not answer:
        return {
            "reward": 0.0,
            "format": 0.0,
            "correctness": 0.0,
            "n_calls": 0,
        }
    
    # 1. Correctness(EM after normalize)
    correctness = float(normalize(answer) == normalize(gold_answer))
    
    # 2. Format
    has_think = "<think>" in response and "</think>" in response
    has_answer = "<answer>" in response
    format_score = float(has_think and has_answer)
    
    # 3. Tool call statistics(供监控)
    n_calls = len(re.findall(r'<search>', response))
    n_results = len(re.findall(r'<result>', response))
    call_success = n_results / max(n_calls, 1)
    
    # 4. Length penalty(超长惩罚,DAPO Overlong Reward Shaping)
    n_tokens = len(response.split())
    if n_tokens > 4000:
        length_penalty = 0.5
    elif n_tokens > 8000:
        length_penalty = 0.0
    else:
        length_penalty = 1.0
    
    # 总 reward(主 reward 70% + format 10% + length 20%)
    reward = (
        0.7 * correctness +
        0.1 * format_score +
        0.2 * length_penalty * correctness   # length 只在 correct 时奖励
    )
    
    return {
        "reward": reward,
        "correctness": correctness,
        "format": format_score,
        "length_penalty": length_penalty,
        "n_calls": float(n_calls),
        "call_success": call_success,
    }

3.3 Multi-turn rollout 逻辑(给 verl Agentic 接口)

# src/agent_rollout.py
import re
from src.search_tool import get_searcher

MAX_TURNS = 8
SEARCH_PATTERN = re.compile(r'<search>(.*?)</search>', re.DOTALL)

async def search_agent_rollout(prompt: str, model, sampling_params):
    """
    Multi-turn rollout:LLM 生成 → 检测 <search> 标签 → 真正执行 search → 拼回 LLM。
    """
    state = prompt
    searcher = get_searcher()
    
    for turn in range(MAX_TURNS):
        # 让 LLM 继续生成
        completion = await model.generate(state, **sampling_params)
        state += completion
        
        # 检测 search 调用
        last_search = list(SEARCH_PATTERN.finditer(completion))
        if not last_search:
            # 无 search,可能已经 final answer
            break
        if "<answer>" in completion:
            break
        
        query = last_search[-1].group(1).strip()
        results = searcher.search(query, top_k=3)
        result_text = "\n".join(
            f"[{r['title']}] {r['snippet']}" for r in results
        )
        state += f"<result>{result_text}</result>"
    
    return state

4. verl GRPO 配置

4.1 主配置 configs/grpo_qwen7b.yaml

data:
  train_files: data/train.parquet
  val_files: data/val.parquet
  prompt_key: prompt
  reward_key: gold_answer
  max_prompt_length: 1024
  max_response_length: 4096

algorithm:
  algo: GRPO
  group_size: 16
  use_kl_in_reward: false
  kl_coef: 0.01
  
  # DAPO 改进
  clip_ratio_low: 0.2
  clip_ratio_high: 0.28           # Clip-Higher
  dynamic_sampling: true            # 跳过全 0/全 1 group
  overlong_penalty: true            # 超长输出渐进式惩罚

actor_rollout_ref:
  model:
    path: Qwen/Qwen2.5-7B-Instruct
    enable_gradient_checkpointing: true
  
  actor:
    optim:
      lr: 1e-6
      warmup_steps: 100
      weight_decay: 0.0
    fsdp_config:
      param_offload: false
      optimizer_offload: false
      forward_prefetch: true
    use_torch_compile: true
  
  rollout:
    name: vllm
    n: 16                            # 等于 group_size
    temperature: 1.0
    top_p: 0.9
    max_num_batched_tokens: 8192
    enable_prefix_caching: true
    enable_chunked_prefill: true
    gpu_memory_utilization: 0.85
    
    # 自定义 multi-turn rollout
    custom_rollout: src.agent_rollout.search_agent_rollout
  
  ref:
    fsdp_config:
      param_offload: true            # ref 不训,offload 省显存

custom_reward_function:
  path: src.reward.search_agent_reward

trainer:
  total_epochs: 3
  total_training_steps: 5000
  n_gpus_per_node: 8
  nnodes: 1
  
  fully_async:
    enable: true
    max_staleness: 4
  
  save_freq: 200
  test_freq: 200
  log_freq: 10
  
  default_local_dir: ./checkpoints/qwen7b_search_grpo
  resume_mode: auto
  
  # Monitoring
  logger:
    - console
    - wandb
  wandb:
    project: search-agent-rl
    name: qwen7b-grpo-v1
    
# Adaptive KL(防 KL drift)
adaptive_kl:
  enable: true
  target_kl: 2.0
  kl_horizon: 10000

4.2 关键超参解读

理由
LR 1e-6保守起步,GRPO 通常 1e-7 ~ 5e-6
group_size 16平衡信号质量和 rollout 成本
clip_low/high 0.2/0.28DAPO 不对称 clip,鼓励探索
KL coef 0.01偏弱,搭配 adaptive_kl 动态调
max_response 40968 turn × ~500 token 已够
max_staleness 4rollout 落后 train 不超 4 步
save_freq 200频繁存,出问题能回滚

5. 启动训练

5.1 一键脚本 run.sh

#!/bin/bash
set -e

# 1. 准备数据(只需第一次)
[ ! -f data/train.parquet ] && python data/prepare.py

# 2. 检查 GPU
nvidia-smi
[ -z "$(nvidia-smi -L | grep H100)" ] && echo "Warning: not H100" 

# 3. 启动 Ray cluster(verl 用)
ray stop || true
ray start --head --port=6379

# 4. 启动训练
export VLLM_ATTENTION_BACKEND=FLASH_ATTN
export TOKENIZERS_PARALLELISM=false
export WANDB_PROJECT=search-agent-rl

python -m verl.trainer.main_ppo \
    --config-path=configs \
    --config-name=grpo_qwen7b \
    2>&1 | tee logs/train_$(date +%Y%m%d_%H%M%S).log

5.2 启动后看到什么

正常启动 5 分钟内日志:

[INFO] Loading model Qwen/Qwen2.5-7B-Instruct ...
[INFO] vLLM engine ready, 4 worker
[INFO] Trainer ready, 4 worker
[INFO] Step 1: rollout 16 traj per prompt, batch 32 prompts ...
[INFO] Step 1: avg reward=0.18, entropy=2.85, kl=0.02
[INFO] Step 2: avg reward=0.22, entropy=2.81, kl=0.04
...

正常前 100 step:

  • reward 0.15 → 0.30(在涨)
  • entropy 2.8 → 2.5(缓慢下降,健康)
  • KL < 0.5
  • effective_group_ratio > 70%

6. 监控仪表盘:必盯曲线

按第 3 章给的 wandb 配置,重点盯下面几条:

6.1 健康曲线参考

0  ──────  500 ──────  1500 ────── 3000 ────── 5000
│         │           │            │           │
reward:   0.18 → 0.32 → 0.55 → 0.65 → 0.72(慢但稳)
entropy:  2.85 → 2.4 → 1.8 → 1.5 → 1.3(缓慢下)
KL:       0.02 → 0.5 → 1.5 → 2.5 → 3.0(逐步上升,< 5 安全)
EGR:      90% → 80% → 70% → 65% → 60%(渐降但不崩)

6.2 wandb 自定义告警

# monitoring/wandb_alerts.py
import wandb

def setup_alerts(run):
    """每个监控指标配自动告警。"""
    # Entropy collapse
    run.alert(
        title="Entropy Collapse Risk",
        text="Entropy dropped below threshold",
        level=wandb.AlertLevel.WARN,
        # 在 wandb 网页配阈值
    )

# 训练里每 step 调用:
def check_and_alert(metrics, step, history):
    alerts = []
    if metrics["entropy"] < 0.3 * history.entropy_init:
        alerts.append(("ENTROPY_COLLAPSE", "WARN"))
    if metrics["effective_group_ratio"] < 0.4:
        alerts.append(("ADV_COLLAPSE", "WARN"))
    if metrics["kl_to_ref"] > 8:
        alerts.append(("KL_DRIFT", "ERROR"))
    if step > 500:
        recent_logp = history.mean_log_pi[-300:]
        if recent_logp[-1] / recent_logp[0] < 0.6:  # 跌 40%
            alerts.append(("LLD_RISK", "ERROR"))
    
    for title, level in alerts:
        wandb.alert(title=title, level=getattr(wandb.AlertLevel, level))

6.3 Grafana 仪表盘要素

┌─────────────────┬─────────────────┬─────────────────┐
│ Mean Reward      │ Entropy          │ KL to Ref         │
│ ─── 趋势 ───      │ ─── 趋势 ───      │ ─── 趋势 ───       │
└─────────────────┴─────────────────┴─────────────────┘
┌─────────────────┬─────────────────┬─────────────────┐
│ Effective Group │ Mean log_pi      │ Gradient Norm    │
│ Ratio            │ (LLD 监控)       │                   │
└─────────────────┴─────────────────┴─────────────────┘
┌─────────────────┬─────────────────┬─────────────────┐
│ Per-dim reward:  │ Avg #search/    │ Avg response     │
│ correctness/     │ trajectory       │ length            │
│ format/length    │                  │                   │
└─────────────────┴─────────────────┴─────────────────┘

7. 故障演示:故意触发 Entropy Collapse

为了让你”亲眼看见”训练崩,这里给一段故意搞炸的配置:

7.1 错误配置

# configs/grpo_collapse_demo.yaml(差异)
algorithm:
  kl_coef: 0.0001            # 太小,policy 自由跑
actor_rollout_ref:
  actor:
    optim:
      lr: 5e-5                 # 比 1e-6 大 50 倍,太激进
  rollout:
    temperature: 0.5           # 偏低,输出本身就尖锐

7.2 触发后的曲线(模拟)

Step:    0   100   300   500   800
Reward:  0.18 0.42 0.55 0.55 0.55  ← 看似在收敛
Entropy: 2.85 2.0  0.8  0.2  0.05  ← 急剧下降
EGR:     90%  80%  50%  30%  10%   ← 信号即将消失
KL:      0.02 0.5  3.0  8.0  15    ← 漂得离谱

第 800 step 后:模型每个 prompt 采 16 个样本几乎一模一样,advantage = 0,梯度 = 0,训练实际上停滞

7.3 救援步骤(实战流程)

发现 entropy 暴跌后:

# 1. 立即停止
ctrl+C

# 2. 回滚到最近 healthy checkpoint
ls checkpoints/qwen7b_search_grpo/
# 选 step 100 那个(entropy 还是 2.0)

# 3. 改配置
sed -i 's/lr: 5e-5/lr: 1e-6/' configs/grpo_qwen7b.yaml
sed -i 's/kl_coef: 0.0001/kl_coef: 0.01/' configs/grpo_qwen7b.yaml

# 加 entropy bonus
echo "  entropy_coef: 0.005" >> configs/grpo_qwen7b.yaml

# 4. 从 healthy ckpt 继续
python -m verl.trainer.main_ppo \
    --config-path=configs --config-name=grpo_qwen7b \
    trainer.resume_path=checkpoints/qwen7b_search_grpo/step_100

第 3 章讲的”避坑工具箱”在这里完整复现一遍。


8. GAIA 评测

8.1 GAIA 简介

GAIA(General AI Assistants Benchmark):Princeton + HAL,Agent 真实场景评测——研究论文查找、跨文档推理、工具使用、计算等。Level 1 是基础,Level 3 是高难。

# 拉取 GAIA(需要 HF token)
pip install gaia
git clone https://huggingface.co/datasets/gaia-benchmark/GAIA

8.2 评测脚本

# src/eval_gaia.py
import json
from gaia import GAIA
from src.agent_rollout import search_agent_rollout
from vllm import LLM, SamplingParams

llm = LLM(model="checkpoints/qwen7b_search_grpo/step_5000",
          tensor_parallel_size=8)
sampling = SamplingParams(temperature=0.1, max_tokens=4096)

gaia = GAIA(level=1)   # basic
results = []

for task in gaia.tasks:
    state = await search_agent_rollout(task.question, llm, sampling)
    answer = extract_answer(state)
    correct = gaia.verify(task.id, answer)
    results.append({"id": task.id, "correct": correct})

acc = sum(r["correct"] for r in results) / len(results)
print(f"GAIA Level 1 accuracy: {acc:.2%}")

with open("eval_results.json", "w") as f:
    json.dump(results, f, indent=2)

8.3 期望结果

按 Search-R1 论文范式,Qwen2.5-7B + 5K step GRPO 在 GAIA Level 1 应达到 60-65%(SFT-only baseline 约 35%)。

注意:GAIA 在 2026-04 被 UC Berkeley 揭示存在 reward hacking 风险,生产模型评测应同时跑多个 benchmark(WebArena、TAU-bench 等)交叉验证。


9. 上线 checklist 与成本估算

9.1 上线 checklist

  • 健康曲线:reward 单调上升,entropy 缓慢下,KL < 5,EGR > 50%
  • GAIA Level 1 ≥ 60%(主目标)
  • HotpotQA dev ≥ 75%(子目标)
  • 通用能力不退化:跑 MMLU / AlpacaEval,RL 后下降 < 3%
  • 抽样人审 100 条 reward 高的 trajectory:无明显 reward hacking
  • 多 verifier 交叉:跑 WebArena / TAU-bench 部分子集
  • 推理部署:RL 模型用 vLLM 起 OpenAI 兼容 API
  • 回滚预案:保留至少 3 个 healthy checkpoint(每 1000 step)
  • 生产灰度:5% → 50% → 100% 三阶段灰度
  • 监控:推理时 OTel trace(模块六第 8 章)+ reward 异常告警

9.2 成本估算(参考)

8 卡 H100(AWS p5.48xlarge,~$98/小时):

阶段时长成本
数据准备4 小时$400
Cold-start SFT(可选)12 小时$1.2K
GRPO 主训练 5K step60 小时$5.9K
RFT 整形8 小时$800
评测 + 调试16 小时$1.6K
合计~100 小时~$10K

🌟 **训一个 production-ready 7B Search Agent 大约 10K——比从零训basemodel(10K**——比从零训 base model(1M+)便宜 100 倍,但比纯 SFT($1K)贵 10 倍。ROI 在”是否需要 emergent reasoning capability”——需要就值,不需要就省。

9.3 自托管 vs 云

选项月成本(连续训)适合
AWS / GCP / Azure~$70K(8 卡 H100)短期实验、灵活
Hyperstack / Lambda Cloud~$30K中期
自购 8× H100 服务器$250K 一次性 + 电费长期 + 团队多人复用

✅ 自我检验清单

  • 项目骨架:能默写 src / configs / data / monitoring 目录的文件
  • 数据处理:能写 HotpotQA + 2WikiMultihop 合并 prompt 模板
  • Search Tool 实现:能写一个 BM25 风格的 search 函数
  • Reward 函数:能写多维度 reward(correctness / format / length penalty)
  • Multi-turn rollout:能写”LLM 生成 → 检测 search 标签 → 调 tool → 拼回”
  • verl YAML:能默写 GRPO + DAPO 改进 + async + adaptive_kl 关键配置
  • 健康曲线:能描述 5K step 训练 reward / entropy / KL 的健康范围
  • 故障救援:能复述 entropy collapse 出现后的 4 步救援
  • GAIA 评测:能跑通 verifier 函数,目标 60%+
  • 成本估算:能算 8 卡 H100 训 7B Search Agent 总成本约 $10K

📚 参考资料

复现路径

数据集

监控与运维

  • wandb GRPO 仪表盘模板:wandb 社区
  • UC Berkeley RDI Reward Hacking:博文

🎉 恭喜完成模块七 Agentic RL!

走完 9 章 + 学习路线总览,你已经掌握:

  • 算法:PG / Actor-Critic / PPO / DPO / GRPO / DAPO 五代核心
  • 稳定性:Entropy / Advantage / KL collapse + LLD Death Spiral 失败模式
  • Reward 设计:RLVR、PRM/ORM、Multi-objective、Reward Hacking 防御
  • 论文:Search-R1 / ToolRL / Agent-R1 / VerlTool 等 7 篇里程碑
  • 工程:Multi-turn + Async Rollout、verl Fully Async 范式
  • 自我改进:RFT / ReST / STaR / Synthetic Data / Self-Play / SFT-RL Flywheel
  • 框架:verl / OpenRLHF / TRL / NeMo-RL / Unsloth 8 框架横评
  • 实战:8 卡 H100 训 Qwen2.5-7B Search Agent 完整可复现

模块五 Memory + 模块六 Runtime + 模块七 RL = 完整的 Agent 工程三件套。下一步该准备模块八 Agent Evaluation & Benchmarks——给所有这些 agent 系统一个”可比、可信、可重现”的尺子。