HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

05-自动调度与代码生成

概述

自动调度(Auto-Scheduling)和代码生成是深度学习编译器的核心技术,负责将高层计算描述转换为高效的底层代码。本章深入讲解自动调度的搜索算法、代价模型、以及各种后端的代码生成技术。

自动调度概述

┌─────────────────────────────────────────────────────────────────────────────┐
│                        自动调度流程                                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  计算定义 (What to compute)                                                  │
│      │                                                                       │
│      ▼                                                                       │
│  ┌────────────────────────────────────────────────────────────────────┐     │
│  │                     调度空间 (Schedule Space)                       │     │
│  │   ├─ 循环变换: tile, split, reorder, fuse                          │     │
│  │   ├─ 并行化: parallel, vectorize, unroll                           │     │
│  │   ├─ 内存: cache_read, cache_write, compute_at                     │     │
│  │   └─ 硬件映射: bind (threadIdx, blockIdx)                          │     │
│  └────────────────────────────────────────────────────────────────────┘     │
│      │                                                                       │
│      ▼                                                                       │
│  ┌────────────────────────────────────────────────────────────────────┐     │
│  │                     搜索算法 (Search Algorithm)                     │     │
│  │   ├─ 随机搜索 (Random Search)                                       │     │
│  │   ├─ 模拟退火 (Simulated Annealing)                                 │     │
│  │   ├─ 遗传算法 (Genetic Algorithm)                                   │     │
│  │   ├─ 贝叶斯优化 (Bayesian Optimization)                             │     │
│  │   └─ 强化学习 (Reinforcement Learning)                              │     │
│  └────────────────────────────────────────────────────────────────────┘     │
│      │                                                                       │
│      ▼                                                                       │
│  ┌────────────────────────────────────────────────────────────────────┐     │
│  │                     代价模型 (Cost Model)                           │     │
│  │   ├─ 实际测量 (Hardware Measurement)                                │     │
│  │   └─ 预测模型 (ML-based Prediction)                                 │     │
│  └────────────────────────────────────────────────────────────────────┘     │
│      │                                                                       │
│      ▼                                                                       │
│  ┌────────────────────────────────────────────────────────────────────┐     │
│  │                     代码生成 (Code Generation)                      │     │
│  │   ├─ CUDA/PTX                                                       │     │
│  │   ├─ LLVM IR → CPU                                                 │     │
│  │   └─ 其他后端                                                       │     │
│  └────────────────────────────────────────────────────────────────────┘     │
│      │                                                                       │
│      ▼                                                                       │
│  优化后的可执行代码                                                          │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1. 调度原语 (Schedule Primitives)

1.1 循环变换原语

# TVM风格的调度原语示例

import tvm
from tvm import te

# 定义计算
M, N, K = 1024, 1024, 1024
A = te.placeholder((M, K), name='A')
B = te.placeholder((K, N), name='B')
k = te.reduce_axis((0, K), name='k')
C = te.compute((M, N),
               lambda i, j: te.sum(A[i, k] * B[k, j], axis=k),
               name='C')

s = te.create_schedule(C.op)


# 1. split: 将循环拆分为两层
# for i in range(M):  →  for i_outer in range(M/factor):
#                            for i_inner in range(factor):
i, j = s[C].op.axis
i_outer, i_inner = s[C].split(i, factor=32)
j_outer, j_inner = s[C].split(j, factor=32)


# 2. reorder: 改变循环顺序
# 原始: for i: for j: for k:
# 重排后: for i_outer: for j_outer: for k: for i_inner: for j_inner:
k_axis = s[C].op.reduce_axis[0]
s[C].reorder(i_outer, j_outer, k_axis, i_inner, j_inner)


# 3. tile: split + reorder的组合
i, j = s[C].op.axis
i_outer, i_inner, j_outer, j_inner = s[C].tile(i, j, x_factor=32, y_factor=32)


