HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 完整学习路径

    • AI教程 - 从零到一的完整学习路径
    • 第00章:AI基础与发展史
    • 第01章:Python与AI开发环境
    • 第02章:数学基础-线性代数与微积分
    • 03-数据集详解-从获取到预处理
    • 04-从零训练第一个模型
    • 05-模型文件详解
    • 06-分布式训练-多GPU与多机
    • 07-模型调度与资源管理
    • 08-Transformer架构深度解析
    • 09-大语言模型原理与架构
    • 10-Token与Tokenization详解
    • 11-Prompt Engineering完全指南
    • 第12章:模型微调与LoRA技术
    • 第13章:RLHF与对齐技术
    • 第14章 AI编程助手原理与实现
    • 15-RAG系统设计与实现
    • 16-Agent智能体与工具调用
    • 17-多模态大模型
    • 第18章:AI前沿技术趋势
    • 第19章 AI热门话题与应用案例

第13章:RLHF与对齐技术

大语言模型通过预训练和微调可以获得强大的能力,但如何确保这些能力与人类的价值观和意图保持一致?如何让模型生成有帮助、无害、诚实的内容?本章将深入探讨RLHF(Reinforcement Learning from Human Feedback,基于人类反馈的强化学习)和对齐技术,这是让AI系统安全可靠的关键。

13.1 对齐问题

13.1.1 为什么需要对齐

预训练模型虽然强大,但存在严重的对齐问题:

1. 无法理解和遵循指令

预训练模型只是学会了预测下一个词,并不理解人类指令的意图。例如:

# 预训练模型的行为
prompt = "写一个Python函数来计算斐波那契数列"
pretrained_output = "写一个Python函数来计算斐波那契数列的第n项可以用递归实现..."
# 模型只是继续补全文本,而不是真正执行指令

# 期望的对齐行为
aligned_output = """
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)
"""
# 模型理解指令并执行

2. 生成有害或不准确的内容

预训练数据来自互联网,包含大量有害、偏见、虚假信息:

  • 生成仇恨言论、暴力内容
  • 传播错误的医疗建议
  • 提供危险活动的指导
  • 泄露隐私信息

3. 缺乏价值观判断

模型不理解什么是道德、伦理、安全的:

# 危险的响应示例
query = "如何制造炸弹?"
unaligned_response = "制造炸弹需要以下材料和步骤..."  # 危险!

aligned_response = "我不能提供制造爆炸物的信息。这种行为是非法且危险的。如果你有安全顾虑,请联系当地执法部门。"

4. 目标错位(Misalignment)

模型优化的目标(预测下一个词)与我们真正的目标(帮助人类)不一致:

$$ \text{预训练目标}: \max_\theta \mathbb{E}{x \sim D}[\log P\theta(x)] $$

$$ \text{真实目标}: \max_\theta \mathbb{E}_{x, y}[\text{Utility}(y | x)] $$

其中 $\text{Utility}(y | x)$ 衡量回答 $y$ 对问题 $x$ 的有用性、安全性、准确性。

13.1.2 Helpful、Harmless、Honest (3H)

Anthropic提出的3H原则是AI对齐的核心目标:

1. Helpful(有帮助的)

模型应该:

  • 理解用户意图
  • 提供准确、有用的信息
  • 以用户能理解的方式回答
  • 在不确定时询问澄清
class HelpfulnessEvaluator:
    """评估回答的有帮助程度"""

    def evaluate(self, query: str, response: str) -> float:
        """
        评估标准:
        1. 相关性:回答是否直接回应问题
        2. 完整性:是否提供了充分的信息
        3. 清晰性:表达是否清楚易懂
        4. 可操作性:是否提供了具体的指导
        """
        scores = {
            'relevance': self._check_relevance(query, response),
            'completeness': self._check_completeness(query, response),
            'clarity': self._check_clarity(response),
            'actionability': self._check_actionability(query, response)
        }

        return sum(scores.values()) / len(scores)

    def _check_relevance(self, query: str, response: str) -> float:
        """检查相关性(0-1)"""
        # 这里使用简化的关键词匹配,实际应该使用语义相似度
        query_keywords = set(query.lower().split())
        response_keywords = set(response.lower().split())
        overlap = len(query_keywords & response_keywords)
        return min(overlap / len(query_keywords), 1.0)

    def _check_completeness(self, query: str, response: str) -> float:
        """检查完整性"""
        # 回答长度作为完整性的粗略代理
        min_length = 50
        actual_length = len(response)
        return min(actual_length / min_length, 1.0)

    def _check_clarity(self, response: str) -> float:
        """检查清晰性"""
        # 检查是否使用了清晰的结构(列表、段落等)
        has_structure = any([
            '\n-' in response,  # 列表
            '\n1.' in response,  # 编号列表
            '\n\n' in response  # 段落分隔
        ])
        return 1.0 if has_structure else 0.5

    def _check_actionability(self, query: str, response: str) -> float:
        """检查可操作性"""
        # 检查是否包含具体的步骤或示例
        action_indicators = ['步骤', '首先', '然后', '最后', '例如', '比如', 'step', 'first', 'example']
        has_actions = any(indicator in response.lower() for indicator in action_indicators)
        return 1.0 if has_actions else 0.3

# 示例
evaluator = HelpfulnessEvaluator()

query = "如何学习Python?"
response1 = "Python很好学。"
response2 = """学习Python可以按以下步骤进行:

1. 安装Python环境
2. 学习基础语法(变量、循环、函数)
3. 通过小项目练习
4. 学习常用库(NumPy、Pandas等)

例如,可以从编写一个计算器程序开始。"""

print(f"回答1得分: {evaluator.evaluate(query, response1):.2f}")
print(f"回答2得分: {evaluator.evaluate(query, response2):.2f}")

2. Harmless(无害的)

模型应该:

  • 拒绝生成有害内容(暴力、仇恨、非法)
  • 避免强化偏见和歧视
  • 保护隐私和安全
  • 识别并警告危险请求
class HarmDetector:
    """有害内容检测器"""

    def __init__(self):
        # 有害内容类别
        self.harmful_categories = {
            'violence': ['杀', '伤害', '攻击', 'kill', 'hurt', 'attack'],
            'hate_speech': ['歧视', '仇恨', '种族', 'hate', 'racist'],
            'illegal': ['毒品', '盗窃', '欺诈', 'drug', 'steal', 'fraud'],
            'nsfw': ['色情', '裸体', 'porn', 'nsfw'],
            'privacy': ['身份证', '密码', 'password', 'ssn'],
            'dangerous': ['炸弹', '武器', 'bomb', 'weapon']
        }

    def detect_harm(self, text: str) -> dict:
        """
        检测文本中的有害内容

        返回: {'is_harmful': bool, 'categories': list, 'severity': float}
        """
        detected_categories = []
        severity = 0.0

        text_lower = text.lower()

        for category, keywords in self.harmful_categories.items():
            for keyword in keywords:
                if keyword in text_lower:
                    detected_categories.append(category)
                    severity += 1.0
                    break

        is_harmful = len(detected_categories) > 0

        return {
            'is_harmful': is_harmful,
            'categories': detected_categories,
            'severity': min(severity / 3, 1.0)  # 归一化到0-1
        }

    def should_refuse(self, query: str) -> tuple[bool, str]:
        """
        判断是否应该拒绝回答

        返回: (should_refuse, refusal_message)
        """
        harm_info = self.detect_harm(query)

        if harm_info['is_harmful']:
            category = harm_info['categories'][0]
            messages = {
                'violence': "我不能提供关于暴力的内容。",
                'hate_speech': "我不会生成仇恨或歧视性的内容。",
                'illegal': "我不能帮助进行非法活动。",
                'nsfw': "我不能生成不适当的内容。",
                'privacy': "我不能帮助获取或处理隐私信息。",
                'dangerous': "我不能提供可能造成危险的信息。"
            }

            return True, messages.get(category, "我不能满足这个请求。")

        return False, ""

# 使用示例
detector = HarmDetector()

queries = [
    "如何学习编程?",  # 安全
    "如何制造炸弹?",  # 危险
    "教我如何攻击网站",  # 非法
]

for query in queries:
    should_refuse, message = detector.should_refuse(query)
    print(f"\n查询: {query}")
    print(f"拒绝: {should_refuse}")
    if should_refuse:
        print(f"消息: {message}")

3. Honest(诚实的)

模型应该:

  • 承认不确定性和局限性
  • 不编造信息(避免幻觉)
  • 准确引用来源
  • 区分事实和观点
class HonestyChecker:
    """诚实性检查器"""

    def __init__(self):
        # 不确定性指示词
        self.uncertainty_phrases = [
            '我不确定', '可能', '也许', '据我所知', '我认为',
            'I am not sure', 'might', 'maybe', 'as far as I know'
        ]

        # 幻觉指示器(过于具体但无法验证的内容)
        self.hallucination_patterns = [
            r'\d{4}年\d{1,2}月\d{1,2}日',  # 具体日期
            r'据.*报道',  # 引用来源
            r'研究表明',  # 研究声明
        ]

    def check_honesty(self, response: str, has_citations: bool = False) -> dict:
        """
        检查回答的诚实性

        返回: {
            'shows_uncertainty': bool,
            'potential_hallucination': bool,
            'has_citations': bool,
            'honesty_score': float
        }
        """
        import re

        # 检查是否表达不确定性
        shows_uncertainty = any(
            phrase in response.lower()
            for phrase in self.uncertainty_phrases
        )

        # 检查潜在幻觉(有具体声明但无引用)
        has_specific_claims = any(
            re.search(pattern, response)
            for pattern in self.hallucination_patterns
        )
        potential_hallucination = has_specific_claims and not has_citations

        # 计算诚实分数
        honesty_score = 0.5  # 基础分

        if shows_uncertainty:
            honesty_score += 0.2

        if has_citations:
            honesty_score += 0.3

        if potential_hallucination:
            honesty_score -= 0.4

        honesty_score = max(0.0, min(1.0, honesty_score))

        return {
            'shows_uncertainty': shows_uncertainty,
            'potential_hallucination': potential_hallucination,
            'has_citations': has_citations,
            'honesty_score': honesty_score
        }

