第12章:模型微调与LoRA技术
大语言模型的预训练需要海量数据和巨大算力,但预训练模型往往需要针对特定任务或领域进行调整才能发挥最佳效果。微调(Fine-tuning)是让预训练模型适应下游任务的关键技术。本章将深入探讨微调的原理、方法和实践,特别是参数高效微调技术LoRA。
12.1 微调基础
12.1.1 为什么需要微调
预训练模型虽然在大规模语料上学习了丰富的语言知识,但存在以下局限:
1. 通用性与专业性的矛盾
预训练模型追求通用能力,但在特定领域(如医疗、法律、金融)的表现可能不如专门训练的模型。例如,通用模型可能无法准确理解医学术语或法律条款的细微差别。
2. 任务格式的适配
预训练阶段主要是语言建模任务(预测下一个词),而实际应用中需要完成问答、摘要、翻译等特定格式的任务。模型需要学习如何理解和遵循指令。
3. 行为和风格的定制
不同应用场景对模型的回答风格、安全性、价值观有不同要求。例如,客服机器人需要礼貌友好,而代码助手需要简洁准确。
4. 知识的更新和补充
预训练数据有时效性限制,微调可以注入最新知识或私有领域知识。
12.1.2 预训练 vs 微调
让我们从数学角度理解两者的区别。
预训练阶段
目标是最大化语言建模的似然:
$$ \mathcal{L}{\text{pretrain}} = -\sum{i=1}^{N} \log P_\theta(x_i | x_{<i}) $$
其中:
- $x_i$ 是序列中的第 $i$ 个token
- $\theta$ 是模型参数
- $x_{<i}$ 是前文上下文
预训练使用海量无标注文本(TB级),学习语言的统计规律和知识。
微调阶段
目标是最大化特定任务的性能:
$$ \mathcal{L}{\text{finetune}} = -\sum{(x,y) \in \mathcal{D}} \log P_\theta(y | x) $$
其中:
- $(x, y)$ 是输入输出对
- $\mathcal{D}$ 是标注数据集(通常KB到GB级)
微调在预训练权重 $\theta_{\text{pretrain}}$ 基础上继续训练:
$$ \theta_{\text{finetuned}} = \theta_{\text{pretrain}} + \Delta\theta $$
关键差异:
| 维度 | 预训练 | 微调 |
|---|---|---|
| 数据量 | TB级 | MB到GB级 |
| 数据类型 | 无标注文本 | 标注的输入输出对 |
| 目标 | 通用语言能力 | 特定任务性能 |
| 计算成本 | 数千GPU-月 | 数GPU-天到GPU-月 |
| 学习率 | 较大(1e-4) | 较小(1e-5到1e-6) |
12.1.3 微调的成本和效果
成本分析
以LLaMA-7B为例:
# 模型参数量
params = 7_000_000_000 # 7B参数
# 全参数微调内存需求(FP32)
model_size = params * 4 # 4 bytes per parameter
optimizer_states = params * 8 # Adam需要2个状态(momentum, variance)
gradients = params * 4
activations_per_token = params * 4 * 2 # 粗略估计
# 批次大小为8,序列长度2048
batch_size = 8
seq_length = 2048
total_memory_gb = (model_size + optimizer_states + gradients +
activations_per_token * batch_size * seq_length) / (1024**3)
print(f"预估内存需求: {total_memory_gb:.1f} GB")
# 输出:预估内存需求: 112.0 GB
# 需要多少GPU?
gpu_memory = 80 # A100 80GB
num_gpus = (total_memory_gb + gpu_memory - 1) // gpu_memory
print(f"需要GPU数量: {num_gpus}")
# 输出:需要GPU数量: 2
训练时间估算
# 假设使用8xA100 GPU
gpus = 8
samples_per_gpu = 2
gradient_accumulation = 4
effective_batch_size = gpus * samples_per_gpu * gradient_accumulation # 64
# 数据集大小
dataset_size = 100_000 # 10万条指令数据
epochs = 3
# 训练步数
total_steps = (dataset_size * epochs) // effective_batch_size
print(f"总训练步数: {total_steps}")
# 输出: 总训练步数: 4687
# 每步时间(经验值)
seconds_per_step = 2.5
total_hours = (total_steps * seconds_per_step) / 3600
print(f"预计训练时间: {total_hours:.1f} 小时")
# 输出: 预计训练时间: 3.3 小时
# 成本估算(A100 $2/GPU/hour)
cost = total_hours * gpus * 2
print(f"云端训练成本: ${cost:.0f}")
# 输出: 云端训练成本: $53
效果评估
微调效果取决于多个因素:
- 数据质量:高质量的标注数据比数量更重要
- 数据量:通常1000-10000条数据即可见效
- 任务相关性:越接近预训练任务,效果越好
- 模型大小:大模型微调收益更明显
典型效果提升:
# MMLU基准测试(通用知识)
base_model_accuracy = 0.45
finetuned_model_accuracy = 0.52
improvement = (finetuned_model_accuracy - base_model_accuracy) / base_model_accuracy
print(f"通用任务提升: {improvement*100:.1f}%")
# 输出: 通用任务提升: 15.6%
# 特定领域任务
domain_base_accuracy = 0.35
domain_finetuned_accuracy = 0.78
domain_improvement = (domain_finetuned_accuracy - domain_base_accuracy) / domain_base_accuracy
print(f"领域任务提升: {domain_improvement*100:.1f}%")
# 输出: 领域任务提升: 122.9%
12.2 全参数微调(Full Fine-tuning)
12.2.1 完整训练流程
全参数微调更新模型的所有权重。以下是完整的训练流程:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM, AdamW
from transformers import get_linear_schedule_with_warmup
import logging
from tqdm import tqdm
import json
from typing import Dict, List
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Config:
"""训练配置"""
# 模型配置
model_name = "meta-llama/Llama-2-7b-hf"
# 训练配置
num_epochs = 3
batch_size = 4
gradient_accumulation_steps = 8
learning_rate = 2e-5
weight_decay = 0.01
warmup_ratio = 0.03
max_grad_norm = 1.0
# 数据配置
max_seq_length = 2048
train_file = "train.jsonl"
val_file = "val.jsonl"
# 保存配置
output_dir = "./finetuned_model"
save_steps = 500
eval_steps = 500
logging_steps = 10
# 硬件配置
device = "cuda" if torch.cuda.is_available() else "cpu"
fp16 = True # 混合精度训练
class InstructionDataset(Dataset):
"""指令微调数据集"""
def __init__(self, data_path: str, tokenizer, max_length: int):
self.tokenizer = tokenizer
self.max_length = max_length
# 加载数据
self.examples = []
with open(data_path, 'r', encoding='utf-8') as f:
for line in f:
self.examples.append(json.loads(line))
logger.info(f"加载 {len(self.examples)} 条数据从 {data_path}")
def __len__(self):
return len(self.examples)
def __getitem__(self, idx) -> Dict[str, torch.Tensor]:
example = self.examples[idx]
# 构建输入格式
# Alpaca格式: ### Instruction: {instruction}\n### Input: {input}\n### Response: {output}
if example.get('input', '').strip():
prompt = f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n"
else:
prompt = f"### Instruction:\n{example['instruction']}\n\n### Response:\n"
full_text = prompt + example['output']
# Tokenize
encodings = self.tokenizer(
full_text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
input_ids = encodings['input_ids'].squeeze()
attention_mask = encodings['attention_mask'].squeeze()
# 创建labels:只计算response部分的loss
prompt_length = len(self.tokenizer(prompt, truncation=True, max_length=self.max_length)['input_ids'])
labels = input_ids.clone()
labels[:prompt_length] = -100 # 忽略prompt部分的loss
return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'labels': labels
}
class Trainer:
"""训练器"""
def __init__(self, config: Config):
self.config = config
# 初始化tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(config.model_name)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# 初始化模型
logger.info(f"加载模型: {config.model_name}")
self.model = AutoModelForCausalLM.from_pretrained(
config.model_name,
torch_dtype=torch.float16 if config.fp16 else torch.float32,
device_map="auto"
)
# 统计参数量
total_params = sum(p.numel() for p in self.model.parameters())
trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
logger.info(f"总参数量: {total_params:,}")
logger.info(f"可训练参数: {trainable_params:,}")
# 加载数据集
self.train_dataset = InstructionDataset(
config.train_file, self.tokenizer, config.max_seq_length
)
self.val_dataset = InstructionDataset(
config.val_file, self.tokenizer, config.max_seq_length
)
# 创建DataLoader
self.train_loader = DataLoader(
self.train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=4,
pin_memory=True
)
self.val_loader = DataLoader(
self.val_dataset,
batch_size=config.batch_size,
shuffle=False,
num_workers=4,
pin_memory=True
)
# 初始化优化器
self.optimizer = self._create_optimizer()
# 初始化学习率调度器
total_steps = len(self.train_loader) * config.num_epochs // config.gradient_accumulation_steps
warmup_steps = int(total_steps * config.warmup_ratio)
self.scheduler = get_linear_schedule_with_warmup(
self.optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps
)
# 混合精度训练
self.scaler = torch.cuda.amp.GradScaler(enabled=config.fp16)
self.global_step = 0
def _create_optimizer(self):
"""创建优化器,对不同参数使用不同的权重衰减"""
# 不对bias和LayerNorm参数进行权重衰减
no_decay = ["bias", "LayerNorm.weight", "layer_norm.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in self.model.named_parameters()
if not any(nd in n for nd in no_decay) and p.requires_grad],
"weight_decay": self.config.weight_decay,
},
{
"params": [p for n, p in self.model.named_parameters()
if any(nd in n for nd in no_decay) and p.requires_grad],
"weight_decay": 0.0,
},
]
return AdamW(optimizer_grouped_parameters, lr=self.config.learning_rate)
def train_epoch(self, epoch: int):
"""训练一个epoch"""
self.model.train()
total_loss = 0
progress_bar = tqdm(self.train_loader, desc=f"Epoch {epoch+1}")
for step, batch in enumerate(progress_bar):
# 移动到GPU
batch = {k: v.to(self.config.device) for k, v in batch.items()}
# 前向传播(使用混合精度)
with torch.cuda.amp.autocast(enabled=self.config.fp16):
outputs = self.model(**batch)
loss = outputs.loss
loss = loss / self.config.gradient_accumulation_steps
# 反向传播
self.scaler.scale(loss).backward()
total_loss += loss.item()
# 梯度累积
if (step + 1) % self.config.gradient_accumulation_steps == 0:
# 梯度裁剪
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(
self.model.parameters(),
self.config.max_grad_norm
)
# 更新参数
self.scaler.step(self.optimizer)
self.scaler.update()
self.scheduler.step()
self.optimizer.zero_grad()
self.global_step += 1
# 日志记录
if self.global_step % self.config.logging_steps == 0:
avg_loss = total_loss / self.config.logging_steps
lr = self.scheduler.get_last_lr()[0]
logger.info(
f"Step {self.global_step}: loss={avg_loss:.4f}, lr={lr:.2e}"
)
total_loss = 0
# 验证
if self.global_step % self.config.eval_steps == 0:
self.evaluate()
self.model.train()
# 保存检查点
if self.global_step % self.config.save_steps == 0:
self.save_checkpoint()
progress_bar.set_postfix({"loss": loss.item() * self.config.gradient_accumulation_steps})
def evaluate(self):
"""评估模型"""
self.model.eval()
total_loss = 0
total_steps = 0
with torch.no_grad():
for batch in tqdm(self.val_loader, desc="Evaluating"):
batch = {k: v.to(self.config.device) for k, v in batch.items()}
with torch.cuda.amp.autocast(enabled=self.config.fp16):
outputs = self.model(**batch)
loss = outputs.loss
total_loss += loss.item()
total_steps += 1
avg_loss = total_loss / total_steps
perplexity = torch.exp(torch.tensor(avg_loss))
logger.info(f"验证集 - Loss: {avg_loss:.4f}, Perplexity: {perplexity:.2f}")
return avg_loss
def save_checkpoint(self):
"""保存检查点"""
output_dir = f"{self.config.output_dir}/checkpoint-{self.global_step}"
logger.info(f"保存检查点到 {output_dir}")
self.model.save_pretrained(output_dir)
self.tokenizer.save_pretrained(output_dir)
def train(self):
"""完整训练流程"""
logger.info("开始训练...")
for epoch in range(self.config.num_epochs):
self.train_epoch(epoch)
# 保存最终模型
logger.info(f"保存最终模型到 {self.config.output_dir}")
self.model.save_pretrained(self.config.output_dir)
self.tokenizer.save_pretrained(self.config.output_dir)
logger.info("训练完成!")
# 使用示例
if __name__ == "__main__":
config = Config()
trainer = Trainer(config)
trainer.train()
12.2.2 数据集准备(指令微调格式)
指令微调数据集的格式设计至关重要。最常见的格式包括:
1. Alpaca格式
{
"instruction": "给定一个电影评论,判断情感是正面还是负面。",
"input": "这部电影真是太棒了!演员演技精湛,剧情引人入胜。",
"output": "正面"
}
2. ShareGPT格式
{
"conversations": [
{"from": "human", "value": "什么是机器学习?"},
{"from": "gpt", "value": "机器学习是人工智能的一个分支..."},
{"from": "human", "value": "能举个例子吗?"},
{"from": "gpt", "value": "当然可以。例如垃圾邮件过滤..."}
]
}
3. 自定义格式转换器
from typing import List, Dict
import json
class DatasetConverter:
"""数据集格式转换器"""
@staticmethod
def to_alpaca_format(instruction: str, input_text: str = "", output: str = "") -> Dict:
"""转换为Alpaca格式"""
return {
"instruction": instruction,
"input": input_text,
"output": output
}
@staticmethod
def sharegpt_to_alpaca(sharegpt_data: Dict) -> List[Dict]:
"""ShareGPT格式转Alpaca格式"""
alpaca_data = []
conversations = sharegpt_data['conversations']
# 将多轮对话拆分为多个单轮样本
for i in range(0, len(conversations) - 1, 2):
if i + 1 < len(conversations):
human_msg = conversations[i]['value']
gpt_msg = conversations[i + 1]['value']
# 构建上下文(包含之前的对话)
context = ""
if i > 0:
for j in range(0, i, 2):
context += f"Q: {conversations[j]['value']}\n"
context += f"A: {conversations[j+1]['value']}\n\n"
alpaca_data.append({
"instruction": "根据对话历史回答问题。",
"input": context + f"Q: {human_msg}",
"output": gpt_msg
})
return alpaca_data
@staticmethod
def convert_qa_pair(question: str, answer: str, context: str = "") -> Dict:
"""通用问答对转换"""
if context:
return {
"instruction": "根据给定的上下文回答问题。",
"input": f"上下文:{context}\n\n问题:{question}",
"output": answer
}
else:
return {
"instruction": question,
"input": "",
"output": answer
}
@staticmethod
def create_training_file(data: List[Dict], output_path: str):
"""创建训练文件"""
with open(output_path, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
print(f"已保存 {len(data)} 条数据到 {output_path}")
# 使用示例
converter = DatasetConverter()
# 示例1:转换问答对
qa_data = [
converter.convert_qa_pair(
question="Python中的列表和元组有什么区别?",
answer="主要区别在于:1) 列表是可变的,元组是不可变的;2) 列表使用方括号[],元组使用圆括号();3) 列表支持修改、添加、删除元素,元组不支持。"
),
converter.convert_qa_pair(
question="什么是递归?",
answer="递归是函数调用自身的编程技术。它通常包含两部分:基础情况(终止条件)和递归情况(调用自身)。"
)
]
converter.create_training_file(qa_data, "qa_train.jsonl")
12.2.3 训练脚本
使用Hugging Face Transformers的高级API简化训练:
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
Trainer,
TrainingArguments,
DataCollatorForLanguageModeling
)
from datasets import load_dataset
import torch
def prepare_model_and_tokenizer(model_name: str):
"""准备模型和tokenizer"""
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 设置pad token
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
# 启用梯度检查点(节省内存)
model.gradient_checkpointing_enable()
return model, tokenizer
def tokenize_function(examples, tokenizer, max_length=2048):
"""Tokenize函数"""
# 构建完整文本
texts = []
for instruction, input_text, output in zip(
examples['instruction'],
examples['input'],
examples['output']
):
if input_text.strip():
prompt = f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n### Response:\n"
else:
prompt = f"### Instruction:\n{instruction}\n\n### Response:\n"
full_text = prompt + output + tokenizer.eos_token
texts.append(full_text)
# Tokenize
tokenized = tokenizer(
texts,
truncation=True,
max_length=max_length,
padding=False, # 由DataCollator处理padding
return_tensors=None
)
# 创建labels(与input_ids相同)
tokenized['labels'] = tokenized['input_ids'].copy()
return tokenized
def main():
# 配置
model_name = "meta-llama/Llama-2-7b-hf"
train_file = "train.jsonl"
val_file = "val.jsonl"
output_dir = "./finetuned_llama2"
# 加载模型和tokenizer
model, tokenizer = prepare_model_and_tokenizer(model_name)
# 加载数据集
dataset = load_dataset('json', data_files={
'train': train_file,
'validation': val_file
})
# Tokenize数据集
tokenized_dataset = dataset.map(
lambda x: tokenize_function(x, tokenizer),
batched=True,
remove_columns=dataset['train'].column_names,
desc="Tokenizing"
)
# Data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False # 因果语言模型,不使用MLM
)
# 训练参数
training_args = TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=True,
# 训练设置
num_train_epochs=3,
per_device_train_batch_size=4,
per_device_eval_batch_size=4,
gradient_accumulation_steps=8,
# 优化器设置
learning_rate=2e-5,
weight_decay=0.01,
adam_beta1=0.9,
adam_beta2=0.999,
adam_epsilon=1e-8,
max_grad_norm=1.0,
# 学习率调度
lr_scheduler_type="cosine",
warmup_ratio=0.03,
# 日志和保存
logging_steps=10,
save_steps=500,
eval_steps=500,
save_total_limit=3,
# 评估
evaluation_strategy="steps",
load_best_model_at_end=True,
metric_for_best_model="loss",
# 性能优化
fp16=True,
dataloader_num_workers=4,
dataloader_pin_memory=True,
# 其他
report_to=["tensorboard"],
seed=42
)
# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset['train'],
eval_dataset=tokenized_dataset['validation'],
data_collator=data_collator,
tokenizer=tokenizer
)
# 开始训练
print("开始训练...")
trainer.train()
# 保存最终模型
print(f"保存模型到 {output_dir}")
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
print("训练完成!")
if __name__ == "__main__":
main()
12.2.4 超参数设置
超参数调优对微调效果至关重要。以下是关键超参数的设置指南:
1. 学习率(Learning Rate)
# 不同模型大小的推荐学习率
learning_rate_guide = {
"small (<1B)": 5e-5,
"medium (1B-10B)": 2e-5,
"large (>10B)": 1e-5,
}
# 学习率查找器
class LearningRateFinder:
"""学习率查找器(Leslie Smith方法)"""
def __init__(self, model, optimizer, criterion):
self.model = model
self.optimizer = optimizer
self.criterion = criterion
self.history = {'lr': [], 'loss': []}
def range_test(self, train_loader, start_lr=1e-7, end_lr=1, num_iter=100):
"""执行学习率范围测试"""
lr_mult = (end_lr / start_lr) ** (1 / num_iter)
lr = start_lr
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
self.model.train()
avg_loss = 0
best_loss = float('inf')
batch_iter = iter(train_loader)
for iteration in range(num_iter):
try:
batch = next(batch_iter)
except StopIteration:
batch_iter = iter(train_loader)
batch = next(batch_iter)
# 前向传播
batch = {k: v.to('cuda') for k, v in batch.items()}
outputs = self.model(**batch)
loss = outputs.loss
# 计算平滑loss
avg_loss = 0.98 * avg_loss + 0.02 * loss.item()
smoothed_loss = avg_loss / (1 - 0.98 ** (iteration + 1))
# 记录
self.history['lr'].append(lr)
self.history['loss'].append(smoothed_loss)
# 如果loss爆炸,停止
if smoothed_loss > 4 * best_loss or torch.isnan(loss):
break
if smoothed_loss < best_loss:
best_loss = smoothed_loss
# 反向传播
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
# 更新学习率
lr *= lr_mult
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
return self.history
def plot(self):
"""绘制学习率vs损失曲线"""
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(self.history['lr'], self.history['loss'])
plt.xscale('log')
plt.xlabel('Learning Rate')
plt.ylabel('Loss')
plt.title('Learning Rate Finder')
plt.grid(True)
plt.show()
2. 批次大小(Batch Size)
def calculate_optimal_batch_size(
model_params: int,
gpu_memory_gb: int,
seq_length: int,
precision: str = "fp16"
) -> int:
"""计算最优批次大小"""
# 内存占用估算(单位:GB)
bytes_per_param = 2 if precision == "fp16" else 4
# 模型权重
model_memory = model_params * bytes_per_param / (1024**3)
# 优化器状态(Adam需要2倍参数量)
optimizer_memory = model_params * bytes_per_param * 2 / (1024**3)
# 梯度
gradient_memory = model_params * bytes_per_param / (1024**3)
# 固定开销
fixed_memory = model_memory + optimizer_memory + gradient_memory
# 可用内存(预留20%给CUDA)
available_memory = gpu_memory_gb * 0.8 - fixed_memory
# 每个样本的激活值内存(粗略估计)
activation_per_sample = (model_params * bytes_per_param * seq_length * 4) / (1024**3)
# 计算批次大小
batch_size = int(available_memory / activation_per_sample)
return max(1, batch_size)
# 示例
optimal_bs = calculate_optimal_batch_size(
model_params=7_000_000_000, # 7B
gpu_memory_gb=80, # A100
seq_length=2048,
precision="fp16"
)
print(f"推荐批次大小: {optimal_bs}")
3. 训练epoch数和早停
from transformers import TrainerCallback
import numpy as np
class EarlyStoppingCallback(TrainerCallback):
"""早停回调"""
def __init__(self, patience: int = 3, threshold: float = 0.001):
self.patience = patience
self.threshold = threshold
self.best_metric = None
self.patience_counter = 0
def on_evaluate(self, args, state, control, metrics, **kwargs):
"""评估后回调"""
current_metric = metrics.get('eval_loss')
if self.best_metric is None:
self.best_metric = current_metric
elif current_metric < self.best_metric - self.threshold:
# 有显著改善
self.best_metric = current_metric
self.patience_counter = 0
else:
# 没有改善
self.patience_counter += 1
if self.patience_counter >= self.patience:
print(f"早停触发!已经 {self.patience} 次评估没有改善。")
control.should_training_stop = True
return control
# 使用early stopping
early_stopping = EarlyStoppingCallback(patience=3, threshold=0.001)
4. 完整的超参数配置类
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class FinetuningConfig:
"""微调超参数配置"""
# 模型
model_name: str = "meta-llama/Llama-2-7b-hf"
# 数据
train_file: str = "train.jsonl"
val_file: str = "val.jsonl"
max_seq_length: int = 2048
# 训练
num_epochs: int = 3
batch_size: int = 4
gradient_accumulation_steps: int = 8
learning_rate: float = 2e-5
weight_decay: float = 0.01
warmup_ratio: float = 0.03
lr_scheduler_type: str = "cosine" # linear, cosine, constant
# 优化器
adam_beta1: float = 0.9
adam_beta2: float = 0.999
adam_epsilon: float = 1e-8
max_grad_norm: float = 1.0
# 正则化
dropout: float = 0.1
attention_dropout: float = 0.1
# 保存和日志
output_dir: str = "./output"
logging_steps: int = 10
save_steps: int = 500
eval_steps: int = 500
save_total_limit: int = 3
# 硬件
fp16: bool = True
bf16: bool = False
gradient_checkpointing: bool = True
# 早停
early_stopping: bool = True
early_stopping_patience: int = 3
early_stopping_threshold: float = 0.001
# 随机种子
seed: int = 42
def effective_batch_size(self) -> int:
"""有效批次大小"""
return self.batch_size * self.gradient_accumulation_steps
def training_steps(self, dataset_size: int) -> int:
"""总训练步数"""
return (dataset_size * self.num_epochs) // self.effective_batch_size()
def warmup_steps(self, dataset_size: int) -> int:
"""预热步数"""
return int(self.training_steps(dataset_size) * self.warmup_ratio)
# 使用示例
config = FinetuningConfig(
model_name="meta-llama/Llama-2-7b-hf",
learning_rate=2e-5,
num_epochs=3
)
print(f"有效批次大小: {config.effective_batch_size()}")
print(f"总训练步数: {config.training_steps(100000)}")
12.3 参数高效微调(PEFT)
全参数微调虽然效果好,但成本高昂。参数高效微调(Parameter-Efficient Fine-Tuning, PEFT)只更新少量参数,大幅降低计算和存储成本。
12.3.1 LoRA原理详解
LoRA(Low-Rank Adaptation)是最流行的PEFT方法,由微软研究院于2021年提出。
核心思想
预训练模型的权重矩阵在微调过程中的更新是低秩的。因此,我们可以用低秩分解来近似权重更新:
$$ W' = W_0 + \Delta W $$
其中:
- $W_0 \in \mathbb{R}^{d \times k}$ 是预训练权重(冻结)
- $\Delta W \in \mathbb{R}^{d \times k}$ 是权重更新
LoRA的关键洞察:$\Delta W$ 可以分解为两个低秩矩阵的乘积:
$$ \Delta W = BA $$
其中:
- $B \in \mathbb{R}^{d \times r}$
- $A \in \mathbb{R}^{r \times k}$
- $r \ll \min(d, k)$ 是秩(通常 $r \in [1, 64]$)
前向传播
原始: $$ h = W_0 x $$
LoRA: $$ h = W_0 x + \Delta W x = W_0 x + BAx $$
通过缩放因子 $\alpha$ 控制LoRA的影响:
$$ h = W_0 x + \frac{\alpha}{r} BAx $$
参数量对比
假设 $W_0 \in \mathbb{R}^{4096 \times 4096}$:
- 全参数微调:$4096 \times 4096 = 16,777,216$ 参数
- LoRA($r=8$):$4096 \times 8 + 8 \times 4096 = 65,536$ 参数
- 减少比例:$\frac{65,536}{16,777,216} = 0.39%$
对于7B模型:
- 全参数微调:7,000,000,000 参数
- LoRA(应用到所有线性层,$r=8$):约 8,000,000 参数
- 减少比例:0.11%
为什么LoRA有效?
理论依据来自于"内在维度假说"(Intrinsic Dimension Hypothesis):
- 任务适应的低维特性:模型适应新任务不需要改变所有参数,只需在低维子空间中调整
- 预训练的丰富表示:预训练模型已经学习了丰富的特征,微调只需要组合这些特征
- 正则化效应:低秩约束起到隐式正则化作用,防止过拟合
12.3.2 低秩分解数学推导
让我们深入推导LoRA的数学基础。
1. 矩阵秩和SVD
任意矩阵 $W \in \mathbb{R}^{m \times n}$ 可以进行奇异值分解(SVD):
$$ W = U\Sigma V^T $$
其中:
- $U \in \mathbb{R}^{m \times m}$ 是左奇异向量
- $\Sigma \in \mathbb{R}^{m \times n}$ 是奇异值对角矩阵
- $V \in \mathbb{R}^{n \times n}$ 是右奇异向量
秩-$r$ 近似:
$$ W_r = U_r \Sigma_r V_r^T $$
其中 $U_r, \Sigma_r, V_r$ 只保留前 $r$ 个最大奇异值对应的部分。
2. Frobenius范数误差界
根据Eckart-Young定理,秩-$r$ 近似是最优的:
$$ |W - W_r|F = \sqrt{\sum{i=r+1}^{\min(m,n)} \sigma_i^2} $$
这意味着如果奇异值快速衰减,低秩近似误差很小。
3. LoRA的参数化
LoRA不直接使用SVD,而是学习两个矩阵 $B$ 和 $A$:
$$ \Delta W = BA, \quad B \in \mathbb{R}^{d \times r}, A \in \mathbb{R}^{r \times k} $$
初始化策略:
- $A$:高斯分布初始化 $A \sim \mathcal{N}(0, \sigma^2)$
- $B$:零初始化 $B = 0$
这样初始时 $\Delta W = 0$,模型从预训练权重开始。
4. 梯度更新
损失函数:$\mathcal{L}(\theta)$
梯度:
$$ \frac{\partial \mathcal{L}}{\partial A} = B^T \frac{\partial \mathcal{L}}{\partial (\Delta W)} $$
$$ \frac{\partial \mathcal{L}}{\partial B} = \frac{\partial \mathcal{L}}{\partial (\Delta W)} A^T $$
其中 $\frac{\partial \mathcal{L}}{\partial (\Delta W)}$ 是对 $\Delta W$ 的梯度。
5. 计算复杂度分析
前向传播:
- 原始:$O(dk)$(矩阵-向量乘法)
- LoRA:$O(dk + dr + rk) = O(dk + r(d+k))$
当 $r \ll \min(d, k)$ 时,额外开销很小。
反向传播:
- 全参数微调:需要存储 $W_0$ 的梯度,$O(dk)$ 内存
- LoRA:只需要存储 $A, B$ 的梯度,$O(r(d+k))$ 内存
6. 秩的选择
秩 $r$ 的选择是trade-off:
- 太小($r < 4$):表达能力不足,性能下降
- 太大($r > 64$):参数量增加,效率降低
- 推荐范围:$r \in [8, 32]$
经验规律:
def suggest_lora_rank(task_complexity: str, model_size_b: float) -> int:
"""根据任务复杂度和模型大小建议LoRA秩"""
base_rank = {
"simple": 4, # 简单分类、情感分析
"moderate": 8, # 问答、摘要
"complex": 16, # 指令跟随、多任务
"very_complex": 32 # 复杂推理、代码生成
}[task_complexity]
# 大模型可以用稍大的秩
if model_size_b > 30:
base_rank *= 2
elif model_size_b > 10:
base_rank = int(base_rank * 1.5)
return base_rank
# 示例
print(suggest_lora_rank("complex", 7)) # 输出: 16
print(suggest_lora_rank("complex", 70)) # 输出: 32
12.3.3 LoRA代码实现
以下是从零开始实现LoRA的完整代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
import math
class LoRALayer(nn.Module):
"""LoRA层实现"""
def __init__(
self,
in_features: int,
out_features: int,
r: int = 8,
lora_alpha: float = 16,
lora_dropout: float = 0.1
):
super().__init__()
self.r = r
self.lora_alpha = lora_alpha
self.scaling = lora_alpha / r # 缩放因子
# LoRA矩阵
self.lora_A = nn.Parameter(torch.zeros(in_features, r))
self.lora_B = nn.Parameter(torch.zeros(r, out_features))
# Dropout
self.lora_dropout = nn.Dropout(p=lora_dropout) if lora_dropout > 0 else nn.Identity()
# 初始化
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
x: (batch_size, ..., in_features)
返回: (batch_size, ..., out_features)
"""
# x @ A @ B,使用scaling
lora_out = (self.lora_dropout(x) @ self.lora_A @ self.lora_B) * self.scaling
return lora_out
class LoRALinear(nn.Module):
"""带LoRA的线性层"""
def __init__(
self,
base_layer: nn.Linear,
r: int = 8,
lora_alpha: float = 16,
lora_dropout: float = 0.1
):
super().__init__()
# 冻结原始层
self.base_layer = base_layer
for param in self.base_layer.parameters():
param.requires_grad = False
# LoRA层
self.lora = LoRALayer(
in_features=base_layer.in_features,
out_features=base_layer.out_features,
r=r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""前向传播:原始层 + LoRA"""
return self.base_layer(x) + self.lora(x)
def merge_weights(self):
"""合并LoRA权重到base layer(推理优化)"""
with torch.no_grad():
# W' = W + BA * scaling
delta_w = (self.lora.lora_A @ self.lora.lora_B) * self.lora.scaling
self.base_layer.weight.data += delta_w.T
def unmerge_weights(self):
"""分离LoRA权重(用于继续训练)"""
with torch.no_grad():
delta_w = (self.lora.lora_A @ self.lora.lora_B) * self.lora.scaling
self.base_layer.weight.data -= delta_w.T
class LoRAAttention(nn.Module):
"""对Attention层应用LoRA"""
def __init__(
self,
attention_layer,
r: int = 8,
lora_alpha: float = 16,
lora_dropout: float = 0.1,
target_modules: Tuple[str, ...] = ("q_proj", "v_proj")
):
super().__init__()
self.attention_layer = attention_layer
self.target_modules = target_modules
# 对指定的模块应用LoRA
for name in target_modules:
if hasattr(attention_layer, name):
base_layer = getattr(attention_layer, name)
if isinstance(base_layer, nn.Linear):
lora_layer = LoRALinear(
base_layer,
r=r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout
)
setattr(self, name, lora_layer)
def forward(self, *args, **kwargs):
"""前向传播(委托给原始attention层)"""
return self.attention_layer(*args, **kwargs)
def apply_lora_to_model(
model: nn.Module,
r: int = 8,
lora_alpha: float = 16,
lora_dropout: float = 0.1,
target_modules: Optional[list] = None
) -> nn.Module:
"""
对模型应用LoRA
Args:
model: 预训练模型
r: LoRA秩
lora_alpha: 缩放因子
lora_dropout: Dropout率
target_modules: 目标模块名称列表(如["q_proj", "v_proj", "k_proj", "o_proj"])
"""
if target_modules is None:
# 默认对query和value投影应用LoRA
target_modules = ["q_proj", "v_proj"]
def _apply_lora(module, name=""):
"""递归应用LoRA"""
for child_name, child in module.named_children():
full_name = f"{name}.{child_name}" if name else child_name
# 检查是否是目标模块
if any(target in child_name for target in target_modules):
if isinstance(child, nn.Linear):
# 替换为LoRALinear
lora_linear = LoRALinear(
child,
r=r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout
)
setattr(module, child_name, lora_linear)
print(f"应用LoRA到: {full_name}")
else:
# 递归处理子模块
_apply_lora(child, full_name)
_apply_lora(model)
return model
def count_lora_parameters(model: nn.Module) -> Tuple[int, int, int]:
"""
统计模型参数
返回: (总参数, 可训练参数, LoRA参数)
"""
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
lora_params = 0
for name, module in model.named_modules():
if isinstance(module, LoRALayer):
lora_params += module.lora_A.numel() + module.lora_B.numel()
return total_params, trainable_params, lora_params
# 使用示例
if __name__ == "__main__":
from transformers import AutoModelForCausalLM
# 加载预训练模型
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
torch_dtype=torch.float32
)
print("原始模型参数:")
total, trainable, _ = count_lora_parameters(model)
print(f" 总参数: {total:,}")
print(f" 可训练: {trainable:,}")
# 应用LoRA
model = apply_lora_to_model(
model,
r=8,
lora_alpha=16,
target_modules=["c_attn", "c_proj"] # GPT-2的attention模块
)
print("\n应用LoRA后:")
total, trainable, lora = count_lora_parameters(model)
print(f" 总参数: {total:,}")
print(f" 可训练: {trainable:,}")
print(f" LoRA参数: {lora:,}")
print(f" 可训练比例: {trainable/total*100:.2f}%")
12.3.4 QLoRA(量化+LoRA)
QLoRA将量化技术与LoRA结合,进一步降低内存需求。核心思想:
- 4-bit量化:将预训练权重量化到4-bit
- NormalFloat (NF4):使用专门设计的量化数据类型
- 双重量化:对量化常数本身也进行量化
- 分页优化器:使用CPU内存存储优化器状态
QLoRA实现
import torch
import torch.nn as nn
from typing import Optional
import numpy as np
class NF4Quantizer:
"""NormalFloat 4-bit量化器"""
def __init__(self):
# NF4量化表(针对正态分布优化)
self.quantization_table = torch.tensor([
-1.0, -0.6962, -0.5251, -0.3949, -0.2844, -0.1848,
-0.0911, 0.0, 0.0911, 0.1848, 0.2844, 0.3949,
0.5251, 0.6962, 1.0, float('inf')
])
def quantize(self, tensor: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
量化张量到4-bit
返回: (量化后的索引, 缩放因子)
"""
# 计算缩放因子(绝对值最大值)
scale = tensor.abs().max()
# 归一化到[-1, 1]
normalized = tensor / (scale + 1e-8)
# 查找最近的量化值
quantized_indices = torch.zeros_like(tensor, dtype=torch.uint8)
for i, val in enumerate(self.quantization_table[:-1]):
next_val = self.quantization_table[i + 1]
mask = (normalized >= val) & (normalized < next_val)
quantized_indices[mask] = i
return quantized_indices, scale
def dequantize(self, indices: torch.Tensor, scale: torch.Tensor) -> torch.Tensor:
"""反量化"""
quantized_values = self.quantization_table[indices.long()]
return quantized_values * scale
class QLoRALinear(nn.Module):
"""量化LoRA线性层"""
def __init__(
self,
base_layer: nn.Linear,
r: int = 8,
lora_alpha: float = 16,
lora_dropout: float = 0.1
):
super().__init__()
self.in_features = base_layer.in_features
self.out_features = base_layer.out_features
# 量化基础权重
self.quantizer = NF4Quantizer()
with torch.no_grad():
quantized_weight, scale = self.quantizer.quantize(base_layer.weight.data)
# 存储量化后的权重(4-bit,打包存储)
self.register_buffer('quantized_weight', quantized_weight)
self.register_buffer('scale', scale)
# 偏置(如果有)
if base_layer.bias is not None:
self.register_buffer('bias', base_layer.bias.data)
else:
self.bias = None
# LoRA层(使用16-bit)
self.lora_A = nn.Parameter(
torch.zeros(self.in_features, r, dtype=torch.float16)
)
self.lora_B = nn.Parameter(
torch.zeros(r, self.out_features, dtype=torch.float16)
)
self.r = r
self.lora_alpha = lora_alpha
self.scaling = lora_alpha / r
self.lora_dropout = nn.Dropout(p=lora_dropout) if lora_dropout > 0 else nn.Identity()
# 初始化LoRA
nn.init.kaiming_uniform_(self.lora_A, a=np.sqrt(5))
nn.init.zeros_(self.lora_B)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""前向传播"""
# 反量化基础权重
base_weight = self.quantizer.dequantize(self.quantized_weight, self.scale)
# 基础线性变换
output = F.linear(x, base_weight, self.bias)
# LoRA变换
lora_output = (self.lora_dropout(x) @ self.lora_A @ self.lora_B) * self.scaling
return output + lora_output
def memory_footprint(self) -> dict:
"""计算内存占用"""
# 量化权重:4 bits per parameter
quantized_mem = self.quantized_weight.numel() * 0.5 / (1024**2) # MB
# LoRA参数:16 bits per parameter
lora_mem = (self.lora_A.numel() + self.lora_B.numel()) * 2 / (1024**2) # MB
# 缩放因子
scale_mem = self.scale.numel() * 4 / (1024**2) # MB
return {
'quantized_weight': quantized_mem,
'lora_params': lora_mem,
'scale': scale_mem,
'total': quantized_mem + lora_mem + scale_mem
}
def convert_to_qlora(model: nn.Module, r: int = 8, lora_alpha: float = 16) -> nn.Module:
"""将模型转换为QLoRA"""
def _convert_layer(module, name=""):
for child_name, child in list(module.named_children()):
full_name = f"{name}.{child_name}" if name else child_name
if isinstance(child, nn.Linear) and "lm_head" not in full_name:
# 转换为QLoRA层
qlora_layer = QLoRALinear(
child,
r=r,
lora_alpha=lora_alpha
)
setattr(module, child_name, qlora_layer)
print(f"转换为QLoRA: {full_name}")
else:
_convert_layer(child, full_name)
_convert_layer(model)
return model
# 使用示例
if __name__ == "__main__":
from transformers import AutoModelForCausalLM
# 加载模型
model = AutoModelForCausalLM.from_pretrained("gpt2")
# 原始内存占用
original_mem = sum(p.numel() * p.element_size() for p in model.parameters()) / (1024**2)
print(f"原始模型内存: {original_mem:.2f} MB")
# 转换为QLoRA
model = convert_to_qlora(model, r=8, lora_alpha=16)
# QLoRA内存占用
qlora_mem = 0
for module in model.modules():
if isinstance(module, QLoRALinear):
footprint = module.memory_footprint()
qlora_mem += footprint['total']
print(f"QLoRA模型内存: {qlora_mem:.2f} MB")
print(f"内存减少: {(1 - qlora_mem/original_mem)*100:.1f}%")
QLoRA的优势
def compare_memory_usage():
"""比较不同方法的内存使用"""
model_size_b = 7 # 7B模型
params = model_size_b * 1e9
# 全参数微调 (FP16)
full_finetune_mem = (
params * 2 + # 模型权重
params * 2 + # 梯度
params * 8 # Adam优化器状态
) / (1024**3)
# LoRA (FP16 base + FP16 LoRA)
r = 8
lora_params = params * 0.01 # 假设1%参数是LoRA
lora_mem = (
params * 2 + # 冻结的模型权重
lora_params * 2 + # LoRA权重
lora_params * 2 + # LoRA梯度
lora_params * 8 # LoRA优化器状态
) / (1024**3)
# QLoRA (4-bit base + FP16 LoRA)
qlora_mem = (
params * 0.5 + # 4-bit量化权重
lora_params * 2 + # LoRA权重
lora_params * 2 + # LoRA梯度
lora_params * 8 # LoRA优化器状态
) / (1024**3)
print(f"7B模型微调内存需求:")
print(f" 全参数微调: {full_finetune_mem:.1f} GB")
print(f" LoRA: {lora_mem:.1f} GB")
print(f" QLoRA: {qlora_mem:.1f} GB")
print(f"\nQLoRA相比全参数微调减少: {(1 - qlora_mem/full_finetune_mem)*100:.1f}%")
compare_memory_usage()
输出:
7B模型微调内存需求:
全参数微调: 84.0 GB
LoRA: 15.0 GB
QLoRA: 4.5 GB
QLoRA相比全参数微调减少: 94.6%
12.4 其他PEFT方法
除了LoRA,还有多种参数高效微调方法。
12.4.1 Adapter
Adapter在Transformer层之间插入小型瓶颈层(bottleneck layers)。
架构
class AdapterLayer(nn.Module):
"""Adapter层"""
def __init__(
self,
hidden_size: int,
adapter_size: int = 64,
dropout: float = 0.1
):
super().__init__()
# 下投影(降维)
self.down_project = nn.Linear(hidden_size, adapter_size)
# 非线性激活
self.activation = nn.ReLU()
# 上投影(升维)
self.up_project = nn.Linear(adapter_size, hidden_size)
# Dropout
self.dropout = nn.Dropout(dropout)
# 初始化为近似恒等映射
nn.init.zeros_(self.up_project.weight)
nn.init.zeros_(self.up_project.bias)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播(残差连接)
x: (batch_size, seq_len, hidden_size)
"""
# Adapter变换
adapter_out = self.down_project(x)
adapter_out = self.activation(adapter_out)
adapter_out = self.up_project(adapter_out)
adapter_out = self.dropout(adapter_out)
# 残差连接
return x + adapter_out
class AdapterTransformerLayer(nn.Module):
"""带Adapter的Transformer层"""
def __init__(
self,
transformer_layer: nn.Module,
hidden_size: int,
adapter_size: int = 64
):
super().__init__()
self.transformer_layer = transformer_layer
# 冻结原始层
for param in self.transformer_layer.parameters():
param.requires_grad = False
# 在attention后添加adapter
self.adapter_attn = AdapterLayer(hidden_size, adapter_size)
# 在FFN后添加adapter
self.adapter_ffn = AdapterLayer(hidden_size, adapter_size)
def forward(self, hidden_states, *args, **kwargs):
"""前向传播"""
# 原始Transformer层
outputs = self.transformer_layer(hidden_states, *args, **kwargs)
if isinstance(outputs, tuple):
hidden_states = outputs[0]
extra_outputs = outputs[1:]
else:
hidden_states = outputs
extra_outputs = ()
# 应用adapter(这里简化了,实际需要根据具体模型结构调整)
hidden_states = self.adapter_attn(hidden_states)
hidden_states = self.adapter_ffn(hidden_states)
if extra_outputs:
return (hidden_states,) + extra_outputs
return hidden_states
12.4.2 Prefix Tuning
Prefix Tuning在输入序列前添加可学习的前缀向量。
class PrefixTuning(nn.Module):
"""Prefix Tuning实现"""
def __init__(
self,
num_layers: int,
num_heads: int,
head_dim: int,
prefix_length: int = 20,
prefix_hidden_size: int = 512
):
super().__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
self.prefix_length = prefix_length
# 前缀参数(先投影到较小维度,再投影回来)
# 这种re-parameterization可以提高训练稳定性
self.prefix_encoder = nn.Sequential(
nn.Linear(prefix_length, prefix_hidden_size),
nn.Tanh(),
nn.Linear(prefix_hidden_size, num_layers * 2 * num_heads * head_dim)
)
# 初始化前缀
self.prefix_tokens = nn.Parameter(
torch.randn(prefix_length)
)
def forward(self, batch_size: int) -> torch.Tensor:
"""
生成前缀key和value
返回: (num_layers, 2, batch_size, num_heads, prefix_length, head_dim)
"""
# 编码前缀
prefix = self.prefix_encoder(self.prefix_tokens)
# Reshape: (num_layers, 2, num_heads, prefix_length, head_dim)
prefix = prefix.view(
self.num_layers,
2, # key和value
self.num_heads,
self.prefix_length,
self.head_dim
)
# 扩展batch维度
prefix = prefix.unsqueeze(2).expand(
self.num_layers,
2,
batch_size,
self.num_heads,
self.prefix_length,
self.head_dim
)
return prefix
class PrefixAttention(nn.Module):
"""带Prefix的Attention层"""
def __init__(
self,
attention_layer: nn.Module,
prefix_tuning: PrefixTuning
):
super().__init__()
self.attention_layer = attention_layer
self.prefix_tuning = prefix_tuning
# 冻结attention层
for param in self.attention_layer.parameters():
param.requires_grad = False
def forward(
self,
hidden_states: torch.Tensor,
layer_idx: int,
*args,
**kwargs
):
"""前向传播"""
batch_size = hidden_states.size(0)
# 获取该层的prefix
# prefix: (2, batch_size, num_heads, prefix_length, head_dim)
all_prefix = self.prefix_tuning(batch_size)
layer_prefix = all_prefix[layer_idx]
prefix_key = layer_prefix[0] # (batch_size, num_heads, prefix_length, head_dim)
prefix_value = layer_prefix[1]
# 修改attention计算以包含prefix
# 这里简化了,实际需要根据具体模型实现调整
return self.attention_layer(hidden_states, *args, **kwargs)
12.4.3 P-Tuning
P-Tuning v2是Prefix Tuning的改进版本,更加简单高效。
class PTuningV2(nn.Module):
"""P-Tuning v2实现"""
def __init__(
self,
num_layers: int,
hidden_size: int,
num_virtual_tokens: int = 20
):
super().__init__()
self.num_layers = num_layers
self.num_virtual_tokens = num_virtual_tokens
# 每一层都有独立的prompt embeddings
self.prompt_embeddings = nn.ParameterList([
nn.Parameter(torch.randn(num_virtual_tokens, hidden_size))
for _ in range(num_layers)
])
# 初始化
for embedding in self.prompt_embeddings:
nn.init.xavier_uniform_(embedding)
def forward(
self,
hidden_states: torch.Tensor,
layer_idx: int
) -> torch.Tensor:
"""
将prompt embeddings添加到hidden states
Args:
hidden_states: (batch_size, seq_len, hidden_size)
layer_idx: 当前层索引
Returns:
(batch_size, num_virtual_tokens + seq_len, hidden_size)
"""
batch_size = hidden_states.size(0)
# 获取该层的prompt embeddings
prompt_emb = self.prompt_embeddings[layer_idx]
# 扩展batch维度
prompt_emb = prompt_emb.unsqueeze(0).expand(batch_size, -1, -1)
# 拼接到输入前面
return torch.cat([prompt_emb, hidden_states], dim=1)
12.4.4 方法对比
import pandas as pd
def compare_peft_methods():
"""比较不同PEFT方法"""
methods = {
"方法": ["Full Fine-tuning", "LoRA", "QLoRA", "Adapter", "Prefix Tuning", "P-Tuning v2"],
"可训练参数比例": ["100%", "0.1-1%", "0.1-1%", "1-5%", "0.1-1%", "0.1-1%"],
"内存需求": ["很高", "中", "低", "中", "中", "中"],
"训练速度": ["慢", "快", "中", "中", "快", "快"],
"推理速度": ["快", "快*", "中", "慢", "快", "快"],
"实现复杂度": ["简单", "中", "高", "中", "高", "中"],
"效果": ["最好", "很好", "很好", "好", "好", "很好"],
"适用场景": [
"大数据集",
"通用微调",
"显存受限",
"多任务学习",
"生成任务",
"理解任务"
]
}
df = pd.DataFrame(methods)
print(df.to_string(index=False))
print("\n* LoRA推理速度:合并权重后与全参数微调相同")
compare_peft_methods()
各方法的数学对比
def calculate_peft_params(
model_params: int,
hidden_size: int,
num_layers: int,
method: str,
**kwargs
) -> dict:
"""计算不同PEFT方法的参数量"""
if method == "lora":
r = kwargs.get('r', 8)
# 假设应用到4个矩阵(Q, K, V, O)
lora_params = num_layers * 4 * (hidden_size * r + r * hidden_size)
trainable_ratio = lora_params / model_params
elif method == "adapter":
adapter_size = kwargs.get('adapter_size', 64)
# 每层2个adapter(attention后和FFN后)
adapter_params = num_layers * 2 * (hidden_size * adapter_size + adapter_size * hidden_size)
trainable_ratio = adapter_params / model_params
elif method == "prefix":
prefix_length = kwargs.get('prefix_length', 20)
num_heads = kwargs.get('num_heads', 32)
head_dim = hidden_size // num_heads
# 每层的key和value prefix
prefix_params = num_layers * 2 * num_heads * prefix_length * head_dim
trainable_ratio = prefix_params / model_params
elif method == "ptuning_v2":
num_virtual_tokens = kwargs.get('num_virtual_tokens', 20)
# 每层的virtual token embeddings
ptuning_params = num_layers * num_virtual_tokens * hidden_size
trainable_ratio = ptuning_params / model_params
else:
raise ValueError(f"Unknown method: {method}")
return {
'trainable_params': eval(f"{method}_params"),
'trainable_ratio': trainable_ratio,
'trainable_percentage': f"{trainable_ratio * 100:.2f}%"
}7B LLaMA模型
model_config = {
'model_params': 7_000_000_000,
'hidden_size': 4096,
'num_layers': 32,
'num_heads': 32
}
print("7B模型各PEFT方法参数量对比:\n")
for method in ['lora', 'adapter', 'prefix', 'ptuning_v2']:
result = calculate_peft_params(**model_config, method=method, r=8, adapter_size=64, prefix_length=20, num_virtual_tokens=20)
print(f"{method.upper()}:")
print(f" 可训练参数: {result['trainable_params']:,}")
print(f" 占比: {result['trainable_percentage']}\n")
12.5 完整微调实战
12.5.1 LLaMA-2微调代码
以下是使用LoRA微调LLaMA-2的完整代码:
#!/usr/bin/env python3
"""
LLaMA-2 LoRA微调完整脚本
"""
import os
import torch
import transformers
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling
)
from peft import (
LoraConfig,
get_peft_model,
prepare_model_for_kbit_training,
TaskType
)
from datasets import load_dataset
import bitsandbytes as bnb
from typing import Dict, List, Optional
import logging
# 配置日志
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class LLaMA2LoRATrainer:
"""LLaMA-2 LoRA微调器"""
def __init__(
self,
model_name: str = "meta-llama/Llama-2-7b-hf",
use_4bit: bool = True,
lora_r: int = 16,
lora_alpha: int = 32,
lora_dropout: float = 0.05,
target_modules: Optional[List[str]] = None
):
self.model_name = model_name
self.use_4bit = use_4bit
# LoRA配置
if target_modules is None:
# LLaMA-2的attention层
target_modules = [
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj"
]
self.lora_config = LoraConfig(
r=lora_r,
lora_alpha=lora_alpha,
target_modules=target_modules,
lora_dropout=lora_dropout,
bias="none",
task_type=TaskType.CAUSAL_LM
)
# 初始化模型和tokenizer
self.tokenizer = None
self.model = None
self._load_model()
def _load_model(self):
"""加载模型"""
logger.info(f"加载模型: {self.model_name}")
# 加载tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
self.tokenizer.pad_token = self.tokenizer.eos_token
self.tokenizer.padding_side = "right"
# 4-bit量化配置
if self.use_4bit:
from transformers import BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
# 加载模型
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True
)
# 准备模型用于kbit训练
self.model = prepare_model_for_kbit_training(self.model)
else:
# 加载FP16模型
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
# 启用梯度检查点
self.model.gradient_checkpointing_enable()
# 应用LoRA
self.model = get_peft_model(self.model, self.lora_config)
# 打印可训练参数
self.model.print_trainable_parameters()
def prepare_dataset(
self,
train_file: str,
val_file: Optional[str] = None,
max_length: int = 2048
):
"""准备数据集"""
logger.info("加载数据集...")
# 加载JSONL文件
data_files = {'train': train_file}
if val_file:
data_files['validation'] = val_file
dataset = load_dataset('json', data_files=data_files)
# Tokenize函数
def tokenize_function(examples):
# 构建提示词
prompts = []
for instruction, input_text, output in zip(
examples['instruction'],
examples.get('input', [''] * len(examples['instruction'])),
examples['output']
):
if input_text:
prompt = f"""<s>[INST] <<SYS>>
You are a helpful assistant.
<</SYS>>
{instruction}
Input: {input_text} [/INST] {output} </s>"""
else:
prompt = f"""<s>[INST] <<SYS>>
You are a helpful assistant.
<</SYS>>
{instruction} [/INST] {output} </s>"""
prompts.append(prompt)
# Tokenize
tokenized = self.tokenizer(
prompts,
truncation=True,
max_length=max_length,
padding=False
)
# Labels与input_ids相同
tokenized['labels'] = tokenized['input_ids'].copy()
return tokenized
# 应用tokenization
tokenized_dataset = dataset.map(
tokenize_function,
batched=True,
remove_columns=dataset['train'].column_names,
desc="Tokenizing"
)
logger.info(f"训练集大小: {len(tokenized_dataset['train'])}")
if 'validation' in tokenized_dataset:
logger.info(f"验证集大小: {len(tokenized_dataset['validation'])}")
return tokenized_dataset
def train(
self,
tokenized_dataset,
output_dir: str = "./llama2-lora",
num_epochs: int = 3,
per_device_train_batch_size: int = 4,
gradient_accumulation_steps: int = 4,
learning_rate: float = 2e-4,
warmup_ratio: float = 0.03,
logging_steps: int = 10,
save_steps: int = 500,
eval_steps: int = 500
):
"""训练模型"""
# 训练参数
training_args = TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=True,
num_train_epochs=num_epochs,
per_device_train_batch_size=per_device_train_batch_size,
per_device_eval_batch_size=per_device_train_batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
learning_rate=learning_rate,
weight_decay=0.01,
warmup_ratio=warmup_ratio,
lr_scheduler_type="cosine",
logging_steps=logging_steps,
save_steps=save_steps,
eval_steps=eval_steps if 'validation' in tokenized_dataset else None,
evaluation_strategy="steps" if 'validation' in tokenized_dataset else "no",
save_total_limit=3,
load_best_model_at_end=True if 'validation' in tokenized_dataset else False,
fp16=True,
bf16=False,
optim="paged_adamw_32bit", # 使用分页优化器
group_by_length=True,
report_to="tensorboard",
seed=42
)
# Data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm=False
)
# 创建Trainer
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=tokenized_dataset['train'],
eval_dataset=tokenized_dataset.get('validation'),
data_collator=data_collator
)
# 开始训练
logger.info("开始训练...")
trainer.train()
# 保存模型
logger.info(f"保存模型到 {output_dir}")
trainer.save_model(output_dir)
self.tokenizer.save_pretrained(output_dir)
logger.info("训练完成!")
def inference(self, prompt: str, max_new_tokens: int = 256) -> str:
"""推理"""
# 构建完整提示词
full_prompt = f"""<s>[INST] <<SYS>>
You are a helpful assistant.
<</SYS>>
{prompt} [/INST]"""
# Tokenize
inputs = self.tokenizer(
full_prompt,
return_tensors="pt",
truncation=True,
max_length=2048
).to(self.model.device)
# 生成
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
# 解码
generated_text = self.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return generated_text
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--model_name", type=str, default="meta-llama/Llama-2-7b-hf")
parser.add_argument("--train_file", type=str, required=True)
parser.add_argument("--val_file", type=str, default=None)
parser.add_argument("--output_dir", type=str, default="./llama2-lora")
parser.add_argument("--use_4bit", action="store_true")
parser.add_argument("--lora_r", type=int, default=16)
parser.add_argument("--lora_alpha", type=int, default=32)
parser.add_argument("--num_epochs", type=int, default=3)
parser.add_argument("--batch_size", type=int, default=4)
parser.add_argument("--learning_rate", type=float, default=2e-4)
args = parser.parse_args()
# 创建训练器
trainer = LLaMA2LoRATrainer(
model_name=args.model_name,
use_4bit=args.use_4bit,
lora_r=args.lora_r,
lora_alpha=args.lora_alpha
)
# 准备数据集
dataset = trainer.prepare_dataset(
train_file=args.train_file,
val_file=args.val_file
)
# 训练
trainer.train(
tokenized_dataset=dataset,
output_dir=args.output_dir,
num_epochs=args.num_epochs,
per_device_train_batch_size=args.batch_size,
learning_rate=args.learning_rate
)
# 测试推理
print("\n测试推理:")
test_prompt = "解释什么是机器学习?"
response = trainer.inference(test_prompt)
print(f"提示: {test_prompt}")
print(f"回答: {response}")
if __name__ == "__main__":
main()
运行脚本
# 安装依赖
pip install transformers peft datasets bitsandbytes accelerate
# 训练
python train_llama2_lora.py \
--model_name meta-llama/Llama-2-7b-hf \
--train_file train.jsonl \
--val_file val.jsonl \
--output_dir ./llama2-lora \
--use_4bit \
--lora_r 16 \
--lora_alpha 32 \
--num_epochs 3 \
--batch_size 4 \
--learning_rate 2e-4
12.5.2 Hugging Face Transformers
使用Transformers库的高级功能简化训练:
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import torch
class LLaMALoRAInference:
"""LLaMA LoRA推理类"""
def __init__(
self,
base_model_name: str,
lora_weights_path: str,
device: str = "cuda"
):
self.device = device
# 加载base model
print(f"加载基础模型: {base_model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
self.model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# 加载LoRA权重
print(f"加载LoRA权重: {lora_weights_path}")
self.model = PeftModel.from_pretrained(
self.model,
lora_weights_path,
torch_dtype=torch.float16
)
# 合并权重以加速推理
print("合并LoRA权重到基础模型...")
self.model = self.model.merge_and_unload()
self.model.eval()
def generate(
self,
prompt: str,
max_new_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.1
) -> str:
"""生成文本"""
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repetition_penalty=repetition_penalty,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
generated_text = self.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return generated_text
def chat(self):
"""交互式对话"""
print("进入对话模式(输入'quit'退出)")
while True:
user_input = input("\n用户: ")
if user_input.lower() == 'quit':
break
response = self.generate(user_input)
print(f"助手: {response}")
# 使用示例
if __name__ == "__main__":
inferencer = LLaMALoRAInference(
base_model_name="meta-llama/Llama-2-7b-hf",
lora_weights_path="./llama2-lora"
)
# 单次生成
prompt = "编写一个Python函数来计算斐波那契数列。"
response = inferencer.generate(prompt)
print(f"提示: {prompt}")
print(f"回答: {response}")
# 交互式对话
# inferencer.chat()
12.5.3 PEFT库使用
PEFT库提供了统一的接口来使用各种参数高效微调方法:
from peft import (
get_peft_config,
get_peft_model,
LoraConfig,
PrefixTuningConfig,
PromptEncoderConfig,
TaskType
)
def create_peft_model(base_model, method: str = "lora", **kwargs):
"""创建PEFT模型"""
if method == "lora":
peft_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=kwargs.get('r', 16),
lora_alpha=kwargs.get('lora_alpha', 32),
lora_dropout=kwargs.get('lora_dropout', 0.05),
target_modules=kwargs.get('target_modules', ["q_proj", "v_proj"]),
bias="none"
)
elif method == "prefix":
peft_config = PrefixTuningConfig(
task_type=TaskType.CAUSAL_LM,
num_virtual_tokens=kwargs.get('num_virtual_tokens', 20),
encoder_hidden_size=kwargs.get('encoder_hidden_size', 128)
)
elif method == "p-tuning":
peft_config = PromptEncoderConfig(
task_type=TaskType.CAUSAL_LM,
num_virtual_tokens=kwargs.get('num_virtual_tokens', 20),
encoder_hidden_size=kwargs.get('encoder_hidden_size', 128)
)
else:
raise ValueError(f"Unknown method: {method}")
# 创建PEFT模型
peft_model = get_peft_model(base_model, peft_config)
peft_model.print_trainable_parameters()
return peft_model
# 使用示例
from transformers import AutoModelForCausalLM
base_model = AutoModelForCausalLM.from_pretrained(
"gpt2",
torch_dtype=torch.float16
)
# LoRA
lora_model = create_peft_model(
base_model,
method="lora",
r=8,
lora_alpha=16,
target_modules=["c_attn", "c_proj"]
)
# Prefix Tuning
prefix_model = create_peft_model(
base_model,
method="prefix",
num_virtual_tokens=20
)
12.5.4 训练监控和评估
import wandb
from transformers import TrainerCallback
import numpy as np
class WandbCallback(TrainerCallback):
"""Weights & Biases回调"""
def __init__(self, project_name: str = "llama-lora-finetune"):
wandb.init(project=project_name)
def on_log(self, args, state, control, logs=None, **kwargs):
"""记录训练指标"""
if logs:
wandb.log(logs)
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
"""记录评估指标"""
if metrics:
wandb.log(metrics)
class PerplexityCallback(TrainerCallback):
"""困惑度计算回调"""
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
"""计算困惑度"""
if metrics and 'eval_loss' in metrics:
perplexity = np.exp(metrics['eval_loss'])
metrics['eval_perplexity'] = perplexity
print(f"\nPerplexity: {perplexity:.2f}")
# 评估函数
def evaluate_model(model, tokenizer, eval_dataset, device="cuda"):
"""评估模型性能"""
from torch.utils.data import DataLoader
from tqdm import tqdm
model.eval()
eval_loader = DataLoader(
eval_dataset,
batch_size=8,
shuffle=False
)
total_loss = 0
total_tokens = 0
with torch.no_grad():
for batch in tqdm(eval_loader, desc="Evaluating"):
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
# 累积损失
total_loss += loss.item() * batch['input_ids'].size(0)
total_tokens += batch['input_ids'].size(0)
avg_loss = total_loss / total_tokens
perplexity = np.exp(avg_loss)
return {
'loss': avg_loss,
'perplexity': perplexity
}
12.6 微调数据集制作
高质量的数据集是微调成功的关键。
12.6.1 指令数据格式(Alpaca、ShareGPT)
Alpaca格式示例
{
"instruction": "给定一个产品描述,生成一个吸引人的标题。",
"input": "这款智能手表具有心率监测、睡眠追踪、GPS定位等功能,续航可达7天。",
"output": "7天长续航智能手表:健康监测+GPS定位,您的全天候健康助手"
}
ShareGPT格式示例
{
"id": "unique_id_123",
"conversations": [
{
"from": "human",
"value": "如何学习编程?"
},
{
"from": "gpt",
"value": "学习编程可以从以下几个步骤开始:\n\n1. 选择一门编程语言(推荐Python作为入门语言)\n2. 学习基础语法和概念\n3. 通过实践项目巩固知识\n4. 阅读他人代码,学习最佳实践\n5. 参与开源项目\n\n持续练习是关键!"
},
{
"from": "human",
"value": "Python有哪些好的学习资源?"
},
{
"from": "gpt",
"value": "推荐以下Python学习资源:\n\n1. 官方文档(python.org)\n2. 《Python Crash Course》书籍\n3. Coursera上的Python课程\n4. LeetCode练习编程题\n5. GitHub上的优秀开源项目\n\n建议理论结合实践,边学边做。"
}
]
}
12.6.2 数据清洗和过滤
import json
import re
from typing import List, Dict
import unicodedata
class DataCleaner:
"""数据清洗器"""
def __init__(self):
# 定义过滤规则
self.min_length = 10
self.max_length = 2048
self.remove_patterns = [
r'http[s]?://\S+', # URL
r'\[removed\]', # Reddit删除标记
r'\[deleted\]' # 删除用户标记
]
def clean_text(self, text: str) -> str:
"""清洗单个文本"""
# 移除多余空白
text = ' '.join(text.split())
# 移除特殊模式
for pattern in self.remove_patterns:
text = re.sub(pattern, '', text)
# 归一化Unicode字符
text = unicodedata.normalize('NFKC', text)
# 移除控制字符
text = ''.join(char for char in text if unicodedata.category(char)[0] != 'C')
return text.strip()
def is_valid(self, item: Dict) -> bool:
"""检查数据是否有效"""
# 检查必需字段
if 'instruction' not in item or 'output' not in item:
return False
# 检查长度
instruction = item['instruction'].strip()
output = item['output'].strip()
if len(instruction) < self.min_length or len(output) < self.min_length:
return False
if len(instruction) > self.max_length or len(output) > self.max_length:
return False
# 检查语言(可选,这里简单检查是否包含中文或英文)
has_text = bool(re.search(r'[\u4e00-\u9fa5a-zA-Z]', instruction + output))
if not has_text:
return False
return True
def clean_dataset(self, input_file: str, output_file: str):
"""清洗整个数据集"""
valid_count = 0
invalid_count = 0
with open(input_file, 'r', encoding='utf-8') as fin, \
open(output_file, 'w', encoding='utf-8') as fout:
for line in fin:
try:
item = json.loads(line)
# 清洗文本
item['instruction'] = self.clean_text(item['instruction'])
if 'input' in item:
item['input'] = self.clean_text(item['input'])
item['output'] = self.clean_text(item['output'])
# 验证
if self.is_valid(item):
fout.write(json.dumps(item, ensure_ascii=False) + '\n')
valid_count += 1
else:
invalid_count += 1
except json.JSONDecodeError:
invalid_count += 1
continue
print(f"清洗完成:")
print(f" 有效数据: {valid_count}")
print(f" 无效数据: {invalid_count}")
print(f" 保留率: {valid_count/(valid_count+invalid_count)*100:.1f}%")
# 使用示例
cleaner = DataCleaner()
cleaner.clean_dataset("raw_data.jsonl", "cleaned_data.jsonl")
12.6.3 数据增强技巧
import random
from typing import List, Dict
import copy
class DataAugmentor:
"""数据增强器"""
def __init__(self):
# 同义改写模板
self.instruction_templates = [
"请{}",
"能否{}",
"帮我{}",
"{}",
]
def paraphrase_instruction(self, instruction: str) -> List[str]:
"""改写指令"""
paraphrases = []
for template in self.instruction_templates:
paraphrase = template.format(instruction)
if paraphrase != instruction:
paraphrases.append(paraphrase)
return paraphrases
def back_translation(self, text: str, target_lang: str = "en") -> str:
"""回译增强(需要翻译API)"""
# 这里是示意代码,实际需要调用翻译API
# translated = translate(text, src="zh", tgt=target_lang)
# back_translated = translate(translated, src=target_lang, tgt="zh")
# return back_translated
pass
def augment_by_paraphrasing(self, item: Dict) -> List[Dict]:
"""通过改写增强"""
augmented = [item] # 保留原始数据
# 改写指令
paraphrases = self.paraphrase_instruction(item['instruction'])
for paraphrase in paraphrases:
new_item = copy.deepcopy(item)
new_item['instruction'] = paraphrase
augmented.append(new_item)
return augmented
def augment_dataset(
self,
input_file: str,
output_file: str,
augmentation_factor: int = 2
):
"""增强整个数据集"""
augmented_data = []
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
item = json.loads(line)
# 应用增强
augmented_items = self.augment_by_paraphrasing(item)
# 限制增强数量
augmented_items = augmented_items[:augmentation_factor]
augmented_data.extend(augmented_items)
# 保存
with open(output_file, 'w', encoding='utf-8') as f:
for item in augmented_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
print(f"数据增强完成:")
print(f" 原始数据: {len(augmented_data) // augmentation_factor}")
print(f" 增强后数据: {len(augmented_data)}")
print(f" 增强倍数: {augmentation_factor}x")
# 使用示例
augmentor = DataAugmentor()
augmentor.augment_dataset(
"cleaned_data.jsonl",
"augmented_data.jsonl",
augmentation_factor=2
)
数据集质量评估
class DatasetQualityAnalyzer:
"""数据集质量分析器"""
def __init__(self):
pass
def analyze(self, data_file: str):
"""分析数据集质量"""
import matplotlib.pyplot as plt
# 统计信息
num_samples = 0
instruction_lengths = []
output_lengths = []
has_input_count = 0
with open(data_file, 'r', encoding='utf-8') as f:
for line in f:
item = json.loads(line)
num_samples += 1
instruction_lengths.append(len(item['instruction']))
output_lengths.append(len(item['output']))
if item.get('input', '').strip():
has_input_count += 1
# 打印统计
print(f"数据集统计:")
print(f" 样本数量: {num_samples}")
print(f" 包含输入字段: {has_input_count} ({has_input_count/num_samples*100:.1f}%)")
print(f"\n指令长度:")
print(f" 平均: {np.mean(instruction_lengths):.1f}")
print(f" 中位数: {np.median(instruction_lengths):.1f}")
print(f" 最小/最大: {min(instruction_lengths)}/{max(instruction_lengths)}")
print(f"\n输出长度:")
print(f" 平均: {np.mean(output_lengths):.1f}")
print(f" 中位数: {np.median(output_lengths):.1f}")
print(f" 最小/最大: {min(output_lengths)}/{max(output_lengths)}")
# 绘制分布图
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].hist(instruction_lengths, bins=50, edgecolor='black')
axes[0].set_title('指令长度分布')
axes[0].set_xlabel('字符数')
axes[0].set_ylabel('频数')
axes[1].hist(output_lengths, bins=50, edgecolor='black')
axes[1].set_title('输出长度分布')
axes[1].set_xlabel('字符数')
axes[1].set_ylabel('频数')
plt.tight_layout()
plt.savefig('dataset_analysis.png')
print(f"\n分布图已保存到 dataset_analysis.png")
# 使用示例
analyzer = DatasetQualityAnalyzer()
analyzer.analyze("augmented_data.jsonl")