AI 与机器学习面试题
一、深度学习基础
1. 神经网络基础原理
问题:详细说明前馈神经网络的前向传播和反向传播过程。
核心概念:
前馈神经网络(Feedforward Neural Network)是最基础的神经网络结构,信息只沿一个方向流动。
数学原理:
import numpy as np
class NeuralNetwork:
def __init__(self, layers):
"""
初始化神经网络
layers: 列表,表示每层的神经元数量,如 [784, 128, 64, 10]
"""
self.num_layers = len(layers)
self.layers = layers
# 初始化权重和偏置
# He 初始化:适用于 ReLU 激活函数
self.weights = [
np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
for i in range(self.num_layers - 1)
]
# 偏置初始化为 0
self.biases = [
np.zeros((1, layers[i+1]))
for i in range(self.num_layers - 1)
]
def sigmoid(self, z):
"""Sigmoid 激活函数"""
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def sigmoid_derivative(self, z):
"""Sigmoid 导数"""
s = self.sigmoid(z)
return s * (1 - s)
def relu(self, z):
"""ReLU 激活函数"""
return np.maximum(0, z)
def relu_derivative(self, z):
"""ReLU 导数"""
return (z > 0).astype(float)
def softmax(self, z):
"""Softmax 激活函数"""
exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
return exp_z / np.sum(exp_z, axis=1, keepdims=True)
def forward(self, X):
"""
前向传播
X: 输入数据,shape (batch_size, input_dim)
返回:activations(每层的激活值列表)
"""
activations = [X] # 存储每层的激活值
zs = [] # 存储每层的加权输入
for i in range(self.num_layers - 1):
# 计算加权输入: z = xW + b
z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
zs.append(z)
# 应用激活函数
if i == self.num_layers - 2:
# 输出层使用 Softmax
activation = self.softmax(z)
else:
# 隐藏层使用 ReLU
activation = self.relu(z)
activations.append(activation)
return activations, zs
def backward(self, X, y, activations, zs, learning_rate=0.01):
"""
反向传播
X: 输入数据
y: 真实标签(one-hot 编码)
activations: 前向传播得到的激活值
zs: 前向传播得到的加权输入
learning_rate: 学习率
"""
m = X.shape[0] # batch size
# 存储梯度
weight_gradients = [np.zeros_like(w) for w in self.weights]
bias_gradients = [np.zeros_like(b) for b in self.biases]
# 输出层误差: delta = a - y
delta = activations[-1] - y
# 从后向前计算梯度
for i in range(self.num_layers - 2, -1, -1):
# 计算权重梯度: dW = X^T * delta / m
weight_gradients[i] = np.dot(activations[i].T, delta) / m
# 计算偏置梯度: db = sum(delta) / m
bias_gradients[i] = np.sum(delta, axis=0, keepdims=True) / m
if i > 0:
# 传播误差到前一层: delta = (delta * W^T) * f'(z)
delta = np.dot(delta, self.weights[i].T)
delta *= self.relu_derivative(zs[i-1])
# 更新权重和偏置
for i in range(self.num_layers - 1):
self.weights[i] -= learning_rate * weight_gradients[i]
self.biases[i] -= learning_rate * bias_gradients[i]
return weight_gradients, bias_gradients
def train(self, X, y, epochs=100, batch_size=32, learning_rate=0.01, verbose=True):
"""
训练神经网络
"""
n_samples = X.shape[0]
losses = []
for epoch in range(epochs):
# 打乱数据
indices = np.random.permutation(n_samples)
X_shuffled = X[indices]
y_shuffled = y[indices]
epoch_loss = 0
# 小批量梯度下降
for i in range(0, n_samples, batch_size):
batch_X = X_shuffled[i:i+batch_size]
batch_y = y_shuffled[i:i+batch_size]
# 前向传播
activations, zs = self.forward(batch_X)
# 计算损失(交叉熵)
batch_loss = self.cross_entropy_loss(activations[-1], batch_y)
epoch_loss += batch_loss * len(batch_X)
# 反向传播
self.backward(batch_X, batch_y, activations, zs, learning_rate)
# 平均损失
avg_loss = epoch_loss / n_samples
losses.append(avg_loss)
if verbose and (epoch + 1) % 10 == 0:
accuracy = self.evaluate(X, y)
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
return losses
def cross_entropy_loss(self, predictions, targets):
"""
交叉熵损失函数
"""
m = predictions.shape[0]
# 避免 log(0)
predictions = np.clip(predictions, 1e-10, 1 - 1e-10)
loss = -np.sum(targets * np.log(predictions)) / m
return loss
def predict(self, X):
"""
预测
"""
activations, _ = self.forward(X)
return np.argmax(activations[-1], axis=1)
def evaluate(self, X, y):
"""
评估准确率
"""
predictions = self.predict(X)
true_labels = np.argmax(y, axis=1)
accuracy = np.mean(predictions == true_labels)
return accuracy
# 使用示例
if __name__ == "__main__":
# 生成模拟数据
np.random.seed(42)
# 1000 个样本,每个样本 20 个特征
X = np.random.randn(1000, 20)
# 10 个类别,one-hot 编码
y_labels = np.random.randint(0, 10, 1000)
y = np.eye(10)[y_labels]
# 创建神经网络: 20 -> 64 -> 32 -> 10
nn = NeuralNetwork([20, 64, 32, 10])
# 训练
losses = nn.train(X, y, epochs=100, batch_size=32, learning_rate=0.01)
# 评估
accuracy = nn.evaluate(X, y)
print(f"\nFinal Accuracy: {accuracy:.4f}")
关键点解析:
前向传播流程
- 输入层接收数据
- 每层执行:z = xW + b,a = activation(z)
- 输出层使用 Softmax 得到概率分布
反向传播流程
- 计算输出层误差:delta = output - target
- 逐层反向传播误差
- 计算梯度并更新参数
激活函数选择
- ReLU:隐藏层,解决梯度消失
- Softmax:多分类输出层
- Sigmoid:二分类输出层
2. 优化算法
问题:对比 SGD、Momentum、Adam 等优化算法,说明各自的优缺点。
优化器实现:
class Optimizer:
"""优化器基类"""
def __init__(self, learning_rate=0.01):
self.learning_rate = learning_rate
def update(self, params, grads):
raise NotImplementedError
class SGD(Optimizer):
"""
随机梯度下降
优点:简单,适合凸优化
缺点:学习率难调,容易震荡
"""
def __init__(self, learning_rate=0.01):
super().__init__(learning_rate)
def update(self, params, grads):
for param, grad in zip(params, grads):
param -= self.learning_rate * grad
class Momentum(Optimizer):
"""
动量优化器
优点:加速收敛,减少震荡
缺点:引入额外超参数
公式:
v = momentum * v - learning_rate * grad
param = param + v
"""
def __init__(self, learning_rate=0.01, momentum=0.9):
super().__init__(learning_rate)
self.momentum = momentum
self.velocities = None
def update(self, params, grads):
if self.velocities is None:
self.velocities = [np.zeros_like(p) for p in params]
for i, (param, grad) in enumerate(zip(params, grads)):
self.velocities[i] = (
self.momentum * self.velocities[i] -
self.learning_rate * grad
)
param += self.velocities[i]
class AdaGrad(Optimizer):
"""
自适应梯度优化器
优点:自动调整学习率
缺点:学习率单调递减,后期学习变慢
公式:
cache = cache + grad^2
param = param - learning_rate * grad / (sqrt(cache) + epsilon)
"""
def __init__(self, learning_rate=0.01, epsilon=1e-8):
super().__init__(learning_rate)
self.epsilon = epsilon
self.cache = None
def update(self, params, grads):
if self.cache is None:
self.cache = [np.zeros_like(p) for p in params]
for i, (param, grad) in enumerate(zip(params, grads)):
self.cache[i] += grad ** 2
param -= (
self.learning_rate * grad /
(np.sqrt(self.cache[i]) + self.epsilon)
)
class RMSprop(Optimizer):
"""
RMSprop 优化器
优点:解决 AdaGrad 学习率递减问题
缺点:仍需要手动设置初始学习率
公式:
cache = decay_rate * cache + (1 - decay_rate) * grad^2
param = param - learning_rate * grad / (sqrt(cache) + epsilon)
"""
def __init__(self, learning_rate=0.01, decay_rate=0.9, epsilon=1e-8):
super().__init__(learning_rate)
self.decay_rate = decay_rate
self.epsilon = epsilon
self.cache = None
def update(self, params, grads):
if self.cache is None:
self.cache = [np.zeros_like(p) for p in params]
for i, (param, grad) in enumerate(zip(params, grads)):
self.cache[i] = (
self.decay_rate * self.cache[i] +
(1 - self.decay_rate) * grad ** 2
)
param -= (
self.learning_rate * grad /
(np.sqrt(self.cache[i]) + self.epsilon)
)
class Adam(Optimizer):
"""
Adam 优化器(目前最常用)
优点:结合 Momentum 和 RMSprop,自适应学习率,收敛快
缺点:某些情况下可能不收敛(可用 AMSGrad 改进)
公式:
m = beta1 * m + (1 - beta1) * grad
v = beta2 * v + (1 - beta2) * grad^2
m_hat = m / (1 - beta1^t)
v_hat = v / (1 - beta2^t)
param = param - learning_rate * m_hat / (sqrt(v_hat) + epsilon)
"""
def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
super().__init__(learning_rate)
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.m = None # 一阶矩估计
self.v = None # 二阶矩估计
self.t = 0 # 时间步
def update(self, params, grads):
if self.m is None:
self.m = [np.zeros_like(p) for p in params]
self.v = [np.zeros_like(p) for p in params]
self.t += 1
for i, (param, grad) in enumerate(zip(params, grads)):
# 更新一阶矩估计
self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad
# 更新二阶矩估计
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2
# 偏差修正
m_hat = self.m[i] / (1 - self.beta1 ** self.t)
v_hat = self.v[i] / (1 - self.beta2 ** self.t)
# 更新参数
param -= (
self.learning_rate * m_hat /
(np.sqrt(v_hat) + self.epsilon)
)
class AdamW(Adam):
"""
AdamW:Adam + Weight Decay
优点:更好的泛化性能
与 Adam + L2 正则化的区别:
- L2 正则化:在损失函数中添加权重惩罚
- Weight Decay:直接在参数更新时衰减
"""
def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999,
epsilon=1e-8, weight_decay=0.01):
super().__init__(learning_rate, beta1, beta2, epsilon)
self.weight_decay = weight_decay
def update(self, params, grads):
if self.m is None:
self.m = [np.zeros_like(p) for p in params]
self.v = [np.zeros_like(p) for p in params]
self.t += 1
for i, (param, grad) in enumerate(zip(params, grads)):
# 一阶矩估计
self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad
# 二阶矩估计
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2
# 偏差修正
m_hat = self.m[i] / (1 - self.beta1 ** self.t)
v_hat = self.v[i] / (1 - self.beta2 ** self.t)
# 参数更新(包含 weight decay)
param -= self.learning_rate * (
m_hat / (np.sqrt(v_hat) + self.epsilon) +
self.weight_decay * param
)
# 学习率调度器
class LearningRateScheduler:
"""学习率调度"""
@staticmethod
def step_decay(initial_lr, epoch, drop=0.5, epochs_drop=10):
"""阶梯衰减"""
return initial_lr * (drop ** (epoch // epochs_drop))
@staticmethod
def exponential_decay(initial_lr, epoch, decay_rate=0.95):
"""指数衰减"""
return initial_lr * (decay_rate ** epoch)
@staticmethod
def cosine_annealing(initial_lr, epoch, total_epochs):
"""余弦退火"""
return initial_lr * (1 + np.cos(np.pi * epoch / total_epochs)) / 2
@staticmethod
def warmup_cosine(initial_lr, epoch, total_epochs, warmup_epochs=5):
"""预热 + 余弦退火"""
if epoch < warmup_epochs:
# 线性预热
return initial_lr * (epoch + 1) / warmup_epochs
else:
# 余弦退火
progress = (epoch - warmup_epochs) / (total_epochs - warmup_epochs)
return initial_lr * (1 + np.cos(np.pi * progress)) / 2
# 使用示例
optimizer = Adam(learning_rate=0.001)
scheduler = LearningRateScheduler()
for epoch in range(100):
# 调整学习率
lr = scheduler.warmup_cosine(0.001, epoch, 100)
optimizer.learning_rate = lr
# 训练步骤
# optimizer.update(params, grads)
二、Transformer 架构
3. Self-Attention 机制
问题:详细说明 Self-Attention 的计算过程和多头注意力的作用。
Self-Attention 实现:
import numpy as np
class SelfAttention:
"""
Self-Attention 机制
公式:
Q = XW_q, K = XW_k, V = XW_v
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V
"""
def __init__(self, d_model, d_k=None, d_v=None):
"""
d_model: 模型维度
d_k: Key 的维度,默认等于 d_model
d_v: Value 的维度,默认等于 d_model
"""
self.d_model = d_model
self.d_k = d_k or d_model
self.d_v = d_v or d_model
# 初始化权重矩阵
self.W_q = np.random.randn(d_model, self.d_k) / np.sqrt(d_model)
self.W_k = np.random.randn(d_model, self.d_k) / np.sqrt(d_model)
self.W_v = np.random.randn(d_model, self.d_v) / np.sqrt(d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
"""
缩放点积注意力
参数:
Q: Query 矩阵,shape (batch_size, seq_len, d_k)
K: Key 矩阵,shape (batch_size, seq_len, d_k)
V: Value 矩阵,shape (batch_size, seq_len, d_v)
mask: 掩码,shape (batch_size, seq_len, seq_len)
返回:
output: 注意力输出,shape (batch_size, seq_len, d_v)
attention_weights: 注意力权重,shape (batch_size, seq_len, seq_len)
"""
# 计算注意力分数: QK^T / sqrt(d_k)
scores = np.matmul(Q, K.transpose(0, 2, 1)) / np.sqrt(self.d_k)
# 应用掩码(如果有)
if mask is not None:
scores = np.where(mask == 0, -1e9, scores)
# Softmax 归一化
attention_weights = self.softmax(scores, axis=-1)
# 加权求和: Attention * V
output = np.matmul(attention_weights, V)
return output, attention_weights
def forward(self, X, mask=None):
"""
前向传播
参数:
X: 输入,shape (batch_size, seq_len, d_model)
mask: 掩码
返回:
output: 输出
attention_weights: 注意力权重
"""
# 线性变换得到 Q, K, V
Q = np.matmul(X, self.W_q) # (batch_size, seq_len, d_k)
K = np.matmul(X, self.W_k) # (batch_size, seq_len, d_k)
V = np.matmul(X, self.W_v) # (batch_size, seq_len, d_v)
# 计算注意力
output, attention_weights = self.scaled_dot_product_attention(
Q, K, V, mask
)
return output, attention_weights
@staticmethod
def softmax(x, axis=-1):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
class MultiHeadAttention:
"""
多头注意力机制
优势:
1. 模型可以关注不同位置的不同表示子空间
2. 增强模型的表达能力
3. 并行计算,提高效率
"""
def __init__(self, d_model, num_heads):
"""
d_model: 模型维度,必须能被 num_heads 整除
num_heads: 注意力头的数量
"""
assert d_model % num_heads == 0, "d_model 必须能被 num_heads 整除"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
# 为每个头创建权重矩阵
self.W_q = np.random.randn(d_model, d_model) / np.sqrt(d_model)
self.W_k = np.random.randn(d_model, d_model) / np.sqrt(d_model)
self.W_v = np.random.randn(d_model, d_model) / np.sqrt(d_model)
# 输出投影矩阵
self.W_o = np.random.randn(d_model, d_model) / np.sqrt(d_model)
def split_heads(self, x):
"""
将输入分割成多个头
输入: (batch_size, seq_len, d_model)
输出: (batch_size, num_heads, seq_len, d_k)
"""
batch_size, seq_len, _ = x.shape
# 重塑: (batch_size, seq_len, num_heads, d_k)
x = x.reshape(batch_size, seq_len, self.num_heads, self.d_k)
# 转置: (batch_size, num_heads, seq_len, d_k)
return x.transpose(0, 2, 1, 3)
def combine_heads(self, x):
"""
合并多个头
输入: (batch_size, num_heads, seq_len, d_k)
输出: (batch_size, seq_len, d_model)
"""
batch_size, _, seq_len, _ = x.shape
# 转置: (batch_size, seq_len, num_heads, d_k)
x = x.transpose(0, 2, 1, 3)
# 重塑: (batch_size, seq_len, d_model)
return x.reshape(batch_size, seq_len, self.d_model)
def forward(self, X, mask=None):
"""
前向传播
"""
batch_size = X.shape[0]
# 线性变换
Q = np.matmul(X, self.W_q)
K = np.matmul(X, self.W_k)
V = np.matmul(X, self.W_v)
# 分割成多头
Q = self.split_heads(Q) # (batch_size, num_heads, seq_len, d_k)
K = self.split_heads(K)
V = self.split_heads(V)
# 扩展 mask 维度以匹配多头
if mask is not None:
mask = mask[:, np.newaxis, :, :]
# 计算缩放点积注意力
scores = np.matmul(Q, K.transpose(0, 1, 3, 2)) / np.sqrt(self.d_k)
if mask is not None:
scores = np.where(mask == 0, -1e9, scores)
attention_weights = self.softmax(scores, axis=-1)
# 加权求和
attention_output = np.matmul(attention_weights, V)
# 合并多头
attention_output = self.combine_heads(attention_output)
# 输出投影
output = np.matmul(attention_output, self.W_o)
return output, attention_weights
@staticmethod
def softmax(x, axis=-1):
exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
# 使用示例
if __name__ == "__main__":
# 参数
batch_size = 2
seq_len = 10
d_model = 512
num_heads = 8
# 创建随机输入
X = np.random.randn(batch_size, seq_len, d_model)
# 创建因果掩码(用于解码器)
causal_mask = np.tril(np.ones((seq_len, seq_len)))
causal_mask = causal_mask[np.newaxis, :, :] # 添加 batch 维度
# 单头注意力
attention = SelfAttention(d_model)
output, weights = attention.forward(X, mask=causal_mask)
print("单头注意力输出 shape:", output.shape)
print("注意力权重 shape:", weights.shape)
# 多头注意力
mha = MultiHeadAttention(d_model, num_heads)
output, weights = mha.forward(X, mask=causal_mask)
print("\n多头注意力输出 shape:", output.shape)
print("注意力权重 shape:", weights.shape)
4. Transformer 完整实现
问题:实现一个完整的 Transformer 编码器-解码器架构。
Transformer 实现:
class PositionalEncoding:
"""
位置编码
公式:
PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
"""
def __init__(self, d_model, max_len=5000):
self.d_model = d_model
# 创建位置编码矩阵
pe = np.zeros((max_len, d_model))
position = np.arange(0, max_len)[:, np.newaxis]
div_term = np.exp(
np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model)
)
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
self.pe = pe
def forward(self, X):
"""
添加位置编码
X: (batch_size, seq_len, d_model)
"""
seq_len = X.shape[1]
return X + self.pe[np.newaxis, :seq_len, :]
class FeedForward:
"""
前馈网络
FFN(x) = max(0, xW1 + b1)W2 + b2
"""
def __init__(self, d_model, d_ff, dropout=0.1):
self.d_model = d_model
self.d_ff = d_ff
self.dropout = dropout
# 两层全连接
self.W1 = np.random.randn(d_model, d_ff) / np.sqrt(d_model)
self.b1 = np.zeros(d_ff)
self.W2 = np.random.randn(d_ff, d_model) / np.sqrt(d_ff)
self.b2 = np.zeros(d_model)
def forward(self, X, training=True):
# 第一层 + ReLU
hidden = np.maximum(0, np.matmul(X, self.W1) + self.b1)
# Dropout
if training and self.dropout > 0:
mask = np.random.binomial(1, 1-self.dropout, hidden.shape)
hidden = hidden * mask / (1 - self.dropout)
# 第二层
output = np.matmul(hidden, self.W2) + self.b2
return output
class LayerNormalization:
"""
层归一化
公式:
LN(x) = gamma * (x - mean) / sqrt(var + epsilon) + beta
"""
def __init__(self, d_model, epsilon=1e-6):
self.gamma = np.ones(d_model)
self.beta = np.zeros(d_model)
self.epsilon = epsilon
def forward(self, X):
mean = np.mean(X, axis=-1, keepdims=True)
var = np.var(X, axis=-1, keepdims=True)
X_norm = (X - mean) / np.sqrt(var + self.epsilon)
return self.gamma * X_norm + self.beta
class TransformerEncoderLayer:
"""Transformer 编码器层"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = FeedForward(d_model, d_ff, dropout)
self.ln1 = LayerNormalization(d_model)
self.ln2 = LayerNormalization(d_model)
self.dropout = dropout
def forward(self, X, mask=None, training=True):
# 多头自注意力 + 残差连接 + 层归一化
attn_output, _ = self.mha.forward(X, mask)
if training and self.dropout > 0:
dropout_mask = np.random.binomial(
1, 1-self.dropout, attn_output.shape
)
attn_output = attn_output * dropout_mask / (1 - self.dropout)
X = self.ln1.forward(X + attn_output)
# 前馈网络 + 残差连接 + 层归一化
ffn_output = self.ffn.forward(X, training)
if training and self.dropout > 0:
dropout_mask = np.random.binomial(
1, 1-self.dropout, ffn_output.shape
)
ffn_output = ffn_output * dropout_mask / (1 - self.dropout)
output = self.ln2.forward(X + ffn_output)
return output
class TransformerEncoder:
"""Transformer 编码器"""
def __init__(self, num_layers, d_model, num_heads, d_ff,
vocab_size, max_len=5000, dropout=0.1):
# 词嵌入
self.embedding = np.random.randn(vocab_size, d_model) / np.sqrt(d_model)
# 位置编码
self.pos_encoding = PositionalEncoding(d_model, max_len)
# 编码器层
self.layers = [
TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
]
self.dropout = dropout
def forward(self, X, mask=None, training=True):
"""
X: 输入序列,shape (batch_size, seq_len)
"""
# 词嵌入
X = self.embedding[X]
# 添加位置编码
X = self.pos_encoding.forward(X)
# Dropout
if training and self.dropout > 0:
dropout_mask = np.random.binomial(1, 1-self.dropout, X.shape)
X = X * dropout_mask / (1 - self.dropout)
# 通过编码器层
for layer in self.layers:
X = layer.forward(X, mask, training)
return X
class TransformerDecoderLayer:
"""Transformer 解码器层"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
# 自注意力(带掩码)
self.masked_mha = MultiHeadAttention(d_model, num_heads)
# 编码器-解码器注意力
self.mha = MultiHeadAttention(d_model, num_heads)
# 前馈网络
self.ffn = FeedForward(d_model, d_ff, dropout)
# 层归一化
self.ln1 = LayerNormalization(d_model)
self.ln2 = LayerNormalization(d_model)
self.ln3 = LayerNormalization(d_model)
self.dropout = dropout
def forward(self, X, encoder_output, self_mask=None,
cross_mask=None, training=True):
# 掩码自注意力
attn1, _ = self.masked_mha.forward(X, self_mask)
if training and self.dropout > 0:
dropout_mask = np.random.binomial(1, 1-self.dropout, attn1.shape)
attn1 = attn1 * dropout_mask / (1 - self.dropout)
X = self.ln1.forward(X + attn1)
# 编码器-解码器注意力(交叉注意力)
# Q 来自解码器,K 和 V 来自编码器
attn2, _ = self.cross_attention(X, encoder_output, cross_mask)
if training and self.dropout > 0:
dropout_mask = np.random.binomial(1, 1-self.dropout, attn2.shape)
attn2 = attn2 * dropout_mask / (1 - self.dropout)
X = self.ln2.forward(X + attn2)
# 前馈网络
ffn_output = self.ffn.forward(X, training)
if training and self.dropout > 0:
dropout_mask = np.random.binomial(1, 1-self.dropout, ffn_output.shape)
ffn_output = ffn_output * dropout_mask / (1 - self.dropout)
output = self.ln3.forward(X + ffn_output)
return output
def cross_attention(self, decoder_output, encoder_output, mask=None):
"""
交叉注意力:解码器关注编码器输出
Q: 来自解码器
K, V: 来自编码器
"""
# 这里简化实现,实际需要修改 MultiHeadAttention
# 支持 Q, K, V 来自不同输入
return self.mha.forward(decoder_output, mask)
# 使用示例
encoder = TransformerEncoder(
num_layers=6,
d_model=512,
num_heads=8,
d_ff=2048,
vocab_size=10000,
dropout=0.1
)
# 输入序列
input_ids = np.random.randint(0, 10000, (2, 20)) # (batch_size=2, seq_len=20)
# 编码
encoder_output = encoder.forward(input_ids, training=True)
print("编码器输出 shape:", encoder_output.shape)
三、大语言模型(LLM)
5. GPT 架构与实现
问题:说明 GPT 的自回归生成原理和解码策略。
GPT 实现与解码策略:
class GPTModel:
"""
GPT 模型:仅解码器的 Transformer
特点:
1. 单向注意力(因果掩码)
2. 自回归生成
3. 预训练 + 微调范式
"""
def __init__(self, vocab_size, d_model=768, num_layers=12,
num_heads=12, d_ff=3072, max_len=1024, dropout=0.1):
self.vocab_size = vocab_size
self.d_model = d_model
# 词嵌入 + 位置编码
self.token_embedding = np.random.randn(vocab_size, d_model) / np.sqrt(d_model)
self.pos_encoding = PositionalEncoding(d_model, max_len)
# Transformer 解码器层
self.layers = [
TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
]
# 输出层(语言模型头)
self.lm_head = np.random.randn(d_model, vocab_size) / np.sqrt(d_model)
def create_causal_mask(self, seq_len):
"""
创建因果掩码(下三角矩阵)
防止模型看到未来的 token
"""
mask = np.tril(np.ones((seq_len, seq_len)))
return mask[np.newaxis, :, :]
def forward(self, input_ids, training=True):
"""
前向传播
input_ids: (batch_size, seq_len)
返回: logits (batch_size, seq_len, vocab_size)
"""
batch_size, seq_len = input_ids.shape
# 词嵌入
X = self.token_embedding[input_ids]
# 位置编码
X = self.pos_encoding.forward(X)
# 因果掩码
mask = self.create_causal_mask(seq_len)
# 通过 Transformer 层
for layer in self.layers:
X = layer.forward(X, mask, training)
# 语言模型头
logits = np.matmul(X, self.lm_head)
return logits
def generate(self, input_ids, max_new_tokens=50,
method='greedy', temperature=1.0,
top_k=0, top_p=1.0):
"""
自回归生成
解码策略:
- greedy: 贪婪解码
- sample: 随机采样
- top_k: Top-K 采样
- top_p: Top-P (Nucleus) 采样
- beam: 束搜索
"""
generated = input_ids.copy()
for _ in range(max_new_tokens):
# 前向传播
logits = self.forward(generated, training=False)
# 只取最后一个位置的 logits
next_token_logits = logits[:, -1, :] # (batch_size, vocab_size)
# 应用温度
next_token_logits = next_token_logits / temperature
# 根据解码方法选择下一个 token
if method == 'greedy':
next_token = self._greedy_decode(next_token_logits)
elif method == 'sample':
next_token = self._sample_decode(next_token_logits)
elif method == 'top_k':
next_token = self._top_k_decode(next_token_logits, top_k)
elif method == 'top_p':
next_token = self._top_p_decode(next_token_logits, top_p)
else:
raise ValueError(f"Unknown method: {method}")
# 添加到生成序列
generated = np.concatenate(
[generated, next_token[:, np.newaxis]],
axis=1
)
return generated
def _greedy_decode(self, logits):
"""贪婪解码:选择概率最大的 token"""
return np.argmax(logits, axis=-1)
def _sample_decode(self, logits):
"""随机采样:按概率分布采样"""
probs = self._softmax(logits)
# 对每个样本采样
next_tokens = []
for prob in probs:
next_token = np.random.choice(len(prob), p=prob)
next_tokens.append(next_token)
return np.array(next_tokens)
def _top_k_decode(self, logits, k):
"""
Top-K 采样:只从概率最高的 K 个 token 中采样
优点:避免采样到低概率的 token
"""
# 获取 top-k 的索引
top_k_indices = np.argsort(logits, axis=-1)[:, -k:]
# 创建掩码,只保留 top-k
mask = np.zeros_like(logits)
np.put_along_axis(mask, top_k_indices, 1, axis=-1)
# 应用掩码
masked_logits = np.where(mask == 1, logits, -1e9)
return self._sample_decode(masked_logits)
def _top_p_decode(self, logits, p):
"""
Top-P (Nucleus) 采样:从累积概率达到 p 的最小 token 集合中采样
优点:动态调整候选集大小
"""
# 计算概率并排序
probs = self._softmax(logits)
sorted_indices = np.argsort(probs, axis=-1)[:, ::-1]
sorted_probs = np.take_along_axis(probs, sorted_indices, axis=-1)
# 计算累积概率
cumsum_probs = np.cumsum(sorted_probs, axis=-1)
# 找到累积概率超过 p 的位置
mask = cumsum_probs <= p
# 至少保留一个 token
mask[:, 0] = True
# 创建过滤后的 logits
filtered_logits = np.where(
np.take_along_axis(mask, np.argsort(sorted_indices, axis=-1), axis=-1),
logits,
-1e9
)
return self._sample_decode(filtered_logits)
@staticmethod
def _softmax(x, axis=-1):
exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
class BeamSearch:
"""
束搜索解码
优点:比贪婪搜索更全局
缺点:可能生成通用、重复的文本
"""
def __init__(self, model, beam_width=5, max_len=50):
self.model = model
self.beam_width = beam_width
self.max_len = max_len
def search(self, input_ids):
"""
束搜索
返回:最佳序列
"""
batch_size = input_ids.shape[0]
assert batch_size == 1, "束搜索当前只支持 batch_size=1"
# 初始化束:[(序列, 得分)]
beams = [(input_ids[0], 0.0)]
for _ in range(self.max_len):
candidates = []
for seq, score in beams:
# 前向传播
logits = self.model.forward(
seq[np.newaxis, :],
training=False
)
# 最后一个位置的 log 概率
log_probs = np.log(self._softmax(logits[0, -1, :]) + 1e-10)
# Top-K 候选
top_k_indices = np.argsort(log_probs)[-self.beam_width:]
for idx in top_k_indices:
new_seq = np.append(seq, idx)
new_score = score + log_probs[idx]
candidates.append((new_seq, new_score))
# 选择得分最高的 beam_width 个候选
beams = sorted(candidates, key=lambda x: x[1], reverse=True)[
:self.beam_width
]
# 返回得分最高的序列
return beams[0][0][np.newaxis, :]
@staticmethod
def _softmax(x):
exp_x = np.exp(x - np.max(x))
return exp_x / np.sum(exp_x)
# 使用示例
vocab_size = 50000
gpt = GPTModel(
vocab_size=vocab_size,
d_model=768,
num_layers=12,
num_heads=12,
d_ff=3072
)
# 输入提示
prompt = np.array([[1, 2, 3, 4, 5]]) # token IDs
# 贪婪解码
output_greedy = gpt.generate(prompt, max_new_tokens=20, method='greedy')
print("贪婪解码:", output_greedy)
# Top-K 采样
output_topk = gpt.generate(
prompt,
max_new_tokens=20,
method='top_k',
temperature=0.8,
top_k=50
)
print("Top-K 采样:", output_topk)
# Top-P 采样
output_topp = gpt.generate(
prompt,
max_new_tokens=20,
method='top_p',
temperature=0.9,
top_p=0.9
)
print("Top-P 采样:", output_topp)
# 束搜索
beam_search = BeamSearch(gpt, beam_width=5, max_len=20)
output_beam = beam_search.search(prompt)
print("束搜索:", output_beam)
6. 大模型推理优化
问题:如何优化大语言模型的推理速度和内存占用?
推理优化技术:
# 1. KV Cache 优化
class GPTWithKVCache:
"""
带 KV Cache 的 GPT
原理:缓存已计算的 Key 和 Value,避免重复计算
加速:O(n^2) -> O(n)
"""
def __init__(self, *args, **kwargs):
# 初始化模型(省略)
pass
def forward_with_cache(self, input_ids, past_kv_cache=None):
"""
带缓存的前向传播
past_kv_cache: 列表,每层的 (K, V) 缓存
返回: (logits, new_kv_cache)
"""
if past_kv_cache is None:
# 首次生成,正常前向传播
past_kv_cache = [None] * len(self.layers)
X = self.token_embedding[input_ids]
X = self.pos_encoding.forward(X)
new_kv_cache = []
for i, layer in enumerate(self.layers):
# 使用缓存的 K, V
X, kv = layer.forward_with_cache(X, past_kv_cache[i])
new_kv_cache.append(kv)
logits = np.matmul(X, self.lm_head)
return logits, new_kv_cache
def generate_with_cache(self, input_ids, max_new_tokens=50):
"""使用 KV Cache 的生成"""
kv_cache = None
generated = input_ids.copy()
for i in range(max_new_tokens):
if i == 0:
# 首次:处理完整输入
current_input = generated
else:
# 后续:只处理新 token
current_input = generated[:, -1:]
# 前向传播
logits, kv_cache = self.forward_with_cache(
current_input,
kv_cache
)
# 选择下一个 token
next_token = np.argmax(logits[:, -1, :], axis=-1)
# 添加到序列
generated = np.concatenate(
[generated, next_token[:, np.newaxis]],
axis=1
)
return generated
# 2. 量化(Quantization)
class QuantizedLinear:
"""
量化线性层
INT8 量化:将 FP32 权重转换为 INT8
内存:减少 4 倍
速度:利用 INT8 GEMM 加速
"""
def __init__(self, weight_fp32):
"""
weight_fp32: FP32 权重矩阵
"""
self.scale, self.zero_point, self.weight_int8 = self.quantize(weight_fp32)
def quantize(self, weight):
"""
对称量化
公式:
scale = max(abs(weight)) / 127
weight_int8 = round(weight / scale)
"""
max_val = np.max(np.abs(weight))
scale = max_val / 127.0
zero_point = 0
weight_int8 = np.round(weight / scale).astype(np.int8)
return scale, zero_point, weight_int8
def dequantize(self):
"""反量化"""
return self.scale * self.weight_int8.astype(np.float32)
def forward(self, X):
"""
量化感知的矩阵乘法
方法 1:反量化后计算(简单但慢)
方法 2:INT8 矩阵乘法 + 缩放(快但需要硬件支持)
"""
# 方法 1:反量化
weight_fp32 = self.dequantize()
return np.matmul(X, weight_fp32)
# 方法 2:INT8 计算(伪代码)
# X_int8, X_scale = quantize(X)
# result_int8 = matmul_int8(X_int8, self.weight_int8)
# result_fp32 = dequantize(result_int8, X_scale * self.scale)
# return result_fp32
# 3. Flash Attention
class FlashAttention:
"""
Flash Attention: IO 感知的注意力算法
优化:
1. 分块计算,减少 HBM <-> SRAM 数据传输
2. 内存:O(N^2) -> O(N)
3. 速度:2-4 倍加速
参考:https://arxiv.org/abs/2205.14135
"""
def __init__(self, d_k, block_size=64):
self.d_k = d_k
self.block_size = block_size
def forward(self, Q, K, V):
"""
Flash Attention 前向传播
核心思想:
1. 将 Q, K, V 分块
2. 逐块计算注意力
3. 在线更新 softmax 统计量
"""
batch_size, seq_len, d_k = Q.shape
num_blocks = (seq_len + self.block_size - 1) // self.block_size
# 初始化输出和统计量
O = np.zeros_like(V)
l = np.zeros((batch_size, seq_len, 1)) # softmax 归一化项
m = np.full((batch_size, seq_len, 1), -np.inf) # 最大值
for i in range(num_blocks):
# Q 的第 i 块
q_start = i * self.block_size
q_end = min((i + 1) * self.block_size, seq_len)
Q_block = Q[:, q_start:q_end, :]
for j in range(num_blocks):
# K, V 的第 j 块
kv_start = j * self.block_size
kv_end = min((j + 1) * self.block_size, seq_len)
K_block = K[:, kv_start:kv_end, :]
V_block = V[:, kv_start:kv_end, :]
# 计算注意力分数
S_block = np.matmul(Q_block, K_block.transpose(0, 2, 1)) / np.sqrt(self.d_k)
# 在线更新 softmax
m_new = np.maximum(m[:, q_start:q_end, :], np.max(S_block, axis=2, keepdims=True))
l_new = (
np.exp(m[:, q_start:q_end, :] - m_new) * l[:, q_start:q_end, :] +
np.sum(np.exp(S_block - m_new), axis=2, keepdims=True)
)
# 更新输出
O[:, q_start:q_end, :] = (
np.exp(m[:, q_start:q_end, :] - m_new) * O[:, q_start:q_end, :] +
np.matmul(np.exp(S_block - m_new), V_block)
) / l_new
# 更新统计量
m[:, q_start:q_end, :] = m_new
l[:, q_start:q_end, :] = l_new
return O
# 4. 模型并行
class TensorParallel:
"""
张量并行:将模型的层分割到多个 GPU
适用场景:模型太大,单 GPU 放不下
"""
def __init__(self, weight, num_gpus=2):
self.num_gpus = num_gpus
# 按列分割权重矩阵
# W: (d_in, d_out) -> [(d_in, d_out/num_gpus), ...]
self.weight_shards = np.split(weight, num_gpus, axis=1)
def forward(self, X):
"""
并行计算
1. 广播输入 X 到所有 GPU
2. 每个 GPU 计算部分输出
3. All-Gather 收集结果
"""
# 模拟并行计算
outputs = []
for shard in self.weight_shards:
output_shard = np.matmul(X, shard)
outputs.append(output_shard)
# 拼接结果
return np.concatenate(outputs, axis=-1)
class PipelineParallel:
"""
流水线并行:将模型的层分配到不同 GPU
适用场景:模型太深,需要减少单 GPU 内存占用
"""
def __init__(self, layers, num_stages=4):
self.num_stages = num_stages
# 将层分成 num_stages 个阶段
layers_per_stage = len(layers) // num_stages
self.stages = [
layers[i*layers_per_stage:(i+1)*layers_per_stage]
for i in range(num_stages)
]
def forward(self, X, micro_batches=4):
"""
流水线执行
1. 将 batch 分成 micro_batches
2. 流水线式执行各阶段
3. 减少 GPU 空闲时间
"""
# 简化实现:顺序执行各阶段
for stage_layers in self.stages:
for layer in stage_layers:
X = layer.forward(X)
return X
# 5. Speculative Decoding(推测解码)
class SpeculativeDecoding:
"""
推测解码:用小模型生成,大模型验证
加速:2-3 倍
原理:小模型快速生成多个 token,大模型批量验证
"""
def __init__(self, large_model, small_model, k=5):
"""
large_model: 大模型(准确)
small_model: 小模型(快速)
k: 推测长度
"""
self.large_model = large_model
self.small_model = small_model
self.k = k
def generate(self, input_ids, max_new_tokens=50):
"""推测解码生成"""
generated = input_ids.copy()
while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
# 1. 小模型快速生成 k 个 token
draft = self.small_model.generate(
generated,
max_new_tokens=self.k,
method='greedy'
)
# 2. 大模型并行验证
large_logits = self.large_model.forward(draft, training=False)
# 3. 逐个验证 token
verified_tokens = []
for i in range(self.k):
pos = generated.shape[1] + i
# 大模型的预测
large_pred = np.argmax(large_logits[:, pos-1, :], axis=-1)
# 小模型的预测
small_pred = draft[:, pos]
if large_pred == small_pred:
# 接受
verified_tokens.append(small_pred)
else:
# 拒绝,使用大模型的预测
verified_tokens.append(large_pred)
break # 停止验证后续 token
# 添加验证通过的 token
if verified_tokens:
generated = np.concatenate([
generated,
np.array(verified_tokens).reshape(1, -1)
], axis=1)
else:
break
return generated
四、RAG 系统
7. RAG 架构设计
问题:设计并实现一个生产级的 RAG 系统。
RAG 实现:
import numpy as np
from typing import List, Dict, Tuple
class VectorStore:
"""
向量数据库
功能:
1. 存储文档向量
2. 相似度搜索
3. 混合检索(向量 + 关键词)
"""
def __init__(self, embedding_dim=768):
self.embedding_dim = embedding_dim
self.vectors = [] # 存储向量
self.documents = [] # 存储文档
self.metadata = [] # 存储元数据
def add_documents(self, documents: List[str], embeddings: np.ndarray,
metadata: List[Dict] = None):
"""
添加文档
documents: 文档文本列表
embeddings: 文档向量,shape (n_docs, embedding_dim)
metadata: 元数据列表
"""
self.documents.extend(documents)
self.vectors.append(embeddings)
if metadata is None:
metadata = [{}] * len(documents)
self.metadata.extend(metadata)
def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""计算余弦相似度"""
# 归一化
a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-10)
b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-10)
# 计算相似度
return np.matmul(a_norm, b_norm.T)
def search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple]:
"""
向量相似度搜索
返回: [(文档, 分数, 元数据), ...]
"""
if not self.vectors:
return []
# 合并所有向量
all_vectors = np.concatenate(self.vectors, axis=0)
# 计算相似度
similarities = self.cosine_similarity(
query_embedding.reshape(1, -1),
all_vectors
)[0]
# 获取 top-k
top_k_indices = np.argsort(similarities)[-top_k:][::-1]
results = [
(self.documents[i], similarities[i], self.metadata[i])
for i in top_k_indices
]
return results
def hybrid_search(self, query_embedding: np.ndarray, query_text: str,
top_k: int = 5, alpha: float = 0.7) -> List[Tuple]:
"""
混合检索:向量检索 + BM25
alpha: 向量检索权重(0-1)
"""
# 向量检索分数
vector_scores = self.cosine_similarity(
query_embedding.reshape(1, -1),
np.concatenate(self.vectors, axis=0)
)[0]
# BM25 分数(简化实现)
bm25_scores = self._bm25_score(query_text)
# 归一化
vector_scores = (vector_scores - vector_scores.min()) / (
vector_scores.max() - vector_scores.min() + 1e-10
)
bm25_scores = (bm25_scores - bm25_scores.min()) / (
bm25_scores.max() - bm25_scores.min() + 1e-10
)
# 加权融合
final_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# 获取 top-k
top_k_indices = np.argsort(final_scores)[-top_k:][::-1]
results = [
(self.documents[i], final_scores[i], self.metadata[i])
for i in top_k_indices
]
return results
def _bm25_score(self, query: str) -> np.ndarray:
"""
BM25 算法
简化实现,实际应使用专门的 BM25 库
"""
query_terms = query.lower().split()
scores = np.zeros(len(self.documents))
for i, doc in enumerate(self.documents):
doc_terms = doc.lower().split()
doc_len = len(doc_terms)
for term in query_terms:
tf = doc_terms.count(term) / (doc_len + 1e-10)
scores[i] += tf
return scores
class Reranker:
"""
重排序器
目的:对初步检索结果进行精排
方法:
1. 交叉编码器(准确但慢)
2. 基于规则的重排序
"""
def __init__(self, model=None):
self.model = model
def rerank(self, query: str, documents: List[Tuple],
top_k: int = 3) -> List[Tuple]:
"""
重排序
documents: [(文档, 分数, 元数据), ...]
"""
if self.model is None:
# 简单排序:按原始分数
return sorted(documents, key=lambda x: x[1], reverse=True)[:top_k]
# 使用交叉编码器重排序
scores = []
for doc, _, metadata in documents:
# 计算 query-document 匹配分数
score = self._cross_encoder_score(query, doc)
scores.append(score)
# 按新分数排序
reranked = [
(doc, score, metadata)
for (doc, _, metadata), score in zip(documents, scores)
]
reranked = sorted(reranked, key=lambda x: x[1], reverse=True)
return reranked[:top_k]
def _cross_encoder_score(self, query: str, document: str) -> float:
"""交叉编码器评分(简化)"""
# 实际应使用训练好的交叉编码器
# 这里简化为文本重叠度
query_terms = set(query.lower().split())
doc_terms = set(document.lower().split())
if not query_terms:
return 0.0
overlap = len(query_terms & doc_terms)
score = overlap / len(query_terms)
return score
class RAGPipeline:
"""
完整的 RAG 流程
流程:
1. 文档分块
2. 向量化
3. 存储到向量数据库
4. 检索
5. 重排序
6. 生成答案
"""
def __init__(self, embedding_model, llm, chunk_size=512, chunk_overlap=50):
"""
embedding_model: 嵌入模型
llm: 大语言模型
chunk_size: 分块大小
chunk_overlap: 分块重叠
"""
self.embedding_model = embedding_model
self.llm = llm
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.vector_store = VectorStore()
self.reranker = Reranker()
def chunk_text(self, text: str) -> List[str]:
"""
文档分块
策略:
1. 固定大小分块
2. 句子边界分块
3. 语义分块
"""
chunks = []
words = text.split()
for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
chunk = ' '.join(words[i:i + self.chunk_size])
chunks.append(chunk)
return chunks
def add_documents(self, documents: List[str], metadata: List[Dict] = None):
"""添加文档到知识库"""
all_chunks = []
all_metadata = []
for i, doc in enumerate(documents):
chunks = self.chunk_text(doc)
all_chunks.extend(chunks)
# 为每个分块添加元数据
doc_metadata = metadata[i] if metadata else {}
chunk_metadata = [
{**doc_metadata, 'chunk_id': j, 'doc_id': i}
for j in range(len(chunks))
]
all_metadata.extend(chunk_metadata)
# 向量化
embeddings = self._embed_texts(all_chunks)
# 存储
self.vector_store.add_documents(all_chunks, embeddings, all_metadata)
def retrieve(self, query: str, top_k: int = 5,
use_rerank: bool = True) -> List[str]:
"""
检索相关文档
返回:相关文档列表
"""
# 向量化查询
query_embedding = self._embed_texts([query])[0]
# 检索
results = self.vector_store.search(query_embedding, top_k=top_k * 2)
# 重排序
if use_rerank:
results = self.reranker.rerank(query, results, top_k=top_k)
# 提取文档
documents = [doc for doc, _, _ in results[:top_k]]
return documents
def generate(self, query: str, top_k: int = 3) -> str:
"""
RAG 生成
1. 检索相关文档
2. 构建提示词
3. 生成答案
"""
# 检索
relevant_docs = self.retrieve(query, top_k=top_k)
# 构建提示词
context = '\n\n'.join([f"Document {i+1}:\n{doc}"
for i, doc in enumerate(relevant_docs)])
prompt = f"""Answer the question based on the context below.
Context:
{context}
Question: {query}
Answer:"""
# 生成答案
answer = self._generate_answer(prompt)
return answer
def _embed_texts(self, texts: List[str]) -> np.ndarray:
"""文本向量化(简化)"""
# 实际应使用真实的嵌入模型,如 sentence-transformers
# 这里返回随机向量作为示例
return np.random.randn(len(texts), self.vector_store.embedding_dim)
def _generate_answer(self, prompt: str) -> str:
"""生成答案(简化)"""
# 实际应调用真实的 LLM
return "This is a generated answer based on the retrieved context."
# 使用示例
class SimpleEmbeddingModel:
"""简单的嵌入模型(示例)"""
def embed(self, texts):
return np.random.randn(len(texts), 768)
class SimpleLLM:
"""简单的 LLM(示例)"""
def generate(self, prompt):
return "Generated answer"
# 创建 RAG 系统
rag = RAGPipeline(
embedding_model=SimpleEmbeddingModel(),
llm=SimpleLLM(),
chunk_size=512,
chunk_overlap=50
)
# 添加文档
documents = [
"Transformers are a type of neural network architecture...",
"GPT is a generative pre-trained transformer model...",
"RAG combines retrieval with generation..."
]
metadata = [
{'source': 'doc1.pdf', 'page': 1},
{'source': 'doc2.pdf', 'page': 5},
{'source': 'doc3.pdf', 'page': 10}
]
rag.add_documents(documents, metadata)
# 查询
query = "What is a transformer?"
answer = rag.generate(query, top_k=3)
print(f"Query: {query}")
print(f"Answer: {answer}")