HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

AI 与机器学习面试题

一、深度学习基础

1. 神经网络基础原理

问题:详细说明前馈神经网络的前向传播和反向传播过程。

核心概念:

前馈神经网络(Feedforward Neural Network)是最基础的神经网络结构,信息只沿一个方向流动。

数学原理:

import numpy as np

class NeuralNetwork:
    def __init__(self, layers):
        """
        初始化神经网络
        layers: 列表,表示每层的神经元数量,如 [784, 128, 64, 10]
        """
        self.num_layers = len(layers)
        self.layers = layers

        # 初始化权重和偏置
        # He 初始化:适用于 ReLU 激活函数
        self.weights = [
            np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
            for i in range(self.num_layers - 1)
        ]

        # 偏置初始化为 0
        self.biases = [
            np.zeros((1, layers[i+1]))
            for i in range(self.num_layers - 1)
        ]

    def sigmoid(self, z):
        """Sigmoid 激活函数"""
        return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

    def sigmoid_derivative(self, z):
        """Sigmoid 导数"""
        s = self.sigmoid(z)
        return s * (1 - s)

    def relu(self, z):
        """ReLU 激活函数"""
        return np.maximum(0, z)

    def relu_derivative(self, z):
        """ReLU 导数"""
        return (z > 0).astype(float)

    def softmax(self, z):
        """Softmax 激活函数"""
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def forward(self, X):
        """
        前向传播
        X: 输入数据,shape (batch_size, input_dim)
        返回:activations(每层的激活值列表)
        """
        activations = [X]  # 存储每层的激活值
        zs = []  # 存储每层的加权输入

        for i in range(self.num_layers - 1):
            # 计算加权输入: z = xW + b
            z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
            zs.append(z)

            # 应用激活函数
            if i == self.num_layers - 2:
                # 输出层使用 Softmax
                activation = self.softmax(z)
            else:
                # 隐藏层使用 ReLU
                activation = self.relu(z)

            activations.append(activation)

        return activations, zs

    def backward(self, X, y, activations, zs, learning_rate=0.01):
        """
        反向传播
        X: 输入数据
        y: 真实标签(one-hot 编码)
        activations: 前向传播得到的激活值
        zs: 前向传播得到的加权输入
        learning_rate: 学习率
        """
        m = X.shape[0]  # batch size

        # 存储梯度
        weight_gradients = [np.zeros_like(w) for w in self.weights]
        bias_gradients = [np.zeros_like(b) for b in self.biases]

        # 输出层误差: delta = a - y
        delta = activations[-1] - y

        # 从后向前计算梯度
        for i in range(self.num_layers - 2, -1, -1):
            # 计算权重梯度: dW = X^T * delta / m
            weight_gradients[i] = np.dot(activations[i].T, delta) / m

            # 计算偏置梯度: db = sum(delta) / m
            bias_gradients[i] = np.sum(delta, axis=0, keepdims=True) / m

            if i > 0:
                # 传播误差到前一层: delta = (delta * W^T) * f'(z)
                delta = np.dot(delta, self.weights[i].T)
                delta *= self.relu_derivative(zs[i-1])

        # 更新权重和偏置
        for i in range(self.num_layers - 1):
            self.weights[i] -= learning_rate * weight_gradients[i]
            self.biases[i] -= learning_rate * bias_gradients[i]

        return weight_gradients, bias_gradients

    def train(self, X, y, epochs=100, batch_size=32, learning_rate=0.01, verbose=True):
        """
        训练神经网络
        """
        n_samples = X.shape[0]
        losses = []

        for epoch in range(epochs):
            # 打乱数据
            indices = np.random.permutation(n_samples)
            X_shuffled = X[indices]
            y_shuffled = y[indices]

            epoch_loss = 0

            # 小批量梯度下降
            for i in range(0, n_samples, batch_size):
                batch_X = X_shuffled[i:i+batch_size]
                batch_y = y_shuffled[i:i+batch_size]

                # 前向传播
                activations, zs = self.forward(batch_X)

                # 计算损失(交叉熵)
                batch_loss = self.cross_entropy_loss(activations[-1], batch_y)
                epoch_loss += batch_loss * len(batch_X)

                # 反向传播
                self.backward(batch_X, batch_y, activations, zs, learning_rate)

            # 平均损失
            avg_loss = epoch_loss / n_samples
            losses.append(avg_loss)

            if verbose and (epoch + 1) % 10 == 0:
                accuracy = self.evaluate(X, y)
                print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

        return losses

    def cross_entropy_loss(self, predictions, targets):
        """
        交叉熵损失函数
        """
        m = predictions.shape[0]
        # 避免 log(0)
        predictions = np.clip(predictions, 1e-10, 1 - 1e-10)
        loss = -np.sum(targets * np.log(predictions)) / m
        return loss

    def predict(self, X):
        """
        预测
        """
        activations, _ = self.forward(X)
        return np.argmax(activations[-1], axis=1)

    def evaluate(self, X, y):
        """
        评估准确率
        """
        predictions = self.predict(X)
        true_labels = np.argmax(y, axis=1)
        accuracy = np.mean(predictions == true_labels)
        return accuracy

# 使用示例
if __name__ == "__main__":
    # 生成模拟数据
    np.random.seed(42)

    # 1000 个样本,每个样本 20 个特征
    X = np.random.randn(1000, 20)

    # 10 个类别,one-hot 编码
    y_labels = np.random.randint(0, 10, 1000)
    y = np.eye(10)[y_labels]

    # 创建神经网络: 20 -> 64 -> 32 -> 10
    nn = NeuralNetwork([20, 64, 32, 10])

    # 训练
    losses = nn.train(X, y, epochs=100, batch_size=32, learning_rate=0.01)

    # 评估
    accuracy = nn.evaluate(X, y)
    print(f"\nFinal Accuracy: {accuracy:.4f}")

关键点解析:

  1. 前向传播流程

    • 输入层接收数据
    • 每层执行:z = xW + b,a = activation(z)
    • 输出层使用 Softmax 得到概率分布
  2. 反向传播流程

    • 计算输出层误差:delta = output - target
    • 逐层反向传播误差
    • 计算梯度并更新参数
  3. 激活函数选择

    • ReLU:隐藏层,解决梯度消失
    • Softmax:多分类输出层
    • Sigmoid:二分类输出层

2. 优化算法

问题:对比 SGD、Momentum、Adam 等优化算法,说明各自的优缺点。

优化器实现:

