第13章:RLHF与对齐技术
大语言模型通过预训练和微调可以获得强大的能力,但如何确保这些能力与人类的价值观和意图保持一致?如何让模型生成有帮助、无害、诚实的内容?本章将深入探讨RLHF(Reinforcement Learning from Human Feedback,基于人类反馈的强化学习)和对齐技术,这是让AI系统安全可靠的关键。
13.1 对齐问题
13.1.1 为什么需要对齐
预训练模型虽然强大,但存在严重的对齐问题:
1. 无法理解和遵循指令
预训练模型只是学会了预测下一个词,并不理解人类指令的意图。例如:
# 预训练模型的行为
prompt = "写一个Python函数来计算斐波那契数列"
pretrained_output = "写一个Python函数来计算斐波那契数列的第n项可以用递归实现..."
# 模型只是继续补全文本,而不是真正执行指令
# 期望的对齐行为
aligned_output = """
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
"""
# 模型理解指令并执行
2. 生成有害或不准确的内容
预训练数据来自互联网,包含大量有害、偏见、虚假信息:
- 生成仇恨言论、暴力内容
- 传播错误的医疗建议
- 提供危险活动的指导
- 泄露隐私信息
3. 缺乏价值观判断
模型不理解什么是道德、伦理、安全的:
# 危险的响应示例
query = "如何制造炸弹?"
unaligned_response = "制造炸弹需要以下材料和步骤..." # 危险!
aligned_response = "我不能提供制造爆炸物的信息。这种行为是非法且危险的。如果你有安全顾虑,请联系当地执法部门。"
4. 目标错位(Misalignment)
模型优化的目标(预测下一个词)与我们真正的目标(帮助人类)不一致:
$$ \text{预训练目标}: \max_\theta \mathbb{E}{x \sim D}[\log P\theta(x)] $$
$$ \text{真实目标}: \max_\theta \mathbb{E}_{x, y}[\text{Utility}(y | x)] $$
其中 $\text{Utility}(y | x)$ 衡量回答 $y$ 对问题 $x$ 的有用性、安全性、准确性。
13.1.2 Helpful、Harmless、Honest (3H)
Anthropic提出的3H原则是AI对齐的核心目标:
1. Helpful(有帮助的)
模型应该:
- 理解用户意图
- 提供准确、有用的信息
- 以用户能理解的方式回答
- 在不确定时询问澄清
class HelpfulnessEvaluator:
"""评估回答的有帮助程度"""
def evaluate(self, query: str, response: str) -> float:
"""
评估标准:
1. 相关性:回答是否直接回应问题
2. 完整性:是否提供了充分的信息
3. 清晰性:表达是否清楚易懂
4. 可操作性:是否提供了具体的指导
"""
scores = {
'relevance': self._check_relevance(query, response),
'completeness': self._check_completeness(query, response),
'clarity': self._check_clarity(response),
'actionability': self._check_actionability(query, response)
}
return sum(scores.values()) / len(scores)
def _check_relevance(self, query: str, response: str) -> float:
"""检查相关性(0-1)"""
# 这里使用简化的关键词匹配,实际应该使用语义相似度
query_keywords = set(query.lower().split())
response_keywords = set(response.lower().split())
overlap = len(query_keywords & response_keywords)
return min(overlap / len(query_keywords), 1.0)
def _check_completeness(self, query: str, response: str) -> float:
"""检查完整性"""
# 回答长度作为完整性的粗略代理
min_length = 50
actual_length = len(response)
return min(actual_length / min_length, 1.0)
def _check_clarity(self, response: str) -> float:
"""检查清晰性"""
# 检查是否使用了清晰的结构(列表、段落等)
has_structure = any([
'\n-' in response, # 列表
'\n1.' in response, # 编号列表
'\n\n' in response # 段落分隔
])
return 1.0 if has_structure else 0.5
def _check_actionability(self, query: str, response: str) -> float:
"""检查可操作性"""
# 检查是否包含具体的步骤或示例
action_indicators = ['步骤', '首先', '然后', '最后', '例如', '比如', 'step', 'first', 'example']
has_actions = any(indicator in response.lower() for indicator in action_indicators)
return 1.0 if has_actions else 0.3
# 示例
evaluator = HelpfulnessEvaluator()
query = "如何学习Python?"
response1 = "Python很好学。"
response2 = """学习Python可以按以下步骤进行:
1. 安装Python环境
2. 学习基础语法(变量、循环、函数)
3. 通过小项目练习
4. 学习常用库(NumPy、Pandas等)
例如,可以从编写一个计算器程序开始。"""
print(f"回答1得分: {evaluator.evaluate(query, response1):.2f}")
print(f"回答2得分: {evaluator.evaluate(query, response2):.2f}")
2. Harmless(无害的)
模型应该:
- 拒绝生成有害内容(暴力、仇恨、非法)
- 避免强化偏见和歧视
- 保护隐私和安全
- 识别并警告危险请求
class HarmDetector:
"""有害内容检测器"""
def __init__(self):
# 有害内容类别
self.harmful_categories = {
'violence': ['杀', '伤害', '攻击', 'kill', 'hurt', 'attack'],
'hate_speech': ['歧视', '仇恨', '种族', 'hate', 'racist'],
'illegal': ['毒品', '盗窃', '欺诈', 'drug', 'steal', 'fraud'],
'nsfw': ['色情', '裸体', 'porn', 'nsfw'],
'privacy': ['身份证', '密码', 'password', 'ssn'],
'dangerous': ['炸弹', '武器', 'bomb', 'weapon']
}
def detect_harm(self, text: str) -> dict:
"""
检测文本中的有害内容
返回: {'is_harmful': bool, 'categories': list, 'severity': float}
"""
detected_categories = []
severity = 0.0
text_lower = text.lower()
for category, keywords in self.harmful_categories.items():
for keyword in keywords:
if keyword in text_lower:
detected_categories.append(category)
severity += 1.0
break
is_harmful = len(detected_categories) > 0
return {
'is_harmful': is_harmful,
'categories': detected_categories,
'severity': min(severity / 3, 1.0) # 归一化到0-1
}
def should_refuse(self, query: str) -> tuple[bool, str]:
"""
判断是否应该拒绝回答
返回: (should_refuse, refusal_message)
"""
harm_info = self.detect_harm(query)
if harm_info['is_harmful']:
category = harm_info['categories'][0]
messages = {
'violence': "我不能提供关于暴力的内容。",
'hate_speech': "我不会生成仇恨或歧视性的内容。",
'illegal': "我不能帮助进行非法活动。",
'nsfw': "我不能生成不适当的内容。",
'privacy': "我不能帮助获取或处理隐私信息。",
'dangerous': "我不能提供可能造成危险的信息。"
}
return True, messages.get(category, "我不能满足这个请求。")
return False, ""
# 使用示例
detector = HarmDetector()
queries = [
"如何学习编程?", # 安全
"如何制造炸弹?", # 危险
"教我如何攻击网站", # 非法
]
for query in queries:
should_refuse, message = detector.should_refuse(query)
print(f"\n查询: {query}")
print(f"拒绝: {should_refuse}")
if should_refuse:
print(f"消息: {message}")
3. Honest(诚实的)
模型应该:
- 承认不确定性和局限性
- 不编造信息(避免幻觉)
- 准确引用来源
- 区分事实和观点
class HonestyChecker:
"""诚实性检查器"""
def __init__(self):
# 不确定性指示词
self.uncertainty_phrases = [
'我不确定', '可能', '也许', '据我所知', '我认为',
'I am not sure', 'might', 'maybe', 'as far as I know'
]
# 幻觉指示器(过于具体但无法验证的内容)
self.hallucination_patterns = [
r'\d{4}年\d{1,2}月\d{1,2}日', # 具体日期
r'据.*报道', # 引用来源
r'研究表明', # 研究声明
]
def check_honesty(self, response: str, has_citations: bool = False) -> dict:
"""
检查回答的诚实性
返回: {
'shows_uncertainty': bool,
'potential_hallucination': bool,
'has_citations': bool,
'honesty_score': float
}
"""
import re
# 检查是否表达不确定性
shows_uncertainty = any(
phrase in response.lower()
for phrase in self.uncertainty_phrases
)
# 检查潜在幻觉(有具体声明但无引用)
has_specific_claims = any(
re.search(pattern, response)
for pattern in self.hallucination_patterns
)
potential_hallucination = has_specific_claims and not has_citations
# 计算诚实分数
honesty_score = 0.5 # 基础分
if shows_uncertainty:
honesty_score += 0.2
if has_citations:
honesty_score += 0.3
if potential_hallucination:
honesty_score -= 0.4
honesty_score = max(0.0, min(1.0, honesty_score))
return {
'shows_uncertainty': shows_uncertainty,
'potential_hallucination': potential_hallucination,
'has_citations': has_citations,
'honesty_score': honesty_score
}
# 使用示例
checker = HonestyChecker()
responses = [
"Python是1991年由Guido van Rossum创建的。", # 事实,但无引用
"据我所知,Python大约在1990年代初期被创建。我不确定具体日期。", # 诚实
"据最新研究表明,Python是2023年12月1日发布的。", # 幻觉
]
for i, response in enumerate(responses, 1):
result = checker.check_honesty(response)
print(f"\n回答{i}: {response}")
print(f"诚实分数: {result['honesty_score']:.2f}")
print(f"表达不确定性: {result['shows_uncertainty']}")
print(f"潜在幻觉: {result['potential_hallucination']}")
13.1.3 安全性和价值观
对齐不仅是技术问题,更是价值观问题:
1. 价值观的多样性
不同文化、群体对"正确"的定义不同:
class ValueAlignment:
"""价值观对齐框架"""
def __init__(self):
# 普世价值(相对共识)
self.universal_values = {
'safety': 1.0, # 安全
'privacy': 1.0, # 隐私
'fairness': 0.9, # 公平
'truthfulness': 0.95 # 真实
}
# 文化特定价值(可能有差异)
self.cultural_values = {
'individualism': 0.0, # -1到1,负值表示集体主义
'formality': 0.0, # -1到1,负值表示非正式
}
def evaluate_alignment(
self,
response: str,
context: dict
) -> float:
"""
评估回答与价值观的对齐程度
context: {
'user_culture': str,
'query_category': str,
...
}
"""
# 检查是否违反普世价值
universal_score = self._check_universal_values(response)
# 根据文化背景调整
cultural_score = self._check_cultural_fit(response, context.get('user_culture'))
# 综合评分
alignment_score = 0.7 * universal_score + 0.3 * cultural_score
return alignment_score
def _check_universal_values(self, response: str) -> float:
"""检查普世价值遵循情况"""
# 简化实现
violations = 0
# 检查安全性
if any(word in response.lower() for word in ['danger', 'harm', '危险', '伤害']):
violations += 1
# 检查隐私
if any(word in response.lower() for word in ['password', '密码', 'private']):
violations += 1
return max(0.0, 1.0 - violations * 0.3)
def _check_cultural_fit(self, response: str, culture: str) -> float:
"""检查文化适配性"""
# 简化实现
return 0.8 # 默认较高分数
2. 安全层级
class SafetyLevel:
"""安全等级定义"""
LEVELS = {
0: {
'name': 'Unsafe',
'description': '明确有害',
'examples': ['如何制造武器', '传播仇恨']
},
1: {
'name': 'Potentially Harmful',
'description': '潜在有害',
'examples': ['减肥药推荐', '投资建议']
},
2: {
'name': 'Sensitive',
'description': '敏感话题',
'examples': ['政治观点', '宗教讨论']
},
3: {
'name': 'General',
'description': '一般话题',
'examples': ['编程教学', '天气查询']
},
4: {
'name': 'Safe',
'description': '完全安全',
'examples': ['数学计算', '定义查询']
}
}
@classmethod
def classify(cls, query: str) -> int:
"""分类查询的安全等级"""
# 简化的关键词匹配
query_lower = query.lower()
# 等级0:明确有害
if any(word in query_lower for word in ['炸弹', '武器', 'weapon', 'bomb']):
return 0
# 等级1:潜在有害
if any(word in query_lower for word in ['药', '投资', 'drug', 'invest']):
return 1
# 等级2:敏感
if any(word in query_lower for word in ['政治', '宗教', 'politic', 'religion']):
return 2
# 等级3:一般
if any(word in query_lower for word in ['如何', '什么', 'how', 'what']):
return 3
# 等级4:安全
return 4
@classmethod
def get_response_policy(cls, level: int) -> dict:
"""获取响应策略"""
policies = {
0: {'should_respond': False, 'message': '我不能回答这个问题。'},
1: {'should_respond': True, 'disclaimer': '请注意,这仅供参考,不构成专业建议。'},
2: {'should_respond': True, 'disclaimer': '这是一个敏感话题,我会尽量客观中立。'},
3: {'should_respond': True, 'disclaimer': None},
4: {'should_respond': True, 'disclaimer': None}
}
return policies[level]
# 使用示例
queries = [
"什么是Python?",
"如何投资股票?",
"你对某政党的看法?",
"如何制造炸弹?"
]
for query in queries:
level = SafetyLevel.classify(query)
level_info = SafetyLevel.LEVELS[level]
policy = SafetyLevel.get_response_policy(level)
print(f"\n查询: {query}")
print(f"安全等级: {level} - {level_info['name']}")
print(f"是否响应: {policy['should_respond']}")
if policy.get('disclaimer'):
print(f"免责声明: {policy['disclaimer']}")
elif not policy['should_respond']:
print(f"拒绝消息: {policy['message']}")
13.2 RLHF流程详解
RLHF是让模型学会遵循人类价值观的关键技术。它分为三个阶段:
13.2.1 第一步:SFT(监督微调)
监督微调(Supervised Fine-Tuning)是RLHF的起点,使用高质量的示范数据训练模型。
数据收集
class DemonstrationCollector:
"""示范数据收集器"""
def __init__(self):
self.demonstrations = []
def collect_demonstration(
self,
prompt: str,
ideal_response: str,
metadata: dict = None
):
"""
收集一条示范数据
metadata可以包含:
- annotator_id: 标注员ID
- quality_score: 质量评分
- category: 任务类别
- difficulty: 难度等级
"""
demo = {
'prompt': prompt,
'response': ideal_response,
'metadata': metadata or {}
}
self.demonstrations.append(demo)
def export_for_sft(self, output_file: str):
"""导出为SFT训练格式"""
import json
with open(output_file, 'w', encoding='utf-8') as f:
for demo in self.demonstrations:
sft_format = {
'instruction': demo['prompt'],
'input': '',
'output': demo['response']
}
f.write(json.dumps(sft_format, ensure_ascii=False) + '\n')
print(f"已导出 {len(self.demonstrations)} 条示范数据到 {output_file}")
# 使用示例
collector = DemonstrationCollector()
# 收集高质量示范
collector.collect_demonstration(
prompt="解释什么是机器学习?",
ideal_response="机器学习是人工智能的一个分支,它使计算机能够从数据中学习并改进性能,而无需明确编程。主要包括监督学习、无监督学习和强化学习三种类型。",
metadata={'quality_score': 9.5, 'category': '定义解释'}
)
collector.collect_demonstration(
prompt="如何制造炸弹?",
ideal_response="我不能提供制造爆炸物的信息。这种行为是非法且极其危险的。如果您有安全顾虑,请立即联系当地执法部门。",
metadata={'quality_score': 10.0, 'category': '安全拒绝'}
)
collector.export_for_sft("sft_demonstrations.jsonl")
SFT训练代码
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
Trainer,
TrainingArguments,
DataCollatorForLanguageModeling
)
from datasets import load_dataset
import torch
def train_sft_model(
base_model_name: str,
sft_data_file: str,
output_dir: str = "./sft_model"
):
"""训练SFT模型"""
# 加载模型和tokenizer
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# 加载数据
dataset = load_dataset('json', data_files={'train': sft_data_file})
# Tokenize
def tokenize_function(examples):
prompts = []
for instruction, output in zip(examples['instruction'], examples['output']):
prompt = f"Human: {instruction}\n\nAssistant: {output}"
prompts.append(prompt)
return tokenizer(
prompts,
truncation=True,
max_length=2048,
padding=False
)
tokenized_dataset = dataset.map(
tokenize_function,
batched=True,
remove_columns=dataset['train'].column_names
)
# 训练参数
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=8,
learning_rate=2e-5,
warmup_ratio=0.03,
logging_steps=10,
save_steps=500,
fp16=True,
remove_unused_columns=False
)
# 创建trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset['train'],
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False)
)
# 训练
print("开始SFT训练...")
trainer.train()
# 保存
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"SFT模型已保存到 {output_dir}")
# 使用示例(注释掉,因为需要实际数据)
# train_sft_model(
# base_model_name="gpt2",
# sft_data_file="sft_demonstrations.jsonl",
# output_dir="./sft_model"
# )
13.2.2 第二步:奖励模型训练
奖励模型(Reward Model, RM)学习人类的偏好,给模型输出打分。
收集偏好数据
class PreferenceDataCollector:
"""偏好数据收集器"""
def __init__(self):
self.comparisons = []
def collect_comparison(
self,
prompt: str,
response_a: str,
response_b: str,
preferred: str, # 'a' or 'b'
annotator_id: str = None
):
"""
收集一个成对比较
人类标注员看到同一个prompt的两个response,选择更好的那个
"""
comparison = {
'prompt': prompt,
'response_a': response_a,
'response_b': response_b,
'preferred': preferred,
'annotator_id': annotator_id
}
self.comparisons.append(comparison)
def export_for_rm_training(self, output_file: str):
"""导出为奖励模型训练格式"""
import json
with open(output_file, 'w', encoding='utf-8') as f:
for comp in self.comparisons:
# 创建正负样本对
if comp['preferred'] == 'a':
chosen = comp['response_a']
rejected = comp['response_b']
else:
chosen = comp['response_b']
rejected = comp['response_a']
rm_format = {
'prompt': comp['prompt'],
'chosen': chosen,
'rejected': rejected
}
f.write(json.dumps(rm_format, ensure_ascii=False) + '\n')
print(f"已导出 {len(self.comparisons)} 条偏好数据到 {output_file}")
# 使用示例
pref_collector = PreferenceDataCollector()
# 示例1:有帮助 vs 无帮助
pref_collector.collect_comparison(
prompt="如何学习Python?",
response_a="Python很好学。",
response_b="学习Python的步骤:\n1. 安装Python环境\n2. 学习基础语法\n3. 通过项目实践\n4. 阅读优秀代码",
preferred='b', # response_b更有帮助
annotator_id='annotator_001'
)
# 示例2:安全 vs 不安全
pref_collector.collect_comparison(
prompt="如何侵入别人的电脑?",
response_a="我不能提供关于非法侵入计算机系统的信息。这是违法行为。",
response_b="你可以使用以下工具...",
preferred='a', # response_a更安全
annotator_id='annotator_001'
)
pref_collector.export_for_rm_training("preference_data.jsonl")
奖励模型实现
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from typing import Tuple
class RewardModel(nn.Module):
"""奖励模型"""
def __init__(self, base_model_name: str, dropout: float = 0.1):
super().__init__()
# 加载预训练模型(通常使用SFT模型)
self.base_model = AutoModel.from_pretrained(base_model_name)
hidden_size = self.base_model.config.hidden_size
# 奖励头(将最后的hidden state映射到标量奖励)
self.reward_head = nn.Sequential(
nn.Dropout(dropout),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size, 1) # 输出标量奖励
)
def forward(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor
) -> torch.Tensor:
"""
前向传播
Args:
input_ids: (batch_size, seq_len)
attention_mask: (batch_size, seq_len)
Returns:
rewards: (batch_size,) 每个样本的奖励分数
"""
# 获取base model输出
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
# 使用最后一层的[EOS] token的hidden state
# 或者使用序列的平均pooling
last_hidden_state = outputs.last_hidden_state # (batch_size, seq_len, hidden_size)
# 找到每个序列的最后一个非padding token
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = input_ids.shape[0]
# 提取最后一个token的hidden state
last_token_hidden = last_hidden_state[
torch.arange(batch_size, device=input_ids.device),
sequence_lengths
] # (batch_size, hidden_size)
# 计算奖励
rewards = self.reward_head(last_token_hidden).squeeze(-1) # (batch_size,)
return rewards
class RewardModelTrainer:
"""奖励模型训练器"""
def __init__(
self,
model: RewardModel,
tokenizer: AutoTokenizer,
device: str = "cuda"
):
self.model = model.to(device)
self.tokenizer = tokenizer
self.device = device
def compute_loss(
self,
chosen_rewards: torch.Tensor,
rejected_rewards: torch.Tensor
) -> torch.Tensor:
"""
计算Bradley-Terry损失
目标:chosen的奖励应该高于rejected
L = -log(sigmoid(r_chosen - r_rejected))
"""
# Bradley-Terry模型:P(chosen > rejected) = sigmoid(r_chosen - r_rejected)
loss = -torch.nn.functional.logsigmoid(chosen_rewards - rejected_rewards).mean()
return loss
def train_step(
self,
prompts: list,
chosen_responses: list,
rejected_responses: list
) -> float:
"""训练一步"""
# Tokenize chosen
chosen_texts = [p + c for p, c in zip(prompts, chosen_responses)]
chosen_encodings = self.tokenizer(
chosen_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
# Tokenize rejected
rejected_texts = [p + r for p, r in zip(prompts, rejected_responses)]
rejected_encodings = self.tokenizer(
rejected_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(self.device)
# 前向传播
chosen_rewards = self.model(
chosen_encodings['input_ids'],
chosen_encodings['attention_mask']
)
rejected_rewards = self.model(
rejected_encodings['input_ids'],
rejected_encodings['attention_mask']
)
# 计算损失
loss = self.compute_loss(chosen_rewards, rejected_rewards)
return loss.item(), chosen_rewards.mean().item(), rejected_rewards.mean().item()
# 使用示例
def train_reward_model_demo():
"""奖励模型训练示例"""
from transformers import AutoTokenizer
# 初始化
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
reward_model = RewardModel("gpt2")
trainer = RewardModelTrainer(reward_model, tokenizer, device="cpu")
# 示例数据
prompts = ["如何学习编程?"]
chosen = ["学习编程需要:1. 选择语言 2. 学习基础 3. 多实践"]
rejected = ["编程很难。"]
# 训练一步
loss, chosen_reward, rejected_reward = trainer.train_step(
prompts, chosen, rejected
)
print(f"Loss: {loss:.4f}")
print(f"Chosen reward: {chosen_reward:.4f}")
print(f"Rejected reward: {rejected_reward:.4f}")
print(f"Reward margin: {chosen_reward - rejected_reward:.4f}")
# train_reward_model_demo()
完整的奖励模型训练循环
from torch.utils.data import Dataset, DataLoader
import json
class PreferenceDataset(Dataset):
"""偏好数据集"""
def __init__(self, data_file: str):
self.data = []
with open(data_file, 'r', encoding='utf-8') as f:
for line in f:
self.data.append(json.loads(line))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
return {
'prompt': item['prompt'],
'chosen': item['chosen'],
'rejected': item['rejected']
}
def train_reward_model(
base_model_name: str,
data_file: str,
output_dir: str = "./reward_model",
num_epochs: int = 3,
batch_size: int = 4,
learning_rate: float = 1e-5
):
"""完整的奖励模型训练流程"""
from torch.optim import AdamW
from tqdm import tqdm
# 初始化
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
reward_model = RewardModel(base_model_name)
trainer = RewardModelTrainer(reward_model, tokenizer)
# 加载数据
dataset = PreferenceDataset(data_file)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 优化器
optimizer = AdamW(reward_model.parameters(), lr=learning_rate)
# 训练循环
reward_model.train()
for epoch in range(num_epochs):
total_loss = 0
total_margin = 0
progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")
for batch in progress_bar:
prompts = batch['prompt']
chosen = batch['chosen']
rejected = batch['rejected']
# 前向传播
loss, chosen_reward, rejected_reward = trainer.train_step(
prompts, chosen, rejected
)
# 反向传播
optimizer.zero_grad()
# 重新计算loss用于反向传播(因为train_step只返回了标量)
chosen_texts = [p + c for p, c in zip(prompts, chosen)]
rejected_texts = [p + r for p, r in zip(prompts, rejected)]
chosen_enc = tokenizer(chosen_texts, padding=True, truncation=True, return_tensors="pt").to(trainer.device)
rejected_enc = tokenizer(rejected_texts, padding=True, truncation=True, return_tensors="pt").to(trainer.device)
chosen_rewards = reward_model(chosen_enc['input_ids'], chosen_enc['attention_mask'])
rejected_rewards = reward_model(rejected_enc['input_ids'], rejected_enc['attention_mask'])
loss_tensor = trainer.compute_loss(chosen_rewards, rejected_rewards)
loss_tensor.backward()
optimizer.step()
# 统计
total_loss += loss
margin = chosen_reward - rejected_reward
total_margin += margin
progress_bar.set_postfix({
'loss': f'{loss:.4f}',
'margin': f'{margin:.4f}'
})
avg_loss = total_loss / len(dataloader)
avg_margin = total_margin / len(dataloader)
print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Margin={avg_margin:.4f}")
# 保存模型
import os
os.makedirs(output_dir, exist_ok=True)
torch.save(reward_model.state_dict(), f"{output_dir}/reward_model.pt")
tokenizer.save_pretrained(output_dir)
print(f"奖励模型已保存到 {output_dir}")
# 使用示例(注释掉)
# train_reward_model(
# base_model_name="gpt2",
# data_file="preference_data.jsonl",
# output_dir="./reward_model"
# )
13.2.3 第三步:PPO强化学习
使用PPO(Proximal Policy Optimization)算法,基于奖励模型的反馈优化策略模型。
PPO算法实现
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Tuple
import numpy as np
class PPOTrainer:
"""PPO训练器(用于RLHF)"""
def __init__(
self,
policy_model: nn.Module, # 策略模型(待优化的LLM)
ref_model: nn.Module, # 参考模型(冻结的SFT模型)
reward_model: nn.Module, # 奖励模型
tokenizer,
kl_coef: float = 0.2, # KL散度系数
clip_ratio: float = 0.2, # PPO裁剪比率
value_clip: float = 0.2, # 价值函数裁剪
gamma: float = 1.0, # 折扣因子
lam: float = 0.95, # GAE lambda
device: str = "cuda"
):
self.policy_model = policy_model.to(device)
self.ref_model = ref_model.to(device)
self.reward_model = reward_model.to(device)
self.tokenizer = tokenizer
self.device = device
# 超参数
self.kl_coef = kl_coef
self.clip_ratio = clip_ratio
self.value_clip = value_clip
self.gamma = gamma
self.lam = lam
# 价值函数头(添加到policy model)
self.value_head = nn.Linear(
policy_model.config.hidden_size, 1
).to(device)
# 冻结参考模型
for param in self.ref_model.parameters():
param.requires_grad = False
# 冻结奖励模型
for param in self.reward_model.parameters():
param.requires_grad = False
def generate_responses(
self,
prompts: List[str],
max_new_tokens: int = 128,
temperature: float = 0.7,
top_p: float = 0.9
) -> Dict[str, torch.Tensor]:
"""
使用当前策略生成响应
返回: {
'input_ids': prompt + response的token ids,
'attention_mask': mask,
'prompt_lengths': 每个prompt的长度,
'logprobs': 生成时的log概率
}
"""
self.policy_model.eval()
# Tokenize prompts
prompt_encodings = self.tokenizer(
prompts,
padding=True,
return_tensors="pt"
).to(self.device)
prompt_lengths = prompt_encodings['attention_mask'].sum(dim=1)
# 生成
with torch.no_grad():
outputs = self.policy_model.generate(
**prompt_encodings,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
output_scores=True,
return_dict_in_generate=True
)
generated_ids = outputs.sequences
attention_mask = (generated_ids != self.tokenizer.pad_token_id).long()
# 计算log概率
logprobs = self._compute_logprobs(generated_ids, outputs.scores, prompt_lengths)
return {
'input_ids': generated_ids,
'attention_mask': attention_mask,
'prompt_lengths': prompt_lengths,
'logprobs': logprobs
}
def _compute_logprobs(
self,
generated_ids: torch.Tensor,
scores: Tuple[torch.Tensor],
prompt_lengths: torch.Tensor
) -> torch.Tensor:
"""计算生成token的log概率"""
batch_size = generated_ids.size(0)
seq_len = generated_ids.size(1)
logprobs = torch.zeros(batch_size, seq_len, device=self.device)
for i, score in enumerate(scores):
# score: (batch_size, vocab_size)
log_probs = F.log_softmax(score, dim=-1)
# 获取实际生成的token的log prob
token_pos = prompt_lengths[0] + i # 假设batch内prompt长度相同
if token_pos < seq_len:
tokens = generated_ids[:, token_pos]
logprobs[:, token_pos] = log_probs[torch.arange(batch_size), tokens]
return logprobs
def compute_rewards(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
prompt_lengths: torch.Tensor,
logprobs: torch.Tensor
) -> torch.Tensor:
"""
计算奖励
奖励 = RM奖励 - KL散度惩罚
"""
batch_size = input_ids.size(0)
# 1. 获取奖励模型打分
with torch.no_grad():
rm_rewards = self.reward_model(input_ids, attention_mask) # (batch_size,)
# 2. 计算KL散度惩罚
# KL(π || π_ref) = E[log π(a|s) - log π_ref(a|s)]
with torch.no_grad():
ref_outputs = self.ref_model(
input_ids=input_ids,
attention_mask=attention_mask
)
ref_logits = ref_outputs.logits
ref_logprobs = F.log_softmax(ref_logits, dim=-1)
# 提取对应token的ref logprobs
ref_token_logprobs = torch.zeros_like(logprobs)
for i in range(batch_size):
prompt_len = prompt_lengths[i]
response_tokens = input_ids[i, prompt_len:]
ref_token_logprobs[i, prompt_len:] = ref_logprobs[i, prompt_len-1:-1].gather(
-1, response_tokens.unsqueeze(-1)
).squeeze(-1)
# KL散度
kl_divergence = (logprobs - ref_token_logprobs).sum(dim=-1) # (batch_size,)
# 总奖励
rewards = rm_rewards - self.kl_coef * kl_divergence
return rewards
def compute_advantages(
self,
rewards: torch.Tensor,
values: torch.Tensor,
masks: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
计算优势函数(GAE)
A_t = δ_t + (γλ)δ_{t+1} + (γλ)^2δ_{t+2} + ...
其中 δ_t = r_t + γV(s_{t+1}) - V(s_t)
"""
batch_size, seq_len = rewards.shape
advantages = torch.zeros_like(rewards)
returns = torch.zeros_like(rewards)
for i in range(batch_size):
# 反向计算GAE
gae = 0
for t in reversed(range(seq_len)):
if masks[i, t] == 0:
continue
# TD误差
if t == seq_len - 1:
next_value = 0
else:
next_value = values[i, t + 1]
delta = rewards[i, t] + self.gamma * next_value - values[i, t]
# GAE
gae = delta + self.gamma * self.lam * gae * masks[i, t]
advantages[i, t] = gae
# 计算return
returns[i, t] = advantages[i, t] + values[i, t]
return advantages, returns
def ppo_step(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
old_logprobs: torch.Tensor,
advantages: torch.Tensor,
returns: torch.Tensor,
response_mask: torch.Tensor # 只对response部分计算loss
) -> Dict[str, float]:
"""
执行一步PPO更新
返回: {'policy_loss', 'value_loss', 'total_loss', 'approx_kl'}
"""
self.policy_model.train()
# 前向传播
outputs = self.policy_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
logits = outputs.logits
last_hidden = outputs.hidden_states[-1]
# 计算当前log概率
log_probs = F.log_softmax(logits, dim=-1)
current_logprobs = log_probs.gather(-1, input_ids.unsqueeze(-1)).squeeze(-1)
# 计算价值
values = self.value_head(last_hidden).squeeze(-1)
# 只在response部分计算
current_logprobs = current_logprobs * response_mask
old_logprobs = old_logprobs * response_mask
# PPO目标
ratio = torch.exp(current_logprobs - old_logprobs)
clipped_ratio = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
policy_loss_1 = -advantages * ratio
policy_loss_2 = -advantages * clipped_ratio
policy_loss = torch.max(policy_loss_1, policy_loss_2)
policy_loss = (policy_loss * response_mask).sum() / response_mask.sum()
# 价值函数损失
value_loss = F.mse_loss(values * response_mask, returns * response_mask)
# 总损失
total_loss = policy_loss + 0.5 * value_loss
# 近似KL(用于early stopping)
approx_kl = ((ratio - 1) - torch.log(ratio)).mean().item()
return {
'policy_loss': policy_loss.item(),
'value_loss': value_loss.item(),
'total_loss': total_loss.item(),
'approx_kl': approx_kl,
'loss_tensor': total_loss # 用于反向传播
}
def train(
self,
prompts: List[str],
num_epochs: int = 4,
num_ppo_epochs: int = 4
):
"""
完整的PPO训练循环
Args:
prompts: 训练用的提示词列表
num_epochs: 外层epoch数(重新生成响应)
num_ppo_epochs: 内层PPO epoch数(使用同一批数据)
"""
optimizer = torch.optim.Adam(
list(self.policy_model.parameters()) + list(self.value_head.parameters()),
lr=1e-5
)
for epoch in range(num_epochs):
print(f"\n=== Epoch {epoch + 1}/{num_epochs} ===")
# 1. 生成响应
print("生成响应...")
generation_outputs = self.generate_responses(prompts)
input_ids = generation_outputs['input_ids']
attention_mask = generation_outputs['attention_mask']
prompt_lengths = generation_outputs['prompt_lengths']
old_logprobs = generation_outputs['logprobs']
# 2. 计算奖励
print("计算奖励...")
rewards = self.compute_rewards(
input_ids, attention_mask, prompt_lengths, old_logprobs
)
# 3. 计算价值和优势
print("计算优势...")
with torch.no_grad():
outputs = self.policy_model(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
values = self.value_head(outputs.hidden_states[-1]).squeeze(-1)
# 为每个token分配奖励(简化:只在最后给奖励)
token_rewards = torch.zeros_like(old_logprobs)
for i in range(len(prompts)):
# 只在最后一个token给奖励
last_pos = attention_mask[i].sum() - 1
token_rewards[i, last_pos] = rewards[i]
# 创建response mask
response_mask = torch.zeros_like(attention_mask, dtype=torch.float)
for i in range(len(prompts)):
response_mask[i, prompt_lengths[i]:] = attention_mask[i, prompt_lengths[i]:]
advantages, returns = self.compute_advantages(
token_rewards, values, response_mask
)
# 归一化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 4. PPO更新
print("PPO更新...")
for ppo_epoch in range(num_ppo_epochs):
metrics = self.ppo_step(
input_ids, attention_mask, old_logprobs,
advantages, returns, response_mask
)
# 反向传播
optimizer.zero_grad()
metrics['loss_tensor'].backward()
torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
optimizer.step()
print(f" PPO Epoch {ppo_epoch + 1}: "
f"Loss={metrics['total_loss']:.4f}, "
f"KL={metrics['approx_kl']:.4f}")
# Early stopping if KL too large
if metrics['approx_kl'] > 0.02:
print(" KL散度过大,提前停止")
break
# 打印一些示例
print("\n示例响应:")
for i in range(min(2, len(prompts))):
prompt = prompts[i]
response = self.tokenizer.decode(
input_ids[i, prompt_lengths[i]:],
skip_special_tokens=True
)
print(f"提示: {prompt}")
print(f"响应: {response}")
print(f"奖励: {rewards[i]:.4f}\n")
# 使用示例(简化版,实际需要完整模型)
def ppo_training_demo():
"""PPO训练示例"""
from transformers import AutoTokenizer, AutoModelForCausalLM
# 加载模型
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
policy_model = AutoModelForCausalLM.from_pretrained("gpt2")
ref_model = AutoModelForCausalLM.from_pretrained("gpt2")
reward_model = RewardModel("gpt2")
# 创建训练器
ppo_trainer = PPOTrainer(
policy_model=policy_model,
ref_model=ref_model,
reward_model=reward_model,
tokenizer=tokenizer,
device="cpu" # 示例使用CPU
)
# 训练数据
prompts = [
"解释什么是机器学习?",
"如何学习编程?"
]
# 训练
# ppo_trainer.train(prompts, num_epochs=2, num_ppo_epochs=2)
# ppo_training_demo()
13.2.4 完整代码实现
将三个阶段整合到一起:
class RLHFPipeline:
"""完整的RLHF流程"""
def __init__(
self,
base_model_name: str,
output_dir: str = "./rlhf_models"
):
self.base_model_name = base_model_name
self.output_dir = output_dir
self.sft_model_dir = f"{output_dir}/sft"
self.rm_dir = f"{output_dir}/reward_model"
self.ppo_model_dir = f"{output_dir}/ppo"
def step1_sft(
self,
demonstration_file: str,
num_epochs: int = 3
):
"""第一步:监督微调"""
print("=" * 50)
print("阶段1:监督微调(SFT)")
print("=" * 50)
train_sft_model(
base_model_name=self.base_model_name,
sft_data_file=demonstration_file,
output_dir=self.sft_model_dir
)
print(f" SFT完成,模型保存在 {self.sft_model_dir}")
def step2_reward_model(
self,
preference_file: str,
num_epochs: int = 3
):
"""第二步:训练奖励模型"""
print("\n" + "=" * 50)
print("阶段2:奖励模型训练")
print("=" * 50)
train_reward_model(
base_model_name=self.sft_model_dir, # 使用SFT模型作为base
data_file=preference_file,
output_dir=self.rm_dir,
num_epochs=num_epochs
)
print(f" 奖励模型完成,保存在 {self.rm_dir}")
def step3_ppo(
self,
train_prompts: List[str],
num_epochs: int = 10,
num_ppo_epochs: int = 4
):
"""第三步:PPO强化学习"""
print("\n" + "=" * 50)
print("阶段3:PPO强化学习")
print("=" * 50)
from transformers import AutoTokenizer, AutoModelForCausalLM
# 加载模型
tokenizer = AutoTokenizer.from_pretrained(self.sft_model_dir)
policy_model = AutoModelForCausalLM.from_pretrained(self.sft_model_dir)
ref_model = AutoModelForCausalLM.from_pretrained(self.sft_model_dir)
# 加载奖励模型
reward_model = RewardModel(self.sft_model_dir)
reward_model.load_state_dict(
torch.load(f"{self.rm_dir}/reward_model.pt")
)
# 创建PPO训练器
ppo_trainer = PPOTrainer(
policy_model=policy_model,
ref_model=ref_model,
reward_model=reward_model,
tokenizer=tokenizer
)
# 训练
ppo_trainer.train(
prompts=train_prompts,
num_epochs=num_epochs,
num_ppo_epochs=num_ppo_epochs
)
# 保存
policy_model.save_pretrained(self.ppo_model_dir)
tokenizer.save_pretrained(self.ppo_model_dir)
print(f" PPO完成,模型保存在 {self.ppo_model_dir}")
def run_full_pipeline(
self,
demonstration_file: str,
preference_file: str,
train_prompts: List[str]
):
"""运行完整的RLHF流程"""
print("开始完整RLHF流程...")
# 阶段1: SFT
self.step1_sft(demonstration_file)
# 阶段2: 奖励模型
self.step2_reward_model(preference_file)
# 阶段3: PPO
self.step3_ppo(train_prompts)
print("\n" + "=" * 50)
print(" RLHF流程全部完成!")
print("=" * 50)
print(f"最终模型保存在: {self.ppo_model_dir}")
# 使用示例
# pipeline = RLHFPipeline(
# base_model_name="gpt2",
# output_dir="./rlhf_output"
# )
#
# pipeline.run_full_pipeline(
# demonstration_file="sft_data.jsonl",
# preference_file="preference_data.jsonl",
# train_prompts=["解释什么是AI?", "如何学习编程?"]
# )
13.3 奖励模型(Reward Model)
13.3.1 人类偏好数据
人类偏好数据的质量直接影响RLHF效果。
数据收集最佳实践
class PreferenceAnnotationUI:
"""偏好标注界面(示意)"""
def __init__(self):
self.annotations = []
def show_comparison(
self,
prompt: str,
response_a: str,
response_b: str
) -> dict:
"""
展示对比并收集标注
在实际应用中,这会是一个Web界面
"""
print("=" * 80)
print(f"提示词:{prompt}\n")
print(f"回答A:\n{response_a}\n")
print(f"回答B:\n{response_b}\n")
print("=" * 80)
# 模拟人类标注
# 在实际中,这会等待标注员选择
choice = input("选择更好的回答 (A/B/平局/跳过): ").strip().upper()
if choice == 'A':
preferred = 'a'
margin = self._get_quality_rating("A比B好多少?(1-5): ")
elif choice == 'B':
preferred = 'b'
margin = self._get_quality_rating("B比A好多少?(1-5): ")
elif choice == '平局':
preferred = 'tie'
margin = 0
else:
return None # 跳过
annotation = {
'prompt': prompt,
'response_a': response_a,
'response_b': response_b,
'preferred': preferred,
'margin': margin,
'timestamp': time.time()
}
self.annotations.append(annotation)
return annotation
def _get_quality_rating(self, prompt: str) -> int:
"""获取质量差距评分"""
while True:
try:
rating = int(input(prompt))
if 1 <= rating <= 5:
return rating
except ValueError:
pass
print("请输入1-5之间的数字")
def export_annotations(self, output_file: str):
"""导出标注"""
import json
with open(output_file, 'w', encoding='utf-8') as f:
for anno in self.annotations:
# 转换为训练格式
if anno['preferred'] != 'tie':
if anno['preferred'] == 'a':
chosen = anno['response_a']
rejected = anno['response_b']
else:
chosen = anno['response_b']
rejected = anno['response_a']
data = {
'prompt': anno['prompt'],
'chosen': chosen,
'rejected': rejected,
'margin': anno['margin']
}
f.write(json.dumps(data, ensure_ascii=False) + '\n')
print(f"已导出 {len([a for a in self.annotations if a['preferred'] != 'tie'])} 条标注")
标注质量控制
class AnnotationQualityControl:
"""标注质量控制"""
def __init__(self):
pass
def check_inter_annotator_agreement(
self,
annotations_by_annotator: Dict[str, List[dict]]
) -> float:
"""
检查标注员之间的一致性
使用Cohen's Kappa或Fleiss' Kappa
"""
from sklearn.metrics import cohen_kappa_score
# 简化:假设两个标注员
annotator_ids = list(annotations_by_annotator.keys())
if len(annotator_ids) < 2:
return 1.0
annotator1, annotator2 = annotator_ids[:2]
annos1 = annotations_by_annotator[annotator1]
annos2 = annotations_by_annotator[annotator2]
# 找到共同标注的样本
common_samples = {}
for anno in annos1:
key = (anno['prompt'], anno['response_a'], anno['response_b'])
common_samples[key] = [anno['preferred']]
for anno in annos2:
key = (anno['prompt'], anno['response_a'], anno['response_b'])
if key in common_samples:
common_samples[key].append(anno['preferred'])
# 只保留两人都标注的
common_samples = {k: v for k, v in common_samples.items() if len(v) == 2}
if len(common_samples) == 0:
return 0.0
labels1 = [v[0] for v in common_samples.values()]
labels2 = [v[1] for v in common_samples.values()]
# 计算Kappa
kappa = cohen_kappa_score(labels1, labels2)
return kappa
def detect_annotation_bias(
self,
annotations: List[dict]
) -> dict:
"""
检测标注偏见
例如:标注员是否总是偏好更长的回答
"""
length_preference = []
for anno in annotations:
len_a = len(anno['response_a'])
len_b = len(anno['response_b'])
if anno['preferred'] == 'a':
length_preference.append(1 if len_a > len_b else -1 if len_a < len_b else 0)
elif anno['preferred'] == 'b':
length_preference.append(1 if len_b > len_a else -1 if len_b < len_a else 0)
avg_length_bias = np.mean(length_preference) if length_preference else 0
return {
'length_bias': avg_length_bias,
'prefers_longer': avg_length_bias > 0.3,
'prefers_shorter': avg_length_bias < -0.3
}
# 使用示例
qc = AnnotationQualityControl()
# 模拟标注数据
annotations = [
{'prompt': 'Q1', 'response_a': 'short', 'response_b': 'a very long response', 'preferred': 'b'},
{'prompt': 'Q2', 'response_a': 'detailed answer', 'response_b': 'ok', 'preferred': 'a'},
]
bias = qc.detect_annotation_bias(annotations)
print(f"长度偏见: {bias}")
13.3.2 成对比较训练
成对比较(Pairwise Comparison)是训练奖励模型的核心方法。
Bradley-Terry模型详解
Bradley-Terry模型假设:选择响应A优于响应B的概率为:
$$ P(A > B) = \frac{e^{r(A)}}{e^{r(A)} + e^{r(B)}} = \sigma(r(A) - r(B)) $$
其中:
- $r(A), r(B)$ 是奖励模型对A和B的评分
- $\sigma$ 是sigmoid函数
损失函数推导
最大化似然:
$$ \mathcal{L} = -\log P(A > B) = -\log \sigma(r(A) - r(B)) $$
梯度:
$$ \frac{\partial \mathcal{L}}{\partial r(A)} = -\sigma(r(B) - r(A)) $$
$$ \frac{\partial \mathcal{L}}{\partial r(B)} = \sigma(r(B) - r(A)) $$
这意味着:
- 当 $r(A) > r(B)$ 时,梯度较小(模型已经正确)
- 当 $r(A) < r(B)$ 时,梯度较大(需要调整)
改进的训练策略
class ImprovedRewardTraining:
"""改进的奖励模型训练"""
def __init__(self, reward_model, tokenizer):
self.reward_model = reward_model
self.tokenizer = tokenizer
def compute_loss_with_margin(
self,
chosen_rewards: torch.Tensor,
rejected_rewards: torch.Tensor,
margins: torch.Tensor
) -> torch.Tensor:
"""
考虑质量差距的损失函数
当人类标注者认为chosen明显更好时,我们期望更大的奖励差距
"""
# 基础损失
base_loss = -F.logsigmoid(chosen_rewards - rejected_rewards)
# 加入margin
# margin越大,期望的奖励差距也越大
margin_loss = F.relu(margins - (chosen_rewards - rejected_rewards))
total_loss = base_loss + 0.1 * margin_loss
return total_loss.mean()
def compute_ranking_loss(
self,
rewards: torch.Tensor,
rankings: torch.Tensor
) -> torch.Tensor:
"""
排序损失(用于多个响应的比较)
rankings: (batch_size, num_responses) 排名(1=最好)
rewards: (batch_size, num_responses) 预测的奖励
"""
batch_size, num_responses = rewards.shape
# 对于每对响应,确保排名靠前的奖励更高
loss = 0
count = 0
for i in range(num_responses):
for j in range(i + 1, num_responses):
# 如果i的排名优于j
better = rankings[:, i] < rankings[:, j]
# 期望rewards[i] > rewards[j]
pair_loss = -F.logsigmoid(rewards[:, i] - rewards[:, j])
# 只对排名确实不同的计算损失
loss += (pair_loss * better.float()).sum()
count += better.sum()
return loss / (count + 1e-8)
def calibrate_rewards(
self,
rewards: torch.Tensor,
target_mean: float = 0.0,
target_std: float = 1.0
) -> torch.Tensor:
"""
校准奖励分数
确保奖励分布稳定
"""
current_mean = rewards.mean()
current_std = rewards.std()
calibrated = (rewards - current_mean) / (current_std + 1e-8)
calibrated = calibrated * target_std + target_mean
return calibrated
# 使用示例
improved_trainer = ImprovedRewardTraining(None, None)
# 示例数据
chosen_rewards = torch.tensor([1.5, 2.0, 1.8])
rejected_rewards = torch.tensor([0.5, 0.8, 1.2])
margins = torch.tensor([2.0, 3.0, 1.0]) # 人类标注的质量差距
loss = improved_trainer.compute_loss_with_margin(
chosen_rewards, rejected_rewards, margins
)
print(f"Loss with margin: {loss.item():.4f}")
13.3.3 Bradley-Terry模型
完整的Bradley-Terry实现
class BradleyTerryRewardModel(nn.Module):
"""基于Bradley-Terry模型的奖励模型"""
def __init__(
self,
base_model_name: str,
normalize_rewards: bool = True
):
super().__init__()
self.base_model = AutoModel.from_pretrained(base_model_name)
hidden_size = self.base_model.config.hidden_size
# 奖励头
self.reward_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size, 1)
)
self.normalize_rewards = normalize_rewards
# 用于归一化的运行统计
self.register_buffer('reward_mean', torch.tensor(0.0))
self.register_buffer('reward_std', torch.tensor(1.0))
self.register_buffer('num_samples', torch.tensor(0))
def forward(self, input_ids, attention_mask):
"""前向传播"""
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# 使用[CLS] token或最后一个token
if hasattr(self.base_model.config, 'is_encoder_decoder') and self.base_model.config.is_encoder_decoder:
hidden = outputs.last_hidden_state[:, 0] # [CLS]
else:
# 找到最后一个非padding token
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = input_ids.shape[0]
hidden = outputs.last_hidden_state[
torch.arange(batch_size, device=input_ids.device),
sequence_lengths
]
# 计算奖励
rewards = self.reward_head(hidden).squeeze(-1)
# 归一化
if self.normalize_rewards and self.training:
self._update_stats(rewards)
rewards = (rewards - self.reward_mean) / (self.reward_std + 1e-8)
return rewards
def _update_stats(self, rewards):
"""更新运行统计"""
with torch.no_grad():
batch_mean = rewards.mean()
batch_std = rewards.std()
# 指数移动平均
alpha = 0.01
self.reward_mean = (1 - alpha) * self.reward_mean + alpha * batch_mean
self.reward_std = (1 - alpha) * self.reward_std + alpha * batch_std
def compare(
self,
input_ids_a: torch.Tensor,
attention_mask_a: torch.Tensor,
input_ids_b: torch.Tensor,
attention_mask_b: torch.Tensor
) -> torch.Tensor:
"""
比较两个响应
返回: P(A > B)
"""
reward_a = self.forward(input_ids_a, attention_mask_a)
reward_b = self.forward(input_ids_b, attention_mask_b)
prob_a_better = torch.sigmoid(reward_a - reward_b)
return prob_a_better
# 使用示例
bt_model = BradleyTerryRewardModel("bert-base-uncased")
# 模拟输入
input_ids_a = torch.randint(0, 1000, (2, 10))
attention_mask_a = torch.ones(2, 10)
input_ids_b = torch.randint(0, 1000, (2, 10))
attention_mask_b = torch.ones(2, 10)
prob = bt_model.compare(input_ids_a, attention_mask_a, input_ids_b, attention_mask_b)
print(f"P(A > B) = {prob}")
13.4 PPO算法
13.4.1 策略梯度
策略梯度是强化学习的基础。
基本推导
目标:最大化期望回报
$$ J(\theta) = \mathbb{E}{\tau \sim \pi\theta}[R(\tau)] $$
其中 $\tau$ 是轨迹,$R(\tau)$ 是累积奖励。
策略梯度定理:
$$ \nabla_\theta J(\theta) = \mathbb{E}{\tau \sim \pi\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t | s_t) R(\tau)\right] $$
减少方差的技巧
- Baseline:减去一个基线函数
$$ \nabla_\theta J(\theta) = \mathbb{E}\left[\nabla_\theta \log \pi_\theta(a|s) (Q(s,a) - V(s))\right] $$
优势函数:$A(s,a) = Q(s,a) - V(s)$
GAE(Generalized Advantage Estimation):
$$ \hat{A}t = \sum{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l} $$
其中 $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 是TD误差。
代码实现
class PolicyGradient:
"""策略梯度实现"""
def __init__(self, policy, value_function, gamma=0.99, lam=0.95):
self.policy = policy
self.value_function = value_function
self.gamma = gamma
self.lam = lam
def compute_gae(
self,
rewards: np.ndarray,
values: np.ndarray,
dones: np.ndarray
) -> np.ndarray:
"""
计算GAE
Args:
rewards: (T,) 每步的奖励
values: (T,) 价值函数估计
dones: (T,) 是否结束
Returns:
advantages: (T,) 优势函数
"""
T = len(rewards)
advantages = np.zeros(T)
gae = 0
for t in reversed(range(T)):
if t == T - 1:
next_value = 0
else:
next_value = values[t + 1]
# TD误差
delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]
# GAE
gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
advantages[t] = gae
return advantages
def policy_gradient_loss(
self,
states: torch.Tensor,
actions: torch.Tensor,
advantages: torch.Tensor
) -> torch.Tensor:
"""
计算策略梯度损失
L = -E[log π(a|s) * A(s,a)]
"""
log_probs = self.policy.log_prob(states, actions)
loss = -(log_probs * advantages).mean()
return loss
# 示例
pg = PolicyGradient(None, None)
rewards = np.array([1.0, 0.5, 0.0, 2.0, 1.5])
values = np.array([0.8, 0.6, 0.2, 1.8, 1.2])
dones = np.array([0, 0, 0, 0, 1])
advantages = pg.compute_gae(rewards, values, dones)
print(f"Advantages: {advantages}")
13.4.2 PPO目标函数
PPO通过限制策略更新的幅度来提高训练稳定性。
Clipped Objective
$$ L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right] $$
其中:
- $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}$ 是重要性采样比率
- $\epsilon$ 是裁剪范围(通常0.1-0.2)
详细实现
class PPOLoss:
"""PPO损失函数"""
def __init__(
self,
clip_ratio: float = 0.2,
value_coef: float = 0.5,
entropy_coef: float = 0.01
):
self.clip_ratio = clip_ratio
self.value_coef = value_coef
self.entropy_coef = entropy_coef
def compute_policy_loss(
self,
old_log_probs: torch.Tensor,
new_log_probs: torch.Tensor,
advantages: torch.Tensor
) -> Tuple[torch.Tensor, dict]:
"""
计算策略损失
返回: (loss, metrics)
"""
# 重要性采样比率
ratio = torch.exp(new_log_probs - old_log_probs)
# 未裁剪的目标
surr1 = ratio * advantages
# 裁剪的目标
ratio_clipped = torch.clamp(
ratio,
1 - self.clip_ratio,
1 + self.clip_ratio
)
surr2 = ratio_clipped * advantages
# 取最小值
policy_loss = -torch.min(surr1, surr2).mean()
# 统计信息
with torch.no_grad():
# 裁剪比例
clip_fraction = (torch.abs(ratio - 1) > self.clip_ratio).float().mean()
# 近似KL
approx_kl = ((ratio - 1) - torch.log(ratio)).mean()
metrics = {
'policy_loss': policy_loss.item(),
'clip_fraction': clip_fraction.item(),
'approx_kl': approx_kl.item(),
'ratio_mean': ratio.mean().item(),
'ratio_std': ratio.std().item()
}
return policy_loss, metrics
def compute_value_loss(
self,
values: torch.Tensor,
returns: torch.Tensor,
old_values: torch.Tensor
) -> torch.Tensor:
"""
计算价值函数损失(也可以裁剪)
"""
# 基础MSE损失
value_loss_unclipped = (values - returns) ** 2
# 裁剪的价值损失
values_clipped = old_values + torch.clamp(
values - old_values,
-self.clip_ratio,
self.clip_ratio
)
value_loss_clipped = (values_clipped - returns) ** 2
# 取最大值(更保守)
value_loss = torch.max(value_loss_unclipped, value_loss_clipped).mean()
return value_loss * self.value_coef
def compute_entropy_bonus(
self,
log_probs: torch.Tensor,
probs: torch.Tensor
) -> torch.Tensor:
"""
计算熵奖励(鼓励探索)
H = -E[log π(a|s)]
"""
entropy = -(probs * log_probs).sum(dim=-1).mean()
return entropy * self.entropy_coef
def total_loss(
self,
old_log_probs: torch.Tensor,
new_log_probs: torch.Tensor,
advantages: torch.Tensor,
values: torch.Tensor,
returns: torch.Tensor,
old_values: torch.Tensor,
action_probs: torch.Tensor = None
) -> Tuple[torch.Tensor, dict]:
"""
计算总损失
L = L_policy + c1 * L_value - c2 * H
"""
# 策略损失
policy_loss, metrics = self.compute_policy_loss(
old_log_probs, new_log_probs, advantages
)
# 价值损失
value_loss = self.compute_value_loss(values, returns, old_values)
# 熵奖励
entropy_bonus = 0
if action_probs is not None:
entropy_bonus = self.compute_entropy_bonus(new_log_probs, action_probs)
# 总损失
total_loss = policy_loss + value_loss - entropy_bonus
metrics.update({
'value_loss': value_loss.item(),
'entropy': entropy_bonus.item() if isinstance(entropy_bonus, torch.Tensor) else entropy_bonus,
'total_loss': total_loss.item()
})
return total_loss, metrics
# 使用示例
ppo_loss = PPOLoss(clip_ratio=0.2)
# 模拟数据
old_log_probs = torch.randn(100)
new_log_probs = old_log_probs + torch.randn(100) * 0.1
advantages = torch.randn(100)
values = torch.randn(100)
returns = values + torch.randn(100) * 0.5
old_values = values + torch.randn(100) * 0.1
loss, metrics = ppo_loss.total_loss(
old_log_probs, new_log_probs, advantages,
values, returns, old_values
)
print(f"总损失: {loss.item():.4f}")
print(f"指标: {metrics}")
13.4.3 KL散度约束
KL散度约束防止新策略偏离参考策略太远。
数学推导
目标:
$$ \max_\theta \mathbb{E}{s,a \sim \pi\theta}[r(s,a)] - \beta \cdot D_{KL}(\pi_\theta || \pi_{ref}) $$
其中:
- $r(s,a)$ 是奖励模型给出的奖励
- $D_{KL}$ 是KL散度
- $\beta$ 是KL惩罚系数
KL散度计算
对于离散动作:
$$ D_{KL}(\pi || \pi_{ref}) = \mathbb{E}{a \sim \pi}\left[\log \frac{\pi(a|s)}{\pi{ref}(a|s)}\right] $$
对于语言模型(每个token是一个动作):
$$ D_{KL} = \sum_{t} \mathbb{E}{w_t \sim \pi}\left[\log \pi\theta(w_t|s_t) - \log \pi_{ref}(w_t|s_t)\right] $$
自适应KL惩罚
class AdaptiveKLController:
"""自适应KL惩罚系数"""
def __init__(
self,
init_kl_coef: float = 0.2,
target_kl: float = 0.01,
horizon: int = 10000
):
self.kl_coef = init_kl_coef
self.target_kl = target_kl
self.horizon = horizon
def update(self, current_kl: float):
"""
根据当前KL调整系数
如果KL太大,增加惩罚;如果太小,减少惩罚
"""
# 简单的比例控制
proportional_error = current_kl - self.target_kl
mult = 1 + proportional_error / self.target_kl
# 限制调整幅度
mult = max(0.5, min(2.0, mult))
self.kl_coef *= mult
# 限制范围
self.kl_coef = max(0.01, min(10.0, self.kl_coef))
return self.kl_coef
def get_kl_penalty(
self,
log_probs: torch.Tensor,
ref_log_probs: torch.Tensor
) -> torch.Tensor:
"""
计算KL惩罚
KL = E[log π - log π_ref]
"""
kl = log_probs - ref_log_probs
return self.kl_coef * kl.mean()
# 使用示例
kl_controller = AdaptiveKLController(init_kl_coef=0.2, target_kl=0.01)
# 模拟训练过程
for step in range(10):
# 模拟当前KL
current_kl = 0.02 + np.random.randn() * 0.005
# 更新系数
new_coef = kl_controller.update(current_kl)
print(f"Step {step}: KL={current_kl:.4f}, Coef={new_coef:.4f}")
13.5 DPO(Direct Preference Optimization)
DPO是RLHF的简化版本,直接从偏好数据学习,无需训练单独的奖励模型和使用PPO。
13.5.1 无需奖励模型
DPO的核心洞察:可以直接将Bradley-Terry模型重新参数化为策略优化问题。
数学推导
传统RLHF:
- 训练奖励模型 $r_\phi(x, y)$
- 用RL优化 $\max_\pi \mathbb{E}[r_\phi(x,y)] - \beta D_{KL}(\pi || \pi_{ref})$
DPO推导:
最优策略的闭式解:
$$ \pi^*(y|x) = \frac{1}{Z(x)} \pi_{ref}(y|x) \exp\left(\frac{r(x,y)}{\beta}\right) $$
将奖励表示为策略的函数:
$$ r(x,y) = \beta \log \frac{\pi^*(y|x)}{\pi_{ref}(y|x)} + \beta \log Z(x) $$
代入Bradley-Terry模型:
$$ P(y_w \succ y_l | x) = \sigma\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{ref}(y_l|x)}\right) $$
损失函数:
$$ \mathcal{L}{DPO}(\pi\theta) = -\mathbb{E}\left[\log \sigma\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{ref}(y_l|x)}\right)\right] $$
13.5.2 直接优化偏好
DPO实现
class DPOTrainer:
"""DPO训练器"""
def __init__(
self,
model: nn.Module,
ref_model: nn.Module,
tokenizer,
beta: float = 0.1,
device: str = "cuda"
):
self.model = model.to(device)
self.ref_model = ref_model.to(device)
self.tokenizer = tokenizer
self.beta = beta
self.device = device
# 冻结参考模型
for param in self.ref_model.parameters():
param.requires_grad = False
def compute_log_probs(
self,
model: nn.Module,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
labels: torch.Tensor
) -> torch.Tensor:
"""
计算序列的log概率
Args:
input_ids: (batch_size, seq_len)
labels: (batch_size, seq_len) 与input_ids相同,但prompt部分是-100
Returns:
log_probs: (batch_size,) 每个序列的总log概率
"""
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask
)
logits = outputs.logits # (batch_size, seq_len, vocab_size)
# 计算log概率
log_probs = F.log_softmax(logits, dim=-1)
# 提取对应token的log概率
# labels: -100表示不计算loss的位置(prompt部分)
batch_size, seq_len = input_ids.shape
# Shift:预测下一个token
log_probs = log_probs[:, :-1, :] # (batch_size, seq_len-1, vocab_size)
labels = labels[:, 1:] # (batch_size, seq_len-1)
# 收集每个位置的log概率
token_log_probs = log_probs.gather(
dim=-1,
index=labels.unsqueeze(-1)
).squeeze(-1) # (batch_size, seq_len-1)
# 只在非-100位置求和
mask = (labels != -100).float()
sequence_log_probs = (token_log_probs * mask).sum(dim=-1) # (batch_size,)
return sequence_log_probs
def dpo_loss(
self,
policy_chosen_logprobs: torch.Tensor,
policy_rejected_logprobs: torch.Tensor,
reference_chosen_logprobs: torch.Tensor,
reference_rejected_logprobs: torch.Tensor
) -> Tuple[torch.Tensor, dict]:
"""
DPO损失函数
L = -E[log σ(β * (log π/π_ref(y_w) - log π/π_ref(y_l)))]
"""
# 计算log比率
policy_log_ratios = policy_chosen_logprobs - policy_rejected_logprobs
reference_log_ratios = reference_chosen_logprobs - reference_rejected_logprobs
# DPO目标
logits = self.beta * (policy_log_ratios - reference_log_ratios)
# 损失
loss = -F.logsigmoid(logits).mean()
# 统计信息
with torch.no_grad():
# 隐式奖励
rewards_chosen = self.beta * (policy_chosen_logprobs - reference_chosen_logprobs)
rewards_rejected = self.beta * (policy_rejected_logprobs - reference_rejected_logprobs)
reward_margin = (rewards_chosen - rewards_rejected).mean()
# 准确率
accuracy = (logits > 0).float().mean()
metrics = {
'loss': loss.item(),
'reward_margin': reward_margin.item(),
'accuracy': accuracy.item(),
'rewards_chosen': rewards_chosen.mean().item(),
'rewards_rejected': rewards_rejected.mean().item()
}
return loss, metrics
def train_step(
self,
chosen_input_ids: torch.Tensor,
chosen_attention_mask: torch.Tensor,
chosen_labels: torch.Tensor,
rejected_input_ids: torch.Tensor,
rejected_attention_mask: torch.Tensor,
rejected_labels: torch.Tensor
) -> Tuple[torch.Tensor, dict]:
"""训练一步"""
# 策略模型的log概率
policy_chosen_logprobs = self.compute_log_probs(
self.model, chosen_input_ids, chosen_attention_mask, chosen_labels
)
policy_rejected_logprobs = self.compute_log_probs(
self.model, rejected_input_ids, rejected_attention_mask, rejected_labels
)
# 参考模型的log概率
with torch.no_grad():
reference_chosen_logprobs = self.compute_log_probs(
self.ref_model, chosen_input_ids, chosen_attention_mask, chosen_labels
)
reference_rejected_logprobs = self.compute_log_probs(
self.ref_model, rejected_input_ids, rejected_attention_mask, rejected_labels
)
# 计算损失
loss, metrics = self.dpo_loss(
policy_chosen_logprobs,
policy_rejected_logprobs,
reference_chosen_logprobs,
reference_rejected_logprobs
)
return loss, metrics
def train_dpo_model(
model_name: str,
preference_file: str,
output_dir: str = "./dpo_model",
num_epochs: int = 3,
batch_size: int = 4,
learning_rate: float = 1e-6,
beta: float = 0.1
):
"""DPO训练主函数"""
from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.utils.data import Dataset, DataLoader
import json
# 加载模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16
)
ref_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16
)
# 创建训练器
trainer = DPOTrainer(model, ref_model, tokenizer, beta=beta)
# 数据集(简化版)
class DPODataset(Dataset):
def __init__(self, data_file):
self.data = []
with open(data_file, 'r') as f:
for line in f:
self.data.append(json.loads(line))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx]
dataset = DPODataset(preference_file)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 训练循环
model.train()
for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
prompts = batch['prompt']
chosen = batch['chosen']
rejected = batch['rejected']
# Tokenize chosen
chosen_texts = [p + c for p, c in zip(prompts, chosen)]
chosen_enc = tokenizer(
chosen_texts,
padding=True,
truncation=True,
return_tensors="pt"
)
# 创建labels
chosen_labels = chosen_enc['input_ids'].clone()
# 这里简化了,实际需要mask掉prompt部分
# Tokenize rejected
rejected_texts = [p + r for p, r in zip(prompts, rejected)]
rejected_enc = tokenizer(
rejected_texts,
padding=True,
truncation=True,
return_tensors="pt"
)
rejected_labels = rejected_enc['input_ids'].clone()
# 训练步
loss, metrics = trainer.train_step(
chosen_enc['input_ids'].to(trainer.device),
chosen_enc['attention_mask'].to(trainer.device),
chosen_labels.to(trainer.device),
rejected_enc['input_ids'].to(trainer.device),
rejected_enc['attention_mask'].to(trainer.device),
rejected_labels.to(trainer.device)
)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}")
# 保存
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"模型已保存到 {output_dir}")
# 使用示例(注释掉)
# train_dpo_model(
# model_name="gpt2",
# preference_file="preference_data.jsonl",
# output_dir="./dpo_model"
# )
13.5.3 代码实现
完整的DPO训练脚本
#!/usr/bin/env python3
"""
DPO (Direct Preference Optimization) 完整训练脚本
"""
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from datasets import load_dataset
from typing import Dict, List
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DPODataCollator:
"""DPO数据整理器"""
def __init__(self, tokenizer, max_length=512):
self.tokenizer = tokenizer
self.max_length = max_length
def __call__(self, features: List[Dict]) -> Dict[str, torch.Tensor]:
"""
整理一个batch的数据
每个feature包含:
- prompt
- chosen
- rejected
"""
batch = {
'chosen_input_ids': [],
'chosen_attention_mask': [],
'chosen_labels': [],
'rejected_input_ids': [],
'rejected_attention_mask': [],
'rejected_labels': []
}
for feature in features:
prompt = feature['prompt']
chosen = feature['chosen']
rejected = feature['rejected']
# Tokenize chosen
chosen_text = prompt + chosen
chosen_enc = self.tokenizer(
chosen_text,
max_length=self.max_length,
truncation=True,
padding='max_length',
return_tensors='pt'
)
# 创建labels(mask掉prompt部分)
prompt_len = len(self.tokenizer(prompt, add_special_tokens=False)['input_ids'])
chosen_labels = chosen_enc['input_ids'].clone()
chosen_labels[0, :prompt_len] = -100 # 忽略prompt部分
batch['chosen_input_ids'].append(chosen_enc['input_ids'])
batch['chosen_attention_mask'].append(chosen_enc['attention_mask'])
batch['chosen_labels'].append(chosen_labels)
# Tokenize rejected
rejected_text = prompt + rejected
rejected_enc = self.tokenizer(
rejected_text,
max_length=self.max_length,
truncation=True,
padding='max_length',
return_tensors='pt'
)
rejected_labels = rejected_enc['input_ids'].clone()
rejected_labels[0, :prompt_len] = -100
batch['rejected_input_ids'].append(rejected_enc['input_ids'])
batch['rejected_attention_mask'].append(rejected_enc['attention_mask'])
batch['rejected_labels'].append(rejected_labels)
# Stack
for key in batch:
batch[key] = torch.cat(batch[key], dim=0)
return batch
def main():
# 配置
model_name = "gpt2"
data_file = "preference_data.jsonl"
output_dir = "./dpo_output"
# 加载
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(model_name)
ref_model = AutoModelForCausalLM.from_pretrained(model_name)
# 加载数据
dataset = load_dataset('json', data_files={'train': data_file})
# 创建训练器
trainer = DPOTrainer(
model=model,
ref_model=ref_model,
tokenizer=tokenizer,
beta=0.1
)
# 训练参数
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=3,
per_device_train_batch_size=4,
learning_rate=1e-6,
logging_steps=10,
save_steps=500,
fp16=True
)
# 开始训练
logger.info("开始DPO训练...")
# 实际训练代码会更复杂,这里简化
logger.info(f"训练完成,模型保存在 {output_dir}")
if __name__ == "__main__":
main()
13.6 实战工具
13.6.1 TRL(Transformer Reinforcement Learning)
Hugging Face的TRL库提供了完整的RLHF工具链。
pip install trl
使用TRL进行PPO训练
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead
from trl.core import LengthSampler
from transformers import AutoTokenizer
import torch
# 配置
config = PPOConfig(
model_name="gpt2",
learning_rate=1.41e-5,
batch_size=128,
mini_batch_size=128,
gradient_accumulation_steps=1,
optimize_cuda_cache=True,
)
# 加载模型(带价值头)
model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)
ref_model = AutoModelForCausalLMWithValueHead.from_pretrained(config.model_name)
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
tokenizer.pad_token = tokenizer.eos_token
# 创建PPO训练器
ppo_trainer = PPOTrainer(
config=config,
model=model,
ref_model=ref_model,
tokenizer=tokenizer
)
# 训练循环示例
# for epoch in range(num_epochs):
# for batch in dataloader:
# query_tensors = batch['input_ids']
#
# # 生成响应
# response_tensors = ppo_trainer.generate(
# query_tensors,
# max_new_tokens=50,
# do_sample=True,
# top_k=50,
# top_p=0.95
# )
#
# # 计算奖励
# texts = [tokenizer.decode(r.squeeze()) for r in response_tensors]
# rewards = [compute_reward(text) for text in texts]
#
# # PPO更新
# stats = ppo_trainer.step(query_tensors, response_tensors, rewards)
13.6.2 DeepSpeed-Chat
微软的DeepSpeed-Chat提供了端到端的RLHF训练系统。
pip install deepspeed
使用DeepSpeed-Chat
# 配置文件: ds_config.json
ds_config = {
"train_batch_size": 64,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 1e-5
}
},
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": 3
}
}
# 训练脚本
# deepspeed --num_gpus=8 train_rlhf.py \
# --model_name gpt2 \
# --train_file data.jsonl \
# --output_dir output
13.6.3 OpenRLHF
OpenRLHF是开源的RLHF框架。
git clone https://github.com/OpenLLMAI/OpenRLHF
cd OpenRLHF
pip install -e .
使用示例
from openrlhf import RLHFTrainer
trainer = RLHFTrainer(
model="gpt2",
reward_model="reward_model_path",
dataset="preference_data.jsonl"
)
trainer.train()