05-自动调度与代码生成
概述
自动调度(Auto-Scheduling)和代码生成是深度学习编译器的核心技术,负责将高层计算描述转换为高效的底层代码。本章深入讲解自动调度的搜索算法、代价模型、以及各种后端的代码生成技术。
自动调度概述
┌─────────────────────────────────────────────────────────────────────────────┐
│ 自动调度流程 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 计算定义 (What to compute) │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 调度空间 (Schedule Space) │ │
│ │ ├─ 循环变换: tile, split, reorder, fuse │ │
│ │ ├─ 并行化: parallel, vectorize, unroll │ │
│ │ ├─ 内存: cache_read, cache_write, compute_at │ │
│ │ └─ 硬件映射: bind (threadIdx, blockIdx) │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 搜索算法 (Search Algorithm) │ │
│ │ ├─ 随机搜索 (Random Search) │ │
│ │ ├─ 模拟退火 (Simulated Annealing) │ │
│ │ ├─ 遗传算法 (Genetic Algorithm) │ │
│ │ ├─ 贝叶斯优化 (Bayesian Optimization) │ │
│ │ └─ 强化学习 (Reinforcement Learning) │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 代价模型 (Cost Model) │ │
│ │ ├─ 实际测量 (Hardware Measurement) │ │
│ │ └─ 预测模型 (ML-based Prediction) │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 代码生成 (Code Generation) │ │
│ │ ├─ CUDA/PTX │ │
│ │ ├─ LLVM IR → CPU │ │
│ │ └─ 其他后端 │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 优化后的可执行代码 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
1. 调度原语 (Schedule Primitives)
1.1 循环变换原语
# TVM风格的调度原语示例
import tvm
from tvm import te
# 定义计算
M, N, K = 1024, 1024, 1024
A = te.placeholder((M, K), name='A')
B = te.placeholder((K, N), name='B')
k = te.reduce_axis((0, K), name='k')
C = te.compute((M, N),
lambda i, j: te.sum(A[i, k] * B[k, j], axis=k),
name='C')
s = te.create_schedule(C.op)
# 1. split: 将循环拆分为两层
# for i in range(M): → for i_outer in range(M/factor):
# for i_inner in range(factor):
i, j = s[C].op.axis
i_outer, i_inner = s[C].split(i, factor=32)
j_outer, j_inner = s[C].split(j, factor=32)
# 2. reorder: 改变循环顺序
# 原始: for i: for j: for k:
# 重排后: for i_outer: for j_outer: for k: for i_inner: for j_inner:
k_axis = s[C].op.reduce_axis[0]
s[C].reorder(i_outer, j_outer, k_axis, i_inner, j_inner)
# 3. tile: split + reorder的组合
i, j = s[C].op.axis
i_outer, i_inner, j_outer, j_inner = s[C].tile(i, j, x_factor=32, y_factor=32)
# 4. fuse: 合并多个循环
# for i: for j: → for fused in range(i*j):
fused = s[C].fuse(i_outer, j_outer)
# 5. unroll: 循环展开
s[C].unroll(i_inner)
# 6. vectorize: 向量化
s[C].vectorize(j_inner)
# 7. parallel: 并行化
s[C].parallel(i_outer)
# GPU调度
# 8. bind: 绑定到GPU线程
block_x = te.thread_axis("blockIdx.x")
thread_x = te.thread_axis("threadIdx.x")
s[C].bind(i_outer, block_x)
s[C].bind(i_inner, thread_x)
# 9. compute_at: 指定计算位置
# 将中间计算嵌入到消费者的特定循环层
A_local = s.cache_read(A, "local", [C])
s[A_local].compute_at(s[C], k_axis)
# 10. cache_read/cache_write: 缓存数据
A_shared = s.cache_read(A, "shared", [C])
C_local = s.cache_write(C, "local")
1.2 调度空间表示
# 调度空间的形式化表示
class ScheduleSpace:
"""
调度空间定义
"""
def __init__(self, compute):
self.compute = compute
self.transforms = []
def enumerate_transforms(self):
"""
枚举所有可能的变换
"""
transforms = []
# Split变换
for axis in self.compute.axes:
for factor in [2, 4, 8, 16, 32, 64, 128, 256]:
if axis.length % factor == 0:
transforms.append(SplitTransform(axis, factor))
# Reorder变换
from itertools import permutations
for perm in permutations(self.compute.axes):
transforms.append(ReorderTransform(perm))
# Fuse变换
for i in range(len(self.compute.axes) - 1):
transforms.append(FuseTransform(
self.compute.axes[i],
self.compute.axes[i + 1]
))
# 并行化/向量化
for axis in self.compute.axes:
transforms.append(ParallelTransform(axis))
transforms.append(VectorizeTransform(axis))
return transforms
class ScheduleConfig:
"""
一个具体的调度配置
"""
def __init__(self):
self.tile_sizes = {} # 平铺大小
self.loop_order = [] # 循环顺序
self.parallel_axes = [] # 并行轴
self.vectorize_axes = [] # 向量化轴
self.unroll_factors = {} # 展开因子
self.cache_locations = {} # 缓存位置
def apply(self, schedule):
"""
应用配置到调度
"""
# 应用平铺
for axis, sizes in self.tile_sizes.items():
schedule.split(axis, sizes)
# 应用循环重排
schedule.reorder(*self.loop_order)
# 应用并行化
for axis in self.parallel_axes:
schedule.parallel(axis)
# 应用向量化
for axis in self.vectorize_axes:
schedule.vectorize(axis)
return schedule
# 调度模板
class ScheduleTemplate:
"""
调度模板
定义高层调度策略
"""
def __init__(self, name):
self.name = name
self.knobs = {} # 可调参数
def add_knob(self, name, values):
self.knobs[name] = values
def instantiate(self, config):
"""
根据配置实例化模板
"""
raise NotImplementedError
class MatMulTemplate(ScheduleTemplate):
"""
矩阵乘法调度模板
"""
def __init__(self, M, N, K):
super().__init__("matmul")
self.M = M
self.N = N
self.K = K
# 定义可调参数
self.add_knob('tile_m', [32, 64, 128, 256])
self.add_knob('tile_n', [32, 64, 128, 256])
self.add_knob('tile_k', [8, 16, 32])
self.add_knob('num_threads', [128, 256, 512, 1024])
def instantiate(self, config):
"""
生成具体的调度
"""
schedule = ScheduleConfig()
# 设置平铺
schedule.tile_sizes['i'] = config['tile_m']
schedule.tile_sizes['j'] = config['tile_n']
schedule.tile_sizes['k'] = config['tile_k']
# 设置循环顺序
# 外层: i_outer, j_outer
# 内层: k, i_inner, j_inner
schedule.loop_order = [
'i_outer', 'j_outer', 'k', 'i_inner', 'j_inner'
]
# GPU绑定
schedule.parallel_axes = ['i_outer', 'j_outer']
return schedule
2. 搜索算法
2.1 随机搜索与模拟退火
import random
import math
class RandomSearcher:
"""
随机搜索
"""
def __init__(self, space, measure_func):
self.space = space
self.measure_func = measure_func
def search(self, n_trials):
best_config = None
best_cost = float('inf')
for _ in range(n_trials):
config = self.space.sample()
cost = self.measure_func(config)
if cost < best_cost:
best_cost = cost
best_config = config
return best_config, best_cost
class SimulatedAnnealing:
"""
模拟退火搜索
"""
def __init__(self, space, measure_func,
initial_temp=100, cooling_rate=0.95):
self.space = space
self.measure_func = measure_func
self.initial_temp = initial_temp
self.cooling_rate = cooling_rate
def search(self, n_trials):
# 初始解
current = self.space.sample()
current_cost = self.measure_func(current)
best = current
best_cost = current_cost
temp = self.initial_temp
for _ in range(n_trials):
# 生成邻居解
neighbor = self._get_neighbor(current)
neighbor_cost = self.measure_func(neighbor)
# 计算接受概率
delta = neighbor_cost - current_cost
if delta < 0:
# 更好的解,直接接受
accept = True
else:
# 以一定概率接受更差的解
accept = random.random() < math.exp(-delta / temp)
if accept:
current = neighbor
current_cost = neighbor_cost
if current_cost < best_cost:
best = current
best_cost = current_cost
# 降温
temp *= self.cooling_rate
return best, best_cost
def _get_neighbor(self, config):
"""
生成邻居配置
"""
neighbor = config.copy()
# 随机选择一个参数进行修改
param = random.choice(list(self.space.params.keys()))
values = self.space.params[param]
neighbor[param] = random.choice(values)
return neighbor
2.2 遗传算法
class GeneticAlgorithm:
"""
遗传算法搜索
"""
def __init__(self, space, measure_func,
population_size=50,
mutation_rate=0.1,
crossover_rate=0.8):
self.space = space
self.measure_func = measure_func
self.population_size = population_size
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
def search(self, n_generations):
# 初始化种群
population = [self.space.sample() for _ in range(self.population_size)]
for gen in range(n_generations):
# 评估适应度
fitness = [(config, self.measure_func(config)) for config in population]
fitness.sort(key=lambda x: x[1]) # 按性能排序
# 保留精英
elite_size = self.population_size // 5
new_population = [f[0] for f in fitness[:elite_size]]
# 选择、交叉、变异生成新个体
while len(new_population) < self.population_size:
# 选择父代 (锦标赛选择)
parent1 = self._tournament_select(fitness)
parent2 = self._tournament_select(fitness)
# 交叉
if random.random() < self.crossover_rate:
child = self._crossover(parent1, parent2)
else:
child = parent1.copy()
# 变异
if random.random() < self.mutation_rate:
child = self._mutate(child)
new_population.append(child)
population = new_population
# 打印进度
best = fitness[0]
print(f"Generation {gen}: best cost = {best[1]:.4f}")
return fitness[0]
def _tournament_select(self, fitness, k=3):
"""
锦标赛选择
"""
candidates = random.sample(fitness, k)
winner = min(candidates, key=lambda x: x[1])
return winner[0]
def _crossover(self, parent1, parent2):
"""
单点交叉
"""
child = {}
params = list(parent1.keys())
crossover_point = random.randint(0, len(params))
for i, param in enumerate(params):
if i < crossover_point:
child[param] = parent1[param]
else:
child[param] = parent2[param]
return child
def _mutate(self, config):
"""
随机变异
"""
config = config.copy()
param = random.choice(list(self.space.params.keys()))
config[param] = random.choice(self.space.params[param])
return config
2.3 基于代价模型的搜索 (AutoTVM风格)
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
class CostModelSearcher:
"""
基于代价模型的搜索
类似AutoTVM的方法
"""
def __init__(self, space, measure_func):
self.space = space
self.measure_func = measure_func
self.cost_model = None
self.history = [] # (config, cost) pairs
def search(self, n_trials, n_parallel=1):
for trial in range(n_trials):
# 使用代价模型采样候选
if self.cost_model is not None and trial >= 20:
candidates = self._model_based_sampling(n_candidates=1000)
else:
candidates = [self.space.sample() for _ in range(n_parallel)]
# 实际测量
for config in candidates:
cost = self.measure_func(config)
self.history.append((config, cost))
# 更新代价模型
if trial % 10 == 0 and len(self.history) >= 20:
self._update_cost_model()
# 返回最优配置
best = min(self.history, key=lambda x: x[1])
return best
def _model_based_sampling(self, n_candidates):
"""
基于代价模型采样
"""
# 生成大量候选
candidates = [self.space.sample() for _ in range(n_candidates)]
# 用代价模型预测
features = [self._encode_config(c) for c in candidates]
predictions = self.cost_model.predict(features)
# 选择预测最优的
best_idx = np.argmin(predictions)
return [candidates[best_idx]]
def _update_cost_model(self):
"""
训练/更新代价模型
"""
# 准备训练数据
X = [self._encode_config(config) for config, _ in self.history]
y = [cost for _, cost in self.history]
# 训练XGBoost模型
self.cost_model = GradientBoostingRegressor(
n_estimators=100,
max_depth=5,
learning_rate=0.1
)
self.cost_model.fit(X, y)
def _encode_config(self, config):
"""
将配置编码为特征向量
"""
features = []
for param_name, values in self.space.params.items():
value = config[param_name]
# One-hot编码
one_hot = [1 if v == value else 0 for v in values]
features.extend(one_hot)
return features
2.4 强化学习搜索
import torch
import torch.nn as nn
import torch.optim as optim
class RLScheduleSearcher:
"""
基于强化学习的调度搜索
"""
def __init__(self, space, measure_func):
self.space = space
self.measure_func = measure_func
# 策略网络
self.policy = PolicyNetwork(
input_size=self._get_state_size(),
output_size=self._get_action_size()
)
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)
def search(self, n_episodes):
best_config = None
best_cost = float('inf')
for episode in range(n_episodes):
# 收集一个episode的数据
states, actions, rewards = self._run_episode()
# 计算回报
returns = self._compute_returns(rewards)
# 策略梯度更新
loss = self._policy_gradient_update(states, actions, returns)
# 评估当前最优
config = self._greedy_sample()
cost = self.measure_func(config)
if cost < best_cost:
best_cost = cost
best_config = config
print(f"Episode {episode}: loss={loss:.4f}, best_cost={best_cost:.4f}")
return best_config, best_cost
def _run_episode(self):
"""
运行一个episode,收集经验
"""
states = []
actions = []
rewards = []
state = self._get_initial_state()
for step in range(self._max_steps):
states.append(state)
# 采样动作
action = self._sample_action(state)
actions.append(action)
# 执行动作,获得新状态
state, done = self._step(state, action)
# 如果完成调度,测量性能
if done:
config = self._state_to_config(state)
cost = self.measure_func(config)
reward = -cost # 负的代价作为奖励
rewards.append(reward)
break
else:
rewards.append(0)
return states, actions, rewards
def _sample_action(self, state):
"""
从策略采样动作
"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
probs = self.policy(state_tensor)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item()
def _compute_returns(self, rewards, gamma=0.99):
"""
计算折扣回报
"""
returns = []
R = 0
for r in reversed(rewards):
R = r + gamma * R
returns.insert(0, R)
return returns
def _policy_gradient_update(self, states, actions, returns):
"""
策略梯度更新
"""
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
returns = torch.FloatTensor(returns)
# 标准化回报
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算策略损失
probs = self.policy(states)
dist = torch.distributions.Categorical(probs)
log_probs = dist.log_prob(actions)
loss = -(log_probs * returns).mean()
# 更新
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
class PolicyNetwork(nn.Module):
"""
策略网络
"""
def __init__(self, input_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_size)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = torch.softmax(self.fc3(x), dim=-1)
return x
3. 代码生成
3.1 CUDA代码生成
class CUDACodeGenerator:
"""
CUDA代码生成器
"""
def __init__(self):
self.code = []
self.indent_level = 0
def emit(self, line):
self.code.append(" " * self.indent_level + line)
def generate(self, schedule, compute):
"""
生成CUDA代码
"""
# 生成kernel签名
self._generate_kernel_signature(compute)
# 生成变量声明
self._generate_declarations(schedule, compute)
# 生成线程索引计算
self._generate_thread_index(schedule)
# 生成主计算循环
self._generate_compute_loop(schedule, compute)
return "\n".join(self.code)
def _generate_kernel_signature(self, compute):
"""
生成kernel函数签名
"""
params = []
for input_tensor in compute.inputs:
params.append(f"float* __restrict__ {input_tensor.name}")
params.append(f"float* __restrict__ {compute.output.name}")
self.emit(f"__global__ void kernel({', '.join(params)}) {{")
self.indent_level += 1
def _generate_thread_index(self, schedule):
"""
生成线程索引计算代码
"""
# 根据绑定的轴生成索引
for axis, thread_type in schedule.bindings.items():
if thread_type == 'blockIdx.x':
self.emit(f"int {axis} = blockIdx.x;")
elif thread_type == 'threadIdx.x':
self.emit(f"int {axis} = threadIdx.x;")
elif thread_type == 'blockIdx.y':
self.emit(f"int {axis} = blockIdx.y;")
elif thread_type == 'threadIdx.y':
self.emit(f"int {axis} = threadIdx.y;")
def _generate_compute_loop(self, schedule, compute):
"""
生成计算循环
"""
# 根据调度生成循环嵌套
for loop in schedule.loops:
if loop.parallel_type:
# 并行循环已经通过thread index处理
continue
if loop.unroll_factor:
self.emit(f"#pragma unroll {loop.unroll_factor}")
if loop.vectorize:
self._generate_vectorized_loop(loop, compute)
else:
self._generate_scalar_loop(loop, compute)
def _generate_scalar_loop(self, loop, compute):
"""
生成标量循环
"""
self.emit(f"for (int {loop.var} = {loop.start}; {loop.var} < {loop.end}; {loop.var} += {loop.step}) {{")
self.indent_level += 1
# 生成循环体
self._generate_loop_body(compute)
self.indent_level -= 1
self.emit("}")
def _generate_vectorized_loop(self, loop, compute):
"""
生成向量化循环
"""
vec_width = 4 # float4
self.emit(f"for (int {loop.var} = {loop.start}; {loop.var} < {loop.end}; {loop.var} += {vec_width}) {{")
self.indent_level += 1
# 使用float4加载和存储
self.emit(f"float4 val = *reinterpret_cast<float4*>(&input[{loop.var}]);")
self.emit(f"val.x = compute(val.x);")
self.emit(f"val.y = compute(val.y);")
self.emit(f"val.z = compute(val.z);")
self.emit(f"val.w = compute(val.w);")
self.emit(f"*reinterpret_cast<float4*>(&output[{loop.var}]) = val;")
self.indent_level -= 1
self.emit("}")
# 生成的CUDA代码示例
GENERATED_CUDA_EXAMPLE = """
__global__ void matmul_kernel(
float* __restrict__ A,
float* __restrict__ B,
float* __restrict__ C,
int M, int N, int K
) {
// 线程和块索引
int bx = blockIdx.x;
int by = blockIdx.y;
int tx = threadIdx.x;
int ty = threadIdx.y;
// 共享内存
__shared__ float As[BLOCK_SIZE][BLOCK_SIZE];
__shared__ float Bs[BLOCK_SIZE][BLOCK_SIZE];
// 全局索引
int row = by * BLOCK_SIZE + ty;
int col = bx * BLOCK_SIZE + tx;
float sum = 0.0f;
// 分块计算
for (int k = 0; k < (K + BLOCK_SIZE - 1) / BLOCK_SIZE; k++) {
// 协作加载到共享内存
if (row < M && k * BLOCK_SIZE + tx < K)
As[ty][tx] = A[row * K + k * BLOCK_SIZE + tx];
else
As[ty][tx] = 0.0f;
if (col < N && k * BLOCK_SIZE + ty < K)
Bs[ty][tx] = B[(k * BLOCK_SIZE + ty) * N + col];
else
Bs[ty][tx] = 0.0f;
__syncthreads();
// 计算
#pragma unroll
for (int i = 0; i < BLOCK_SIZE; i++) {
sum += As[ty][i] * Bs[i][tx];
}
__syncthreads();
}
// 写回
if (row < M && col < N) {
C[row * N + col] = sum;
}
}
"""
3.2 LLVM IR代码生成
class LLVMCodeGenerator:
"""
LLVM IR代码生成器
用于CPU后端
"""
def __init__(self):
import llvmlite.ir as ir
self.module = ir.Module(name="generated")
self.builder = None
def generate(self, schedule, compute):
"""
生成LLVM IR
"""
import llvmlite.ir as ir
# 创建函数类型
func_type = self._create_function_type(compute)
func = ir.Function(self.module, func_type, name="kernel")
# 创建入口块
entry_block = func.append_basic_block(name="entry")
self.builder = ir.IRBuilder(entry_block)
# 生成计算代码
self._generate_compute(schedule, compute, func.args)
# 返回
self.builder.ret_void()
return str(self.module)
def _create_function_type(self, compute):
import llvmlite.ir as ir
# 输入和输出指针
ptr_type = ir.PointerType(ir.FloatType())
arg_types = [ptr_type] * (len(compute.inputs) + 1) # inputs + output
# 维度参数
arg_types.extend([ir.IntType(64)] * len(compute.shape))
return ir.FunctionType(ir.VoidType(), arg_types)
def _generate_compute(self, schedule, compute, args):
"""
生成计算代码
"""
import llvmlite.ir as ir
# 解析参数
input_ptrs = args[:len(compute.inputs)]
output_ptr = args[len(compute.inputs)]
dims = args[len(compute.inputs) + 1:]
# 生成循环
for loop in schedule.loops:
self._generate_loop(loop, compute, input_ptrs, output_ptr, dims)
def _generate_loop(self, loop, compute, inputs, output, dims):
"""
生成循环
"""
import llvmlite.ir as ir
# 创建循环块
loop_cond = self.builder.append_basic_block(name=f"{loop.var}_cond")
loop_body = self.builder.append_basic_block(name=f"{loop.var}_body")
loop_end = self.builder.append_basic_block(name=f"{loop.var}_end")
# 初始化循环变量
loop_var = self.builder.alloca(ir.IntType(64), name=loop.var)
self.builder.store(ir.Constant(ir.IntType(64), loop.start), loop_var)
self.builder.branch(loop_cond)
# 条件检查
self.builder.position_at_end(loop_cond)
current = self.builder.load(loop_var)
end_val = ir.Constant(ir.IntType(64), loop.end)
cond = self.builder.icmp_signed('<', current, end_val)
self.builder.cbranch(cond, loop_body, loop_end)
# 循环体
self.builder.position_at_end(loop_body)
# 如果是最内层循环,生成计算
if loop.is_innermost:
self._generate_elementwise_compute(compute, inputs, output, current)
# 增量
incremented = self.builder.add(current, ir.Constant(ir.IntType(64), loop.step))
self.builder.store(incremented, loop_var)
self.builder.branch(loop_cond)
# 循环结束
self.builder.position_at_end(loop_end)
def _generate_elementwise_compute(self, compute, inputs, output, index):
"""
生成逐元素计算
"""
# 加载输入
input_vals = []
for inp in inputs:
ptr = self.builder.gep(inp, [index])
val = self.builder.load(ptr)
input_vals.append(val)
# 计算
result = self._emit_expression(compute.expression, input_vals)
# 存储输出
out_ptr = self.builder.gep(output, [index])
self.builder.store(result, out_ptr)
def _emit_expression(self, expr, inputs):
"""
发射表达式计算
"""
if expr.op == 'add':
return self.builder.fadd(inputs[0], inputs[1])
elif expr.op == 'mul':
return self.builder.fmul(inputs[0], inputs[1])
elif expr.op == 'sin':
# 调用llvm.sin内联函数
import llvmlite.ir as ir
sin_func = self.module.declare_intrinsic('llvm.sin', [ir.FloatType()])
return self.builder.call(sin_func, [inputs[0]])
# ... 更多操作
3.3 Triton代码生成
class TritonCodeGenerator:
"""
Triton代码生成器
"""
def __init__(self):
self.code = []
self.indent = 0
def emit(self, line):
self.code.append(" " * self.indent + line)
def generate(self, schedule, compute):
"""
生成Triton kernel
"""
# 导入
self.emit("import triton")
self.emit("import triton.language as tl")
self.emit("")
# 生成kernel
self._generate_kernel_decorator(schedule)
self._generate_kernel_function(schedule, compute)
return "\n".join(self.code)
def _generate_kernel_decorator(self, schedule):
"""
生成@triton.jit装饰器
"""
# 如果有自动调优配置
if schedule.autotune_configs:
self.emit("@triton.autotune(")
self.indent += 1
self.emit("configs=[")
self.indent += 1
for config in schedule.autotune_configs:
self.emit(f"triton.Config({config}),")
self.indent -= 1
self.emit("],")
self.emit(f"key={schedule.autotune_key},")
self.indent -= 1
self.emit(")")
self.emit("@triton.jit")
def _generate_kernel_function(self, schedule, compute):
"""
生成kernel函数
"""
# 函数签名
params = []
for inp in compute.inputs:
params.append(f"{inp.name}_ptr")
params.append(f"output_ptr")
# 添加constexpr参数
for const in schedule.constexpr_params:
params.append(f"{const}: tl.constexpr")
self.emit(f"def kernel({', '.join(params)}):")
self.indent += 1
# 程序ID
self.emit("pid = tl.program_id(0)")
# 计算偏移
self._generate_offsets(schedule)
# 加载数据
self._generate_loads(compute)
# 计算
self._generate_compute(compute)
# 存储结果
self._generate_stores(compute)
self.indent -= 1
def _generate_offsets(self, schedule):
"""
生成偏移计算
"""
block_size = schedule.block_size
self.emit(f"offset = pid * {block_size} + tl.arange(0, {block_size})")
self.emit(f"mask = offset < numel")
def _generate_loads(self, compute):
"""
生成数据加载
"""
for inp in compute.inputs:
self.emit(f"{inp.name} = tl.load({inp.name}_ptr + offset, mask=mask)")
def _generate_compute(self, compute):
"""
生成计算代码
"""
# 根据计算表达式生成代码
expr = compute.expression
result = self._expr_to_triton(expr)
self.emit(f"result = {result}")
def _expr_to_triton(self, expr):
"""
将表达式转换为Triton代码
"""
if expr.op == 'add':
return f"({self._expr_to_triton(expr.args[0])} + {self._expr_to_triton(expr.args[1])})"
elif expr.op == 'mul':
return f"({self._expr_to_triton(expr.args[0])} * {self._expr_to_triton(expr.args[1])})"
elif expr.op == 'sin':
return f"tl.sin({self._expr_to_triton(expr.args[0])})"
elif expr.op == 'var':
return expr.name
# ... 更多操作
def _generate_stores(self, compute):
"""
生成数据存储
"""
self.emit(f"tl.store(output_ptr + offset, result, mask=mask)")
4. 面试高频问题
Q1: 什么是自动调度?与手工调度有什么区别?
答案要点:
- 自动调度: 自动搜索最优的循环变换和硬件映射
- 手工调度: 需要专家手动指定调度策略
- 区别:
- 自动调度减少人工成本
- 可能找到人难以想到的优化
- 搜索时间较长
- 手工调度可以利用领域知识
Q2: 常用的自动调度搜索算法有哪些?
答案要点:
- 随机搜索: 简单但效率低
- 模拟退火: 有一定全局搜索能力
- 遗传算法: 适合大搜索空间
- 贝叶斯优化: 利用代价模型指导搜索
- 强化学习: 可以学习策略
Q3: 代价模型有什么作用?如何训练?
答案要点:
- 作用: 预测配置的执行时间,减少实际测量次数
- 训练数据: 实际测量的(配置, 时间)对
- 模型选择: XGBoost, 神经网络等
- 特征工程: 配置参数编码为特征向量
Q4: CUDA代码生成需要考虑哪些因素?
答案要点:
- 线程层次: blockIdx, threadIdx映射
- 内存层次: 全局内存、共享内存、寄存器
- 同步: __syncthreads()位置
- 向量化: float4等向量类型
- 循环展开: #pragma unroll
Q5: Triton相比直接写CUDA有什么优势?
答案要点:
- 更高抽象: Python语法,更易编写
- 自动优化: 自动处理内存访问、同步
- 可移植性: 支持多种硬件后端
- 快速迭代: 编译速度快
- JIT编译: 可以根据输入动态生成
5. 学习资源
官方文档
经典论文
- "Learning to Optimize Halide with Tree Search and Random Programs"
- "Ansor: Generating High-Performance Tensor Programs for Deep Learning"
- "Value Learning for Throughput Optimization of Deep Learning Workloads"