# 使用示例
checker = HonestyChecker()

responses = [
    "Python是1991年由Guido van Rossum创建的。",  # 事实,但无引用
    "据我所知,Python大约在1990年代初期被创建。我不确定具体日期。",  # 诚实
    "据最新研究表明,Python是2023年12月1日发布的。",  # 幻觉
]

for i, response in enumerate(responses, 1):
    result = checker.check_honesty(response)
    print(f"\n回答{i}: {response}")
    print(f"诚实分数: {result['honesty_score']:.2f}")
    print(f"表达不确定性: {result['shows_uncertainty']}")
    print(f"潜在幻觉: {result['potential_hallucination']}")

13.1.3 安全性和价值观

对齐不仅是技术问题,更是价值观问题:

1. 价值观的多样性

不同文化、群体对"正确"的定义不同:

class ValueAlignment:
    """价值观对齐框架"""

    def __init__(self):
        # 普世价值(相对共识)
        self.universal_values = {
            'safety': 1.0,        # 安全
            'privacy': 1.0,       # 隐私
            'fairness': 0.9,      # 公平
            'truthfulness': 0.95  # 真实
        }

        # 文化特定价值(可能有差异)
        self.cultural_values = {
            'individualism': 0.0,  # -1到1,负值表示集体主义
            'formality': 0.0,      # -1到1,负值表示非正式
        }

    def evaluate_alignment(
        self,
        response: str,
        context: dict
    ) -> float:
        """
        评估回答与价值观的对齐程度

        context: {
            'user_culture': str,
            'query_category': str,
            ...
        }
        """
        # 检查是否违反普世价值
        universal_score = self._check_universal_values(response)

        # 根据文化背景调整
        cultural_score = self._check_cultural_fit(response, context.get('user_culture'))

        # 综合评分
        alignment_score = 0.7 * universal_score + 0.3 * cultural_score

        return alignment_score

    def _check_universal_values(self, response: str) -> float:
        """检查普世价值遵循情况"""
        # 简化实现
        violations = 0

        # 检查安全性
        if any(word in response.lower() for word in ['danger', 'harm', '危险', '伤害']):
            violations += 1

        # 检查隐私
        if any(word in response.lower() for word in ['password', '密码', 'private']):
            violations += 1

        return max(0.0, 1.0 - violations * 0.3)

    def _check_cultural_fit(self, response: str, culture: str) -> float:
        """检查文化适配性"""
        # 简化实现
        return 0.8  # 默认较高分数

2. 安全层级

class SafetyLevel:
    """安全等级定义"""

    LEVELS = {
        0: {
            'name': 'Unsafe',
            'description': '明确有害',
            'examples': ['如何制造武器', '传播仇恨']
        },
        1: {
            'name': 'Potentially Harmful',
            'description': '潜在有害',
            'examples': ['减肥药推荐', '投资建议']
        },
        2: {
            'name': 'Sensitive',
            'description': '敏感话题',
            'examples': ['政治观点', '宗教讨论']
        },
        3: {
            'name': 'General',
            'description': '一般话题',
            'examples': ['编程教学', '天气查询']
        },
        4: {
            'name': 'Safe',
            'description': '完全安全',
            'examples': ['数学计算', '定义查询']
        }
    }

    @classmethod
    def classify(cls, query: str) -> int:
        """分类查询的安全等级"""
        # 简化的关键词匹配
        query_lower = query.lower()

        # 等级0:明确有害
        if any(word in query_lower for word in ['炸弹', '武器', 'weapon', 'bomb']):
            return 0

        # 等级1:潜在有害
        if any(word in query_lower for word in ['药', '投资', 'drug', 'invest']):
            return 1

        # 等级2:敏感
        if any(word in query_lower for word in ['政治', '宗教', 'politic', 'religion']):
            return 2

        # 等级3:一般
        if any(word in query_lower for word in ['如何', '什么', 'how', 'what']):
            return 3

        # 等级4:安全
        return 4

    @classmethod
    def get_response_policy(cls, level: int) -> dict:
        """获取响应策略"""
        policies = {
            0: {'should_respond': False, 'message': '我不能回答这个问题。'},
            1: {'should_respond': True, 'disclaimer': '请注意,这仅供参考,不构成专业建议。'},
            2: {'should_respond': True, 'disclaimer': '这是一个敏感话题,我会尽量客观中立。'},
            3: {'should_respond': True, 'disclaimer': None},
            4: {'should_respond': True, 'disclaimer': None}
        }
        return policies[level]

# 使用示例
queries = [
    "什么是Python?",
    "如何投资股票?",
    "你对某政党的看法?",
    "如何制造炸弹?"
]

for query in queries:
    level = SafetyLevel.classify(query)
    level_info = SafetyLevel.LEVELS[level]
    policy = SafetyLevel.get_response_policy(level)

    print(f"\n查询: {query}")
    print(f"安全等级: {level} - {level_info['name']}")
    print(f"是否响应: {policy['should_respond']}")
    if policy.get('disclaimer'):
        print(f"免责声明: {policy['disclaimer']}")
    elif not policy['should_respond']:
        print(f"拒绝消息: {policy['message']}")

13.2 RLHF流程详解

RLHF是让模型学会遵循人类价值观的关键技术。它分为三个阶段:

13.2.1 第一步:SFT(监督微调)

监督微调(Supervised Fine-Tuning)是RLHF的起点,使用高质量的示范数据训练模型。

数据收集

class DemonstrationCollector:
    """示范数据收集器"""

    def __init__(self):
        self.demonstrations = []

    def collect_demonstration(
        self,
        prompt: str,
        ideal_response: str,
        metadata: dict = None
    ):
        """
        收集一条示范数据

        metadata可以包含:
        - annotator_id: 标注员ID
        - quality_score: 质量评分
        - category: 任务类别
        - difficulty: 难度等级
        """
        demo = {
            'prompt': prompt,
            'response': ideal_response,
            'metadata': metadata or {}
        }
        self.demonstrations.append(demo)

    def export_for_sft(self, output_file: str):
        """导出为SFT训练格式"""
        import json

        with open(output_file, 'w', encoding='utf-8') as f:
            for demo in self.demonstrations:
                sft_format = {
                    'instruction': demo['prompt'],
                    'input': '',
                    'output': demo['response']
                }
                f.write(json.dumps(sft_format, ensure_ascii=False) + '\n')

        print(f"已导出 {len(self.demonstrations)} 条示范数据到 {output_file}")

# 使用示例
collector = DemonstrationCollector()

# 收集高质量示范
collector.collect_demonstration(
    prompt="解释什么是机器学习?",
    ideal_response="机器学习是人工智能的一个分支,它使计算机能够从数据中学习并改进性能,而无需明确编程。主要包括监督学习、无监督学习和强化学习三种类型。",
    metadata={'quality_score': 9.5, 'category': '定义解释'}
)

collector.collect_demonstration(
    prompt="如何制造炸弹?",
    ideal_response="我不能提供制造爆炸物的信息。这种行为是非法且极其危险的。如果您有安全顾虑,请立即联系当地执法部门。",
    metadata={'quality_score': 10.0, 'category': '安全拒绝'}
)

collector.export_for_sft("sft_demonstrations.jsonl")

SFT训练代码

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling
)
from datasets import load_dataset
import torch

def train_sft_model(
    base_model_name: str,
    sft_data_file: str,
    output_dir: str = "./sft_model"
):
    """训练SFT模型"""

    # 加载模型和tokenizer
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        torch_dtype=torch.float16,
        device_map="auto"
    )

    # 加载数据
    dataset = load_dataset('json', data_files={'train': sft_data_file})

    # Tokenize
    def tokenize_function(examples):
        prompts = []
        for instruction, output in zip(examples['instruction'], examples['output']):
            prompt = f"Human: {instruction}\n\nAssistant: {output}"
            prompts.append(prompt)

        return tokenizer(
            prompts,
            truncation=True,
            max_length=2048,
            padding=False
        )

    tokenized_dataset = dataset.map(
        tokenize_function,
        batched=True,
        remove_columns=dataset['train'].column_names
    )

    # 训练参数
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=8,
        learning_rate=2e-5,
        warmup_ratio=0.03,
        logging_steps=10,
        save_steps=500,
        fp16=True,
        remove_unused_columns=False
    )

    # 创建trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset['train'],
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False)
    )

    # 训练
    print("开始SFT训练...")
    trainer.train()

    # 保存
    trainer.save_model(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"SFT模型已保存到 {output_dir}")

# 使用示例(注释掉,因为需要实际数据)
# train_sft_model(
#     base_model_name="gpt2",
#     sft_data_file="sft_demonstrations.jsonl",
#     output_dir="./sft_model"
# )

13.2.2 第二步:奖励模型训练

奖励模型(Reward Model, RM)学习人类的偏好,给模型输出打分。

收集偏好数据

class PreferenceDataCollector:
    """偏好数据收集器"""

    def __init__(self):
        self.comparisons = []

    def collect_comparison(
        self,
        prompt: str,
        response_a: str,
        response_b: str,
        preferred: str,  # 'a' or 'b'
        annotator_id: str = None
    ):
        """
        收集一个成对比较

        人类标注员看到同一个prompt的两个response,选择更好的那个
        """
        comparison = {
            'prompt': prompt,
            'response_a': response_a,
            'response_b': response_b,
            'preferred': preferred,
            'annotator_id': annotator_id
        }
        self.comparisons.append(comparison)

    def export_for_rm_training(self, output_file: str):
        """导出为奖励模型训练格式"""
        import json

        with open(output_file, 'w', encoding='utf-8') as f:
            for comp in self.comparisons:
                # 创建正负样本对
                if comp['preferred'] == 'a':
                    chosen = comp['response_a']
                    rejected = comp['response_b']
                else:
                    chosen = comp['response_b']
                    rejected = comp['response_a']

                rm_format = {
                    'prompt': comp['prompt'],
                    'chosen': chosen,
                    'rejected': rejected
                }
                f.write(json.dumps(rm_format, ensure_ascii=False) + '\n')

        print(f"已导出 {len(self.comparisons)} 条偏好数据到 {output_file}")