class Optimizer:
    """优化器基类"""
    def __init__(self, learning_rate=0.01):
        self.learning_rate = learning_rate

    def update(self, params, grads):
        raise NotImplementedError

class SGD(Optimizer):
    """
    随机梯度下降
    优点:简单,适合凸优化
    缺点:学习率难调,容易震荡
    """
    def __init__(self, learning_rate=0.01):
        super().__init__(learning_rate)

    def update(self, params, grads):
        for param, grad in zip(params, grads):
            param -= self.learning_rate * grad

class Momentum(Optimizer):
    """
    动量优化器
    优点:加速收敛,减少震荡
    缺点:引入额外超参数

    公式:
    v = momentum * v - learning_rate * grad
    param = param + v
    """
    def __init__(self, learning_rate=0.01, momentum=0.9):
        super().__init__(learning_rate)
        self.momentum = momentum
        self.velocities = None

    def update(self, params, grads):
        if self.velocities is None:
            self.velocities = [np.zeros_like(p) for p in params]

        for i, (param, grad) in enumerate(zip(params, grads)):
            self.velocities[i] = (
                self.momentum * self.velocities[i] -
                self.learning_rate * grad
            )
            param += self.velocities[i]

class AdaGrad(Optimizer):
    """
    自适应梯度优化器
    优点:自动调整学习率
    缺点:学习率单调递减,后期学习变慢

    公式:
    cache = cache + grad^2
    param = param - learning_rate * grad / (sqrt(cache) + epsilon)
    """
    def __init__(self, learning_rate=0.01, epsilon=1e-8):
        super().__init__(learning_rate)
        self.epsilon = epsilon
        self.cache = None

    def update(self, params, grads):
        if self.cache is None:
            self.cache = [np.zeros_like(p) for p in params]

        for i, (param, grad) in enumerate(zip(params, grads)):
            self.cache[i] += grad ** 2
            param -= (
                self.learning_rate * grad /
                (np.sqrt(self.cache[i]) + self.epsilon)
            )

class RMSprop(Optimizer):
    """
    RMSprop 优化器
    优点:解决 AdaGrad 学习率递减问题
    缺点:仍需要手动设置初始学习率

    公式:
    cache = decay_rate * cache + (1 - decay_rate) * grad^2
    param = param - learning_rate * grad / (sqrt(cache) + epsilon)
    """
    def __init__(self, learning_rate=0.01, decay_rate=0.9, epsilon=1e-8):
        super().__init__(learning_rate)
        self.decay_rate = decay_rate
        self.epsilon = epsilon
        self.cache = None

    def update(self, params, grads):
        if self.cache is None:
            self.cache = [np.zeros_like(p) for p in params]

        for i, (param, grad) in enumerate(zip(params, grads)):
            self.cache[i] = (
                self.decay_rate * self.cache[i] +
                (1 - self.decay_rate) * grad ** 2
            )
            param -= (
                self.learning_rate * grad /
                (np.sqrt(self.cache[i]) + self.epsilon)
            )

class Adam(Optimizer):
    """
    Adam 优化器(目前最常用)
    优点:结合 Momentum 和 RMSprop,自适应学习率,收敛快
    缺点:某些情况下可能不收敛(可用 AMSGrad 改进)

    公式:
    m = beta1 * m + (1 - beta1) * grad
    v = beta2 * v + (1 - beta2) * grad^2
    m_hat = m / (1 - beta1^t)
    v_hat = v / (1 - beta2^t)
    param = param - learning_rate * m_hat / (sqrt(v_hat) + epsilon)
    """
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        super().__init__(learning_rate)
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = None  # 一阶矩估计
        self.v = None  # 二阶矩估计
        self.t = 0     # 时间步

    def update(self, params, grads):
        if self.m is None:
            self.m = [np.zeros_like(p) for p in params]
            self.v = [np.zeros_like(p) for p in params]

        self.t += 1

        for i, (param, grad) in enumerate(zip(params, grads)):
            # 更新一阶矩估计
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad

            # 更新二阶矩估计
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2

            # 偏差修正
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)

            # 更新参数
            param -= (
                self.learning_rate * m_hat /
                (np.sqrt(v_hat) + self.epsilon)
            )

class AdamW(Adam):
    """
    AdamW:Adam + Weight Decay
    优点:更好的泛化性能

    与 Adam + L2 正则化的区别:
    - L2 正则化:在损失函数中添加权重惩罚
    - Weight Decay:直接在参数更新时衰减
    """
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999,
                 epsilon=1e-8, weight_decay=0.01):
        super().__init__(learning_rate, beta1, beta2, epsilon)
        self.weight_decay = weight_decay

    def update(self, params, grads):
        if self.m is None:
            self.m = [np.zeros_like(p) for p in params]
            self.v = [np.zeros_like(p) for p in params]

        self.t += 1

        for i, (param, grad) in enumerate(zip(params, grads)):
            # 一阶矩估计
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad

            # 二阶矩估计
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2

            # 偏差修正
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)

            # 参数更新(包含 weight decay)
            param -= self.learning_rate * (
                m_hat / (np.sqrt(v_hat) + self.epsilon) +
                self.weight_decay * param
            )