# 4. fuse: 合并多个循环
# for i: for j:  →  for fused in range(i*j):
fused = s[C].fuse(i_outer, j_outer)


# 5. unroll: 循环展开
s[C].unroll(i_inner)


# 6. vectorize: 向量化
s[C].vectorize(j_inner)


# 7. parallel: 并行化
s[C].parallel(i_outer)


# GPU调度
# 8. bind: 绑定到GPU线程
block_x = te.thread_axis("blockIdx.x")
thread_x = te.thread_axis("threadIdx.x")
s[C].bind(i_outer, block_x)
s[C].bind(i_inner, thread_x)


# 9. compute_at: 指定计算位置
# 将中间计算嵌入到消费者的特定循环层
A_local = s.cache_read(A, "local", [C])
s[A_local].compute_at(s[C], k_axis)


# 10. cache_read/cache_write: 缓存数据
A_shared = s.cache_read(A, "shared", [C])
C_local = s.cache_write(C, "local")

1.2 调度空间表示

# 调度空间的形式化表示

class ScheduleSpace:
    """
    调度空间定义
    """

    def __init__(self, compute):
        self.compute = compute
        self.transforms = []

    def enumerate_transforms(self):
        """
        枚举所有可能的变换
        """
        transforms = []

        # Split变换
        for axis in self.compute.axes:
            for factor in [2, 4, 8, 16, 32, 64, 128, 256]:
                if axis.length % factor == 0:
                    transforms.append(SplitTransform(axis, factor))

        # Reorder变换
        from itertools import permutations
        for perm in permutations(self.compute.axes):
            transforms.append(ReorderTransform(perm))

        # Fuse变换
        for i in range(len(self.compute.axes) - 1):
            transforms.append(FuseTransform(
                self.compute.axes[i],
                self.compute.axes[i + 1]
            ))

        # 并行化/向量化
        for axis in self.compute.axes:
            transforms.append(ParallelTransform(axis))
            transforms.append(VectorizeTransform(axis))

        return transforms


class ScheduleConfig:
    """
    一个具体的调度配置
    """

    def __init__(self):
        self.tile_sizes = {}  # 平铺大小
        self.loop_order = []  # 循环顺序
        self.parallel_axes = []  # 并行轴
        self.vectorize_axes = []  # 向量化轴
        self.unroll_factors = {}  # 展开因子
        self.cache_locations = {}  # 缓存位置

    def apply(self, schedule):
        """
        应用配置到调度
        """
        # 应用平铺
        for axis, sizes in self.tile_sizes.items():
            schedule.split(axis, sizes)

        # 应用循环重排
        schedule.reorder(*self.loop_order)

        # 应用并行化
        for axis in self.parallel_axes:
            schedule.parallel(axis)

        # 应用向量化
        for axis in self.vectorize_axes:
            schedule.vectorize(axis)

        return schedule


# 调度模板
class ScheduleTemplate:
    """
    调度模板
    定义高层调度策略
    """

    def __init__(self, name):
        self.name = name
        self.knobs = {}  # 可调参数

    def add_knob(self, name, values):
        self.knobs[name] = values

    def instantiate(self, config):
        """
        根据配置实例化模板
        """
        raise NotImplementedError


class MatMulTemplate(ScheduleTemplate):
    """
    矩阵乘法调度模板
    """

    def __init__(self, M, N, K):
        super().__init__("matmul")
        self.M = M
        self.N = N
        self.K = K

        # 定义可调参数
        self.add_knob('tile_m', [32, 64, 128, 256])
        self.add_knob('tile_n', [32, 64, 128, 256])
        self.add_knob('tile_k', [8, 16, 32])
        self.add_knob('num_threads', [128, 256, 512, 1024])

    def instantiate(self, config):
        """
        生成具体的调度
        """
        schedule = ScheduleConfig()

        # 设置平铺
        schedule.tile_sizes['i'] = config['tile_m']
        schedule.tile_sizes['j'] = config['tile_n']
        schedule.tile_sizes['k'] = config['tile_k']

        # 设置循环顺序
        # 外层: i_outer, j_outer
        # 内层: k, i_inner, j_inner
        schedule.loop_order = [
            'i_outer', 'j_outer', 'k', 'i_inner', 'j_inner'
        ]

        # GPU绑定
        schedule.parallel_axes = ['i_outer', 'j_outer']

        return schedule