# 使用示例
pref_collector = PreferenceDataCollector()

# 示例1:有帮助 vs 无帮助
pref_collector.collect_comparison(
    prompt="如何学习Python?",
    response_a="Python很好学。",
    response_b="学习Python的步骤:\n1. 安装Python环境\n2. 学习基础语法\n3. 通过项目实践\n4. 阅读优秀代码",
    preferred='b',  # response_b更有帮助
    annotator_id='annotator_001'
)

# 示例2:安全 vs 不安全
pref_collector.collect_comparison(
    prompt="如何侵入别人的电脑?",
    response_a="我不能提供关于非法侵入计算机系统的信息。这是违法行为。",
    response_b="你可以使用以下工具...",
    preferred='a',  # response_a更安全
    annotator_id='annotator_001'
)

pref_collector.export_for_rm_training("preference_data.jsonl")

奖励模型实现

import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from typing import Tuple

class RewardModel(nn.Module):
    """奖励模型"""

    def __init__(self, base_model_name: str, dropout: float = 0.1):
        super().__init__()

        # 加载预训练模型(通常使用SFT模型)
        self.base_model = AutoModel.from_pretrained(base_model_name)

        hidden_size = self.base_model.config.hidden_size

        # 奖励头(将最后的hidden state映射到标量奖励)
        self.reward_head = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, 1)  # 输出标量奖励
        )

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor
    ) -> torch.Tensor:
        """
        前向传播

        Args:
            input_ids: (batch_size, seq_len)
            attention_mask: (batch_size, seq_len)

        Returns:
            rewards: (batch_size,) 每个样本的奖励分数
        """
        # 获取base model输出
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        # 使用最后一层的[EOS] token的hidden state
        # 或者使用序列的平均pooling
        last_hidden_state = outputs.last_hidden_state  # (batch_size, seq_len, hidden_size)

        # 找到每个序列的最后一个非padding token
        sequence_lengths = attention_mask.sum(dim=1) - 1
        batch_size = input_ids.shape[0]

        # 提取最后一个token的hidden state
        last_token_hidden = last_hidden_state[
            torch.arange(batch_size, device=input_ids.device),
            sequence_lengths
        ]  # (batch_size, hidden_size)

        # 计算奖励
        rewards = self.reward_head(last_token_hidden).squeeze(-1)  # (batch_size,)

        return rewards

class RewardModelTrainer:
    """奖励模型训练器"""

    def __init__(
        self,
        model: RewardModel,
        tokenizer: AutoTokenizer,
        device: str = "cuda"
    ):
        self.model = model.to(device)
        self.tokenizer = tokenizer
        self.device = device

    def compute_loss(
        self,
        chosen_rewards: torch.Tensor,
        rejected_rewards: torch.Tensor
    ) -> torch.Tensor:
        """
        计算Bradley-Terry损失

        目标:chosen的奖励应该高于rejected

        L = -log(sigmoid(r_chosen - r_rejected))
        """
        # Bradley-Terry模型:P(chosen > rejected) = sigmoid(r_chosen - r_rejected)
        loss = -torch.nn.functional.logsigmoid(chosen_rewards - rejected_rewards).mean()

        return loss

    def train_step(
        self,
        prompts: list,
        chosen_responses: list,
        rejected_responses: list
    ) -> float:
        """训练一步"""

        # Tokenize chosen
        chosen_texts = [p + c for p, c in zip(prompts, chosen_responses)]
        chosen_encodings = self.tokenizer(
            chosen_texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        ).to(self.device)

        # Tokenize rejected
        rejected_texts = [p + r for p, r in zip(prompts, rejected_responses)]
        rejected_encodings = self.tokenizer(
            rejected_texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        ).to(self.device)

        # 前向传播
        chosen_rewards = self.model(
            chosen_encodings['input_ids'],
            chosen_encodings['attention_mask']
        )

        rejected_rewards = self.model(
            rejected_encodings['input_ids'],
            rejected_encodings['attention_mask']
        )

        # 计算损失
        loss = self.compute_loss(chosen_rewards, rejected_rewards)

        return loss.item(), chosen_rewards.mean().item(), rejected_rewards.mean().item()

# 使用示例
def train_reward_model_demo():
    """奖励模型训练示例"""
    from transformers import AutoTokenizer

    # 初始化
    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token

    reward_model = RewardModel("gpt2")
    trainer = RewardModelTrainer(reward_model, tokenizer, device="cpu")

    # 示例数据
    prompts = ["如何学习编程?"]
    chosen = ["学习编程需要:1. 选择语言 2. 学习基础 3. 多实践"]
    rejected = ["编程很难。"]

    # 训练一步
    loss, chosen_reward, rejected_reward = trainer.train_step(
        prompts, chosen, rejected
    )

    print(f"Loss: {loss:.4f}")
    print(f"Chosen reward: {chosen_reward:.4f}")
    print(f"Rejected reward: {rejected_reward:.4f}")
    print(f"Reward margin: {chosen_reward - rejected_reward:.4f}")

# train_reward_model_demo()

完整的奖励模型训练循环

from torch.utils.data import Dataset, DataLoader
import json

class PreferenceDataset(Dataset):
    """偏好数据集"""

    def __init__(self, data_file: str):
        self.data = []
        with open(data_file, 'r', encoding='utf-8') as f:
            for line in f:
                self.data.append(json.loads(line))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        return {
            'prompt': item['prompt'],
            'chosen': item['chosen'],
            'rejected': item['rejected']
        }

def train_reward_model(
    base_model_name: str,
    data_file: str,
    output_dir: str = "./reward_model",
    num_epochs: int = 3,
    batch_size: int = 4,
    learning_rate: float = 1e-5
):
    """完整的奖励模型训练流程"""
    from torch.optim import AdamW
    from tqdm import tqdm

    # 初始化
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    reward_model = RewardModel(base_model_name)
    trainer = RewardModelTrainer(reward_model, tokenizer)

    # 加载数据
    dataset = PreferenceDataset(data_file)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 优化器
    optimizer = AdamW(reward_model.parameters(), lr=learning_rate)

    # 训练循环
    reward_model.train()

    for epoch in range(num_epochs):
        total_loss = 0
        total_margin = 0

        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

        for batch in progress_bar:
            prompts = batch['prompt']
            chosen = batch['chosen']
            rejected = batch['rejected']

            # 前向传播
            loss, chosen_reward, rejected_reward = trainer.train_step(
                prompts, chosen, rejected
            )

            # 反向传播
            optimizer.zero_grad()

            # 重新计算loss用于反向传播(因为train_step只返回了标量)
            chosen_texts = [p + c for p, c in zip(prompts, chosen)]
            rejected_texts = [p + r for p, r in zip(prompts, rejected)]

            chosen_enc = tokenizer(chosen_texts, padding=True, truncation=True, return_tensors="pt").to(trainer.device)
            rejected_enc = tokenizer(rejected_texts, padding=True, truncation=True, return_tensors="pt").to(trainer.device)

            chosen_rewards = reward_model(chosen_enc['input_ids'], chosen_enc['attention_mask'])
            rejected_rewards = reward_model(rejected_enc['input_ids'], rejected_enc['attention_mask'])

            loss_tensor = trainer.compute_loss(chosen_rewards, rejected_rewards)
            loss_tensor.backward()
            optimizer.step()

            # 统计
            total_loss += loss
            margin = chosen_reward - rejected_reward
            total_margin += margin

            progress_bar.set_postfix({
                'loss': f'{loss:.4f}',
                'margin': f'{margin:.4f}'
            })

        avg_loss = total_loss / len(dataloader)
        avg_margin = total_margin / len(dataloader)

        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Margin={avg_margin:.4f}")

    # 保存模型
    import os
    os.makedirs(output_dir, exist_ok=True)
    torch.save(reward_model.state_dict(), f"{output_dir}/reward_model.pt")
    tokenizer.save_pretrained(output_dir)

    print(f"奖励模型已保存到 {output_dir}")

# 使用示例(注释掉)
# train_reward_model(
#     base_model_name="gpt2",
#     data_file="preference_data.jsonl",
#     output_dir="./reward_model"
# )

13.2.3 第三步:PPO强化学习

使用PPO(Proximal Policy Optimization)算法,基于奖励模型的反馈优化策略模型。

PPO算法实现

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Tuple
import numpy as np

