05-HuggingFace Transformers源码解析
概述
HuggingFace Transformers是最流行的预训练模型库,本章深入分析其核心架构设计,包括模型抽象层、Tokenizer实现、Trainer训练框架、以及与分布式训练的集成,帮助读者理解如何构建一个灵活且高效的模型库。
整体架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ HuggingFace Transformers 架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Pipeline API │ │
│ │ text-generation │ text-classification │ question-answering │ ... │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Trainer Framework │ │
│ │ Trainer │ Seq2SeqTrainer │ TrainingArguments │ Callbacks │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Auto Classes │ │ Tokenizers │ │ Generation │ │
│ │ ├─ AutoModel │ │ ├─ PreTrained │ │ ├─ Greedy │ │
│ │ ├─ AutoConfig │ │ ├─ Fast │ │ ├─ Beam │ │
│ │ └─ AutoToken │ │ └─ Rust-based │ │ └─ Sampling │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ PreTrainedModel Base │ │
│ │ from_pretrained │ save_pretrained │ resize_token_embeddings │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Model Implementations │ │
│ │ BERT │ GPT-2 │ LLaMA │ T5 │ BLOOM │ Falcon │ Mistral │ ... │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
1. 模型抽象层设计
1.1 PreTrainedModel基类
# transformers/modeling_utils.py
class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin, PushToHubMixin):
"""
所有预训练模型的基类
提供模型加载、保存、权重初始化等通用功能
"""
# 类级别配置
config_class = None # 配置类
base_model_prefix = "" # 基础模型前缀
main_input_name = "input_ids" # 主输入名称
_auto_class = None # Auto类映射
_no_split_modules = None # 不可拆分的模块(用于模型并行)
_skip_keys_device_placement = None # 跳过设备放置的键
_keys_to_ignore_on_load_missing = None # 加载时忽略的缺失键
_keys_to_ignore_on_load_unexpected = None # 加载时忽略的意外键
_tied_weights_keys = None # 绑定权重的键
def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
super().__init__()
# 保存配置
self.config = config
# 设备和数据类型
self.name_or_path = config.name_or_path
self.warnings_issued = {}
# 生成配置
self.generation_config = GenerationConfig.from_model_config(config)
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path: Optional[Union[str, os.PathLike]],
*model_args,
config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None,
cache_dir: Optional[Union[str, os.PathLike]] = None,
ignore_mismatched_sizes: bool = False,
force_download: bool = False,
local_files_only: bool = False,
token: Optional[Union[str, bool]] = None,
revision: str = "main",
use_safetensors: bool = None,
**kwargs,
):
"""
从预训练模型加载
支持多种来源: HuggingFace Hub、本地路径、S3等
"""
# 步骤1: 加载配置
config, model_kwargs = cls.config_class.from_pretrained(
pretrained_model_name_or_path,
cache_dir=cache_dir,
return_unused_kwargs=True,
force_download=force_download,
**kwargs,
)
# 步骤2: 确定加载设备和数据类型
torch_dtype = kwargs.get("torch_dtype", None)
device_map = kwargs.get("device_map", None)
# 处理量化配置
quantization_config = kwargs.get("quantization_config", None)
if quantization_config is not None:
quantization_config = cls._prepare_quantization_config(
quantization_config, torch_dtype
)
# 步骤3: 初始化模型
with ContextManagers(init_contexts):
model = cls(config, *model_args, **model_kwargs)
# 步骤4: 加载权重
model = cls._load_pretrained_model(
model,
state_dict,
loaded_keys,
resolved_archive_file,
pretrained_model_name_or_path,
ignore_mismatched_sizes=ignore_mismatched_sizes,
sharded_metadata=sharded_metadata,
_fast_init=_fast_init,
low_cpu_mem_usage=low_cpu_mem_usage,
device_map=device_map,
offload_folder=offload_folder,
offload_state_dict=offload_state_dict,
dtype=torch_dtype,
keep_in_fp32_modules=keep_in_fp32_modules,
)
# 步骤5: 后处理
model.eval() # 默认设为评估模式
return model
@classmethod
def _load_pretrained_model(
cls,
model,
state_dict,
loaded_keys,
resolved_archive_file,
pretrained_model_name_or_path,
**kwargs,
):
"""
加载预训练权重到模型
处理键名映射、缺失键、意外键等情况
"""
# 获取模型期望的键
model_state_dict = model.state_dict()
expected_keys = list(model_state_dict.keys())
# 前缀处理
prefix = cls.base_model_prefix
if prefix:
# 移除或添加前缀
state_dict = cls._fix_key_prefix(state_dict, prefix)
# 检查缺失和意外的键
missing_keys = []
unexpected_keys = []
mismatched_keys = []
for key in expected_keys:
if key not in state_dict:
missing_keys.append(key)
for key in state_dict.keys():
if key not in expected_keys:
unexpected_keys.append(key)
# 检查尺寸不匹配
for key in state_dict.keys():
if key in model_state_dict:
model_shape = model_state_dict[key].shape
ckpt_shape = state_dict[key].shape
if model_shape != ckpt_shape:
if kwargs.get("ignore_mismatched_sizes"):
mismatched_keys.append((key, ckpt_shape, model_shape))
del state_dict[key]
else:
raise RuntimeError(
f"Size mismatch for {key}: {ckpt_shape} vs {model_shape}"
)
# 加载状态字典
model.load_state_dict(state_dict, strict=False)
# 处理绑定权重
model.tie_weights()
return model
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
is_main_process: bool = True,
state_dict: Optional[dict] = None,
save_function: Callable = torch.save,
push_to_hub: bool = False,
max_shard_size: Union[int, str] = "5GB",
safe_serialization: bool = True,
**kwargs,
):
"""
保存模型到目录
支持分片保存大模型
"""
os.makedirs(save_directory, exist_ok=True)
# 保存配置
self.config.save_pretrained(save_directory)
# 获取状态字典
if state_dict is None:
state_dict = self.state_dict()
# 处理绑定权重
state_dict = self._get_state_dict_for_save(state_dict)
# 分片保存
if max_shard_size is not None:
shards, index = self._shard_checkpoint(state_dict, max_shard_size)
for shard_file, shard in shards.items():
if safe_serialization:
save_file(shard, os.path.join(save_directory, shard_file))
else:
torch.save(shard, os.path.join(save_directory, shard_file))
# 保存索引
if index is not None:
with open(os.path.join(save_directory, "model.safetensors.index.json"), "w") as f:
json.dump(index, f, indent=2)
else:
# 单文件保存
if safe_serialization:
save_file(state_dict, os.path.join(save_directory, "model.safetensors"))
else:
torch.save(state_dict, os.path.join(save_directory, "pytorch_model.bin"))
def tie_weights(self):
"""
绑定输入和输出embedding权重
减少参数量并提高一致性
"""
if getattr(self.config, "tie_word_embeddings", True):
output_embeddings = self.get_output_embeddings()
input_embeddings = self.get_input_embeddings()
if output_embeddings is not None and input_embeddings is not None:
output_embeddings.weight = input_embeddings.weight
if getattr(output_embeddings, "bias", None) is not None:
output_embeddings.bias.data = nn.functional.pad(
output_embeddings.bias.data,
(0, input_embeddings.weight.shape[0] - output_embeddings.bias.shape[0]),
"constant",
0,
)
def resize_token_embeddings(
self, new_num_tokens: Optional[int] = None
) -> nn.Embedding:
"""
调整token embedding大小
用于添加特殊token或适应新的tokenizer
"""
old_embeddings = self.get_input_embeddings()
new_embeddings = self._get_resized_embeddings(old_embeddings, new_num_tokens)
self.set_input_embeddings(new_embeddings)
# 同步调整输出层
if self.get_output_embeddings() is not None:
old_lm_head = self.get_output_embeddings()
new_lm_head = self._get_resized_lm_head(old_lm_head, new_num_tokens)
self.set_output_embeddings(new_lm_head)
# 更新配置
self.config.vocab_size = new_num_tokens
# 重新绑定权重
self.tie_weights()
return self.get_input_embeddings()
def _get_resized_embeddings(
self, old_embeddings: nn.Embedding, new_num_tokens: int
) -> nn.Embedding:
"""
创建新尺寸的embedding层
保留旧权重,新增部分随机初始化
"""
old_num_tokens, old_embedding_dim = old_embeddings.weight.size()
if new_num_tokens == old_num_tokens:
return old_embeddings
# 创建新embedding
new_embeddings = nn.Embedding(
new_num_tokens,
old_embedding_dim,
device=old_embeddings.weight.device,
dtype=old_embeddings.weight.dtype,
)
# 复制旧权重
n = min(old_num_tokens, new_num_tokens)
new_embeddings.weight.data[:n, :] = old_embeddings.weight.data[:n, :]
# 初始化新增部分
if new_num_tokens > old_num_tokens:
# 使用平均值初始化新token
new_embeddings.weight.data[n:, :] = old_embeddings.weight.data.mean(dim=0)
return new_embeddings
1.2 PretrainedConfig配置类
# transformers/configuration_utils.py
class PretrainedConfig:
"""
模型配置基类
存储模型超参数和架构信息
"""
model_type: str = ""
is_composition: bool = False
attribute_map: Dict[str, str] = {}
def __init__(self, **kwargs):
# 模型类型标识
self.name_or_path = kwargs.pop("name_or_path", "")
# 输出配置
self.output_hidden_states = kwargs.pop("output_hidden_states", False)
self.output_attentions = kwargs.pop("output_attentions", False)
self.return_dict = kwargs.pop("return_dict", True)
# 设备配置
self.torch_dtype = kwargs.pop("torch_dtype", None)
# 量化配置
self.quantization_config = kwargs.pop("quantization_config", None)
# Pruning配置
self.pruned_heads = kwargs.pop("pruned_heads", {})
# 绑定权重
self.tie_word_embeddings = kwargs.pop("tie_word_embeddings", True)
# 存储额外参数
for key, value in kwargs.items():
setattr(self, key, value)
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path: Union[str, os.PathLike],
cache_dir: Optional[Union[str, os.PathLike]] = None,
force_download: bool = False,
**kwargs,
) -> "PretrainedConfig":
"""
从预训练配置加载
"""
# 解析配置文件路径
config_dict, kwargs = cls.get_config_dict(
pretrained_model_name_or_path, **kwargs
)
# 处理模型类型
if "model_type" in config_dict:
config_class = CONFIG_MAPPING[config_dict["model_type"]]
else:
config_class = cls
return config_class.from_dict(config_dict, **kwargs)
@classmethod
def get_config_dict(
cls, pretrained_model_name_or_path: Union[str, os.PathLike], **kwargs
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
获取配置字典
支持从Hub下载或本地加载
"""
# 确定配置文件路径
if os.path.isdir(pretrained_model_name_or_path):
config_file = os.path.join(pretrained_model_name_or_path, "config.json")
elif os.path.isfile(pretrained_model_name_or_path):
config_file = pretrained_model_name_or_path
else:
# 从Hub下载
config_file = hf_hub_download(
repo_id=pretrained_model_name_or_path,
filename="config.json",
cache_dir=kwargs.get("cache_dir"),
force_download=kwargs.get("force_download", False),
token=kwargs.get("token"),
)
# 加载JSON
with open(config_file, "r", encoding="utf-8") as f:
config_dict = json.load(f)
return config_dict, kwargs
def save_pretrained(
self, save_directory: Union[str, os.PathLike], push_to_hub: bool = False, **kwargs
):
"""
保存配置到目录
"""
os.makedirs(save_directory, exist_ok=True)
config_dict = self.to_dict()
# 保存为JSON
config_file = os.path.join(save_directory, "config.json")
with open(config_file, "w", encoding="utf-8") as f:
json.dump(config_dict, f, indent=2, sort_keys=True)
def to_dict(self) -> Dict[str, Any]:
"""
转换为字典
"""
output = copy.deepcopy(self.__dict__)
if hasattr(self.__class__, "model_type"):
output["model_type"] = self.__class__.model_type
return output
1.3 Auto类自动推断
# transformers/models/auto/auto_factory.py
class _BaseAutoModelClass:
"""
Auto类的基类
根据配置自动选择正确的模型类
"""
_model_mapping = None
@classmethod
def from_config(cls, config, **kwargs):
"""
从配置创建模型
"""
trust_remote_code = kwargs.pop("trust_remote_code", None)
if type(config) in cls._model_mapping.keys():
model_class = cls._model_mapping[type(config)]
return model_class(config, **kwargs)
raise ValueError(f"Unrecognized configuration class {config.__class__}")
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
"""
从预训练模型加载
自动推断模型类型
"""
config = kwargs.pop("config", None)
trust_remote_code = kwargs.get("trust_remote_code", None)
# 加载配置(如果未提供)
if not isinstance(config, PretrainedConfig):
config, kwargs = AutoConfig.from_pretrained(
pretrained_model_name_or_path,
return_unused_kwargs=True,
trust_remote_code=trust_remote_code,
**kwargs,
)
# 根据配置选择模型类
if type(config) in cls._model_mapping.keys():
model_class = cls._model_mapping[type(config)]
return model_class.from_pretrained(
pretrained_model_name_or_path,
*model_args,
config=config,
**kwargs,
)
raise ValueError(
f"Unrecognized configuration class {config.__class__} for this kind of AutoModel"
)
# 模型映射注册
MODEL_MAPPING = OrderedDict([
("llama", LlamaModel),
("gpt2", GPT2Model),
("bert", BertModel),
("t5", T5Model),
# ... 更多模型
])
MODEL_FOR_CAUSAL_LM_MAPPING = OrderedDict([
("llama", LlamaForCausalLM),
("gpt2", GPT2LMHeadModel),
("bloom", BloomForCausalLM),
# ... 更多模型
])
class AutoModel(_BaseAutoModelClass):
"""
自动模型基类
用于获取基础编码器/解码器
"""
_model_mapping = MODEL_MAPPING
class AutoModelForCausalLM(_BaseAutoModelClass):
"""
自动因果语言模型
用于文本生成任务
"""
_model_mapping = MODEL_FOR_CAUSAL_LM_MAPPING
class AutoConfig:
"""
自动配置类
"""
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, **kwargs):
config_dict, unused_kwargs = PretrainedConfig.get_config_dict(
pretrained_model_name_or_path, **kwargs
)
if "model_type" in config_dict:
config_class = CONFIG_MAPPING[config_dict["model_type"]]
else:
raise ValueError("Model type not found in config")
return config_class.from_dict(config_dict, **unused_kwargs)
2. Tokenizer实现
2.1 PreTrainedTokenizer基类
# transformers/tokenization_utils.py
class PreTrainedTokenizer(PreTrainedTokenizerBase):
"""
Python实现的Tokenizer基类
适用于需要自定义逻辑的场景
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 词表
self.vocab = {}
self.ids_to_tokens = {}
# 特殊token
self._bos_token = None
self._eos_token = None
self._unk_token = None
self._pad_token = None
self._cls_token = None
self._sep_token = None
self._mask_token = None
# 额外特殊token
self._additional_special_tokens = []
def tokenize(self, text: str, **kwargs) -> List[str]:
"""
将文本分词为token列表
子类必须实现此方法
"""
raise NotImplementedError
def _convert_token_to_id(self, token: str) -> int:
"""
将token转换为ID
"""
return self.vocab.get(token, self.vocab.get(self.unk_token))
def _convert_id_to_token(self, index: int) -> str:
"""
将ID转换为token
"""
return self.ids_to_tokens.get(index, self.unk_token)
def convert_tokens_to_ids(
self, tokens: Union[str, List[str]]
) -> Union[int, List[int]]:
"""
批量转换token到ID
"""
if isinstance(tokens, str):
return self._convert_token_to_id(tokens)
return [self._convert_token_to_id(token) for token in tokens]
def encode(
self,
text: Union[str, List[str]],
add_special_tokens: bool = True,
padding: Union[bool, str] = False,
truncation: Union[bool, str] = False,
max_length: Optional[int] = None,
return_tensors: Optional[str] = None,
**kwargs,
) -> Union[List[int], "torch.Tensor"]:
"""
编码文本为ID序列
"""
# 分词
tokens = self.tokenize(text)
# 转换为ID
ids = self.convert_tokens_to_ids(tokens)
# 添加特殊token
if add_special_tokens:
ids = self.build_inputs_with_special_tokens(ids)
# 截断
if truncation and max_length is not None:
ids = ids[:max_length]
# 填充
if padding:
ids = self.pad(ids, max_length=max_length, padding=padding)
# 转换为Tensor
if return_tensors == "pt":
import torch
ids = torch.tensor([ids])
return ids
def decode(
self,
token_ids: Union[int, List[int]],
skip_special_tokens: bool = False,
clean_up_tokenization_spaces: bool = True,
**kwargs,
) -> str:
"""
解码ID序列为文本
"""
if isinstance(token_ids, int):
token_ids = [token_ids]
# 过滤特殊token
if skip_special_tokens:
token_ids = [
tid for tid in token_ids
if tid not in self.all_special_ids
]
# 转换为token
tokens = [self._convert_id_to_token(tid) for tid in token_ids]
# 合并为文本
text = self.convert_tokens_to_string(tokens)
# 清理空格
if clean_up_tokenization_spaces:
text = self.clean_up_tokenization(text)
return text
class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
"""
Rust实现的快速Tokenizer
基于tokenizers库
"""
slow_tokenizer_class = None
def __init__(self, *args, tokenizer_object=None, **kwargs):
if tokenizer_object is not None:
self._tokenizer = tokenizer_object
else:
# 从配置构建
self._tokenizer = self._build_tokenizer()
super().__init__(**kwargs)
@property
def backend_tokenizer(self):
return self._tokenizer
def tokenize(
self, text: str, pair: Optional[str] = None, **kwargs
) -> List[str]:
"""
使用Rust后端分词
"""
return self._tokenizer.encode(text, pair).tokens
def encode(
self,
text: Union[str, List[str]],
text_pair: Optional[Union[str, List[str]]] = None,
add_special_tokens: bool = True,
padding: Union[bool, str] = False,
truncation: Union[bool, str] = False,
max_length: Optional[int] = None,
**kwargs,
) -> List[int]:
"""
快速编码
"""
# 使用Rust后端
encoding = self._tokenizer.encode(
text,
text_pair,
add_special_tokens=add_special_tokens,
)
return encoding.ids
def __call__(
self,
text: Union[str, List[str]],
text_pair: Optional[Union[str, List[str]]] = None,
add_special_tokens: bool = True,
padding: Union[bool, str] = False,
truncation: Union[bool, str] = False,
max_length: Optional[int] = None,
return_tensors: Optional[str] = None,
return_attention_mask: bool = True,
return_token_type_ids: bool = False,
**kwargs,
) -> BatchEncoding:
"""
主调用接口
支持批量处理
"""
# 判断是否批量
is_batched = isinstance(text, (list, tuple))
if is_batched:
batch_text_or_text_pairs = list(zip(text, text_pair)) if text_pair else text
else:
batch_text_or_text_pairs = [(text, text_pair)] if text_pair else [text]
# 批量编码
encodings = self._tokenizer.encode_batch(
batch_text_or_text_pairs,
add_special_tokens=add_special_tokens,
)
# 构建输出
input_ids = [e.ids for e in encodings]
attention_mask = [e.attention_mask for e in encodings] if return_attention_mask else None
token_type_ids = [e.type_ids for e in encodings] if return_token_type_ids else None
# 填充
if padding:
max_len = max_length or max(len(ids) for ids in input_ids)
input_ids = [ids + [self.pad_token_id] * (max_len - len(ids)) for ids in input_ids]
if attention_mask:
attention_mask = [m + [0] * (max_len - len(m)) for m in attention_mask]
# 转换为Tensor
data = {"input_ids": input_ids}
if attention_mask:
data["attention_mask"] = attention_mask
if token_type_ids:
data["token_type_ids"] = token_type_ids
return BatchEncoding(data, tensor_type=return_tensors)
2.2 BPE Tokenizer实现
# tokenizers库的Python包装
class BPETokenizer:
"""
Byte Pair Encoding Tokenizer
用于GPT系列模型
"""
def __init__(
self,
vocab: Dict[str, int],
merges: List[Tuple[str, str]],
unk_token: str = "<unk>",
):
self.vocab = vocab
self.vocab_reversed = {v: k for k, v in vocab.items()}
self.merges = merges
self.unk_token = unk_token
# 构建BPE ranks
self.bpe_ranks = dict(zip(merges, range(len(merges))))
# 缓存
self.cache = {}
def bpe(self, token: str) -> str:
"""
对单个token应用BPE
"""
if token in self.cache:
return self.cache[token]
word = tuple(token)
pairs = self.get_pairs(word)
if not pairs:
return token
while True:
# 找到rank最低的pair
bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float("inf")))
if bigram not in self.bpe_ranks:
break
# 合并
first, second = bigram
new_word = []
i = 0
while i < len(word):
try:
j = word.index(first, i)
new_word.extend(word[i:j])
i = j
except ValueError:
new_word.extend(word[i:])
break
if word[i] == first and i < len(word) - 1 and word[i + 1] == second:
new_word.append(first + second)
i += 2
else:
new_word.append(word[i])
i += 1
word = tuple(new_word)
if len(word) == 1:
break
pairs = self.get_pairs(word)
result = " ".join(word)
self.cache[token] = result
return result
@staticmethod
def get_pairs(word: Tuple[str, ...]) -> Set[Tuple[str, str]]:
"""
获取所有相邻字符对
"""
pairs = set()
prev_char = word[0]
for char in word[1:]:
pairs.add((prev_char, char))
prev_char = char
return pairs
def tokenize(self, text: str) -> List[str]:
"""
分词
"""
# 预处理
text = text.strip()
# 按空格分词后应用BPE
bpe_tokens = []
for token in text.split():
bpe_output = self.bpe(token)
bpe_tokens.extend(bpe_output.split())
return bpe_tokens
def encode(self, text: str) -> List[int]:
"""
编码为ID序列
"""
tokens = self.tokenize(text)
return [
self.vocab.get(token, self.vocab[self.unk_token])
for token in tokens
]
def decode(self, ids: List[int]) -> str:
"""
解码为文本
"""
tokens = [self.vocab_reversed[id] for id in ids]
return "".join(tokens).replace("</w>", " ").strip()
3. Trainer训练框架
3.1 Trainer核心实现
# transformers/trainer.py
class Trainer:
"""
HuggingFace通用训练器
封装训练循环,支持多种优化和分布式策略
"""
def __init__(
self,
model: Union[PreTrainedModel, nn.Module] = None,
args: TrainingArguments = None,
data_collator: Optional[DataCollator] = None,
train_dataset: Optional[Dataset] = None,
eval_dataset: Optional[Dataset] = None,
tokenizer: Optional[PreTrainedTokenizerBase] = None,
compute_metrics: Optional[Callable] = None,
callbacks: Optional[List[TrainerCallback]] = None,
optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR] = (None, None),
):
self.args = args
self.model = model
self.tokenizer = tokenizer
self.data_collator = data_collator
self.train_dataset = train_dataset
self.eval_dataset = eval_dataset
self.compute_metrics = compute_metrics
self.optimizer, self.lr_scheduler = optimizers
# 回调管理
self.callback_handler = CallbackHandler(
callbacks, self.model, self.tokenizer, self.optimizer, self.lr_scheduler
)
# 状态跟踪
self.state = TrainerState()
self.control = TrainerControl()
# 设置分布式
self._setup_distributed()
def _setup_distributed(self):
"""
配置分布式训练
"""
if self.args.local_rank != -1:
# 分布式数据并行
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
self.model = nn.parallel.DistributedDataParallel(
self.model,
device_ids=[self.args.local_rank],
output_device=self.args.local_rank,
find_unused_parameters=self.args.ddp_find_unused_parameters,
)
elif self.args.n_gpu > 1:
# 单机多卡DataParallel
self.model = nn.DataParallel(self.model)
def train(
self,
resume_from_checkpoint: Optional[str] = None,
trial: Union["optuna.Trial", Dict[str, Any]] = None,
**kwargs,
):
"""
主训练循环
"""
# 恢复检查点
if resume_from_checkpoint is not None:
self._load_from_checkpoint(resume_from_checkpoint)
# 准备数据加载器
train_dataloader = self.get_train_dataloader()
# 计算总步数
total_train_batch_size = (
self.args.train_batch_size
* self.args.gradient_accumulation_steps
* (torch.distributed.get_world_size() if self.args.local_rank != -1 else 1)
)
num_update_steps_per_epoch = len(train_dataloader) // self.args.gradient_accumulation_steps
max_steps = self.args.max_steps if self.args.max_steps > 0 else \
num_update_steps_per_epoch * self.args.num_train_epochs
# 创建优化器和调度器
self.create_optimizer_and_scheduler(num_training_steps=max_steps)
# 开始训练
self.state.is_training = True
self.callback_handler.on_train_begin(self.args, self.state, self.control)
for epoch in range(int(self.args.num_train_epochs)):
# Epoch开始回调
self.callback_handler.on_epoch_begin(self.args, self.state, self.control)
# 训练一个epoch
epoch_loss = self._train_epoch(train_dataloader, epoch)
# Epoch结束回调
self.callback_handler.on_epoch_end(self.args, self.state, self.control)
# 评估
if self.args.evaluation_strategy == "epoch":
metrics = self.evaluate()
self.callback_handler.on_evaluate(self.args, self.state, self.control, metrics)
# 保存检查点
if self.args.save_strategy == "epoch":
self._save_checkpoint(self.model, trial)
# 训练结束
self.callback_handler.on_train_end(self.args, self.state, self.control)
return TrainOutput(self.state.global_step, epoch_loss, metrics)
def _train_epoch(self, dataloader: DataLoader, epoch: int) -> float:
"""
训练一个epoch
"""
self.model.train()
total_loss = 0.0
for step, inputs in enumerate(dataloader):
# Step开始回调
self.callback_handler.on_step_begin(self.args, self.state, self.control)
# 前向传播
loss = self.training_step(self.model, inputs)
total_loss += loss.item()
# 梯度累积
if (step + 1) % self.args.gradient_accumulation_steps == 0:
# 梯度裁剪
if self.args.max_grad_norm is not None:
self._clip_gradients(self.args.max_grad_norm)
# 优化器步骤
self.optimizer.step()
self.lr_scheduler.step()
self.optimizer.zero_grad()
self.state.global_step += 1
# 日志记录
if self.state.global_step % self.args.logging_steps == 0:
self._log_metrics({"loss": loss.item(), "lr": self.lr_scheduler.get_last_lr()[0]})
# Step结束回调
self.callback_handler.on_step_end(self.args, self.state, self.control)
# 检查是否需要停止
if self.control.should_training_stop:
break
return total_loss / len(dataloader)
def training_step(
self, model: nn.Module, inputs: Dict[str, torch.Tensor]
) -> torch.Tensor:
"""
单个训练步骤
"""
model.train()
inputs = self._prepare_inputs(inputs)
# 混合精度训练
with self.autocast_smart_context_manager():
loss = self.compute_loss(model, inputs)
# 梯度缩放(混合精度)
if self.args.gradient_accumulation_steps > 1:
loss = loss / self.args.gradient_accumulation_steps
# 反向传播
if self.use_amp:
self.scaler.scale(loss).backward()
else:
loss.backward()
return loss.detach()
def compute_loss(
self, model: nn.Module, inputs: Dict[str, torch.Tensor], return_outputs: bool = False
) -> Union[torch.Tensor, Tuple[torch.Tensor, Any]]:
"""
计算损失
"""
labels = inputs.pop("labels", None)
outputs = model(**inputs)
# 如果模型返回loss
if "loss" in outputs:
loss = outputs["loss"]
elif labels is not None:
# 手动计算loss
logits = outputs.get("logits", outputs[0])
loss = nn.functional.cross_entropy(
logits.view(-1, logits.size(-1)),
labels.view(-1),
)
else:
raise ValueError("No loss computed")
return (loss, outputs) if return_outputs else loss
def evaluate(
self,
eval_dataset: Optional[Dataset] = None,
ignore_keys: Optional[List[str]] = None,
) -> Dict[str, float]:
"""
评估模型
"""
eval_dataloader = self.get_eval_dataloader(eval_dataset)
self.model.eval()
total_loss = 0.0
all_preds = []
all_labels = []
with torch.no_grad():
for inputs in eval_dataloader:
inputs = self._prepare_inputs(inputs)
with self.autocast_smart_context_manager():
loss, outputs = self.compute_loss(self.model, inputs, return_outputs=True)
total_loss += loss.item()
if self.compute_metrics is not None:
all_preds.append(outputs.logits.cpu())
all_labels.append(inputs["labels"].cpu())
metrics = {"eval_loss": total_loss / len(eval_dataloader)}
# 计算自定义指标
if self.compute_metrics is not None:
preds = torch.cat(all_preds)
labels = torch.cat(all_labels)
metrics.update(self.compute_metrics(EvalPrediction(preds, labels)))
return metrics
def create_optimizer_and_scheduler(self, num_training_steps: int):
"""
创建优化器和学习率调度器
"""
if self.optimizer is None:
# 参数分组(weight decay)
decay_parameters = self._get_decay_parameter_names(self.model)
optimizer_grouped_parameters = [
{
"params": [p for n, p in self.model.named_parameters() if n in decay_parameters],
"weight_decay": self.args.weight_decay,
},
{
"params": [p for n, p in self.model.named_parameters() if n not in decay_parameters],
"weight_decay": 0.0,
},
]
# 选择优化器
optimizer_cls = AdamW
optimizer_kwargs = {
"lr": self.args.learning_rate,
"betas": (self.args.adam_beta1, self.args.adam_beta2),
"eps": self.args.adam_epsilon,
}
self.optimizer = optimizer_cls(optimizer_grouped_parameters, **optimizer_kwargs)
if self.lr_scheduler is None:
self.lr_scheduler = get_scheduler(
self.args.lr_scheduler_type,
optimizer=self.optimizer,
num_warmup_steps=self.args.warmup_steps,
num_training_steps=num_training_steps,
)
def get_train_dataloader(self) -> DataLoader:
"""
创建训练数据加载器
"""
if self.train_dataset is None:
raise ValueError("Trainer: training requires a train_dataset")
# 分布式采样器
if self.args.local_rank != -1:
sampler = DistributedSampler(
self.train_dataset,
num_replicas=torch.distributed.get_world_size(),
rank=self.args.local_rank,
shuffle=True,
)
else:
sampler = RandomSampler(self.train_dataset)
return DataLoader(
self.train_dataset,
batch_size=self.args.per_device_train_batch_size,
sampler=sampler,
collate_fn=self.data_collator,
num_workers=self.args.dataloader_num_workers,
pin_memory=self.args.dataloader_pin_memory,
)
3.2 TrainingArguments配置
# transformers/training_args.py
@dataclass
class TrainingArguments:
"""
训练参数配置
"""
# 输出设置
output_dir: str = field(
metadata={"help": "输出目录"}
)
overwrite_output_dir: bool = field(
default=False,
metadata={"help": "是否覆盖输出目录"}
)
# 训练超参数
num_train_epochs: float = field(
default=3.0,
metadata={"help": "训练epoch数"}
)
max_steps: int = field(
default=-1,
metadata={"help": "最大训练步数,设置后覆盖num_train_epochs"}
)
per_device_train_batch_size: int = field(
default=8,
metadata={"help": "每设备训练batch大小"}
)
per_device_eval_batch_size: int = field(
default=8,
metadata={"help": "每设备评估batch大小"}
)
gradient_accumulation_steps: int = field(
default=1,
metadata={"help": "梯度累积步数"}
)
# 优化器设置
learning_rate: float = field(
default=5e-5,
metadata={"help": "初始学习率"}
)
weight_decay: float = field(
default=0.0,
metadata={"help": "权重衰减"}
)
adam_beta1: float = field(
default=0.9,
metadata={"help": "Adam beta1"}
)
adam_beta2: float = field(
default=0.999,
metadata={"help": "Adam beta2"}
)
adam_epsilon: float = field(
default=1e-8,
metadata={"help": "Adam epsilon"}
)
max_grad_norm: float = field(
default=1.0,
metadata={"help": "梯度裁剪阈值"}
)
# 学习率调度
lr_scheduler_type: str = field(
default="linear",
metadata={"help": "学习率调度类型: linear, cosine, polynomial"}
)
warmup_ratio: float = field(
default=0.0,
metadata={"help": "warmup比例"}
)
warmup_steps: int = field(
default=0,
metadata={"help": "warmup步数"}
)
# 混合精度
fp16: bool = field(
default=False,
metadata={"help": "是否使用FP16混合精度"}
)
bf16: bool = field(
default=False,
metadata={"help": "是否使用BF16混合精度"}
)
# 分布式训练
local_rank: int = field(
default=-1,
metadata={"help": "本地rank,用于分布式训练"}
)
ddp_find_unused_parameters: bool = field(
default=False,
metadata={"help": "DDP是否查找未使用参数"}
)
# 评估和保存
evaluation_strategy: str = field(
default="no",
metadata={"help": "评估策略: no, steps, epoch"}
)
eval_steps: int = field(
default=500,
metadata={"help": "评估间隔步数"}
)
save_strategy: str = field(
default="steps",
metadata={"help": "保存策略: no, steps, epoch"}
)
save_steps: int = field(
default=500,
metadata={"help": "保存间隔步数"}
)
save_total_limit: Optional[int] = field(
default=None,
metadata={"help": "最多保留检查点数"}
)
# 日志
logging_dir: Optional[str] = field(
default=None,
metadata={"help": "TensorBoard日志目录"}
)
logging_steps: int = field(
default=500,
metadata={"help": "日志记录间隔"}
)
# DeepSpeed集成
deepspeed: Optional[str] = field(
default=None,
metadata={"help": "DeepSpeed配置文件路径"}
)
# FSDP集成
fsdp: Optional[str] = field(
default="",
metadata={"help": "FSDP配置"}
)
@property
def train_batch_size(self) -> int:
"""
实际训练batch大小
"""
return self.per_device_train_batch_size * max(1, self.n_gpu)
@property
def n_gpu(self) -> int:
"""
GPU数量
"""
return torch.cuda.device_count()
3.3 回调系统
# transformers/trainer_callback.py
class TrainerCallback:
"""
训练回调基类
用于在训练过程中插入自定义逻辑
"""
def on_init_end(self, args, state, control, **kwargs):
"""Trainer初始化结束"""
pass
def on_train_begin(self, args, state, control, **kwargs):
"""训练开始"""
pass
def on_train_end(self, args, state, control, **kwargs):
"""训练结束"""
pass
def on_epoch_begin(self, args, state, control, **kwargs):
"""Epoch开始"""
pass
def on_epoch_end(self, args, state, control, **kwargs):
"""Epoch结束"""
pass
def on_step_begin(self, args, state, control, **kwargs):
"""Step开始"""
pass
def on_step_end(self, args, state, control, **kwargs):
"""Step结束"""
pass
def on_evaluate(self, args, state, control, metrics, **kwargs):
"""评估完成"""
pass
def on_save(self, args, state, control, **kwargs):
"""保存检查点"""
pass
def on_log(self, args, state, control, logs, **kwargs):
"""记录日志"""
pass
class EarlyStoppingCallback(TrainerCallback):
"""
早停回调
"""
def __init__(
self,
early_stopping_patience: int = 1,
early_stopping_threshold: float = 0.0,
):
self.early_stopping_patience = early_stopping_patience
self.early_stopping_threshold = early_stopping_threshold
self.early_stopping_patience_counter = 0
self.best_metric = None
def on_evaluate(self, args, state, control, metrics, **kwargs):
metric_value = metrics.get(args.metric_for_best_model)
if metric_value is None:
return
# 比较指标
operator = np.greater if args.greater_is_better else np.less
if self.best_metric is None or operator(metric_value, self.best_metric):
self.best_metric = metric_value
self.early_stopping_patience_counter = 0
else:
self.early_stopping_patience_counter += 1
# 检查是否需要早停
if self.early_stopping_patience_counter >= self.early_stopping_patience:
control.should_training_stop = True
class TensorBoardCallback(TrainerCallback):
"""
TensorBoard日志回调
"""
def __init__(self, tb_writer=None):
self.tb_writer = tb_writer
def on_train_begin(self, args, state, control, **kwargs):
if self.tb_writer is None:
self.tb_writer = SummaryWriter(log_dir=args.logging_dir)
def on_log(self, args, state, control, logs, **kwargs):
for key, value in logs.items():
if isinstance(value, (int, float)):
self.tb_writer.add_scalar(key, value, state.global_step)
def on_train_end(self, args, state, control, **kwargs):
if self.tb_writer:
self.tb_writer.close()
class ProgressCallback(TrainerCallback):
"""
进度条回调
"""
def __init__(self):
self.training_bar = None
self.current_step = 0
def on_train_begin(self, args, state, control, **kwargs):
self.training_bar = tqdm(total=state.max_steps)
def on_step_end(self, args, state, control, **kwargs):
self.current_step = state.global_step
self.training_bar.update(1)
def on_train_end(self, args, state, control, **kwargs):
self.training_bar.close()
4. Generation生成系统
4.1 GenerationMixin
# transformers/generation/utils.py
class GenerationMixin:
"""
生成混入类
为PreTrainedModel添加文本生成能力
"""
@torch.no_grad()
def generate(
self,
inputs: Optional[torch.Tensor] = None,
generation_config: Optional[GenerationConfig] = None,
logits_processor: Optional[LogitsProcessorList] = None,
stopping_criteria: Optional[StoppingCriteriaList] = None,
prefix_allowed_tokens_fn: Optional[Callable] = None,
synced_gpus: bool = False,
**kwargs,
) -> Union[torch.Tensor, "GenerateOutput"]:
"""
文本生成主入口
"""
# 合并配置
generation_config = generation_config or self.generation_config
generation_config = copy.deepcopy(generation_config)
generation_config.update(**kwargs)
# 准备输入
model_kwargs = generation_config.update_model_kwargs_for_generation(
outputs=None,
model_kwargs=kwargs,
)
# 准备input_ids
if inputs is None:
inputs = self._prepare_input_ids_for_generation(
bos_token_id=generation_config.bos_token_id
)
# 选择生成方法
if generation_config.num_beams == 1:
if generation_config.do_sample:
return self._sample(
inputs,
logits_processor=logits_processor,
stopping_criteria=stopping_criteria,
generation_config=generation_config,
**model_kwargs,
)
else:
return self._greedy_search(
inputs,
logits_processor=logits_processor,
stopping_criteria=stopping_criteria,
generation_config=generation_config,
**model_kwargs,
)
else:
return self._beam_search(
inputs,
logits_processor=logits_processor,
stopping_criteria=stopping_criteria,
generation_config=generation_config,
**model_kwargs,
)
def _greedy_search(
self,
input_ids: torch.Tensor,
logits_processor: LogitsProcessorList,
stopping_criteria: StoppingCriteriaList,
generation_config: GenerationConfig,
**model_kwargs,
) -> torch.Tensor:
"""
贪心搜索
每一步选择概率最高的token
"""
# 准备
unfinished_sequences = input_ids.new(input_ids.shape[0]).fill_(1)
while True:
# 前向传播
model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)
outputs = self(**model_inputs, return_dict=True)
# 获取下一个token的logits
next_token_logits = outputs.logits[:, -1, :]
# 应用logits处理器
next_token_logits = logits_processor(input_ids, next_token_logits)
# 选择最高概率的token
next_tokens = torch.argmax(next_token_logits, dim=-1)
# 更新序列
next_tokens = next_tokens * unfinished_sequences + \
generation_config.pad_token_id * (1 - unfinished_sequences)
input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
# 更新未完成标记
unfinished_sequences = unfinished_sequences & ~(
next_tokens == generation_config.eos_token_id
)
# 更新model_kwargs(KV cache等)
model_kwargs = self._update_model_kwargs_for_generation(
outputs, model_kwargs
)
# 检查停止条件
if stopping_criteria(input_ids, None) or unfinished_sequences.max() == 0:
break
return input_ids
def _sample(
self,
input_ids: torch.Tensor,
logits_processor: LogitsProcessorList,
stopping_criteria: StoppingCriteriaList,
generation_config: GenerationConfig,
**model_kwargs,
) -> torch.Tensor:
"""
采样生成
支持temperature、top-k、top-p
"""
unfinished_sequences = input_ids.new(input_ids.shape[0]).fill_(1)
while True:
# 前向传播
model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)
outputs = self(**model_inputs, return_dict=True)
next_token_logits = outputs.logits[:, -1, :]
# 应用logits处理器(包含temperature、top-k、top-p)
next_token_logits = logits_processor(input_ids, next_token_logits)
# 计算概率分布
probs = nn.functional.softmax(next_token_logits, dim=-1)
# 采样
next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)
# 更新序列
next_tokens = next_tokens * unfinished_sequences + \
generation_config.pad_token_id * (1 - unfinished_sequences)
input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
# 更新状态
unfinished_sequences = unfinished_sequences & ~(
next_tokens == generation_config.eos_token_id
)
model_kwargs = self._update_model_kwargs_for_generation(outputs, model_kwargs)
if stopping_criteria(input_ids, None) or unfinished_sequences.max() == 0:
break
return input_ids
def _beam_search(
self,
input_ids: torch.Tensor,
logits_processor: LogitsProcessorList,
stopping_criteria: StoppingCriteriaList,
generation_config: GenerationConfig,
**model_kwargs,
) -> torch.Tensor:
"""
束搜索
维护多个候选序列
"""
batch_size = input_ids.shape[0]
num_beams = generation_config.num_beams
# 扩展输入
input_ids = input_ids.unsqueeze(1).expand(batch_size, num_beams, -1)
input_ids = input_ids.contiguous().view(batch_size * num_beams, -1)
# beam scores
beam_scores = torch.zeros((batch_size, num_beams), device=input_ids.device)
beam_scores[:, 1:] = -1e9 # 初始只有第一个beam有效
beam_scores = beam_scores.view(-1)
while True:
model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)
outputs = self(**model_inputs, return_dict=True)
next_token_logits = outputs.logits[:, -1, :]
next_token_logits = logits_processor(input_ids, next_token_logits)
# 计算下一步分数
next_token_scores = nn.functional.log_softmax(next_token_logits, dim=-1)
next_token_scores = next_token_scores + beam_scores[:, None]
# 重塑为 (batch_size, num_beams * vocab_size)
vocab_size = next_token_scores.shape[-1]
next_token_scores = next_token_scores.view(batch_size, num_beams * vocab_size)
# 选择top-k
next_token_scores, next_tokens = torch.topk(
next_token_scores, 2 * num_beams, dim=1, largest=True, sorted=True
)
# 计算beam索引和token索引
next_indices = next_tokens // vocab_size
next_tokens = next_tokens % vocab_size
# 更新beam
beam_outputs = self._beam_step(
next_token_scores,
next_tokens,
next_indices,
beam_scores,
input_ids,
num_beams,
generation_config,
)
input_ids, beam_scores = beam_outputs[:2]
# 更新KV cache
model_kwargs = self._update_model_kwargs_for_generation(
outputs, model_kwargs, beam_idx=next_indices
)
if stopping_criteria(input_ids, None):
break
return input_ids
@dataclass
class GenerationConfig:
"""
生成配置
"""
# 基础配置
max_length: int = 20
max_new_tokens: Optional[int] = None
min_length: int = 0
# 特殊token
bos_token_id: Optional[int] = None
eos_token_id: Optional[int] = None
pad_token_id: Optional[int] = None
# 采样配置
do_sample: bool = False
temperature: float = 1.0
top_k: int = 50
top_p: float = 1.0
typical_p: float = 1.0
# 束搜索
num_beams: int = 1
num_beam_groups: int = 1
diversity_penalty: float = 0.0
length_penalty: float = 1.0
# 惩罚
repetition_penalty: float = 1.0
no_repeat_ngram_size: int = 0
# 输出
num_return_sequences: int = 1
output_scores: bool = False
output_attentions: bool = False
output_hidden_states: bool = False
return_dict_in_generate: bool = False
4.2 LogitsProcessor
# transformers/generation/logits_process.py
class LogitsProcessor:
"""
Logits处理器基类
用于在生成过程中修改logits
"""
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
raise NotImplementedError
class LogitsProcessorList(list):
"""
Logits处理器列表
"""
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
for processor in self:
scores = processor(input_ids, scores)
return scores
class TemperatureLogitsWarper(LogitsProcessor):
"""
Temperature缩放
temperature > 1: 更随机
temperature < 1: 更确定
"""
def __init__(self, temperature: float):
if temperature <= 0:
raise ValueError("temperature must be strictly positive")
self.temperature = temperature
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
return scores / self.temperature
class TopKLogitsWarper(LogitsProcessor):
"""
Top-K采样
只保留概率最高的K个token
"""
def __init__(self, top_k: int, filter_value: float = float("-inf")):
if top_k <= 0:
raise ValueError("top_k must be positive")
self.top_k = top_k
self.filter_value = filter_value
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
top_k = min(self.top_k, scores.size(-1))
# 找到第K大的值
indices_to_remove = scores < torch.topk(scores, top_k)[0][..., -1, None]
scores = scores.masked_fill(indices_to_remove, self.filter_value)
return scores
class TopPLogitsWarper(LogitsProcessor):
"""
Top-P (Nucleus) 采样
保留累积概率达到P的最小token集合
"""
def __init__(self, top_p: float, filter_value: float = float("-inf")):
if top_p < 0 or top_p > 1:
raise ValueError("top_p must be between 0 and 1")
self.top_p = top_p
self.filter_value = filter_value
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
# 排序
sorted_logits, sorted_indices = torch.sort(scores, descending=True)
cumulative_probs = torch.cumsum(
torch.softmax(sorted_logits, dim=-1), dim=-1
)
# 找到累积概率超过top_p的位置
sorted_indices_to_remove = cumulative_probs > self.top_p
# 保留第一个超过的token
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
# 还原顺序
indices_to_remove = sorted_indices_to_remove.scatter(
1, sorted_indices, sorted_indices_to_remove
)
scores = scores.masked_fill(indices_to_remove, self.filter_value)
return scores
class RepetitionPenaltyLogitsProcessor(LogitsProcessor):
"""
重复惩罚
降低已出现token的概率
"""
def __init__(self, penalty: float):
if penalty <= 0:
raise ValueError("penalty must be positive")
self.penalty = penalty
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
for i in range(scores.shape[0]):
for token_id in set(input_ids[i].tolist()):
if scores[i, token_id] < 0:
scores[i, token_id] *= self.penalty
else:
scores[i, token_id] /= self.penalty
return scores
class NoRepeatNGramLogitsProcessor(LogitsProcessor):
"""
N-gram重复惩罚
禁止生成已出现的n-gram
"""
def __init__(self, ngram_size: int):
if ngram_size <= 0:
raise ValueError("ngram_size must be positive")
self.ngram_size = ngram_size
def __call__(self, input_ids: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
batch_size = scores.shape[0]
for batch_idx in range(batch_size):
# 收集已出现的n-gram
generated_ngrams = self._get_generated_ngrams(
input_ids[batch_idx], self.ngram_size
)
# 获取当前n-gram前缀
if len(input_ids[batch_idx]) >= self.ngram_size - 1:
ngram_prefix = tuple(
input_ids[batch_idx][-(self.ngram_size - 1):].tolist()
)
# 禁止会导致重复n-gram的token
banned_tokens = generated_ngrams.get(ngram_prefix, [])
for token in banned_tokens:
scores[batch_idx, token] = float("-inf")
return scores
@staticmethod
def _get_generated_ngrams(input_ids: torch.Tensor, ngram_size: int) -> Dict[Tuple, List]:
generated_ngrams = {}
ids_list = input_ids.tolist()
for i in range(len(ids_list) - ngram_size + 1):
ngram = tuple(ids_list[i:i + ngram_size])
prefix = ngram[:-1]
token = ngram[-1]
if prefix not in generated_ngrams:
generated_ngrams[prefix] = []
generated_ngrams[prefix].append(token)
return generated_ngrams
5. 与分布式训练集成
5.1 Accelerate集成
# transformers/trainer.py (Accelerate集成部分)
class Trainer:
"""
与Accelerate库集成
提供统一的分布式训练接口
"""
def _setup_accelerator(self):
"""
设置Accelerate加速器
"""
from accelerate import Accelerator
from accelerate.utils import DistributedDataParallelKwargs
# DDP配置
ddp_kwargs = DistributedDataParallelKwargs(
find_unused_parameters=self.args.ddp_find_unused_parameters,
broadcast_buffers=self.args.ddp_broadcast_buffers,
)
# 创建加速器
self.accelerator = Accelerator(
mixed_precision=self._get_mixed_precision(),
gradient_accumulation_steps=self.args.gradient_accumulation_steps,
kwargs_handlers=[ddp_kwargs],
)
# 准备模型、优化器、数据加载器
self.model, self.optimizer, self.train_dataloader = self.accelerator.prepare(
self.model, self.optimizer, self.train_dataloader
)
def _get_mixed_precision(self) -> str:
"""
获取混合精度类型
"""
if self.args.bf16:
return "bf16"
elif self.args.fp16:
return "fp16"
return "no"
def _accelerator_backward(self, loss: torch.Tensor):
"""
使用Accelerate进行反向传播
自动处理梯度累积和混合精度
"""
self.accelerator.backward(loss)
# 与PEFT集成
class PeftModelForCausalLM:
"""
PEFT (Parameter-Efficient Fine-Tuning) 集成
支持LoRA、Prefix Tuning等
"""
@classmethod
def from_pretrained(
cls,
model_id: str,
peft_config: "PeftConfig",
**kwargs,
):
"""
从预训练模型加载并应用PEFT
"""
from peft import get_peft_model, LoraConfig
# 加载基础模型
base_model = AutoModelForCausalLM.from_pretrained(model_id, **kwargs)
# 应用PEFT
model = get_peft_model(base_model, peft_config)
return model
def print_trainable_parameters(self):
"""
打印可训练参数统计
"""
trainable_params = 0
all_params = 0
for _, param in self.named_parameters():
all_params += param.numel()
if param.requires_grad:
trainable_params += param.numel()
print(
f"trainable params: {trainable_params} || "
f"all params: {all_params} || "
f"trainable%: {100 * trainable_params / all_params:.2f}%"
)
5.2 DeepSpeed集成
# transformers/deepspeed.py
class HfDeepSpeedConfig:
"""
DeepSpeed配置包装器
"""
def __init__(self, config_file_or_dict):
if isinstance(config_file_or_dict, str):
with open(config_file_or_dict, "r") as f:
config = json.load(f)
else:
config = config_file_or_dict
self.config = config
# 设置环境变量
self._set_env_vars()
def _set_env_vars(self):
"""
设置DeepSpeed相关环境变量
"""
if "zero_optimization" in self.config:
zero_config = self.config["zero_optimization"]
stage = zero_config.get("stage", 0)
if stage == 3:
# ZeRO Stage 3需要特殊处理
os.environ["DEEPSPEED_ZERO_STAGE"] = "3"
def is_zero3(self) -> bool:
"""
检查是否是ZeRO Stage 3
"""
if "zero_optimization" not in self.config:
return False
return self.config["zero_optimization"].get("stage", 0) == 3
def deepspeed_init(
trainer,
num_training_steps: int,
resume_from_checkpoint: Optional[str] = None,
):
"""
初始化DeepSpeed
"""
import deepspeed
# 准备配置
ds_config = trainer.args.deepspeed
if isinstance(ds_config, str):
with open(ds_config, "r") as f:
ds_config = json.load(f)
# 填充自动参数
ds_config = _fill_auto_values(ds_config, trainer.args)
# 初始化
model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=trainer.model,
model_parameters=trainer.model.parameters(),
config=ds_config,
)
# 恢复检查点
if resume_from_checkpoint is not None:
model.load_checkpoint(resume_from_checkpoint)
return model, optimizer, lr_scheduler
def _fill_auto_values(ds_config: dict, args: TrainingArguments) -> dict:
"""
填充DeepSpeed配置中的auto值
"""
config = copy.deepcopy(ds_config)
# 训练batch size
if config.get("train_batch_size") == "auto":
config["train_batch_size"] = (
args.per_device_train_batch_size
* args.gradient_accumulation_steps
* args.world_size
)
# 学习率
if "optimizer" in config:
if config["optimizer"].get("params", {}).get("lr") == "auto":
config["optimizer"]["params"]["lr"] = args.learning_rate
# Scheduler
if "scheduler" in config:
scheduler_params = config["scheduler"].get("params", {})
if scheduler_params.get("warmup_num_steps") == "auto":
scheduler_params["warmup_num_steps"] = args.warmup_steps
return config
6. 架构设计模式总结
6.1 核心设计原则
┌─────────────────────────────────────────────────────────────────────┐
│ HuggingFace Transformers 设计原则 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 抽象与复用 │
│ ├─ PreTrainedModel: 统一的模型基类 │
│ ├─ PretrainedConfig: 统一的配置管理 │
│ └─ Auto类: 自动类型推断 │
│ │
│ 2. 可扩展性 │
│ ├─ 回调系统: 插件式扩展训练流程 │
│ ├─ LogitsProcessor: 可组合的生成控制 │
│ └─ 模型注册: 新模型易于集成 │
│ │
│ 3. 易用性 │
│ ├─ Pipeline: 一行代码完成推理 │
│ ├─ Trainer: 简化训练流程 │
│ └─ from_pretrained: 统一的加载接口 │
│ │
│ 4. 性能 │
│ ├─ Fast Tokenizer: Rust实现的高性能分词 │
│ ├─ 混合精度: 自动FP16/BF16支持 │
│ └─ 分布式: 与DeepSpeed/FSDP无缝集成 │
│ │
└─────────────────────────────────────────────────────────────────────┘
6.2 类继承关系
nn.Module
│
└── PreTrainedModel (GenerationMixin, PushToHubMixin)
│
├── BertPreTrainedModel
│ ├── BertModel
│ ├── BertForSequenceClassification
│ └── BertForMaskedLM
│
├── GPT2PreTrainedModel
│ ├── GPT2Model
│ └── GPT2LMHeadModel
│
└── LlamaPreTrainedModel
├── LlamaModel
└── LlamaForCausalLM
PreTrainedTokenizerBase
│
├── PreTrainedTokenizer (Python实现)
│ ├── BertTokenizer
│ └── GPT2Tokenizer
│
└── PreTrainedTokenizerFast (Rust实现)
├── BertTokenizerFast
└── GPT2TokenizerFast
7. 最佳实践
7.1 模型加载优化
# 大模型加载最佳实践
# 1. 使用device_map自动分配
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-70b",
device_map="auto", # 自动分配到多GPU
torch_dtype=torch.float16, # 使用FP16减少内存
)
# 2. 使用量化减少内存
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-70b",
quantization_config=quantization_config,
device_map="auto",
)
# 3. 使用Flash Attention
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b",
attn_implementation="flash_attention_2",
torch_dtype=torch.float16,
)
7.2 高效训练配置
# 使用PEFT进行高效微调
from peft import LoraConfig, get_peft_model
lora_config = LoraConfig(
r=16,
lora_alpha=32,
target_modules=["q_proj", "v_proj"],
lora_dropout=0.1,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters() # 查看可训练参数比例
# 训练配置
training_args = TrainingArguments(
output_dir="./results",
per_device_train_batch_size=4,
gradient_accumulation_steps=4, # 有效batch size = 16
num_train_epochs=3,
learning_rate=2e-4,
fp16=True, # 混合精度
logging_steps=10,
save_strategy="epoch",
warmup_ratio=0.03,
lr_scheduler_type="cosine",
optim="adamw_torch",
gradient_checkpointing=True, # 激活检查点
)
8. 面试高频问题
Q1: HuggingFace Transformers的Auto类是如何实现自动模型选择的?
答案要点:
- 配置映射: 维护model_type到模型类的映射字典
- 配置检测: 从config.json读取model_type字段
- 动态加载: 根据model_type选择对应的模型类
- 自定义支持: 支持trust_remote_code加载自定义模型
Q2: PreTrainedModel的from_pretrained流程是什么?
答案要点:
- 配置加载: 下载或读取config.json
- 模型初始化: 根据配置创建模型结构
- 权重加载: 下载并加载预训练权重
- 键名处理: 处理前缀、缺失键、意外键
- 后处理: 绑定权重、设为eval模式
Q3: Tokenizer的Fast版本和普通版本有什么区别?
答案要点:
- 实现语言: Fast用Rust,普通用Python
- 性能: Fast快10-100倍
- 功能: Fast支持offset mapping等高级特性
- 兼容性: 两者API兼容,结果一致
Q4: Trainer的回调系统如何工作?
答案要点:
- 生命周期钩子: 提供train_begin/end、epoch_begin/end、step_begin/end等钩子
- TrainerControl: 通过control对象控制训练流程
- 组合模式: 多个回调可以组合使用
- 内置回调: 提供EarlyStopping、TensorBoard等常用回调
Q5: Generation的采样策略有哪些?如何实现?
答案要点:
- 贪心搜索: 每步选最高概率token
- 采样: 按概率分布随机选择
- Top-K: 只从前K个token中采样
- Top-P: 从累积概率达P的token中采样
- Beam Search: 维护多个候选序列
- LogitsProcessor: 通过处理器链修改logits
9. 学习资源
官方文档
源码阅读
- modeling_utils.py - PreTrainedModel实现
- trainer.py - Trainer实现
- generation/utils.py - 生成系统
推荐学习路径
- 理解PreTrainedModel和Auto类设计
- 学习Tokenizer实现原理
- 掌握Trainer和回调系统
- 深入Generation采样策略
- 学习与分布式训练框架集成