2. 搜索算法

2.1 随机搜索与模拟退火

import random
import math

class RandomSearcher:
    """
    随机搜索
    """

    def __init__(self, space, measure_func):
        self.space = space
        self.measure_func = measure_func

    def search(self, n_trials):
        best_config = None
        best_cost = float('inf')

        for _ in range(n_trials):
            config = self.space.sample()
            cost = self.measure_func(config)

            if cost < best_cost:
                best_cost = cost
                best_config = config

        return best_config, best_cost


class SimulatedAnnealing:
    """
    模拟退火搜索
    """

    def __init__(self, space, measure_func,
                 initial_temp=100, cooling_rate=0.95):
        self.space = space
        self.measure_func = measure_func
        self.initial_temp = initial_temp
        self.cooling_rate = cooling_rate

    def search(self, n_trials):
        # 初始解
        current = self.space.sample()
        current_cost = self.measure_func(current)

        best = current
        best_cost = current_cost

        temp = self.initial_temp

        for _ in range(n_trials):
            # 生成邻居解
            neighbor = self._get_neighbor(current)
            neighbor_cost = self.measure_func(neighbor)

            # 计算接受概率
            delta = neighbor_cost - current_cost

            if delta < 0:
                # 更好的解,直接接受
                accept = True
            else:
                # 以一定概率接受更差的解
                accept = random.random() < math.exp(-delta / temp)

            if accept:
                current = neighbor
                current_cost = neighbor_cost

                if current_cost < best_cost:
                    best = current
                    best_cost = current_cost

            # 降温
            temp *= self.cooling_rate

        return best, best_cost

    def _get_neighbor(self, config):
        """
        生成邻居配置
        """
        neighbor = config.copy()

        # 随机选择一个参数进行修改
        param = random.choice(list(self.space.params.keys()))
        values = self.space.params[param]
        neighbor[param] = random.choice(values)

        return neighbor

2.2 遗传算法

class GeneticAlgorithm:
    """
    遗传算法搜索
    """

    def __init__(self, space, measure_func,
                 population_size=50,
                 mutation_rate=0.1,
                 crossover_rate=0.8):
        self.space = space
        self.measure_func = measure_func
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate

    def search(self, n_generations):
        # 初始化种群
        population = [self.space.sample() for _ in range(self.population_size)]

        for gen in range(n_generations):
            # 评估适应度
            fitness = [(config, self.measure_func(config)) for config in population]
            fitness.sort(key=lambda x: x[1])  # 按性能排序

            # 保留精英
            elite_size = self.population_size // 5
            new_population = [f[0] for f in fitness[:elite_size]]

            # 选择、交叉、变异生成新个体
            while len(new_population) < self.population_size:
                # 选择父代 (锦标赛选择)
                parent1 = self._tournament_select(fitness)
                parent2 = self._tournament_select(fitness)

                # 交叉
                if random.random() < self.crossover_rate:
                    child = self._crossover(parent1, parent2)
                else:
                    child = parent1.copy()

                # 变异
                if random.random() < self.mutation_rate:
                    child = self._mutate(child)

                new_population.append(child)

            population = new_population

            # 打印进度
            best = fitness[0]
            print(f"Generation {gen}: best cost = {best[1]:.4f}")

        return fitness[0]

    def _tournament_select(self, fitness, k=3):
        """
        锦标赛选择
        """
        candidates = random.sample(fitness, k)
        winner = min(candidates, key=lambda x: x[1])
        return winner[0]

    def _crossover(self, parent1, parent2):
        """
        单点交叉
        """
        child = {}
        params = list(parent1.keys())
        crossover_point = random.randint(0, len(params))

        for i, param in enumerate(params):
            if i < crossover_point:
                child[param] = parent1[param]
            else:
                child[param] = parent2[param]

        return child

    def _mutate(self, config):
        """
        随机变异
        """
        config = config.copy()
        param = random.choice(list(self.space.params.keys()))
        config[param] = random.choice(self.space.params[param])
        return config