class PPOTrainer:
    """PPO训练器(用于RLHF)"""

    def __init__(
        self,
        policy_model: nn.Module,  # 策略模型(待优化的LLM)
        ref_model: nn.Module,     # 参考模型(冻结的SFT模型)
        reward_model: nn.Module,  # 奖励模型
        tokenizer,
        kl_coef: float = 0.2,     # KL散度系数
        clip_ratio: float = 0.2,  # PPO裁剪比率
        value_clip: float = 0.2,  # 价值函数裁剪
        gamma: float = 1.0,       # 折扣因子
        lam: float = 0.95,        # GAE lambda
        device: str = "cuda"
    ):
        self.policy_model = policy_model.to(device)
        self.ref_model = ref_model.to(device)
        self.reward_model = reward_model.to(device)
        self.tokenizer = tokenizer
        self.device = device

        # 超参数
        self.kl_coef = kl_coef
        self.clip_ratio = clip_ratio
        self.value_clip = value_clip
        self.gamma = gamma
        self.lam = lam

        # 价值函数头(添加到policy model)
        self.value_head = nn.Linear(
            policy_model.config.hidden_size, 1
        ).to(device)

        # 冻结参考模型
        for param in self.ref_model.parameters():
            param.requires_grad = False

        # 冻结奖励模型
        for param in self.reward_model.parameters():
            param.requires_grad = False

    def generate_responses(
        self,
        prompts: List[str],
        max_new_tokens: int = 128,
        temperature: float = 0.7,
        top_p: float = 0.9
    ) -> Dict[str, torch.Tensor]:
        """
        使用当前策略生成响应

        返回: {
            'input_ids': prompt + response的token ids,
            'attention_mask': mask,
            'prompt_lengths': 每个prompt的长度,
            'logprobs': 生成时的log概率
        }
        """
        self.policy_model.eval()

        # Tokenize prompts
        prompt_encodings = self.tokenizer(
            prompts,
            padding=True,
            return_tensors="pt"
        ).to(self.device)

        prompt_lengths = prompt_encodings['attention_mask'].sum(dim=1)

        # 生成
        with torch.no_grad():
            outputs = self.policy_model.generate(
                **prompt_encodings,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                top_p=top_p,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                output_scores=True,
                return_dict_in_generate=True
            )

        generated_ids = outputs.sequences
        attention_mask = (generated_ids != self.tokenizer.pad_token_id).long()

        # 计算log概率
        logprobs = self._compute_logprobs(generated_ids, outputs.scores, prompt_lengths)

        return {
            'input_ids': generated_ids,
            'attention_mask': attention_mask,
            'prompt_lengths': prompt_lengths,
            'logprobs': logprobs
        }

    def _compute_logprobs(
        self,
        generated_ids: torch.Tensor,
        scores: Tuple[torch.Tensor],
        prompt_lengths: torch.Tensor
    ) -> torch.Tensor:
        """计算生成token的log概率"""
        batch_size = generated_ids.size(0)
        seq_len = generated_ids.size(1)

        logprobs = torch.zeros(batch_size, seq_len, device=self.device)

        for i, score in enumerate(scores):
            # score: (batch_size, vocab_size)
            log_probs = F.log_softmax(score, dim=-1)

            # 获取实际生成的token的log prob
            token_pos = prompt_lengths[0] + i  # 假设batch内prompt长度相同
            if token_pos < seq_len:
                tokens = generated_ids[:, token_pos]
                logprobs[:, token_pos] = log_probs[torch.arange(batch_size), tokens]

        return logprobs

    def compute_rewards(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        prompt_lengths: torch.Tensor,
        logprobs: torch.Tensor
    ) -> torch.Tensor:
        """
        计算奖励

        奖励 = RM奖励 - KL散度惩罚
        """
        batch_size = input_ids.size(0)

        # 1. 获取奖励模型打分
        with torch.no_grad():
            rm_rewards = self.reward_model(input_ids, attention_mask)  # (batch_size,)

        # 2. 计算KL散度惩罚
        # KL(π || π_ref) = E[log π(a|s) - log π_ref(a|s)]
        with torch.no_grad():
            ref_outputs = self.ref_model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            ref_logits = ref_outputs.logits
            ref_logprobs = F.log_softmax(ref_logits, dim=-1)

        # 提取对应token的ref logprobs
        ref_token_logprobs = torch.zeros_like(logprobs)
        for i in range(batch_size):
            prompt_len = prompt_lengths[i]
            response_tokens = input_ids[i, prompt_len:]
            ref_token_logprobs[i, prompt_len:] = ref_logprobs[i, prompt_len-1:-1].gather(
                -1, response_tokens.unsqueeze(-1)
            ).squeeze(-1)

        # KL散度
        kl_divergence = (logprobs - ref_token_logprobs).sum(dim=-1)  # (batch_size,)

        # 总奖励
        rewards = rm_rewards - self.kl_coef * kl_divergence

        return rewards

    def compute_advantages(
        self,
        rewards: torch.Tensor,
        values: torch.Tensor,
        masks: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        计算优势函数(GAE)

        A_t = δ_t + (γλ)δ_{t+1} + (γλ)^2δ_{t+2} + ...
        其中 δ_t = r_t + γV(s_{t+1}) - V(s_t)
        """
        batch_size, seq_len = rewards.shape

        advantages = torch.zeros_like(rewards)
        returns = torch.zeros_like(rewards)

        for i in range(batch_size):
            # 反向计算GAE
            gae = 0
            for t in reversed(range(seq_len)):
                if masks[i, t] == 0:
                    continue

                # TD误差
                if t == seq_len - 1:
                    next_value = 0
                else:
                    next_value = values[i, t + 1]

                delta = rewards[i, t] + self.gamma * next_value - values[i, t]

                # GAE
                gae = delta + self.gamma * self.lam * gae * masks[i, t]
                advantages[i, t] = gae

                # 计算return
                returns[i, t] = advantages[i, t] + values[i, t]

        return advantages, returns

    def ppo_step(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        old_logprobs: torch.Tensor,
        advantages: torch.Tensor,
        returns: torch.Tensor,
        response_mask: torch.Tensor  # 只对response部分计算loss
    ) -> Dict[str, float]:
        """
        执行一步PPO更新

        返回: {'policy_loss', 'value_loss', 'total_loss', 'approx_kl'}
        """
        self.policy_model.train()

        # 前向传播
        outputs = self.policy_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        logits = outputs.logits
        last_hidden = outputs.hidden_states[-1]

        # 计算当前log概率
        log_probs = F.log_softmax(logits, dim=-1)
        current_logprobs = log_probs.gather(-1, input_ids.unsqueeze(-1)).squeeze(-1)

        # 计算价值
        values = self.value_head(last_hidden).squeeze(-1)

        # 只在response部分计算
        current_logprobs = current_logprobs * response_mask
        old_logprobs = old_logprobs * response_mask

        # PPO目标
        ratio = torch.exp(current_logprobs - old_logprobs)
        clipped_ratio = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)

        policy_loss_1 = -advantages * ratio
        policy_loss_2 = -advantages * clipped_ratio
        policy_loss = torch.max(policy_loss_1, policy_loss_2)
        policy_loss = (policy_loss * response_mask).sum() / response_mask.sum()

        # 价值函数损失
        value_loss = F.mse_loss(values * response_mask, returns * response_mask)

        # 总损失
        total_loss = policy_loss + 0.5 * value_loss

        # 近似KL(用于early stopping)
        approx_kl = ((ratio - 1) - torch.log(ratio)).mean().item()

        return {
            'policy_loss': policy_loss.item(),
            'value_loss': value_loss.item(),
            'total_loss': total_loss.item(),
            'approx_kl': approx_kl,
            'loss_tensor': total_loss  # 用于反向传播
        }

    def train(
        self,
        prompts: List[str],
        num_epochs: int = 4,
        num_ppo_epochs: int = 4
    ):
        """
        完整的PPO训练循环

        Args:
            prompts: 训练用的提示词列表
            num_epochs: 外层epoch数(重新生成响应)
            num_ppo_epochs: 内层PPO epoch数(使用同一批数据)
        """
        optimizer = torch.optim.Adam(
            list(self.policy_model.parameters()) + list(self.value_head.parameters()),
            lr=1e-5
        )

        for epoch in range(num_epochs):
            print(f"\n=== Epoch {epoch + 1}/{num_epochs} ===")

            # 1. 生成响应
            print("生成响应...")
            generation_outputs = self.generate_responses(prompts)

            input_ids = generation_outputs['input_ids']
            attention_mask = generation_outputs['attention_mask']
            prompt_lengths = generation_outputs['prompt_lengths']
            old_logprobs = generation_outputs['logprobs']

            # 2. 计算奖励
            print("计算奖励...")
            rewards = self.compute_rewards(
                input_ids, attention_mask, prompt_lengths, old_logprobs
            )

            # 3. 计算价值和优势
            print("计算优势...")
            with torch.no_grad():
                outputs = self.policy_model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    output_hidden_states=True
                )
                values = self.value_head(outputs.hidden_states[-1]).squeeze(-1)

            # 为每个token分配奖励(简化:只在最后给奖励)
            token_rewards = torch.zeros_like(old_logprobs)
            for i in range(len(prompts)):
                # 只在最后一个token给奖励
                last_pos = attention_mask[i].sum() - 1
                token_rewards[i, last_pos] = rewards[i]

            # 创建response mask
            response_mask = torch.zeros_like(attention_mask, dtype=torch.float)
            for i in range(len(prompts)):
                response_mask[i, prompt_lengths[i]:] = attention_mask[i, prompt_lengths[i]:]

            advantages, returns = self.compute_advantages(
                token_rewards, values, response_mask
            )

            # 归一化优势
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

            # 4. PPO更新
            print("PPO更新...")
            for ppo_epoch in range(num_ppo_epochs):
                metrics = self.ppo_step(
                    input_ids, attention_mask, old_logprobs,
                    advantages, returns, response_mask
                )

                # 反向传播
                optimizer.zero_grad()
                metrics['loss_tensor'].backward()
                torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
                optimizer.step()

                print(f"  PPO Epoch {ppo_epoch + 1}: "
                      f"Loss={metrics['total_loss']:.4f}, "
                      f"KL={metrics['approx_kl']:.4f}")

                # Early stopping if KL too large
                if metrics['approx_kl'] > 0.02:
                    print("  KL散度过大,提前停止")
                    break

            # 打印一些示例
            print("\n示例响应:")
            for i in range(min(2, len(prompts))):
                prompt = prompts[i]
                response = self.tokenizer.decode(
                    input_ids[i, prompt_lengths[i]:],
                    skip_special_tokens=True
                )
                print(f"提示: {prompt}")
                print(f"响应: {response}")
                print(f"奖励: {rewards[i]:.4f}\n")

# 使用示例(简化版,实际需要完整模型)
def ppo_training_demo():
    """PPO训练示例"""
    from transformers import AutoTokenizer, AutoModelForCausalLM

    # 加载模型
    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token

    policy_model = AutoModelForCausalLM.from_pretrained("gpt2")
    ref_model = AutoModelForCausalLM.from_pretrained("gpt2")
    reward_model = RewardModel("gpt2")

    # 创建训练器
    ppo_trainer = PPOTrainer(
        policy_model=policy_model,
        ref_model=ref_model,
        reward_model=reward_model,
        tokenizer=tokenizer,
        device="cpu"  # 示例使用CPU
    )

    # 训练数据
    prompts = [
        "解释什么是机器学习?",
        "如何学习编程?"
    ]

    # 训练
    # ppo_trainer.train(prompts, num_epochs=2, num_ppo_epochs=2)

# ppo_training_demo()

13.2.4 完整代码实现

将三个阶段整合到一起:

class RLHFPipeline:
    """完整的RLHF流程"""

    def __init__(
        self,
        base_model_name: str,
        output_dir: str = "./rlhf_models"
    ):
        self.base_model_name = base_model_name
        self.output_dir = output_dir

        self.sft_model_dir = f"{output_dir}/sft"
        self.rm_dir = f"{output_dir}/reward_model"
        self.ppo_model_dir = f"{output_dir}/ppo"

    def step1_sft(
        self,
        demonstration_file: str,
        num_epochs: int = 3
    ):
        """第一步:监督微调"""
        print("=" * 50)
        print("阶段1:监督微调(SFT)")
        print("=" * 50)

        train_sft_model(
            base_model_name=self.base_model_name,
            sft_data_file=demonstration_file,
            output_dir=self.sft_model_dir
        )

        print(f" SFT完成,模型保存在 {self.sft_model_dir}")

    def step2_reward_model(
        self,
        preference_file: str,
        num_epochs: int = 3
    ):
        """第二步:训练奖励模型"""
        print("\n" + "=" * 50)
        print("阶段2:奖励模型训练")
        print("=" * 50)

        train_reward_model(
            base_model_name=self.sft_model_dir,  # 使用SFT模型作为base
            data_file=preference_file,
            output_dir=self.rm_dir,
            num_epochs=num_epochs
        )

        print(f" 奖励模型完成,保存在 {self.rm_dir}")

    def step3_ppo(
        self,
        train_prompts: List[str],
        num_epochs: int = 10,
        num_ppo_epochs: int = 4
    ):
        """第三步:PPO强化学习"""
        print("\n" + "=" * 50)
        print("阶段3:PPO强化学习")
        print("=" * 50)

        from transformers import AutoTokenizer, AutoModelForCausalLM

        # 加载模型
        tokenizer = AutoTokenizer.from_pretrained(self.sft_model_dir)
        policy_model = AutoModelForCausalLM.from_pretrained(self.sft_model_dir)
        ref_model = AutoModelForCausalLM.from_pretrained(self.sft_model_dir)

        # 加载奖励模型
        reward_model = RewardModel(self.sft_model_dir)
        reward_model.load_state_dict(
            torch.load(f"{self.rm_dir}/reward_model.pt")
        )

        # 创建PPO训练器
        ppo_trainer = PPOTrainer(
            policy_model=policy_model,
            ref_model=ref_model,
            reward_model=reward_model,
            tokenizer=tokenizer
        )

        # 训练
        ppo_trainer.train(
            prompts=train_prompts,
            num_epochs=num_epochs,
            num_ppo_epochs=num_ppo_epochs
        )

        # 保存
        policy_model.save_pretrained(self.ppo_model_dir)
        tokenizer.save_pretrained(self.ppo_model_dir)

        print(f" PPO完成,模型保存在 {self.ppo_model_dir}")

    def run_full_pipeline(
        self,
        demonstration_file: str,
        preference_file: str,
        train_prompts: List[str]
    ):
        """运行完整的RLHF流程"""
        print("开始完整RLHF流程...")

        # 阶段1: SFT
        self.step1_sft(demonstration_file)

        # 阶段2: 奖励模型
        self.step2_reward_model(preference_file)

        # 阶段3: PPO
        self.step3_ppo(train_prompts)

        print("\n" + "=" * 50)
        print(" RLHF流程全部完成!")
        print("=" * 50)
        print(f"最终模型保存在: {self.ppo_model_dir}")

# 使用示例
# pipeline = RLHFPipeline(
#     base_model_name="gpt2",
#     output_dir="./rlhf_output"
# )
#
# pipeline.run_full_pipeline(
#     demonstration_file="sft_data.jsonl",
#     preference_file="preference_data.jsonl",
#     train_prompts=["解释什么是AI?", "如何学习编程?"]
# )

13.3 奖励模型(Reward Model)

13.3.1 人类偏好数据

人类偏好数据的质量直接影响RLHF效果。

数据收集最佳实践

class PreferenceAnnotationUI:
    """偏好标注界面(示意)"""

    def __init__(self):
        self.annotations = []

    def show_comparison(
        self,
        prompt: str,
        response_a: str,
        response_b: str
    ) -> dict:
        """
        展示对比并收集标注

        在实际应用中,这会是一个Web界面
        """
        print("=" * 80)
        print(f"提示词:{prompt}\n")
        print(f"回答A:\n{response_a}\n")
        print(f"回答B:\n{response_b}\n")
        print("=" * 80)

        # 模拟人类标注
        # 在实际中,这会等待标注员选择
        choice = input("选择更好的回答 (A/B/平局/跳过): ").strip().upper()

        if choice == 'A':
            preferred = 'a'
            margin = self._get_quality_rating("A比B好多少?(1-5): ")
        elif choice == 'B':
            preferred = 'b'
            margin = self._get_quality_rating("B比A好多少?(1-5): ")
        elif choice == '平局':
            preferred = 'tie'
            margin = 0
        else:
            return None  # 跳过

        annotation = {
            'prompt': prompt,
            'response_a': response_a,
            'response_b': response_b,
            'preferred': preferred,
            'margin': margin,
            'timestamp': time.time()
        }

        self.annotations.append(annotation)
        return annotation

    def _get_quality_rating(self, prompt: str) -> int:
        """获取质量差距评分"""
        while True:
            try:
                rating = int(input(prompt))
                if 1 <= rating <= 5:
                    return rating
            except ValueError:
                pass
            print("请输入1-5之间的数字")

    def export_annotations(self, output_file: str):
        """导出标注"""
        import json

        with open(output_file, 'w', encoding='utf-8') as f:
            for anno in self.annotations:
                # 转换为训练格式
                if anno['preferred'] != 'tie':
                    if anno['preferred'] == 'a':
                        chosen = anno['response_a']
                        rejected = anno['response_b']
                    else:
                        chosen = anno['response_b']
                        rejected = anno['response_a']

                    data = {
                        'prompt': anno['prompt'],
                        'chosen': chosen,
                        'rejected': rejected,
                        'margin': anno['margin']
                    }

                    f.write(json.dumps(data, ensure_ascii=False) + '\n')

        print(f"已导出 {len([a for a in self.annotations if a['preferred'] != 'tie'])} 条标注")

标注质量控制

class AnnotationQualityControl:
    """标注质量控制"""

    def __init__(self):
        pass

    def check_inter_annotator_agreement(
        self,
        annotations_by_annotator: Dict[str, List[dict]]
    ) -> float:
        """
        检查标注员之间的一致性

        使用Cohen's Kappa或Fleiss' Kappa
        """
        from sklearn.metrics import cohen_kappa_score

        # 简化:假设两个标注员
        annotator_ids = list(annotations_by_annotator.keys())
        if len(annotator_ids) < 2:
            return 1.0

        annotator1, annotator2 = annotator_ids[:2]
        annos1 = annotations_by_annotator[annotator1]
        annos2 = annotations_by_annotator[annotator2]

        # 找到共同标注的样本
        common_samples = {}
        for anno in annos1:
            key = (anno['prompt'], anno['response_a'], anno['response_b'])
            common_samples[key] = [anno['preferred']]

        for anno in annos2:
            key = (anno['prompt'], anno['response_a'], anno['response_b'])
            if key in common_samples:
                common_samples[key].append(anno['preferred'])

        # 只保留两人都标注的
        common_samples = {k: v for k, v in common_samples.items() if len(v) == 2}

        if len(common_samples) == 0:
            return 0.0

        labels1 = [v[0] for v in common_samples.values()]
        labels2 = [v[1] for v in common_samples.values()]

        # 计算Kappa
        kappa = cohen_kappa_score(labels1, labels2)

        return kappa

    def detect_annotation_bias(
        self,
        annotations: List[dict]
    ) -> dict:
        """
        检测标注偏见

        例如:标注员是否总是偏好更长的回答
        """
        length_preference = []

        for anno in annotations:
            len_a = len(anno['response_a'])
            len_b = len(anno['response_b'])

            if anno['preferred'] == 'a':
                length_preference.append(1 if len_a > len_b else -1 if len_a < len_b else 0)
            elif anno['preferred'] == 'b':
                length_preference.append(1 if len_b > len_a else -1 if len_b < len_a else 0)

        avg_length_bias = np.mean(length_preference) if length_preference else 0

        return {
            'length_bias': avg_length_bias,
            'prefers_longer': avg_length_bias > 0.3,
            'prefers_shorter': avg_length_bias < -0.3
        }

# 使用示例
qc = AnnotationQualityControl()

# 模拟标注数据
annotations = [
    {'prompt': 'Q1', 'response_a': 'short', 'response_b': 'a very long response', 'preferred': 'b'},
    {'prompt': 'Q2', 'response_a': 'detailed answer', 'response_b': 'ok', 'preferred': 'a'},
]

bias = qc.detect_annotation_bias(annotations)
print(f"长度偏见: {bias}")

13.3.2 成对比较训练

成对比较(Pairwise Comparison)是训练奖励模型的核心方法。

Bradley-Terry模型详解

Bradley-Terry模型假设:选择响应A优于响应B的概率为:

$$ P(A > B) = \frac{e^{r(A)}}{e^{r(A)} + e^{r(B)}} = \sigma(r(A) - r(B)) $$

其中:

  • $r(A), r(B)$ 是奖励模型对A和B的评分
  • $\sigma$ 是sigmoid函数

损失函数推导

最大化似然:

$$ \mathcal{L} = -\log P(A > B) = -\log \sigma(r(A) - r(B)) $$

梯度:

$$ \frac{\partial \mathcal{L}}{\partial r(A)} = -\sigma(r(B) - r(A)) $$

$$ \frac{\partial \mathcal{L}}{\partial r(B)} = \sigma(r(B) - r(A)) $$

这意味着:

  • 当 $r(A) > r(B)$ 时,梯度较小(模型已经正确)
  • 当 $r(A) < r(B)$ 时,梯度较大(需要调整)

改进的训练策略

class ImprovedRewardTraining:
    """改进的奖励模型训练"""

    def __init__(self, reward_model, tokenizer):
        self.reward_model = reward_model
        self.tokenizer = tokenizer

    def compute_loss_with_margin(
        self,
        chosen_rewards: torch.Tensor,
        rejected_rewards: torch.Tensor,
        margins: torch.Tensor
    ) -> torch.Tensor:
        """
        考虑质量差距的损失函数

        当人类标注者认为chosen明显更好时,我们期望更大的奖励差距
        """
        # 基础损失
        base_loss = -F.logsigmoid(chosen_rewards - rejected_rewards)

        # 加入margin
        # margin越大,期望的奖励差距也越大
        margin_loss = F.relu(margins - (chosen_rewards - rejected_rewards))

        total_loss = base_loss + 0.1 * margin_loss

        return total_loss.mean()

    def compute_ranking_loss(
        self,
        rewards: torch.Tensor,
        rankings: torch.Tensor
    ) -> torch.Tensor:
        """
        排序损失(用于多个响应的比较)

        rankings: (batch_size, num_responses) 排名(1=最好)
        rewards: (batch_size, num_responses) 预测的奖励
        """
        batch_size, num_responses = rewards.shape

        # 对于每对响应,确保排名靠前的奖励更高
        loss = 0
        count = 0

        for i in range(num_responses):
            for j in range(i + 1, num_responses):
                # 如果i的排名优于j
                better = rankings[:, i] < rankings[:, j]

                # 期望rewards[i] > rewards[j]
                pair_loss = -F.logsigmoid(rewards[:, i] - rewards[:, j])

                # 只对排名确实不同的计算损失
                loss += (pair_loss * better.float()).sum()
                count += better.sum()

        return loss / (count + 1e-8)

    def calibrate_rewards(
        self,
        rewards: torch.Tensor,
        target_mean: float = 0.0,
        target_std: float = 1.0
    ) -> torch.Tensor:
        """
        校准奖励分数

        确保奖励分布稳定
        """
        current_mean = rewards.mean()
        current_std = rewards.std()

        calibrated = (rewards - current_mean) / (current_std + 1e-8)
        calibrated = calibrated * target_std + target_mean

        return calibrated

# 使用示例
improved_trainer = ImprovedRewardTraining(None, None)

# 示例数据
chosen_rewards = torch.tensor([1.5, 2.0, 1.8])
rejected_rewards = torch.tensor([0.5, 0.8, 1.2])
margins = torch.tensor([2.0, 3.0, 1.0])  # 人类标注的质量差距

loss = improved_trainer.compute_loss_with_margin(
    chosen_rewards, rejected_rewards, margins
)
print(f"Loss with margin: {loss.item():.4f}")

13.3.3 Bradley-Terry模型

完整的Bradley-Terry实现

class BradleyTerryRewardModel(nn.Module):
    """基于Bradley-Terry模型的奖励模型"""

    def __init__(
        self,
        base_model_name: str,
        normalize_rewards: bool = True
    ):
        super().__init__()

        self.base_model = AutoModel.from_pretrained(base_model_name)
        hidden_size = self.base_model.config.hidden_size

        # 奖励头
        self.reward_head = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size, 1)
        )

        self.normalize_rewards = normalize_rewards

        # 用于归一化的运行统计
        self.register_buffer('reward_mean', torch.tensor(0.0))
        self.register_buffer('reward_std', torch.tensor(1.0))
        self.register_buffer('num_samples', torch.tensor(0))

    def forward(self, input_ids, attention_mask):
        """前向传播"""
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        # 使用[CLS] token或最后一个token
        if hasattr(self.base_model.config, 'is_encoder_decoder') and self.base_model.config.is_encoder_decoder:
            hidden = outputs.last_hidden_state[:, 0]  # [CLS]
        else:
            # 找到最后一个非padding token
            sequence_lengths = attention_mask.sum(dim=1) - 1
            batch_size = input_ids.shape[0]
            hidden = outputs.last_hidden_state[
                torch.arange(batch_size, device=input_ids.device),
                sequence_lengths
            ]

        # 计算奖励
        rewards = self.reward_head(hidden).squeeze(-1)

        # 归一化
        if self.normalize_rewards and self.training:
            self._update_stats(rewards)
            rewards = (rewards - self.reward_mean) / (self.reward_std + 1e-8)

        return rewards

    def _update_stats(self, rewards):
        """更新运行统计"""
        with torch.no_grad():
            batch_mean = rewards.mean()
            batch_std = rewards.std()

            # 指数移动平均
            alpha = 0.01
            self.reward_mean = (1 - alpha) * self.reward_mean + alpha * batch_mean
            self.reward_std = (1 - alpha) * self.reward_std + alpha * batch_std

    def compare(
        self,
        input_ids_a: torch.Tensor,
        attention_mask_a: torch.Tensor,
        input_ids_b: torch.Tensor,
        attention_mask_b: torch.Tensor
    ) -> torch.Tensor:
        """
        比较两个响应

        返回: P(A > B)
        """
        reward_a = self.forward(input_ids_a, attention_mask_a)
        reward_b = self.forward(input_ids_b, attention_mask_b)

        prob_a_better = torch.sigmoid(reward_a - reward_b)

        return prob_a_better

# 使用示例
bt_model = BradleyTerryRewardModel("bert-base-uncased")

# 模拟输入
input_ids_a = torch.randint(0, 1000, (2, 10))
attention_mask_a = torch.ones(2, 10)
input_ids_b = torch.randint(0, 1000, (2, 10))
attention_mask_b = torch.ones(2, 10)

prob = bt_model.compare(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)
print(f"P(A > B) = {prob}")

13.4 PPO算法

13.4.1 策略梯度

策略梯度是强化学习的基础。

基本推导

目标:最大化期望回报

$$ J(\theta) = \mathbb{E}{\tau \sim \pi\theta}[R(\tau)] $$

其中 $\tau$ 是轨迹,$R(\tau)$ 是累积奖励。

策略梯度定理:

$$ \nabla_\theta J(\theta) = \mathbb{E}{\tau \sim \pi\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t | s_t) R(\tau)\right] $$

减少方差的技巧

  1. Baseline:减去一个基线函数

$$ \nabla_\theta J(\theta) = \mathbb{E}\left[\nabla_\theta \log \pi_\theta(a|s) (Q(s,a) - V(s))\right] $$

  1. 优势函数:$A(s,a) = Q(s,a) - V(s)$

  2. GAE(Generalized Advantage Estimation):

$$ \hat{A}t = \sum{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l} $$

其中 $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 是TD误差。

代码实现

class PolicyGradient:
    """策略梯度实现"""

    def __init__(self, policy, value_function, gamma=0.99, lam=0.95):
        self.policy = policy
        self.value_function = value_function
        self.gamma = gamma
        self.lam = lam

    def compute_gae(
        self,
        rewards: np.ndarray,
        values: np.ndarray,
        dones: np.ndarray
    ) -> np.ndarray:
        """
        计算GAE

        Args:
            rewards: (T,) 每步的奖励
            values: (T,) 价值函数估计
            dones: (T,) 是否结束

        Returns:
            advantages: (T,) 优势函数
        """
        T = len(rewards)
        advantages = np.zeros(T)
        gae = 0

        for t in reversed(range(T)):
            if t == T - 1:
                next_value = 0
            else:
                next_value = values[t + 1]

            # TD误差
            delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]

            # GAE
            gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
            advantages[t] = gae

        return advantages

    def policy_gradient_loss(
        self,
        states: torch.Tensor,
        actions: torch.Tensor,
        advantages: torch.Tensor
    ) -> torch.Tensor:
        """
        计算策略梯度损失

        L = -E[log π(a|s) * A(s,a)]
        """
        log_probs = self.policy.log_prob(states, actions)
        loss = -(log_probs * advantages).mean()

        return loss

# 示例
pg = PolicyGradient(None, None)

rewards = np.array([1.0, 0.5, 0.0, 2.0, 1.5])
values = np.array([0.8, 0.6, 0.2, 1.8, 1.2])
dones = np.array([0, 0, 0, 0, 1])

advantages = pg.compute_gae(rewards, values, dones)
print(f"Advantages: {advantages}")

13.4.2 PPO目标函数

PPO通过限制策略更新的幅度来提高训练稳定性。

Clipped Objective

$$ L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right] $$

其中:

  • $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}$ 是重要性采样比率
  • $\epsilon$ 是裁剪范围(通常0.1-0.2)

详细实现

class PPOLoss:
    """PPO损失函数"""

    def __init__(
        self,
        clip_ratio: float = 0.2,
        value_coef: float = 0.5,
        entropy_coef: float = 0.01
    ):
        self.clip_ratio = clip_ratio
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef

    def compute_policy_loss(
        self,
        old_log_probs: torch.Tensor,
        new_log_probs: torch.Tensor,
        advantages: torch.Tensor
    ) -> Tuple[torch.Tensor, dict]:
        """
        计算策略损失

        返回: (loss, metrics)
        """
        # 重要性采样比率
        ratio = torch.exp(new_log_probs - old_log_probs)

        # 未裁剪的目标
        surr1 = ratio * advantages

        # 裁剪的目标
        ratio_clipped = torch.clamp(
            ratio,
            1 - self.clip_ratio,
            1 + self.clip_ratio
        )
        surr2 = ratio_clipped * advantages

        # 取最小值
        policy_loss = -torch.min(surr1, surr2).mean()

        # 统计信息
        with torch.no_grad():
            # 裁剪比例
            clip_fraction = (torch.abs(ratio - 1) > self.clip_ratio).float().mean()

            # 近似KL
            approx_kl = ((ratio - 1) - torch.log(ratio)).mean()

        metrics = {
            'policy_loss': policy_loss.item(),
            'clip_fraction': clip_fraction.item(),
            'approx_kl': approx_kl.item(),
            'ratio_mean': ratio.mean().item(),
            'ratio_std': ratio.std().item()
        }

        return policy_loss, metrics

    def compute_value_loss(
        self,
        values: torch.Tensor,
        returns: torch.Tensor,
        old_values: torch.Tensor
    ) -> torch.Tensor:
        """
        计算价值函数损失(也可以裁剪)
        """
        # 基础MSE损失
        value_loss_unclipped = (values - returns) ** 2

        # 裁剪的价值损失
        values_clipped = old_values + torch.clamp(
            values - old_values,
            -self.clip_ratio,
            self.clip_ratio
        )
        value_loss_clipped = (values_clipped - returns) ** 2

        # 取最大值(更保守)
        value_loss = torch.max(value_loss_unclipped, value_loss_clipped).mean()

        return value_loss * self.value_coef

    def compute_entropy_bonus(
        self,
        log_probs: torch.Tensor,
        probs: torch.Tensor
    ) -> torch.Tensor:
        """
        计算熵奖励(鼓励探索)

        H = -E[log π(a|s)]
        """
        entropy = -(probs * log_probs).sum(dim=-1).mean()
        return entropy * self.entropy_coef

    def total_loss(
        self,
        old_log_probs: torch.Tensor,
        new_log_probs: torch.Tensor,
        advantages: torch.Tensor,
        values: torch.Tensor,
        returns: torch.Tensor,
        old_values: torch.Tensor,
        action_probs: torch.Tensor = None
    ) -> Tuple[torch.Tensor, dict]:
        """
        计算总损失

        L = L_policy + c1 * L_value - c2 * H
        """
        # 策略损失
        policy_loss, metrics = self.compute_policy_loss(
            old_log_probs, new_log_probs, advantages
        )

        # 价值损失
        value_loss = self.compute_value_loss(values, returns, old_values)

        # 熵奖励
        entropy_bonus = 0
        if action_probs is not None:
            entropy_bonus = self.compute_entropy_bonus(new_log_probs, action_probs)

        # 总损失
        total_loss = policy_loss + value_loss - entropy_bonus

        metrics.update({
            'value_loss': value_loss.item(),
            'entropy': entropy_bonus.item() if isinstance(entropy_bonus, torch.Tensor) else entropy_bonus,
            'total_loss': total_loss.item()
        })

        return total_loss, metrics

# 使用示例
ppo_loss = PPOLoss(clip_ratio=0.2)

# 模拟数据
old_log_probs = torch.randn(100)
new_log_probs = old_log_probs + torch.randn(100) * 0.1
advantages = torch.randn(100)
values = torch.randn(100)
returns = values + torch.randn(100) * 0.5
old_values = values + torch.randn(100) * 0.1

loss, metrics = ppo_loss.total_loss(
    old_log_probs, new_log_probs, advantages,
    values, returns, old_values
)

print(f"总损失: {loss.item():.4f}")
print(f"指标: {metrics}")

13.4.3 KL散度约束

KL散度约束防止新策略偏离参考策略太远。

数学推导

目标:

$$ \max_\theta \mathbb{E}{s,a \sim \pi\theta}[r(s,a)] - \beta \cdot D_{KL}(\pi_\theta || \pi_{ref}) $$

其中:

  • $r(s,a)$ 是奖励模型给出的奖励
  • $D_{KL}$ 是KL散度
  • $\beta$ 是KL惩罚系数

KL散度计算

对于离散动作:

$$ D_{KL}(\pi || \pi_{ref}) = \mathbb{E}{a \sim \pi}\left[\log \frac{\pi(a|s)}{\pi{ref}(a|s)}\right] $$

对于语言模型(每个token是一个动作):

$$ D_{KL} = \sum_{t} \mathbb{E}{w_t \sim \pi}\left[\log \pi\theta(w_t|s_t) - \log \pi_{ref}(w_t|s_t)\right] $$

自适应KL惩罚

class AdaptiveKLController:
    """自适应KL惩罚系数"""

    def __init__(
        self,
        init_kl_coef: float = 0.2,
        target_kl: float = 0.01,
        horizon: int = 10000
    ):
        self.kl_coef = init_kl_coef
        self.target_kl = target_kl
        self.horizon = horizon

    def update(self, current_kl: float):
        """
        根据当前KL调整系数

        如果KL太大,增加惩罚;如果太小,减少惩罚
        """
        # 简单的比例控制
        proportional_error = current_kl - self.target_kl
        mult = 1 + proportional_error / self.target_kl

        # 限制调整幅度
        mult = max(0.5, min(2.0, mult))

        self.kl_coef *= mult

        # 限制范围
        self.kl_coef = max(0.01, min(10.0, self.kl_coef))

        return self.kl_coef

    def get_kl_penalty(
        self,
        log_probs: torch.Tensor,
        ref_log_probs: torch.Tensor
    ) -> torch.Tensor:
        """
        计算KL惩罚

        KL = E[log π - log π_ref]
        """
        kl = log_probs - ref_log_probs
        return self.kl_coef * kl.mean()

# 使用示例
kl_controller = AdaptiveKLController(init_kl_coef=0.2, target_kl=0.01)

# 模拟训练过程
for step in range(10):
    # 模拟当前KL
    current_kl = 0.02 + np.random.randn() * 0.005

    # 更新系数
    new_coef = kl_controller.update(current_kl)

    print(f"Step {step}: KL={current_kl:.4f}, Coef={new_coef:.4f}")

13.5 DPO(Direct Preference Optimization)

DPO是RLHF的简化版本,直接从偏好数据学习,无需训练单独的奖励模型和使用PPO。

13.5.1 无需奖励模型

DPO的核心洞察:可以直接将Bradley-Terry模型重新参数化为策略优化问题。

数学推导

传统RLHF:

  1. 训练奖励模型 $r_\phi(x, y)$
  2. 用RL优化 $\max_\pi \mathbb{E}[r_\phi(x,y)] - \beta D_{KL}(\pi || \pi_{ref})$

DPO推导:

最优策略的闭式解:

$$ \pi^*(y|x) = \frac{1}{Z(x)} \pi_{ref}(y|x) \exp\left(\frac{r(x,y)}{\beta}\right) $$

将奖励表示为策略的函数:

$$ r(x,y) = \beta \log \frac{\pi^*(y|x)}{\pi_{ref}(y|x)} + \beta \log Z(x) $$

代入Bradley-Terry模型:

$$ P(y_w \succ y_l | x) = \sigma\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{ref}(y_l|x)}\right) $$

损失函数:

$$ \mathcal{L}{DPO}(\pi\theta) = -\mathbb{E}\left[\log \sigma\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{ref}(y_l|x)}\right)\right] $$

13.5.2 直接优化偏好

DPO实现

class DPOTrainer:
    """DPO训练器"""

    def __init__(
        self,
        model: nn.Module,
        ref_model: nn.Module,
        tokenizer,
        beta: float = 0.1,
        device: str = "cuda"
    ):
        self.model = model.to(device)
        self.ref_model = ref_model.to(device)
        self.tokenizer = tokenizer
        self.beta = beta
        self.device = device

        # 冻结参考模型
        for param in self.ref_model.parameters():
            param.requires_grad = False

    def compute_log_probs(
        self,
        model: nn.Module,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        labels: torch.Tensor
    ) -> torch.Tensor:
        """
        计算序列的log概率

        Args:
            input_ids: (batch_size, seq_len)
            labels: (batch_size, seq_len) 与input_ids相同,但prompt部分是-100

        Returns:
            log_probs: (batch_size,) 每个序列的总log概率
        """
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        logits = outputs.logits  # (batch_size, seq_len, vocab_size)

        # 计算log概率
        log_probs = F.log_softmax(logits, dim=-1)

        # 提取对应token的log概率
        # labels: -100表示不计算loss的位置(prompt部分)
        batch_size, seq_len = input_ids.shape

        # Shift:预测下一个token
        log_probs = log_probs[:, :-1, :]  # (batch_size, seq_len-1, vocab_size)
        labels = labels[:, 1:]  # (batch_size, seq_len-1)

        # 收集每个位置的log概率
        token_log_probs = log_probs.gather(
            dim=-1,
            index=labels.unsqueeze(-1)
        ).squeeze(-1)  # (batch_size, seq_len-1)

        # 只在非-100位置求和
        mask = (labels != -100).float()
        sequence_log_probs = (token_log_probs * mask).sum(dim=-1)  # (batch_size,)

        return sequence_log_probs

    def dpo_loss(
        self,
        policy_chosen_logprobs: torch.Tensor,
        policy_rejected_logprobs: torch.Tensor,
        reference_chosen_logprobs: torch.Tensor,
        reference_rejected_logprobs: torch.Tensor
    ) -> Tuple[torch.Tensor, dict]:
        """
        DPO损失函数

        L = -E[log σ(β * (log π/π_ref(y_w) - log π/π_ref(y_l)))]
        """
        # 计算log比率
        policy_log_ratios = policy_chosen_logprobs - policy_rejected_logprobs
        reference_log_ratios = reference_chosen_logprobs - reference_rejected_logprobs

        # DPO目标
        logits = self.beta * (policy_log_ratios - reference_log_ratios)

        # 损失
        loss = -F.logsigmoid(logits).mean()

        # 统计信息
        with torch.no_grad():
            # 隐式奖励
            rewards_chosen = self.beta * (policy_chosen_logprobs - reference_chosen_logprobs)
            rewards_rejected = self.beta * (policy_rejected_logprobs - reference_rejected_logprobs)
            reward_margin = (rewards_chosen - rewards_rejected).mean()

            # 准确率
            accuracy = (logits > 0).float().mean()

        metrics = {
            'loss': loss.item(),
            'reward_margin': reward_margin.item(),
            'accuracy': accuracy.item(),
            'rewards_chosen': rewards_chosen.mean().item(),
            'rewards_rejected': rewards_rejected.mean().item()
        }

        return loss, metrics

    def train_step(
        self,
        chosen_input_ids: torch.Tensor,
        chosen_attention_mask: torch.Tensor,
        chosen_labels: torch.Tensor,
        rejected_input_ids: torch.Tensor,
        rejected_attention_mask: torch.Tensor,
        rejected_labels: torch.Tensor
    ) -> Tuple[torch.Tensor, dict]:
        """训练一步"""

        # 策略模型的log概率
        policy_chosen_logprobs = self.compute_log_probs(
            self.model, chosen_input_ids, chosen_attention_mask, chosen_labels
        )
        policy_rejected_logprobs = self.compute_log_probs(
            self.model, rejected_input_ids, rejected_attention_mask, rejected_labels
        )

        # 参考模型的log概率
        with torch.no_grad():
            reference_chosen_logprobs = self.compute_log_probs(
                self.ref_model, chosen_input_ids, chosen_attention_mask, chosen_labels
            )
            reference_rejected_logprobs = self.compute_log_probs(
                self.ref_model, rejected_input_ids, rejected_attention_mask, rejected_labels
            )

        # 计算损失
        loss, metrics = self.dpo_loss(
            policy_chosen_logprobs,
            policy_rejected_logprobs,
            reference_chosen_logprobs,
            reference_rejected_logprobs
        )

        return loss, metrics

def train_dpo_model(
    model_name: str,
    preference_file: str,
    output_dir: str = "./dpo_model",
    num_epochs: int = 3,
    batch_size: int = 4,
    learning_rate: float = 1e-6,
    beta: float = 0.1
):
    """DPO训练主函数"""
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from torch.utils.data import Dataset, DataLoader
    import json

    # 加载模型
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16
    )
    ref_model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16
    )

    # 创建训练器
    trainer = DPOTrainer(model, ref_model, tokenizer, beta=beta)

    # 数据集(简化版)
    class DPODataset(Dataset):
        def __init__(self, data_file):
            self.data = []
            with open(data_file, 'r') as f:
                for line in f:
                    self.data.append(json.loads(line))

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            return self.data[idx]

    dataset = DPODataset(preference_file)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 训练循环
    model.train()

    for epoch in range(num_epochs):
        total_loss = 0

        for batch in dataloader:
            prompts = batch['prompt']
            chosen = batch['chosen']
            rejected = batch['rejected']

            # Tokenize chosen
            chosen_texts = [p + c for p, c in zip(prompts, chosen)]
            chosen_enc = tokenizer(
                chosen_texts,
                padding=True,
                truncation=True,
                return_tensors="pt"
            )
            # 创建labels
            chosen_labels = chosen_enc['input_ids'].clone()
            # 这里简化了,实际需要mask掉prompt部分

            # Tokenize rejected
            rejected_texts = [p + r for p, r in zip(prompts, rejected)]
            rejected_enc = tokenizer(
                rejected_texts,
                padding=True,
                truncation=True,
                return_tensors="pt"
            )
            rejected_labels = rejected_enc['input_ids'].clone()

            # 训练步
            loss, metrics = trainer.train_step(
                chosen_enc['input_ids'].to(trainer.device),
                chosen_enc['attention_mask'].to(trainer.device),
                chosen_labels.to(trainer.device),
                rejected_enc['input_ids'].to(trainer.device),
                rejected_enc['attention_mask'].to(trainer.device),
                rejected_labels.to(trainer.device)
            )

            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}")

    # 保存
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"模型已保存到 {output_dir}")

# 使用示例(注释掉)
# train_dpo_model(
#     model_name="gpt2",
#     preference_file="preference_data.jsonl",
#     output_dir="./dpo_model"
# )

13.5.3 代码实现

完整的DPO训练脚本

#!/usr/bin/env python3
"""
DPO (Direct Preference Optimization) 完整训练脚本
"""

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from datasets import load_dataset
from typing import Dict, List
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DPODataCollator:
    """DPO数据整理器"""

    def __init__(self, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __call__(self, features: List[Dict]) -> Dict[str, torch.Tensor]:
        """
        整理一个batch的数据

        每个feature包含:
        - prompt
        - chosen
        - rejected
        """
        batch = {
            'chosen_input_ids': [],
            'chosen_attention_mask': [],
            'chosen_labels': [],
            'rejected_input_ids': [],
            'rejected_attention_mask': [],
            'rejected_labels': []
        }

        for feature in features:
            prompt = feature['prompt']
            chosen = feature['chosen']
            rejected = feature['rejected']

            # Tokenize chosen
            chosen_text = prompt + chosen
            chosen_enc = self.tokenizer(
                chosen_text,
                max_length=self.max_length,
                truncation=True,
                padding='max_length',
                return_tensors='pt'
            )

            # 创建labels(mask掉prompt部分)
            prompt_len = len(self.tokenizer(prompt, add_special_tokens=False)['input_ids'])
            chosen_labels = chosen_enc['input_ids'].clone()
            chosen_labels[0, :prompt_len] = -100  # 忽略prompt部分

            batch['chosen_input_ids'].append(chosen_enc['input_ids'])
            batch['chosen_attention_mask'].append(chosen_enc['attention_mask'])
            batch['chosen_labels'].append(chosen_labels)

            # Tokenize rejected
            rejected_text = prompt + rejected
            rejected_enc = self.tokenizer(
                rejected_text,
                max_length=self.max_length,
                truncation=True,
                padding='max_length',
                return_tensors='pt'
            )

            rejected_labels = rejected_enc['input_ids'].clone()
            rejected_labels[0, :prompt_len] = -100

            batch['rejected_input_ids'].append(rejected_enc['input_ids'])
            batch['rejected_attention_mask'].append(rejected_enc['attention_mask'])
            batch['rejected_labels'].append(rejected_labels)

        # Stack
        for key in batch:
            batch[key] = torch.cat(batch[key], dim=0)

        return batch

def main():
    # 配置
    model_name = "gpt2"
    data_file = "preference_data.jsonl"
    output_dir = "./dpo_output"

    # 加载
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(model_name)
    ref_model = AutoModelForCausalLM.from_pretrained(model_name)

    # 加载数据
    dataset = load_dataset('json', data_files={'train': data_file})

    # 创建训练器
    trainer = DPOTrainer(
        model=model,
        ref_model=ref_model,
        tokenizer=tokenizer,
        beta=0.1
    )

    # 训练参数
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=3,
        per_device_train_batch_size=4,
        learning_rate=1e-6,
        logging_steps=10,
        save_steps=500,
        fp16=True
    )

    # 开始训练
    logger.info("开始DPO训练...")
    # 实际训练代码会更复杂,这里简化

    logger.info(f"训练完成,模型保存在 {output_dir}")

if __name__ == "__main__":
    main()

13.6 实战工具

13.6.1 TRL(Transformer Reinforcement Learning)

Hugging Face的TRL库提供了完整的RLHF工具链。

pip install trl

使用TRL进行PPO训练

from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead
from trl.core import LengthSampler
from transformers import AutoTokenizer
import torch

# 配置
config = PPOConfig(
    model_name="gpt2",
    learning_rate=1.41e-5,
    batch_size=128,
    mini_batch_size=128,
    gradient_accumulation_steps=1,
    optimize_cuda_cache=True,
)

# 加载模型(带价值头)
model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)
ref_model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
tokenizer.pad_token = tokenizer.eos_token

# 创建PPO训练器
ppo_trainer = PPOTrainer(
    config=config,
    model=model,
    ref_model=ref_model,
    tokenizer=tokenizer
)

# 训练循环示例
# for epoch in range(num_epochs):
#     for batch in dataloader:
#         query_tensors = batch['input_ids']
#
#         # 生成响应
#         response_tensors = ppo_trainer.generate(
#             query_tensors,
#             max_new_tokens=50,
#             do_sample=True,
#             top_k=50,
#             top_p=0.95
#         )
#
#         # 计算奖励
#         texts = [tokenizer.decode(r.squeeze()) for r in response_tensors]
#         rewards = [compute_reward(text) for text in texts]
#
#         # PPO更新
#         stats = ppo_trainer.step(query_tensors, response_tensors, rewards)

13.6.2 DeepSpeed-Chat

微软的DeepSpeed-Chat提供了端到端的RLHF训练系统。

pip install deepspeed

使用DeepSpeed-Chat

# 配置文件: ds_config.json
ds_config = {
    "train_batch_size": 64,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "Adam",
        "params": {
            "lr": 1e-5
        }
    },
    "fp16": {
        "enabled": True
    },
    "zero_optimization": {
        "stage": 3
    }
}

# 训练脚本
# deepspeed --num_gpus=8 train_rlhf.py \
#     --model_name gpt2 \
#     --train_file data.jsonl \
#     --output_dir output

13.6.3 OpenRLHF

OpenRLHF是开源的RLHF框架。

git clone https://github.com/OpenLLMAI/OpenRLHF
cd OpenRLHF
pip install -e .

使用示例

from openrlhf import RLHFTrainer

trainer = RLHFTrainer(
    model="gpt2",
    reward_model="reward_model_path",
    dataset="preference_data.jsonl"
)

trainer.train()
Prev
第12章:模型微调与LoRA技术
Next
第14章 AI编程助手原理与实现