# 学习率调度器
class LearningRateScheduler:
    """学习率调度"""

    @staticmethod
    def step_decay(initial_lr, epoch, drop=0.5, epochs_drop=10):
        """阶梯衰减"""
        return initial_lr * (drop ** (epoch // epochs_drop))

    @staticmethod
    def exponential_decay(initial_lr, epoch, decay_rate=0.95):
        """指数衰减"""
        return initial_lr * (decay_rate ** epoch)

    @staticmethod
    def cosine_annealing(initial_lr, epoch, total_epochs):
        """余弦退火"""
        return initial_lr * (1 + np.cos(np.pi * epoch / total_epochs)) / 2

    @staticmethod
    def warmup_cosine(initial_lr, epoch, total_epochs, warmup_epochs=5):
        """预热 + 余弦退火"""
        if epoch < warmup_epochs:
            # 线性预热
            return initial_lr * (epoch + 1) / warmup_epochs
        else:
            # 余弦退火
            progress = (epoch - warmup_epochs) / (total_epochs - warmup_epochs)
            return initial_lr * (1 + np.cos(np.pi * progress)) / 2

# 使用示例
optimizer = Adam(learning_rate=0.001)
scheduler = LearningRateScheduler()

for epoch in range(100):
    # 调整学习率
    lr = scheduler.warmup_cosine(0.001, epoch, 100)
    optimizer.learning_rate = lr

    # 训练步骤
    # optimizer.update(params, grads)

二、Transformer 架构

3. Self-Attention 机制

问题:详细说明 Self-Attention 的计算过程和多头注意力的作用。

Self-Attention 实现:

import numpy as np

class SelfAttention:
    """
    Self-Attention 机制

    公式:
    Q = XW_q, K = XW_k, V = XW_v
    Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V
    """
    def __init__(self, d_model, d_k=None, d_v=None):
        """
        d_model: 模型维度
        d_k: Key 的维度,默认等于 d_model
        d_v: Value 的维度,默认等于 d_model
        """
        self.d_model = d_model
        self.d_k = d_k or d_model
        self.d_v = d_v or d_model

        # 初始化权重矩阵
        self.W_q = np.random.randn(d_model, self.d_k) / np.sqrt(d_model)
        self.W_k = np.random.randn(d_model, self.d_k) / np.sqrt(d_model)
        self.W_v = np.random.randn(d_model, self.d_v) / np.sqrt(d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        缩放点积注意力

        参数:
        Q: Query 矩阵,shape (batch_size, seq_len, d_k)
        K: Key 矩阵,shape (batch_size, seq_len, d_k)
        V: Value 矩阵,shape (batch_size, seq_len, d_v)
        mask: 掩码,shape (batch_size, seq_len, seq_len)

        返回:
        output: 注意力输出,shape (batch_size, seq_len, d_v)
        attention_weights: 注意力权重,shape (batch_size, seq_len, seq_len)
        """
        # 计算注意力分数: QK^T / sqrt(d_k)
        scores = np.matmul(Q, K.transpose(0, 2, 1)) / np.sqrt(self.d_k)

        # 应用掩码(如果有)
        if mask is not None:
            scores = np.where(mask == 0, -1e9, scores)

        # Softmax 归一化
        attention_weights = self.softmax(scores, axis=-1)

        # 加权求和: Attention * V
        output = np.matmul(attention_weights, V)

        return output, attention_weights

    def forward(self, X, mask=None):
        """
        前向传播

        参数:
        X: 输入,shape (batch_size, seq_len, d_model)
        mask: 掩码

        返回:
        output: 输出
        attention_weights: 注意力权重
        """
        # 线性变换得到 Q, K, V
        Q = np.matmul(X, self.W_q)  # (batch_size, seq_len, d_k)
        K = np.matmul(X, self.W_k)  # (batch_size, seq_len, d_k)
        V = np.matmul(X, self.W_v)  # (batch_size, seq_len, d_v)

        # 计算注意力
        output, attention_weights = self.scaled_dot_product_attention(
            Q, K, V, mask
        )

        return output, attention_weights

    @staticmethod
    def softmax(x, axis=-1):
        """Softmax 函数"""
        exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
        return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

class MultiHeadAttention:
    """
    多头注意力机制

    优势:
    1. 模型可以关注不同位置的不同表示子空间
    2. 增强模型的表达能力
    3. 并行计算,提高效率
    """
    def __init__(self, d_model, num_heads):
        """
        d_model: 模型维度,必须能被 num_heads 整除
        num_heads: 注意力头的数量
        """
        assert d_model % num_heads == 0, "d_model 必须能被 num_heads 整除"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # 为每个头创建权重矩阵
        self.W_q = np.random.randn(d_model, d_model) / np.sqrt(d_model)
        self.W_k = np.random.randn(d_model, d_model) / np.sqrt(d_model)
        self.W_v = np.random.randn(d_model, d_model) / np.sqrt(d_model)

        # 输出投影矩阵
        self.W_o = np.random.randn(d_model, d_model) / np.sqrt(d_model)

    def split_heads(self, x):
        """
        将输入分割成多个头

        输入: (batch_size, seq_len, d_model)
        输出: (batch_size, num_heads, seq_len, d_k)
        """
        batch_size, seq_len, _ = x.shape

        # 重塑: (batch_size, seq_len, num_heads, d_k)
        x = x.reshape(batch_size, seq_len, self.num_heads, self.d_k)

        # 转置: (batch_size, num_heads, seq_len, d_k)
        return x.transpose(0, 2, 1, 3)

    def combine_heads(self, x):
        """
        合并多个头

        输入: (batch_size, num_heads, seq_len, d_k)
        输出: (batch_size, seq_len, d_model)
        """
        batch_size, _, seq_len, _ = x.shape

        # 转置: (batch_size, seq_len, num_heads, d_k)
        x = x.transpose(0, 2, 1, 3)

        # 重塑: (batch_size, seq_len, d_model)
        return x.reshape(batch_size, seq_len, self.d_model)

    def forward(self, X, mask=None):
        """
        前向传播
        """
        batch_size = X.shape[0]

        # 线性变换
        Q = np.matmul(X, self.W_q)
        K = np.matmul(X, self.W_k)
        V = np.matmul(X, self.W_v)

        # 分割成多头
        Q = self.split_heads(Q)  # (batch_size, num_heads, seq_len, d_k)
        K = self.split_heads(K)
        V = self.split_heads(V)

        # 扩展 mask 维度以匹配多头
        if mask is not None:
            mask = mask[:, np.newaxis, :, :]

        # 计算缩放点积注意力
        scores = np.matmul(Q, K.transpose(0, 1, 3, 2)) / np.sqrt(self.d_k)

        if mask is not None:
            scores = np.where(mask == 0, -1e9, scores)

        attention_weights = self.softmax(scores, axis=-1)

        # 加权求和
        attention_output = np.matmul(attention_weights, V)

        # 合并多头
        attention_output = self.combine_heads(attention_output)

        # 输出投影
        output = np.matmul(attention_output, self.W_o)

        return output, attention_weights

    @staticmethod
    def softmax(x, axis=-1):
        exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
        return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

# 使用示例
if __name__ == "__main__":
    # 参数
    batch_size = 2
    seq_len = 10
    d_model = 512
    num_heads = 8

    # 创建随机输入
    X = np.random.randn(batch_size, seq_len, d_model)

    # 创建因果掩码(用于解码器)
    causal_mask = np.tril(np.ones((seq_len, seq_len)))
    causal_mask = causal_mask[np.newaxis, :, :]  # 添加 batch 维度

    # 单头注意力
    attention = SelfAttention(d_model)
    output, weights = attention.forward(X, mask=causal_mask)
    print("单头注意力输出 shape:", output.shape)
    print("注意力权重 shape:", weights.shape)

    # 多头注意力
    mha = MultiHeadAttention(d_model, num_heads)
    output, weights = mha.forward(X, mask=causal_mask)
    print("\n多头注意力输出 shape:", output.shape)
    print("注意力权重 shape:", weights.shape)

4. Transformer 完整实现

问题:实现一个完整的 Transformer 编码器-解码器架构。

Transformer 实现:

class PositionalEncoding:
    """
    位置编码

    公式:
    PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
    PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
    """
    def __init__(self, d_model, max_len=5000):
        self.d_model = d_model

        # 创建位置编码矩阵
        pe = np.zeros((max_len, d_model))
        position = np.arange(0, max_len)[:, np.newaxis]
        div_term = np.exp(
            np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model)
        )

        pe[:, 0::2] = np.sin(position * div_term)
        pe[:, 1::2] = np.cos(position * div_term)

        self.pe = pe

    def forward(self, X):
        """
        添加位置编码
        X: (batch_size, seq_len, d_model)
        """
        seq_len = X.shape[1]
        return X + self.pe[np.newaxis, :seq_len, :]

class FeedForward:
    """
    前馈网络

    FFN(x) = max(0, xW1 + b1)W2 + b2
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        self.d_model = d_model
        self.d_ff = d_ff
        self.dropout = dropout

        # 两层全连接
        self.W1 = np.random.randn(d_model, d_ff) / np.sqrt(d_model)
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_ff, d_model) / np.sqrt(d_ff)
        self.b2 = np.zeros(d_model)

    def forward(self, X, training=True):
        # 第一层 + ReLU
        hidden = np.maximum(0, np.matmul(X, self.W1) + self.b1)

        # Dropout
        if training and self.dropout > 0:
            mask = np.random.binomial(1, 1-self.dropout, hidden.shape)
            hidden = hidden * mask / (1 - self.dropout)

        # 第二层
        output = np.matmul(hidden, self.W2) + self.b2

        return output

class LayerNormalization:
    """
    层归一化

    公式:
    LN(x) = gamma * (x - mean) / sqrt(var + epsilon) + beta
    """
    def __init__(self, d_model, epsilon=1e-6):
        self.gamma = np.ones(d_model)
        self.beta = np.zeros(d_model)
        self.epsilon = epsilon

    def forward(self, X):
        mean = np.mean(X, axis=-1, keepdims=True)
        var = np.var(X, axis=-1, keepdims=True)

        X_norm = (X - mean) / np.sqrt(var + self.epsilon)
        return self.gamma * X_norm + self.beta

class TransformerEncoderLayer:
    """Transformer 编码器层"""
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.ln1 = LayerNormalization(d_model)
        self.ln2 = LayerNormalization(d_model)
        self.dropout = dropout

    def forward(self, X, mask=None, training=True):
        # 多头自注意力 + 残差连接 + 层归一化
        attn_output, _ = self.mha.forward(X, mask)

        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(
                1, 1-self.dropout, attn_output.shape
            )
            attn_output = attn_output * dropout_mask / (1 - self.dropout)

        X = self.ln1.forward(X + attn_output)

        # 前馈网络 + 残差连接 + 层归一化
        ffn_output = self.ffn.forward(X, training)

        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(
                1, 1-self.dropout, ffn_output.shape
            )
            ffn_output = ffn_output * dropout_mask / (1 - self.dropout)

        output = self.ln2.forward(X + ffn_output)

        return output

class TransformerEncoder:
    """Transformer 编码器"""
    def __init__(self, num_layers, d_model, num_heads, d_ff,
                 vocab_size, max_len=5000, dropout=0.1):
        # 词嵌入
        self.embedding = np.random.randn(vocab_size, d_model) / np.sqrt(d_model)

        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        # 编码器层
        self.layers = [
            TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ]

        self.dropout = dropout

    def forward(self, X, mask=None, training=True):
        """
        X: 输入序列,shape (batch_size, seq_len)
        """
        # 词嵌入
        X = self.embedding[X]

        # 添加位置编码
        X = self.pos_encoding.forward(X)

        # Dropout
        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(1, 1-self.dropout, X.shape)
            X = X * dropout_mask / (1 - self.dropout)

        # 通过编码器层
        for layer in self.layers:
            X = layer.forward(X, mask, training)

        return X

class TransformerDecoderLayer:
    """Transformer 解码器层"""
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        # 自注意力(带掩码)
        self.masked_mha = MultiHeadAttention(d_model, num_heads)

        # 编码器-解码器注意力
        self.mha = MultiHeadAttention(d_model, num_heads)

        # 前馈网络
        self.ffn = FeedForward(d_model, d_ff, dropout)

        # 层归一化
        self.ln1 = LayerNormalization(d_model)
        self.ln2 = LayerNormalization(d_model)
        self.ln3 = LayerNormalization(d_model)

        self.dropout = dropout

    def forward(self, X, encoder_output, self_mask=None,
                cross_mask=None, training=True):
        # 掩码自注意力
        attn1, _ = self.masked_mha.forward(X, self_mask)

        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(1, 1-self.dropout, attn1.shape)
            attn1 = attn1 * dropout_mask / (1 - self.dropout)

        X = self.ln1.forward(X + attn1)

        # 编码器-解码器注意力(交叉注意力)
        # Q 来自解码器,K 和 V 来自编码器
        attn2, _ = self.cross_attention(X, encoder_output, cross_mask)

        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(1, 1-self.dropout, attn2.shape)
            attn2 = attn2 * dropout_mask / (1 - self.dropout)

        X = self.ln2.forward(X + attn2)

        # 前馈网络
        ffn_output = self.ffn.forward(X, training)

        if training and self.dropout > 0:
            dropout_mask = np.random.binomial(1, 1-self.dropout, ffn_output.shape)
            ffn_output = ffn_output * dropout_mask / (1 - self.dropout)

        output = self.ln3.forward(X + ffn_output)

        return output

    def cross_attention(self, decoder_output, encoder_output, mask=None):
        """
        交叉注意力:解码器关注编码器输出
        Q: 来自解码器
        K, V: 来自编码器
        """
        # 这里简化实现,实际需要修改 MultiHeadAttention
        # 支持 Q, K, V 来自不同输入
        return self.mha.forward(decoder_output, mask)

# 使用示例
encoder = TransformerEncoder(
    num_layers=6,
    d_model=512,
    num_heads=8,
    d_ff=2048,
    vocab_size=10000,
    dropout=0.1
)

# 输入序列
input_ids = np.random.randint(0, 10000, (2, 20))  # (batch_size=2, seq_len=20)

# 编码
encoder_output = encoder.forward(input_ids, training=True)
print("编码器输出 shape:", encoder_output.shape)

三、大语言模型(LLM)

5. GPT 架构与实现

问题:说明 GPT 的自回归生成原理和解码策略。

GPT 实现与解码策略:

class GPTModel:
    """
    GPT 模型:仅解码器的 Transformer

    特点:
    1. 单向注意力(因果掩码)
    2. 自回归生成
    3. 预训练 + 微调范式
    """
    def __init__(self, vocab_size, d_model=768, num_layers=12,
                 num_heads=12, d_ff=3072, max_len=1024, dropout=0.1):
        self.vocab_size = vocab_size
        self.d_model = d_model

        # 词嵌入 + 位置编码
        self.token_embedding = np.random.randn(vocab_size, d_model) / np.sqrt(d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        # Transformer 解码器层
        self.layers = [
            TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ]

        # 输出层(语言模型头)
        self.lm_head = np.random.randn(d_model, vocab_size) / np.sqrt(d_model)

    def create_causal_mask(self, seq_len):
        """
        创建因果掩码(下三角矩阵)
        防止模型看到未来的 token
        """
        mask = np.tril(np.ones((seq_len, seq_len)))
        return mask[np.newaxis, :, :]

    def forward(self, input_ids, training=True):
        """
        前向传播

        input_ids: (batch_size, seq_len)
        返回: logits (batch_size, seq_len, vocab_size)
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        X = self.token_embedding[input_ids]

        # 位置编码
        X = self.pos_encoding.forward(X)

        # 因果掩码
        mask = self.create_causal_mask(seq_len)

        # 通过 Transformer 层
        for layer in self.layers:
            X = layer.forward(X, mask, training)

        # 语言模型头
        logits = np.matmul(X, self.lm_head)

        return logits

    def generate(self, input_ids, max_new_tokens=50,
                 method='greedy', temperature=1.0,
                 top_k=0, top_p=1.0):
        """
        自回归生成

        解码策略:
        - greedy: 贪婪解码
        - sample: 随机采样
        - top_k: Top-K 采样
        - top_p: Top-P (Nucleus) 采样
        - beam: 束搜索
        """
        generated = input_ids.copy()

        for _ in range(max_new_tokens):
            # 前向传播
            logits = self.forward(generated, training=False)

            # 只取最后一个位置的 logits
            next_token_logits = logits[:, -1, :]  # (batch_size, vocab_size)

            # 应用温度
            next_token_logits = next_token_logits / temperature

            # 根据解码方法选择下一个 token
            if method == 'greedy':
                next_token = self._greedy_decode(next_token_logits)
            elif method == 'sample':
                next_token = self._sample_decode(next_token_logits)
            elif method == 'top_k':
                next_token = self._top_k_decode(next_token_logits, top_k)
            elif method == 'top_p':
                next_token = self._top_p_decode(next_token_logits, top_p)
            else:
                raise ValueError(f"Unknown method: {method}")

            # 添加到生成序列
            generated = np.concatenate(
                [generated, next_token[:, np.newaxis]],
                axis=1
            )

        return generated

    def _greedy_decode(self, logits):
        """贪婪解码:选择概率最大的 token"""
        return np.argmax(logits, axis=-1)

    def _sample_decode(self, logits):
        """随机采样:按概率分布采样"""
        probs = self._softmax(logits)

        # 对每个样本采样
        next_tokens = []
        for prob in probs:
            next_token = np.random.choice(len(prob), p=prob)
            next_tokens.append(next_token)

        return np.array(next_tokens)

    def _top_k_decode(self, logits, k):
        """
        Top-K 采样:只从概率最高的 K 个 token 中采样
        优点:避免采样到低概率的 token
        """
        # 获取 top-k 的索引
        top_k_indices = np.argsort(logits, axis=-1)[:, -k:]

        # 创建掩码,只保留 top-k
        mask = np.zeros_like(logits)
        np.put_along_axis(mask, top_k_indices, 1, axis=-1)

        # 应用掩码
        masked_logits = np.where(mask == 1, logits, -1e9)

        return self._sample_decode(masked_logits)

    def _top_p_decode(self, logits, p):
        """
        Top-P (Nucleus) 采样:从累积概率达到 p 的最小 token 集合中采样
        优点:动态调整候选集大小
        """
        # 计算概率并排序
        probs = self._softmax(logits)
        sorted_indices = np.argsort(probs, axis=-1)[:, ::-1]
        sorted_probs = np.take_along_axis(probs, sorted_indices, axis=-1)

        # 计算累积概率
        cumsum_probs = np.cumsum(sorted_probs, axis=-1)

        # 找到累积概率超过 p 的位置
        mask = cumsum_probs <= p

        # 至少保留一个 token
        mask[:, 0] = True

        # 创建过滤后的 logits
        filtered_logits = np.where(
            np.take_along_axis(mask, np.argsort(sorted_indices, axis=-1), axis=-1),
            logits,
            -1e9
        )

        return self._sample_decode(filtered_logits)

    @staticmethod
    def _softmax(x, axis=-1):
        exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
        return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

class BeamSearch:
    """
    束搜索解码

    优点:比贪婪搜索更全局
    缺点:可能生成通用、重复的文本
    """
    def __init__(self, model, beam_width=5, max_len=50):
        self.model = model
        self.beam_width = beam_width
        self.max_len = max_len

    def search(self, input_ids):
        """
        束搜索

        返回:最佳序列
        """
        batch_size = input_ids.shape[0]
        assert batch_size == 1, "束搜索当前只支持 batch_size=1"

        # 初始化束:[(序列, 得分)]
        beams = [(input_ids[0], 0.0)]

        for _ in range(self.max_len):
            candidates = []

            for seq, score in beams:
                # 前向传播
                logits = self.model.forward(
                    seq[np.newaxis, :],
                    training=False
                )

                # 最后一个位置的 log 概率
                log_probs = np.log(self._softmax(logits[0, -1, :]) + 1e-10)

                # Top-K 候选
                top_k_indices = np.argsort(log_probs)[-self.beam_width:]

                for idx in top_k_indices:
                    new_seq = np.append(seq, idx)
                    new_score = score + log_probs[idx]
                    candidates.append((new_seq, new_score))

            # 选择得分最高的 beam_width 个候选
            beams = sorted(candidates, key=lambda x: x[1], reverse=True)[
                :self.beam_width
            ]

        # 返回得分最高的序列
        return beams[0][0][np.newaxis, :]

    @staticmethod
    def _softmax(x):
        exp_x = np.exp(x - np.max(x))
        return exp_x / np.sum(exp_x)

# 使用示例
vocab_size = 50000
gpt = GPTModel(
    vocab_size=vocab_size,
    d_model=768,
    num_layers=12,
    num_heads=12,
    d_ff=3072
)

# 输入提示
prompt = np.array([[1, 2, 3, 4, 5]])  # token IDs

# 贪婪解码
output_greedy = gpt.generate(prompt, max_new_tokens=20, method='greedy')
print("贪婪解码:", output_greedy)

# Top-K 采样
output_topk = gpt.generate(
    prompt,
    max_new_tokens=20,
    method='top_k',
    temperature=0.8,
    top_k=50
)
print("Top-K 采样:", output_topk)

# Top-P 采样
output_topp = gpt.generate(
    prompt,
    max_new_tokens=20,
    method='top_p',
    temperature=0.9,
    top_p=0.9
)
print("Top-P 采样:", output_topp)

# 束搜索
beam_search = BeamSearch(gpt, beam_width=5, max_len=20)
output_beam = beam_search.search(prompt)
print("束搜索:", output_beam)

6. 大模型推理优化

问题:如何优化大语言模型的推理速度和内存占用?

推理优化技术:

# 1. KV Cache 优化
class GPTWithKVCache:
    """
    带 KV Cache 的 GPT

    原理:缓存已计算的 Key 和 Value,避免重复计算
    加速:O(n^2) -> O(n)
    """
    def __init__(self, *args, **kwargs):
        # 初始化模型(省略)
        pass

    def forward_with_cache(self, input_ids, past_kv_cache=None):
        """
        带缓存的前向传播

        past_kv_cache: 列表,每层的 (K, V) 缓存
        返回: (logits, new_kv_cache)
        """
        if past_kv_cache is None:
            # 首次生成,正常前向传播
            past_kv_cache = [None] * len(self.layers)

        X = self.token_embedding[input_ids]
        X = self.pos_encoding.forward(X)

        new_kv_cache = []

        for i, layer in enumerate(self.layers):
            # 使用缓存的 K, V
            X, kv = layer.forward_with_cache(X, past_kv_cache[i])
            new_kv_cache.append(kv)

        logits = np.matmul(X, self.lm_head)

        return logits, new_kv_cache

    def generate_with_cache(self, input_ids, max_new_tokens=50):
        """使用 KV Cache 的生成"""
        kv_cache = None
        generated = input_ids.copy()

        for i in range(max_new_tokens):
            if i == 0:
                # 首次:处理完整输入
                current_input = generated
            else:
                # 后续:只处理新 token
                current_input = generated[:, -1:]

            # 前向传播
            logits, kv_cache = self.forward_with_cache(
                current_input,
                kv_cache
            )

            # 选择下一个 token
            next_token = np.argmax(logits[:, -1, :], axis=-1)

            # 添加到序列
            generated = np.concatenate(
                [generated, next_token[:, np.newaxis]],
                axis=1
            )

        return generated

# 2. 量化(Quantization)
class QuantizedLinear:
    """
    量化线性层

    INT8 量化:将 FP32 权重转换为 INT8
    内存:减少 4 倍
    速度:利用 INT8 GEMM 加速
    """
    def __init__(self, weight_fp32):
        """
        weight_fp32: FP32 权重矩阵
        """
        self.scale, self.zero_point, self.weight_int8 = self.quantize(weight_fp32)

    def quantize(self, weight):
        """
        对称量化

        公式:
        scale = max(abs(weight)) / 127
        weight_int8 = round(weight / scale)
        """
        max_val = np.max(np.abs(weight))
        scale = max_val / 127.0
        zero_point = 0

        weight_int8 = np.round(weight / scale).astype(np.int8)

        return scale, zero_point, weight_int8

    def dequantize(self):
        """反量化"""
        return self.scale * self.weight_int8.astype(np.float32)

    def forward(self, X):
        """
        量化感知的矩阵乘法

        方法 1:反量化后计算(简单但慢)
        方法 2:INT8 矩阵乘法 + 缩放(快但需要硬件支持)
        """
        # 方法 1:反量化
        weight_fp32 = self.dequantize()
        return np.matmul(X, weight_fp32)

        # 方法 2:INT8 计算(伪代码)
        # X_int8, X_scale = quantize(X)
        # result_int8 = matmul_int8(X_int8, self.weight_int8)
        # result_fp32 = dequantize(result_int8, X_scale * self.scale)
        # return result_fp32

# 3. Flash Attention
class FlashAttention:
    """
    Flash Attention: IO 感知的注意力算法

    优化:
    1. 分块计算,减少 HBM <-> SRAM 数据传输
    2. 内存:O(N^2) -> O(N)
    3. 速度:2-4 倍加速

    参考:https://arxiv.org/abs/2205.14135
    """
    def __init__(self, d_k, block_size=64):
        self.d_k = d_k
        self.block_size = block_size

    def forward(self, Q, K, V):
        """
        Flash Attention 前向传播

        核心思想:
        1. 将 Q, K, V 分块
        2. 逐块计算注意力
        3. 在线更新 softmax 统计量
        """
        batch_size, seq_len, d_k = Q.shape
        num_blocks = (seq_len + self.block_size - 1) // self.block_size

        # 初始化输出和统计量
        O = np.zeros_like(V)
        l = np.zeros((batch_size, seq_len, 1))  # softmax 归一化项
        m = np.full((batch_size, seq_len, 1), -np.inf)  # 最大值

        for i in range(num_blocks):
            # Q 的第 i 块
            q_start = i * self.block_size
            q_end = min((i + 1) * self.block_size, seq_len)
            Q_block = Q[:, q_start:q_end, :]

            for j in range(num_blocks):
                # K, V 的第 j 块
                kv_start = j * self.block_size
                kv_end = min((j + 1) * self.block_size, seq_len)
                K_block = K[:, kv_start:kv_end, :]
                V_block = V[:, kv_start:kv_end, :]

                # 计算注意力分数
                S_block = np.matmul(Q_block, K_block.transpose(0, 2, 1)) / np.sqrt(self.d_k)

                # 在线更新 softmax
                m_new = np.maximum(m[:, q_start:q_end, :], np.max(S_block, axis=2, keepdims=True))
                l_new = (
                    np.exp(m[:, q_start:q_end, :] - m_new) * l[:, q_start:q_end, :] +
                    np.sum(np.exp(S_block - m_new), axis=2, keepdims=True)
                )

                # 更新输出
                O[:, q_start:q_end, :] = (
                    np.exp(m[:, q_start:q_end, :] - m_new) * O[:, q_start:q_end, :] +
                    np.matmul(np.exp(S_block - m_new), V_block)
                ) / l_new

                # 更新统计量
                m[:, q_start:q_end, :] = m_new
                l[:, q_start:q_end, :] = l_new

        return O

# 4. 模型并行
class TensorParallel:
    """
    张量并行:将模型的层分割到多个 GPU

    适用场景:模型太大,单 GPU 放不下
    """
    def __init__(self, weight, num_gpus=2):
        self.num_gpus = num_gpus

        # 按列分割权重矩阵
        # W: (d_in, d_out) -> [(d_in, d_out/num_gpus), ...]
        self.weight_shards = np.split(weight, num_gpus, axis=1)

    def forward(self, X):
        """
        并行计算

        1. 广播输入 X 到所有 GPU
        2. 每个 GPU 计算部分输出
        3. All-Gather 收集结果
        """
        # 模拟并行计算
        outputs = []
        for shard in self.weight_shards:
            output_shard = np.matmul(X, shard)
            outputs.append(output_shard)

        # 拼接结果
        return np.concatenate(outputs, axis=-1)

class PipelineParallel:
    """
    流水线并行:将模型的层分配到不同 GPU

    适用场景:模型太深,需要减少单 GPU 内存占用
    """
    def __init__(self, layers, num_stages=4):
        self.num_stages = num_stages

        # 将层分成 num_stages 个阶段
        layers_per_stage = len(layers) // num_stages
        self.stages = [
            layers[i*layers_per_stage:(i+1)*layers_per_stage]
            for i in range(num_stages)
        ]

    def forward(self, X, micro_batches=4):
        """
        流水线执行

        1. 将 batch 分成 micro_batches
        2. 流水线式执行各阶段
        3. 减少 GPU 空闲时间
        """
        # 简化实现:顺序执行各阶段
        for stage_layers in self.stages:
            for layer in stage_layers:
                X = layer.forward(X)

        return X

# 5. Speculative Decoding(推测解码)
class SpeculativeDecoding:
    """
    推测解码:用小模型生成,大模型验证

    加速:2-3 倍
    原理:小模型快速生成多个 token,大模型批量验证
    """
    def __init__(self, large_model, small_model, k=5):
        """
        large_model: 大模型(准确)
        small_model: 小模型(快速)
        k: 推测长度
        """
        self.large_model = large_model
        self.small_model = small_model
        self.k = k

    def generate(self, input_ids, max_new_tokens=50):
        """推测解码生成"""
        generated = input_ids.copy()

        while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
            # 1. 小模型快速生成 k 个 token
            draft = self.small_model.generate(
                generated,
                max_new_tokens=self.k,
                method='greedy'
            )

            # 2. 大模型并行验证
            large_logits = self.large_model.forward(draft, training=False)

            # 3. 逐个验证 token
            verified_tokens = []
            for i in range(self.k):
                pos = generated.shape[1] + i

                # 大模型的预测
                large_pred = np.argmax(large_logits[:, pos-1, :], axis=-1)

                # 小模型的预测
                small_pred = draft[:, pos]

                if large_pred == small_pred:
                    # 接受
                    verified_tokens.append(small_pred)
                else:
                    # 拒绝,使用大模型的预测
                    verified_tokens.append(large_pred)
                    break  # 停止验证后续 token

            # 添加验证通过的 token
            if verified_tokens:
                generated = np.concatenate([
                    generated,
                    np.array(verified_tokens).reshape(1, -1)
                ], axis=1)
            else:
                break

        return generated

四、RAG 系统

7. RAG 架构设计

问题:设计并实现一个生产级的 RAG 系统。

RAG 实现:

import numpy as np
from typing import List, Dict, Tuple

class VectorStore:
    """
    向量数据库

    功能:
    1. 存储文档向量
    2. 相似度搜索
    3. 混合检索(向量 + 关键词)
    """
    def __init__(self, embedding_dim=768):
        self.embedding_dim = embedding_dim
        self.vectors = []  # 存储向量
        self.documents = []  # 存储文档
        self.metadata = []  # 存储元数据

    def add_documents(self, documents: List[str], embeddings: np.ndarray,
                     metadata: List[Dict] = None):
        """
        添加文档

        documents: 文档文本列表
        embeddings: 文档向量,shape (n_docs, embedding_dim)
        metadata: 元数据列表
        """
        self.documents.extend(documents)
        self.vectors.append(embeddings)

        if metadata is None:
            metadata = [{}] * len(documents)
        self.metadata.extend(metadata)

    def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
        """计算余弦相似度"""
        # 归一化
        a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-10)
        b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-10)

        # 计算相似度
        return np.matmul(a_norm, b_norm.T)

    def search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple]:
        """
        向量相似度搜索

        返回: [(文档, 分数, 元数据), ...]
        """
        if not self.vectors:
            return []

        # 合并所有向量
        all_vectors = np.concatenate(self.vectors, axis=0)

        # 计算相似度
        similarities = self.cosine_similarity(
            query_embedding.reshape(1, -1),
            all_vectors
        )[0]

        # 获取 top-k
        top_k_indices = np.argsort(similarities)[-top_k:][::-1]

        results = [
            (self.documents[i], similarities[i], self.metadata[i])
            for i in top_k_indices
        ]

        return results

    def hybrid_search(self, query_embedding: np.ndarray, query_text: str,
                     top_k: int = 5, alpha: float = 0.7) -> List[Tuple]:
        """
        混合检索:向量检索 + BM25

        alpha: 向量检索权重(0-1)
        """
        # 向量检索分数
        vector_scores = self.cosine_similarity(
            query_embedding.reshape(1, -1),
            np.concatenate(self.vectors, axis=0)
        )[0]

        # BM25 分数(简化实现)
        bm25_scores = self._bm25_score(query_text)

        # 归一化
        vector_scores = (vector_scores - vector_scores.min()) / (
            vector_scores.max() - vector_scores.min() + 1e-10
        )
        bm25_scores = (bm25_scores - bm25_scores.min()) / (
            bm25_scores.max() - bm25_scores.min() + 1e-10
        )

        # 加权融合
        final_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # 获取 top-k
        top_k_indices = np.argsort(final_scores)[-top_k:][::-1]

        results = [
            (self.documents[i], final_scores[i], self.metadata[i])
            for i in top_k_indices
        ]

        return results

    def _bm25_score(self, query: str) -> np.ndarray:
        """
        BM25 算法

        简化实现,实际应使用专门的 BM25 库
        """
        query_terms = query.lower().split()
        scores = np.zeros(len(self.documents))

        for i, doc in enumerate(self.documents):
            doc_terms = doc.lower().split()
            doc_len = len(doc_terms)

            for term in query_terms:
                tf = doc_terms.count(term) / (doc_len + 1e-10)
                scores[i] += tf

        return scores

class Reranker:
    """
    重排序器

    目的:对初步检索结果进行精排
    方法:
    1. 交叉编码器(准确但慢)
    2. 基于规则的重排序
    """
    def __init__(self, model=None):
        self.model = model

    def rerank(self, query: str, documents: List[Tuple],
               top_k: int = 3) -> List[Tuple]:
        """
        重排序

        documents: [(文档, 分数, 元数据), ...]
        """
        if self.model is None:
            # 简单排序:按原始分数
            return sorted(documents, key=lambda x: x[1], reverse=True)[:top_k]

        # 使用交叉编码器重排序
        scores = []
        for doc, _, metadata in documents:
            # 计算 query-document 匹配分数
            score = self._cross_encoder_score(query, doc)
            scores.append(score)

        # 按新分数排序
        reranked = [
            (doc, score, metadata)
            for (doc, _, metadata), score in zip(documents, scores)
        ]
        reranked = sorted(reranked, key=lambda x: x[1], reverse=True)

        return reranked[:top_k]

    def _cross_encoder_score(self, query: str, document: str) -> float:
        """交叉编码器评分(简化)"""
        # 实际应使用训练好的交叉编码器
        # 这里简化为文本重叠度
        query_terms = set(query.lower().split())
        doc_terms = set(document.lower().split())

        if not query_terms:
            return 0.0

        overlap = len(query_terms & doc_terms)
        score = overlap / len(query_terms)

        return score

class RAGPipeline:
    """
    完整的 RAG 流程

    流程:
    1. 文档分块
    2. 向量化
    3. 存储到向量数据库
    4. 检索
    5. 重排序
    6. 生成答案
    """
    def __init__(self, embedding_model, llm, chunk_size=512, chunk_overlap=50):
        """
        embedding_model: 嵌入模型
        llm: 大语言模型
        chunk_size: 分块大小
        chunk_overlap: 分块重叠
        """
        self.embedding_model = embedding_model
        self.llm = llm
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        self.vector_store = VectorStore()
        self.reranker = Reranker()

    def chunk_text(self, text: str) -> List[str]:
        """
        文档分块

        策略:
        1. 固定大小分块
        2. 句子边界分块
        3. 语义分块
        """
        chunks = []
        words = text.split()

        for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
            chunk = ' '.join(words[i:i + self.chunk_size])
            chunks.append(chunk)

        return chunks

    def add_documents(self, documents: List[str], metadata: List[Dict] = None):
        """添加文档到知识库"""
        all_chunks = []
        all_metadata = []

        for i, doc in enumerate(documents):
            chunks = self.chunk_text(doc)
            all_chunks.extend(chunks)

            # 为每个分块添加元数据
            doc_metadata = metadata[i] if metadata else {}
            chunk_metadata = [
                {**doc_metadata, 'chunk_id': j, 'doc_id': i}
                for j in range(len(chunks))
            ]
            all_metadata.extend(chunk_metadata)

        # 向量化
        embeddings = self._embed_texts(all_chunks)

        # 存储
        self.vector_store.add_documents(all_chunks, embeddings, all_metadata)

    def retrieve(self, query: str, top_k: int = 5,
                 use_rerank: bool = True) -> List[str]:
        """
        检索相关文档

        返回:相关文档列表
        """
        # 向量化查询
        query_embedding = self._embed_texts([query])[0]

        # 检索
        results = self.vector_store.search(query_embedding, top_k=top_k * 2)

        # 重排序
        if use_rerank:
            results = self.reranker.rerank(query, results, top_k=top_k)

        # 提取文档
        documents = [doc for doc, _, _ in results[:top_k]]

        return documents

    def generate(self, query: str, top_k: int = 3) -> str:
        """
        RAG 生成

        1. 检索相关文档
        2. 构建提示词
        3. 生成答案
        """
        # 检索
        relevant_docs = self.retrieve(query, top_k=top_k)

        # 构建提示词
        context = '\n\n'.join([f"Document {i+1}:\n{doc}"
                               for i, doc in enumerate(relevant_docs)])

        prompt = f"""Answer the question based on the context below.

Context:
{context}

Question: {query}

Answer:"""

        # 生成答案
        answer = self._generate_answer(prompt)

        return answer

    def _embed_texts(self, texts: List[str]) -> np.ndarray:
        """文本向量化(简化)"""
        # 实际应使用真实的嵌入模型,如 sentence-transformers
        # 这里返回随机向量作为示例
        return np.random.randn(len(texts), self.vector_store.embedding_dim)

    def _generate_answer(self, prompt: str) -> str:
        """生成答案(简化)"""
        # 实际应调用真实的 LLM
        return "This is a generated answer based on the retrieved context."

# 使用示例
class SimpleEmbeddingModel:
    """简单的嵌入模型(示例)"""
    def embed(self, texts):
        return np.random.randn(len(texts), 768)

class SimpleLLM:
    """简单的 LLM(示例)"""
    def generate(self, prompt):
        return "Generated answer"

# 创建 RAG 系统
rag = RAGPipeline(
    embedding_model=SimpleEmbeddingModel(),
    llm=SimpleLLM(),
    chunk_size=512,
    chunk_overlap=50
)

# 添加文档
documents = [
    "Transformers are a type of neural network architecture...",
    "GPT is a generative pre-trained transformer model...",
    "RAG combines retrieval with generation..."
]

metadata = [
    {'source': 'doc1.pdf', 'page': 1},
    {'source': 'doc2.pdf', 'page': 5},
    {'source': 'doc3.pdf', 'page': 10}
]

rag.add_documents(documents, metadata)

# 查询
query = "What is a transformer?"
answer = rag.generate(query, top_k=3)
print(f"Query: {query}")
print(f"Answer: {answer}")
Prev
前端面试高频题
Next
行为面试与软技能