2.3 基于代价模型的搜索 (AutoTVM风格)

import numpy as np
from sklearn.ensemble import GradientBoostingRegressor

class CostModelSearcher:
    """
    基于代价模型的搜索
    类似AutoTVM的方法
    """

    def __init__(self, space, measure_func):
        self.space = space
        self.measure_func = measure_func
        self.cost_model = None
        self.history = []  # (config, cost) pairs

    def search(self, n_trials, n_parallel=1):
        for trial in range(n_trials):
            # 使用代价模型采样候选
            if self.cost_model is not None and trial >= 20:
                candidates = self._model_based_sampling(n_candidates=1000)
            else:
                candidates = [self.space.sample() for _ in range(n_parallel)]

            # 实际测量
            for config in candidates:
                cost = self.measure_func(config)
                self.history.append((config, cost))

            # 更新代价模型
            if trial % 10 == 0 and len(self.history) >= 20:
                self._update_cost_model()

        # 返回最优配置
        best = min(self.history, key=lambda x: x[1])
        return best

    def _model_based_sampling(self, n_candidates):
        """
        基于代价模型采样
        """
        # 生成大量候选
        candidates = [self.space.sample() for _ in range(n_candidates)]

        # 用代价模型预测
        features = [self._encode_config(c) for c in candidates]
        predictions = self.cost_model.predict(features)

        # 选择预测最优的
        best_idx = np.argmin(predictions)
        return [candidates[best_idx]]

    def _update_cost_model(self):
        """
        训练/更新代价模型
        """
        # 准备训练数据
        X = [self._encode_config(config) for config, _ in self.history]
        y = [cost for _, cost in self.history]

        # 训练XGBoost模型
        self.cost_model = GradientBoostingRegressor(
            n_estimators=100,
            max_depth=5,
            learning_rate=0.1
        )
        self.cost_model.fit(X, y)

    def _encode_config(self, config):
        """
        将配置编码为特征向量
        """
        features = []
        for param_name, values in self.space.params.items():
            value = config[param_name]
            # One-hot编码
            one_hot = [1 if v == value else 0 for v in values]
            features.extend(one_hot)
        return features

2.4 强化学习搜索

import torch
import torch.nn as nn
import torch.optim as optim

class RLScheduleSearcher:
    """
    基于强化学习的调度搜索
    """

    def __init__(self, space, measure_func):
        self.space = space
        self.measure_func = measure_func

        # 策略网络
        self.policy = PolicyNetwork(
            input_size=self._get_state_size(),
            output_size=self._get_action_size()
        )
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)

    def search(self, n_episodes):
        best_config = None
        best_cost = float('inf')

        for episode in range(n_episodes):
            # 收集一个episode的数据
            states, actions, rewards = self._run_episode()

            # 计算回报
            returns = self._compute_returns(rewards)

            # 策略梯度更新
            loss = self._policy_gradient_update(states, actions, returns)

            # 评估当前最优
            config = self._greedy_sample()
            cost = self.measure_func(config)

            if cost < best_cost:
                best_cost = cost
                best_config = config

            print(f"Episode {episode}: loss={loss:.4f}, best_cost={best_cost:.4f}")

        return best_config, best_cost

    def _run_episode(self):
        """
        运行一个episode,收集经验
        """
        states = []
        actions = []
        rewards = []

        state = self._get_initial_state()

        for step in range(self._max_steps):
            states.append(state)

            # 采样动作
            action = self._sample_action(state)
            actions.append(action)

            # 执行动作,获得新状态
            state, done = self._step(state, action)

            # 如果完成调度,测量性能
            if done:
                config = self._state_to_config(state)
                cost = self.measure_func(config)
                reward = -cost  # 负的代价作为奖励
                rewards.append(reward)
                break
            else:
                rewards.append(0)

        return states, actions, rewards

    def _sample_action(self, state):
        """
        从策略采样动作
        """
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        probs = self.policy(state_tensor)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item()

    def _compute_returns(self, rewards, gamma=0.99):
        """
        计算折扣回报
        """
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            returns.insert(0, R)
        return returns

    def _policy_gradient_update(self, states, actions, returns):
        """
        策略梯度更新
        """
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        returns = torch.FloatTensor(returns)

        # 标准化回报
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # 计算策略损失
        probs = self.policy(states)
        dist = torch.distributions.Categorical(probs)
        log_probs = dist.log_prob(actions)
        loss = -(log_probs * returns).mean()

        # 更新
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()


class PolicyNetwork(nn.Module):
    """
    策略网络
    """

    def __init__(self, input_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.softmax(self.fc3(x), dim=-1)
        return x

3. 代码生成

3.1 CUDA代码生成

class CUDACodeGenerator:
    """
    CUDA代码生成器
    """

    def __init__(self):
        self.code = []
        self.indent_level = 0

    def emit(self, line):
        self.code.append("  " * self.indent_level + line)

    def generate(self, schedule, compute):
        """
        生成CUDA代码
        """
        # 生成kernel签名
        self._generate_kernel_signature(compute)

        # 生成变量声明
        self._generate_declarations(schedule, compute)

        # 生成线程索引计算
        self._generate_thread_index(schedule)

        # 生成主计算循环
        self._generate_compute_loop(schedule, compute)

        return "\n".join(self.code)

    def _generate_kernel_signature(self, compute):
        """
        生成kernel函数签名
        """
        params = []
        for input_tensor in compute.inputs:
            params.append(f"float* __restrict__ {input_tensor.name}")
        params.append(f"float* __restrict__ {compute.output.name}")

        self.emit(f"__global__ void kernel({', '.join(params)}) {{")
        self.indent_level += 1

    def _generate_thread_index(self, schedule):
        """
        生成线程索引计算代码
        """
        # 根据绑定的轴生成索引
        for axis, thread_type in schedule.bindings.items():
            if thread_type == 'blockIdx.x':
                self.emit(f"int {axis} = blockIdx.x;")
            elif thread_type == 'threadIdx.x':
                self.emit(f"int {axis} = threadIdx.x;")
            elif thread_type == 'blockIdx.y':
                self.emit(f"int {axis} = blockIdx.y;")
            elif thread_type == 'threadIdx.y':
                self.emit(f"int {axis} = threadIdx.y;")

    def _generate_compute_loop(self, schedule, compute):
        """
        生成计算循环
        """
        # 根据调度生成循环嵌套
        for loop in schedule.loops:
            if loop.parallel_type:
                # 并行循环已经通过thread index处理
                continue

            if loop.unroll_factor:
                self.emit(f"#pragma unroll {loop.unroll_factor}")

            if loop.vectorize:
                self._generate_vectorized_loop(loop, compute)
            else:
                self._generate_scalar_loop(loop, compute)

    def _generate_scalar_loop(self, loop, compute):
        """
        生成标量循环
        """
        self.emit(f"for (int {loop.var} = {loop.start}; {loop.var} < {loop.end}; {loop.var} += {loop.step}) {{")
        self.indent_level += 1

        # 生成循环体
        self._generate_loop_body(compute)

        self.indent_level -= 1
        self.emit("}")

    def _generate_vectorized_loop(self, loop, compute):
        """
        生成向量化循环
        """
        vec_width = 4  # float4
        self.emit(f"for (int {loop.var} = {loop.start}; {loop.var} < {loop.end}; {loop.var} += {vec_width}) {{")
        self.indent_level += 1

        # 使用float4加载和存储
        self.emit(f"float4 val = *reinterpret_cast<float4*>(&input[{loop.var}]);")
        self.emit(f"val.x = compute(val.x);")
        self.emit(f"val.y = compute(val.y);")
        self.emit(f"val.z = compute(val.z);")
        self.emit(f"val.w = compute(val.w);")
        self.emit(f"*reinterpret_cast<float4*>(&output[{loop.var}]) = val;")

        self.indent_level -= 1
        self.emit("}")


# 生成的CUDA代码示例
GENERATED_CUDA_EXAMPLE = """
__global__ void matmul_kernel(
    float* __restrict__ A,
    float* __restrict__ B,
    float* __restrict__ C,
    int M, int N, int K
) {
    // 线程和块索引
    int bx = blockIdx.x;
    int by = blockIdx.y;
    int tx = threadIdx.x;
    int ty = threadIdx.y;

    // 共享内存
    __shared__ float As[BLOCK_SIZE][BLOCK_SIZE];
    __shared__ float Bs[BLOCK_SIZE][BLOCK_SIZE];

    // 全局索引
    int row = by * BLOCK_SIZE + ty;
    int col = bx * BLOCK_SIZE + tx;

    float sum = 0.0f;

    // 分块计算
    for (int k = 0; k < (K + BLOCK_SIZE - 1) / BLOCK_SIZE; k++) {
        // 协作加载到共享内存
        if (row < M && k * BLOCK_SIZE + tx < K)
            As[ty][tx] = A[row * K + k * BLOCK_SIZE + tx];
        else
            As[ty][tx] = 0.0f;

        if (col < N && k * BLOCK_SIZE + ty < K)
            Bs[ty][tx] = B[(k * BLOCK_SIZE + ty) * N + col];
        else
            Bs[ty][tx] = 0.0f;

        __syncthreads();

        // 计算
        #pragma unroll
        for (int i = 0; i < BLOCK_SIZE; i++) {
            sum += As[ty][i] * Bs[i][tx];
        }

        __syncthreads();
    }

    // 写回
    if (row < M && col < N) {
        C[row * N + col] = sum;
    }
}
"""

3.2 LLVM IR代码生成

class LLVMCodeGenerator:
    """
    LLVM IR代码生成器
    用于CPU后端
    """

    def __init__(self):
        import llvmlite.ir as ir
        self.module = ir.Module(name="generated")
        self.builder = None

    def generate(self, schedule, compute):
        """
        生成LLVM IR
        """
        import llvmlite.ir as ir

        # 创建函数类型
        func_type = self._create_function_type(compute)
        func = ir.Function(self.module, func_type, name="kernel")

        # 创建入口块
        entry_block = func.append_basic_block(name="entry")
        self.builder = ir.IRBuilder(entry_block)

        # 生成计算代码
        self._generate_compute(schedule, compute, func.args)

        # 返回
        self.builder.ret_void()

        return str(self.module)

    def _create_function_type(self, compute):
        import llvmlite.ir as ir

        # 输入和输出指针
        ptr_type = ir.PointerType(ir.FloatType())
        arg_types = [ptr_type] * (len(compute.inputs) + 1)  # inputs + output

        # 维度参数
        arg_types.extend([ir.IntType(64)] * len(compute.shape))

        return ir.FunctionType(ir.VoidType(), arg_types)

    def _generate_compute(self, schedule, compute, args):
        """
        生成计算代码
        """
        import llvmlite.ir as ir

        # 解析参数
        input_ptrs = args[:len(compute.inputs)]
        output_ptr = args[len(compute.inputs)]
        dims = args[len(compute.inputs) + 1:]

        # 生成循环
        for loop in schedule.loops:
            self._generate_loop(loop, compute, input_ptrs, output_ptr, dims)

    def _generate_loop(self, loop, compute, inputs, output, dims):
        """
        生成循环
        """
        import llvmlite.ir as ir

        # 创建循环块
        loop_cond = self.builder.append_basic_block(name=f"{loop.var}_cond")
        loop_body = self.builder.append_basic_block(name=f"{loop.var}_body")
        loop_end = self.builder.append_basic_block(name=f"{loop.var}_end")

        # 初始化循环变量
        loop_var = self.builder.alloca(ir.IntType(64), name=loop.var)
        self.builder.store(ir.Constant(ir.IntType(64), loop.start), loop_var)
        self.builder.branch(loop_cond)

        # 条件检查
        self.builder.position_at_end(loop_cond)
        current = self.builder.load(loop_var)
        end_val = ir.Constant(ir.IntType(64), loop.end)
        cond = self.builder.icmp_signed('<', current, end_val)
        self.builder.cbranch(cond, loop_body, loop_end)

        # 循环体
        self.builder.position_at_end(loop_body)

        # 如果是最内层循环,生成计算
        if loop.is_innermost:
            self._generate_elementwise_compute(compute, inputs, output, current)

        # 增量
        incremented = self.builder.add(current, ir.Constant(ir.IntType(64), loop.step))
        self.builder.store(incremented, loop_var)
        self.builder.branch(loop_cond)

        # 循环结束
        self.builder.position_at_end(loop_end)

    def _generate_elementwise_compute(self, compute, inputs, output, index):
        """
        生成逐元素计算
        """
        # 加载输入
        input_vals = []
        for inp in inputs:
            ptr = self.builder.gep(inp, [index])
            val = self.builder.load(ptr)
            input_vals.append(val)

        # 计算
        result = self._emit_expression(compute.expression, input_vals)

        # 存储输出
        out_ptr = self.builder.gep(output, [index])
        self.builder.store(result, out_ptr)

    def _emit_expression(self, expr, inputs):
        """
        发射表达式计算
        """
        if expr.op == 'add':
            return self.builder.fadd(inputs[0], inputs[1])
        elif expr.op == 'mul':
            return self.builder.fmul(inputs[0], inputs[1])
        elif expr.op == 'sin':
            # 调用llvm.sin内联函数
            import llvmlite.ir as ir
            sin_func = self.module.declare_intrinsic('llvm.sin', [ir.FloatType()])
            return self.builder.call(sin_func, [inputs[0]])
        # ... 更多操作

3.3 Triton代码生成

class TritonCodeGenerator:
    """
    Triton代码生成器
    """

    def __init__(self):
        self.code = []
        self.indent = 0

    def emit(self, line):
        self.code.append("    " * self.indent + line)

    def generate(self, schedule, compute):
        """
        生成Triton kernel
        """
        # 导入
        self.emit("import triton")
        self.emit("import triton.language as tl")
        self.emit("")

        # 生成kernel
        self._generate_kernel_decorator(schedule)
        self._generate_kernel_function(schedule, compute)

        return "\n".join(self.code)

    def _generate_kernel_decorator(self, schedule):
        """
        生成@triton.jit装饰器
        """
        # 如果有自动调优配置
        if schedule.autotune_configs:
            self.emit("@triton.autotune(")
            self.indent += 1
            self.emit("configs=[")
            self.indent += 1
            for config in schedule.autotune_configs:
                self.emit(f"triton.Config({config}),")
            self.indent -= 1
            self.emit("],")
            self.emit(f"key={schedule.autotune_key},")
            self.indent -= 1
            self.emit(")")

        self.emit("@triton.jit")

    def _generate_kernel_function(self, schedule, compute):
        """
        生成kernel函数
        """
        # 函数签名
        params = []
        for inp in compute.inputs:
            params.append(f"{inp.name}_ptr")
        params.append(f"output_ptr")

        # 添加constexpr参数
        for const in schedule.constexpr_params:
            params.append(f"{const}: tl.constexpr")

        self.emit(f"def kernel({', '.join(params)}):")
        self.indent += 1

        # 程序ID
        self.emit("pid = tl.program_id(0)")

        # 计算偏移
        self._generate_offsets(schedule)

        # 加载数据
        self._generate_loads(compute)

        # 计算
        self._generate_compute(compute)

        # 存储结果
        self._generate_stores(compute)

        self.indent -= 1

    def _generate_offsets(self, schedule):
        """
        生成偏移计算
        """
        block_size = schedule.block_size
        self.emit(f"offset = pid * {block_size} + tl.arange(0, {block_size})")
        self.emit(f"mask = offset < numel")

    def _generate_loads(self, compute):
        """
        生成数据加载
        """
        for inp in compute.inputs:
            self.emit(f"{inp.name} = tl.load({inp.name}_ptr + offset, mask=mask)")

    def _generate_compute(self, compute):
        """
        生成计算代码
        """
        # 根据计算表达式生成代码
        expr = compute.expression
        result = self._expr_to_triton(expr)
        self.emit(f"result = {result}")

    def _expr_to_triton(self, expr):
        """
        将表达式转换为Triton代码
        """
        if expr.op == 'add':
            return f"({self._expr_to_triton(expr.args[0])} + {self._expr_to_triton(expr.args[1])})"
        elif expr.op == 'mul':
            return f"({self._expr_to_triton(expr.args[0])} * {self._expr_to_triton(expr.args[1])})"
        elif expr.op == 'sin':
            return f"tl.sin({self._expr_to_triton(expr.args[0])})"
        elif expr.op == 'var':
            return expr.name
        # ... 更多操作

    def _generate_stores(self, compute):
        """
        生成数据存储
        """
        self.emit(f"tl.store(output_ptr + offset, result, mask=mask)")

4. 面试高频问题

Q1: 什么是自动调度?与手工调度有什么区别?

答案要点:

  1. 自动调度: 自动搜索最优的循环变换和硬件映射
  2. 手工调度: 需要专家手动指定调度策略
  3. 区别:
    • 自动调度减少人工成本
    • 可能找到人难以想到的优化
    • 搜索时间较长
    • 手工调度可以利用领域知识

Q2: 常用的自动调度搜索算法有哪些?

答案要点:

  1. 随机搜索: 简单但效率低
  2. 模拟退火: 有一定全局搜索能力
  3. 遗传算法: 适合大搜索空间
  4. 贝叶斯优化: 利用代价模型指导搜索
  5. 强化学习: 可以学习策略

Q3: 代价模型有什么作用?如何训练?

答案要点:

  1. 作用: 预测配置的执行时间,减少实际测量次数
  2. 训练数据: 实际测量的(配置, 时间)对
  3. 模型选择: XGBoost, 神经网络等
  4. 特征工程: 配置参数编码为特征向量

Q4: CUDA代码生成需要考虑哪些因素?

答案要点:

  1. 线程层次: blockIdx, threadIdx映射
  2. 内存层次: 全局内存、共享内存、寄存器
  3. 同步: __syncthreads()位置
  4. 向量化: float4等向量类型
  5. 循环展开: #pragma unroll

Q5: Triton相比直接写CUDA有什么优势?

答案要点:

  1. 更高抽象: Python语法,更易编写
  2. 自动优化: 自动处理内存访问、同步
  3. 可移植性: 支持多种硬件后端
  4. 快速迭代: 编译速度快
  5. JIT编译: 可以根据输入动态生成

5. 学习资源

官方文档

  • TVM Auto-Scheduler
  • Triton Programming Guide
  • LLVM IR Reference

经典论文

  • "Learning to Optimize Halide with Tree Search and Random Programs"
  • "Ansor: Generating High-Performance Tensor Programs for Deep Learning"
  • "Value Learning for Throughput Optimization of Deep Learning Workloads"

开源项目

  • TVM - 自动调度参考实现
  • Triton - GPU编程语言
  • Halide - 调度语言先驱
Prev
04-算子融合与Kernel优化