HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

Kubeflow Pipelines 深度实践

概述

Kubeflow Pipelines(KFP)是 Kubernetes 上最流行的 ML 工作流引擎,提供端到端的机器学习流水线编排能力。本文深入探讨 KFP 的架构、组件开发及最佳实践。

KFP 架构

整体架构

┌─────────────────────────────────────────────────────────────────┐
│                   Kubeflow Pipelines 架构                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  用户接口                                                        │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐ │  │
│  │  │  Web UI  │  │  Python  │  │   CLI    │  │  REST    │ │  │
│  │  │          │  │   SDK    │  │  (kfp)   │  │   API    │ │  │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘ │  │
│  └───────┼─────────────┼─────────────┼─────────────┼────────┘  │
│          └─────────────┴──────┬──────┴─────────────┘           │
│                               │                                 │
│  ════════════════════════════╪════════════════════════════════ │
│                               │                                 │
│  KFP 服务层                    │                                 │
│  ┌────────────────────────────┴─────────────────────────────┐  │
│  │                                                          │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │                   KFP API Server                    │ │  │
│  │  │  • Pipeline CRUD                                    │ │  │
│  │  │  • Run 管理                                         │ │  │
│  │  │  • Experiment 管理                                  │ │  │
│  │  │  • Artifact 管理                                    │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                          │  │
│  │  ┌───────────────────┐  ┌───────────────────────────┐   │  │
│  │  │   Persistence     │  │      Visualization       │   │  │
│  │  │   Agent           │  │      Server              │   │  │
│  │  │                   │  │                          │   │  │
│  │  │ • 状态持久化      │  │ • 可视化渲染             │   │  │
│  │  │ • 事件记录        │  │ • 指标图表               │   │  │
│  │  └───────────────────┘  └───────────────────────────┘   │  │
│  │                                                          │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │                 Scheduled Workflow                  │ │  │
│  │  │                   Controller                        │ │  │
│  │  │  • 定时触发                                         │ │  │
│  │  │  • Cron 调度                                        │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                               │                                 │
│  ════════════════════════════╪════════════════════════════════ │
│                               │                                 │
│  执行层 (Argo Workflows)       │                                 │
│  ┌────────────────────────────┴─────────────────────────────┐  │
│  │                                                          │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │              Argo Workflow Controller               │ │  │
│  │  │  • DAG 解析执行                                      │ │  │
│  │  │  • Pod 调度                                          │ │  │
│  │  │  • 状态管理                                          │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                          │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐    │  │
│  │  │ Step 1  │  │ Step 2  │  │ Step 3  │  │ Step N  │    │  │
│  │  │  Pod    │  │  Pod    │  │  Pod    │  │  Pod    │    │  │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘    │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                               │                                 │
│  ════════════════════════════╪════════════════════════════════ │
│                               │                                 │
│  存储层                        │                                 │
│  ┌────────────────────────────┴─────────────────────────────┐  │
│  │                                                          │  │
│  │  ┌───────────────┐  ┌───────────────┐  ┌──────────────┐ │  │
│  │  │    MySQL      │  │    MinIO      │  │   ML        │ │  │
│  │  │   Metadata    │  │   Artifacts   │  │  Metadata   │ │  │
│  │  │               │  │               │  │   (MLMD)    │ │  │
│  │  │ • Pipeline    │  │ • 模型文件    │  │ • 血缘追溯  │ │  │
│  │  │ • Run 记录    │  │ • 数据集      │  │ • 版本管理  │ │  │
│  │  │ • 参数配置    │  │ • 日志文件    │  │ • 指标记录  │ │  │
│  │  └───────────────┘  └───────────────┘  └──────────────┘ │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

KFP v2 SDK 架构

┌─────────────────────────────────────────────────────────────────┐
│                     KFP v2 SDK 架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Python 代码                                                     │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                                                          │  │
│  │  @dsl.component                                          │  │
│  │  def preprocess(data: Input[Dataset]) -> Output[Dataset]:│  │
│  │      ...                                                 │  │
│  │                                                          │  │
│  │  @dsl.pipeline                                           │  │
│  │  def ml_pipeline():                                      │  │
│  │      preprocess_task = preprocess(...)                   │  │
│  │      train_task = train(preprocess_task.output)          │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                               │                                 │
│                               ▼                                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                     KFP Compiler                         │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │  • 类型检查                                          │ │  │
│  │  │  • DAG 构建                                          │ │  │
│  │  │  • IR 生成                                           │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                               │                                 │
│                               ▼                                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Pipeline IR (YAML/JSON)                     │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │  pipelineSpec:                                      │ │  │
│  │  │    components:                                      │ │  │
│  │  │      - name: preprocess                             │ │  │
│  │  │        executorLabel: exec-preprocess               │ │  │
│  │  │    deploymentSpec:                                  │ │  │
│  │  │      executors:                                     │ │  │
│  │  │        exec-preprocess:                             │ │  │
│  │  │          container:                                 │ │  │
│  │  │            image: python:3.9                        │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                               │                                 │
│                               ▼                                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                   Argo Workflow                          │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │  apiVersion: argoproj.io/v1alpha1                   │ │  │
│  │  │  kind: Workflow                                     │ │  │
│  │  │  spec:                                              │ │  │
│  │  │    templates:                                       │ │  │
│  │  │      - name: preprocess                             │ │  │
│  │  │        container: ...                               │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

组件开发

基础组件

"""
KFP v2 组件开发
"""
from kfp import dsl
from kfp.dsl import (
    component,
    pipeline,
    Input,
    Output,
    Dataset,
    Model,
    Metrics,
    Artifact,
    InputPath,
    OutputPath,
)
from typing import NamedTuple


# 1. 轻量级 Python 组件
@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn"]
)
def preprocess_data(
    input_path: str,
    output_dataset: Output[Dataset],
    test_size: float = 0.2
):
    """数据预处理组件"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    # 读取数据
    df = pd.read_csv(input_path)

    # 数据清洗
    df = df.dropna()

    # 划分数据集
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

    # 保存处理后的数据
    train_df.to_csv(f"{output_dataset.path}_train.csv", index=False)
    test_df.to_csv(f"{output_dataset.path}_test.csv", index=False)

    # 设置元数据
    output_dataset.metadata["train_samples"] = len(train_df)
    output_dataset.metadata["test_samples"] = len(test_df)


# 2. 带有多输出的组件
@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn"]
)
def train_model(
    train_data: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    learning_rate: float = 0.01,
    n_estimators: int = 100
) -> NamedTuple("TrainOutput", [("accuracy", float), ("f1_score", float)]):
    """模型训练组件"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score
    import pickle

    # 加载数据
    train_df = pd.read_csv(f"{train_data.path}_train.csv")

    X = train_df.drop("target", axis=1)
    y = train_df["target"]

    # 训练模型
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        random_state=42
    )
    model.fit(X, y)

    # 评估
    predictions = model.predict(X)
    acc = accuracy_score(y, predictions)
    f1 = f1_score(y, predictions, average="weighted")

    # 保存模型
    with open(model_output.path, "wb") as f:
        pickle.dump(model, f)

    # 记录指标
    metrics_output.log_metric("accuracy", acc)
    metrics_output.log_metric("f1_score", f1)
    metrics_output.log_metric("n_estimators", n_estimators)

    # 设置模型元数据
    model_output.metadata["framework"] = "sklearn"
    model_output.metadata["model_type"] = "RandomForestClassifier"

    from collections import namedtuple
    output = namedtuple("TrainOutput", ["accuracy", "f1_score"])
    return output(acc, f1)


# 3. 使用自定义容器镜像
@component(
    base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime"
)
def train_pytorch_model(
    train_data: Input[Dataset],
    model_output: Output[Model],
    epochs: int = 10,
    batch_size: int = 32,
    learning_rate: float = 0.001
):
    """PyTorch 模型训练"""
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import DataLoader, TensorDataset
    import pandas as pd
    import numpy as np

    # 加载数据
    df = pd.read_csv(f"{train_data.path}_train.csv")
    X = torch.tensor(df.drop("target", axis=1).values, dtype=torch.float32)
    y = torch.tensor(df["target"].values, dtype=torch.long)

    dataset = TensorDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 定义模型
    model = nn.Sequential(
        nn.Linear(X.shape[1], 128),
        nn.ReLU(),
        nn.Linear(128, 64),
        nn.ReLU(),
        nn.Linear(64, len(y.unique()))
    )

    if torch.cuda.is_available():
        model = model.cuda()
        X = X.cuda()
        y = y.cuda()

    # 训练
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        for batch_X, batch_y in dataloader:
            if torch.cuda.is_available():
                batch_X, batch_y = batch_X.cuda(), batch_y.cuda()

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

    # 保存模型
    torch.save(model.state_dict(), model_output.path)
    model_output.metadata["framework"] = "pytorch"
    model_output.metadata["epochs"] = epochs


# 4. 容器组件 (适用于复杂依赖)
from kfp.dsl import ContainerSpec

distributed_training_op = dsl.ContainerSpec(
    image="my-registry/distributed-training:v1.0",
    command=["python", "train_distributed.py"],
    args=[
        "--world-size", dsl.PipelineParameterArgument("world_size"),
        "--epochs", dsl.PipelineParameterArgument("epochs"),
        "--model-path", dsl.OutputPathArgument("model")
    ]
)


# 5. 条件组件
@component
def evaluate_model(
    model: Input[Model],
    test_data: Input[Dataset],
    metrics: Output[Metrics]
) -> float:
    """模型评估"""
    import pickle
    import pandas as pd
    from sklearn.metrics import accuracy_score

    # 加载模型
    with open(model.path, "rb") as f:
        clf = pickle.load(f)

    # 加载测试数据
    test_df = pd.read_csv(f"{test_data.path}_test.csv")
    X_test = test_df.drop("target", axis=1)
    y_test = test_df["target"]

    # 预测
    predictions = clf.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)

    metrics.log_metric("test_accuracy", accuracy)
    return accuracy


@component
def deploy_model(
    model: Input[Model],
    endpoint_name: str,
    min_replicas: int = 1
):
    """模型部署"""
    import subprocess

    # 使用 kubectl 部署
    deploy_yaml = f"""
apiVersion: serving.kubeflow.org/v1beta1
kind: InferenceService
metadata:
  name: {endpoint_name}
spec:
  predictor:
    sklearn:
      storageUri: {model.uri}
      resources:
        requests:
          cpu: 100m
          memory: 256Mi
    minReplicas: {min_replicas}
"""
    with open("/tmp/deploy.yaml", "w") as f:
        f.write(deploy_yaml)

    subprocess.run(["kubectl", "apply", "-f", "/tmp/deploy.yaml"], check=True)


# 6. 带重试的组件
@component(
    base_image="python:3.9",
    packages_to_install=["requests"]
)
def fetch_data_with_retry(
    url: str,
    output_data: Output[Dataset],
    max_retries: int = 3
):
    """带重试的数据获取"""
    import requests
    import time

    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=30)
            response.raise_for_status()

            with open(output_data.path, "w") as f:
                f.write(response.text)

            output_data.metadata["source_url"] = url
            output_data.metadata["size_bytes"] = len(response.text)
            return

        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数退避
            else:
                raise RuntimeError(f"Failed after {max_retries} attempts: {e}")

Pipeline 定义

"""
KFP Pipeline 定义
"""
from kfp import dsl
from kfp import compiler
from kfp.client import Client


@dsl.pipeline(
    name="ml-training-pipeline",
    description="End-to-end ML training pipeline"
)
def ml_pipeline(
    data_path: str = "gs://bucket/data.csv",
    learning_rate: float = 0.01,
    epochs: int = 10,
    deploy_threshold: float = 0.9
):
    """
    ML 训练流水线

    Args:
        data_path: 输入数据路径
        learning_rate: 学习率
        epochs: 训练轮数
        deploy_threshold: 部署阈值
    """

    # 1. 数据预处理
    preprocess_task = preprocess_data(
        input_path=data_path,
        test_size=0.2
    )
    preprocess_task.set_display_name("Data Preprocessing")
    preprocess_task.set_cpu_limit("2")
    preprocess_task.set_memory_limit("4Gi")

    # 2. 模型训练 (使用 GPU)
    train_task = train_pytorch_model(
        train_data=preprocess_task.outputs["output_dataset"],
        epochs=epochs,
        learning_rate=learning_rate
    )
    train_task.set_display_name("Model Training")
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")
    train_task.set_accelerator_type("nvidia.com/gpu")

    # 3. 模型评估
    evaluate_task = evaluate_model(
        model=train_task.outputs["model_output"],
        test_data=preprocess_task.outputs["output_dataset"]
    )
    evaluate_task.set_display_name("Model Evaluation")

    # 4. 条件部署
    with dsl.Condition(
        evaluate_task.output >= deploy_threshold,
        name="deploy-condition"
    ):
        deploy_task = deploy_model(
            model=train_task.outputs["model_output"],
            endpoint_name="ml-model-endpoint"
        )
        deploy_task.set_display_name("Deploy Model")


# 并行训练多个模型
@dsl.pipeline(
    name="hyperparameter-search-pipeline",
    description="Hyperparameter search with parallel training"
)
def hyperparameter_search_pipeline(
    data_path: str,
    learning_rates: list = [0.001, 0.01, 0.1],
    n_estimators_list: list = [50, 100, 200]
):
    """超参数搜索流水线"""

    # 数据预处理
    preprocess_task = preprocess_data(input_path=data_path)

    # 并行训练
    with dsl.ParallelFor(
        items=learning_rates,
        parallelism=3
    ) as lr:
        with dsl.ParallelFor(
            items=n_estimators_list,
            parallelism=3
        ) as n_est:
            train_task = train_model(
                train_data=preprocess_task.outputs["output_dataset"],
                learning_rate=lr,
                n_estimators=n_est
            )

    # 注意: ParallelFor 的输出聚合需要额外处理


# 带退出处理的 Pipeline
@dsl.pipeline(name="pipeline-with-exit-handler")
def pipeline_with_exit_handler(data_path: str):
    """带退出处理的流水线"""

    # 定义退出处理任务
    @component
    def cleanup_task(status: str):
        """清理任务"""
        print(f"Pipeline finished with status: {status}")
        # 清理临时资源
        # 发送通知
        import requests
        requests.post(
            "https://hooks.slack.com/...",
            json={"text": f"Pipeline completed: {status}"}
        )

    # 主流水线
    preprocess = preprocess_data(input_path=data_path)
    train = train_model(train_data=preprocess.outputs["output_dataset"])

    # 退出处理
    with dsl.ExitHandler(
        exit_task=cleanup_task(status="{{workflow.status}}")
    ):
        # 主流水线任务已定义
        pass


# 编译和提交
def compile_and_submit():
    """编译并提交流水线"""

    # 编译为 YAML
    compiler.Compiler().compile(
        pipeline_func=ml_pipeline,
        package_path="ml_pipeline.yaml"
    )

    # 提交到 KFP
    client = Client(host="http://localhost:8080")

    # 创建实验
    experiment = client.create_experiment(
        name="ml-experiments",
        description="ML training experiments"
    )

    # 创建运行
    run = client.create_run_from_pipeline_func(
        ml_pipeline,
        experiment_name="ml-experiments",
        run_name="ml-pipeline-run-001",
        arguments={
            "data_path": "gs://my-bucket/data.csv",
            "learning_rate": 0.01,
            "epochs": 20
        }
    )

    print(f"Run created: {run.run_id}")

    # 等待完成
    client.wait_for_run_completion(run.run_id, timeout=3600)


if __name__ == "__main__":
    compile_and_submit()

高级特性

缓存与复用

"""
KFP 缓存与组件复用
"""
from kfp import dsl
from kfp.dsl import component


# 启用缓存的组件
@component(
    base_image="python:3.9",
    packages_to_install=["pandas"]
)
def cacheable_preprocess(
    input_path: str,
    output_path: dsl.OutputPath(str)
) -> str:
    """可缓存的预处理组件"""
    import pandas as pd
    import hashlib

    df = pd.read_csv(input_path)
    df_processed = df.dropna()
    df_processed.to_csv(output_path, index=False)

    # 返回数据指纹用于缓存验证
    return hashlib.md5(df_processed.to_json().encode()).hexdigest()


@dsl.pipeline(name="cached-pipeline")
def cached_pipeline(data_path: str):
    """使用缓存的流水线"""

    preprocess_task = cacheable_preprocess(input_path=data_path)

    # 设置缓存策略
    preprocess_task.set_caching_options(enable_caching=True)

    # 也可以在 Pipeline 级别禁用缓存
    # 通过运行参数 enable_caching=False


# 组件复用 - 创建可复用组件库
class MLComponents:
    """ML 组件库"""

    @staticmethod
    @component(
        base_image="python:3.9",
        packages_to_install=["pandas", "scikit-learn"]
    )
    def standard_scaler(
        input_data: dsl.Input[dsl.Dataset],
        output_data: dsl.Output[dsl.Dataset],
        scaler_artifact: dsl.Output[dsl.Artifact]
    ):
        """标准化组件"""
        import pandas as pd
        from sklearn.preprocessing import StandardScaler
        import pickle

        df = pd.read_csv(input_data.path)
        scaler = StandardScaler()

        numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
        df[numeric_cols] = scaler.fit_transform(df[numeric_cols])

        df.to_csv(output_data.path, index=False)

        with open(scaler_artifact.path, 'wb') as f:
            pickle.dump(scaler, f)

    @staticmethod
    @component(
        base_image="python:3.9",
        packages_to_install=["pandas", "scikit-learn"]
    )
    def feature_selection(
        input_data: dsl.Input[dsl.Dataset],
        output_data: dsl.Output[dsl.Dataset],
        n_features: int = 10,
        method: str = "mutual_info"
    ):
        """特征选择组件"""
        import pandas as pd
        from sklearn.feature_selection import (
            SelectKBest,
            mutual_info_classif,
            f_classif
        )

        df = pd.read_csv(input_data.path)
        X = df.drop("target", axis=1)
        y = df["target"]

        selector_map = {
            "mutual_info": mutual_info_classif,
            "f_classif": f_classif
        }

        selector = SelectKBest(
            score_func=selector_map[method],
            k=n_features
        )
        X_selected = selector.fit_transform(X, y)

        selected_features = X.columns[selector.get_support()].tolist()
        result_df = pd.DataFrame(X_selected, columns=selected_features)
        result_df["target"] = y.values

        result_df.to_csv(output_data.path, index=False)

    @staticmethod
    @component(
        base_image="python:3.9",
        packages_to_install=["pandas", "scikit-learn", "imbalanced-learn"]
    )
    def handle_imbalance(
        input_data: dsl.Input[dsl.Dataset],
        output_data: dsl.Output[dsl.Dataset],
        method: str = "smote",
        sampling_strategy: str = "auto"
    ):
        """处理类别不平衡"""
        import pandas as pd
        from imblearn.over_sampling import SMOTE, RandomOverSampler
        from imblearn.under_sampling import RandomUnderSampler

        df = pd.read_csv(input_data.path)
        X = df.drop("target", axis=1)
        y = df["target"]

        sampler_map = {
            "smote": SMOTE(sampling_strategy=sampling_strategy),
            "oversample": RandomOverSampler(sampling_strategy=sampling_strategy),
            "undersample": RandomUnderSampler(sampling_strategy=sampling_strategy)
        }

        sampler = sampler_map[method]
        X_resampled, y_resampled = sampler.fit_resample(X, y)

        result_df = pd.DataFrame(X_resampled, columns=X.columns)
        result_df["target"] = y_resampled

        result_df.to_csv(output_data.path, index=False)


# 使用组件库构建流水线
@dsl.pipeline(name="feature-engineering-pipeline")
def feature_engineering_pipeline(data_path: str):
    """特征工程流水线"""

    # 数据加载
    load_task = preprocess_data(input_path=data_path)

    # 标准化
    scale_task = MLComponents.standard_scaler(
        input_data=load_task.outputs["output_dataset"]
    )

    # 处理不平衡
    balance_task = MLComponents.handle_imbalance(
        input_data=scale_task.outputs["output_data"],
        method="smote"
    )

    # 特征选择
    select_task = MLComponents.feature_selection(
        input_data=balance_task.outputs["output_data"],
        n_features=20
    )

    # 训练
    train_task = train_model(
        train_data=select_task.outputs["output_data"]
    )

分布式训练集成

"""
KFP 分布式训练集成
"""
from kfp import dsl
from kfp.dsl import component


# PyTorch 分布式训练组件
@component(
    base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
    packages_to_install=["kubernetes"]
)
def create_pytorch_distributed_job(
    train_data: dsl.Input[dsl.Dataset],
    model_output: dsl.Output[dsl.Model],
    image: str,
    num_workers: int = 2,
    gpus_per_worker: int = 1,
    epochs: int = 10
):
    """创建 PyTorchJob 分布式训练"""
    from kubernetes import client, config

    config.load_incluster_config()
    api = client.CustomObjectsApi()

    pytorchjob = {
        "apiVersion": "kubeflow.org/v1",
        "kind": "PyTorchJob",
        "metadata": {
            "name": f"pytorch-train-{dsl.PIPELINE_RUN_ID}"
        },
        "spec": {
            "pytorchReplicaSpecs": {
                "Master": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "spec": {
                            "containers": [{
                                "name": "pytorch",
                                "image": image,
                                "command": ["python", "train.py"],
                                "args": [
                                    "--data-path", train_data.path,
                                    "--epochs", str(epochs),
                                    "--output-path", model_output.path
                                ],
                                "resources": {
                                    "limits": {
                                        "nvidia.com/gpu": gpus_per_worker
                                    }
                                }
                            }]
                        }
                    }
                },
                "Worker": {
                    "replicas": num_workers - 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "spec": {
                            "containers": [{
                                "name": "pytorch",
                                "image": image,
                                "command": ["python", "train.py"],
                                "args": [
                                    "--data-path", train_data.path,
                                    "--epochs", str(epochs)
                                ],
                                "resources": {
                                    "limits": {
                                        "nvidia.com/gpu": gpus_per_worker
                                    }
                                }
                            }]
                        }
                    }
                }
            }
        }
    }

    # 创建 PyTorchJob
    api.create_namespaced_custom_object(
        group="kubeflow.org",
        version="v1",
        namespace="kubeflow",
        plural="pytorchjobs",
        body=pytorchjob
    )

    # 等待完成
    import time
    while True:
        job = api.get_namespaced_custom_object(
            group="kubeflow.org",
            version="v1",
            namespace="kubeflow",
            plural="pytorchjobs",
            name=f"pytorch-train-{dsl.PIPELINE_RUN_ID}"
        )

        status = job.get("status", {})
        if status.get("phase") == "Succeeded":
            break
        elif status.get("phase") == "Failed":
            raise RuntimeError("PyTorchJob failed")

        time.sleep(30)


# 使用 Katib 进行超参数优化
@component(
    base_image="python:3.9",
    packages_to_install=["kubernetes"]
)
def run_katib_experiment(
    experiment_name: str,
    objective_metric: str,
    objective_type: str = "maximize",
    max_trial_count: int = 10,
    parallel_trial_count: int = 3
) -> str:
    """运行 Katib 超参数优化实验"""
    from kubernetes import client, config

    config.load_incluster_config()
    api = client.CustomObjectsApi()

    experiment = {
        "apiVersion": "kubeflow.org/v1beta1",
        "kind": "Experiment",
        "metadata": {
            "name": experiment_name,
            "namespace": "kubeflow"
        },
        "spec": {
            "objective": {
                "type": objective_type,
                "goal": 0.99 if objective_type == "maximize" else 0.01,
                "objectiveMetricName": objective_metric
            },
            "algorithm": {
                "algorithmName": "bayesianoptimization"
            },
            "maxTrialCount": max_trial_count,
            "parallelTrialCount": parallel_trial_count,
            "parameters": [
                {
                    "name": "learning_rate",
                    "parameterType": "double",
                    "feasibleSpace": {
                        "min": "0.0001",
                        "max": "0.1"
                    }
                },
                {
                    "name": "batch_size",
                    "parameterType": "categorical",
                    "feasibleSpace": {
                        "list": ["16", "32", "64", "128"]
                    }
                },
                {
                    "name": "num_layers",
                    "parameterType": "int",
                    "feasibleSpace": {
                        "min": "2",
                        "max": "10"
                    }
                }
            ],
            "trialTemplate": {
                "primaryContainerName": "training-container",
                "trialParameters": [
                    {"name": "learningRate", "reference": "learning_rate"},
                    {"name": "batchSize", "reference": "batch_size"},
                    {"name": "numLayers", "reference": "num_layers"}
                ],
                "trialSpec": {
                    "apiVersion": "batch/v1",
                    "kind": "Job",
                    "spec": {
                        "template": {
                            "spec": {
                                "containers": [{
                                    "name": "training-container",
                                    "image": "pytorch/pytorch:2.0.0",
                                    "command": ["python", "train.py"],
                                    "args": [
                                        "--lr=${trialParameters.learningRate}",
                                        "--batch-size=${trialParameters.batchSize}",
                                        "--layers=${trialParameters.numLayers}"
                                    ]
                                }],
                                "restartPolicy": "Never"
                            }
                        }
                    }
                }
            }
        }
    }

    # 创建实验
    api.create_namespaced_custom_object(
        group="kubeflow.org",
        version="v1beta1",
        namespace="kubeflow",
        plural="experiments",
        body=experiment
    )

    # 等待完成并返回最佳参数
    import time
    while True:
        exp = api.get_namespaced_custom_object(
            group="kubeflow.org",
            version="v1beta1",
            namespace="kubeflow",
            plural="experiments",
            name=experiment_name
        )

        status = exp.get("status", {})
        if status.get("completionTime"):
            best_trial = status.get("currentOptimalTrial", {})
            return str(best_trial.get("parameterAssignments", {}))

        time.sleep(60)

部署配置

KFP 部署 YAML

# KFP 完整部署配置
---
# Namespace
apiVersion: v1
kind: Namespace
metadata:
  name: kubeflow

---
# KFP API Server Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-pipeline-api-server
  namespace: kubeflow
spec:
  replicas: 2
  selector:
    matchLabels:
      app: ml-pipeline-api-server
  template:
    metadata:
      labels:
        app: ml-pipeline-api-server
    spec:
      containers:
      - name: api-server
        image: gcr.io/ml-pipeline/api-server:2.0.0
        ports:
        - containerPort: 8888
        - containerPort: 8887
        env:
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: DBCONFIG_USER
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: username
        - name: DBCONFIG_PASSWORD
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: password
        - name: DBCONFIG_DBNAME
          value: mlpipeline
        - name: DBCONFIG_HOST
          value: mysql
        - name: DBCONFIG_PORT
          value: "3306"
        - name: OBJECTSTORECONFIG_BUCKETNAME
          value: mlpipeline
        - name: OBJECTSTORECONFIG_HOST
          value: minio-service
        - name: OBJECTSTORECONFIG_PORT
          value: "9000"
        resources:
          requests:
            cpu: 250m
            memory: 500Mi
          limits:
            cpu: 500m
            memory: 1Gi
        livenessProbe:
          httpGet:
            path: /apis/v1beta1/healthz
            port: 8888
          initialDelaySeconds: 30
          periodSeconds: 10

---
# KFP UI Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-pipeline-ui
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: ml-pipeline-ui
  template:
    metadata:
      labels:
        app: ml-pipeline-ui
    spec:
      containers:
      - name: ui
        image: gcr.io/ml-pipeline/frontend:2.0.0
        ports:
        - containerPort: 3000
        env:
        - name: VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH
          value: /etc/config/viewer-pod-template.json
        - name: MINIO_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MINIO_HOST
          value: minio-service
        - name: MINIO_PORT
          value: "9000"
        resources:
          requests:
            cpu: 100m
            memory: 256Mi

---
# 持久化 Agent
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-pipeline-persistenceagent
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: ml-pipeline-persistenceagent
  template:
    metadata:
      labels:
        app: ml-pipeline-persistenceagent
    spec:
      containers:
      - name: persistenceagent
        image: gcr.io/ml-pipeline/persistenceagent:2.0.0
        env:
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: TTL_SECONDS_AFTER_WORKFLOW_FINISH
          value: "86400"
        resources:
          requests:
            cpu: 100m
            memory: 256Mi

---
# 调度工作流控制器
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-pipeline-scheduledworkflow
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: ml-pipeline-scheduledworkflow
  template:
    metadata:
      labels:
        app: ml-pipeline-scheduledworkflow
    spec:
      containers:
      - name: scheduledworkflow
        image: gcr.io/ml-pipeline/scheduledworkflow:2.0.0
        env:
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        resources:
          requests:
            cpu: 50m
            memory: 128Mi

---
# MySQL 部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - name: mysql
        image: mysql:8.0
        env:
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: password
        - name: MYSQL_DATABASE
          value: mlpipeline
        ports:
        - containerPort: 3306
        volumeMounts:
        - name: mysql-data
          mountPath: /var/lib/mysql
        resources:
          requests:
            cpu: 100m
            memory: 512Mi
      volumes:
      - name: mysql-data
        persistentVolumeClaim:
          claimName: mysql-pvc

---
# MinIO 部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: minio
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
      - name: minio
        image: minio/minio:RELEASE.2023-09-04T19-57-37Z
        args:
        - server
        - /data
        - --console-address
        - ":9001"
        env:
        - name: MINIO_ROOT_USER
          valueFrom:
            secretKeyRef:
              name: minio-secret
              key: accesskey
        - name: MINIO_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: minio-secret
              key: secretkey
        ports:
        - containerPort: 9000
        - containerPort: 9001
        volumeMounts:
        - name: minio-data
          mountPath: /data
        resources:
          requests:
            cpu: 100m
            memory: 512Mi
      volumes:
      - name: minio-data
        persistentVolumeClaim:
          claimName: minio-pvc

---
# Service 定义
apiVersion: v1
kind: Service
metadata:
  name: ml-pipeline-api-server
  namespace: kubeflow
spec:
  selector:
    app: ml-pipeline-api-server
  ports:
  - name: http
    port: 8888
  - name: grpc
    port: 8887

---
apiVersion: v1
kind: Service
metadata:
  name: ml-pipeline-ui
  namespace: kubeflow
spec:
  selector:
    app: ml-pipeline-ui
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

---
apiVersion: v1
kind: Service
metadata:
  name: minio-service
  namespace: kubeflow
spec:
  selector:
    app: minio
  ports:
  - name: api
    port: 9000
  - name: console
    port: 9001

最佳实践

组件设计原则

# KFP 组件设计最佳实践
component_design:
  # 1. 输入输出
  io_design:
    - name: "使用强类型"
      description: "使用 Input/Output 类型注解"
      good: "data: Input[Dataset]"
      bad: "data: str"

    - name: "小数据用参数"
      description: "简单值作为参数传递"
      limit: "< 1MB"

    - name: "大数据用 Artifact"
      description: "文件通过 Artifact 传递"
      types: [Dataset, Model, Metrics]

  # 2. 容器镜像
  image_design:
    - name: "使用固定版本"
      good: "pytorch/pytorch:2.0.0-cuda11.7"
      bad: "pytorch/pytorch:latest"

    - name: "最小化镜像"
      description: "只包含必要依赖"

    - name: "使用基础镜像"
      description: "构建组织内部基础镜像"

  # 3. 错误处理
  error_handling:
    - name: "设置重试策略"
      config:
        max_retries: 3
        backoff_duration: "10s"

    - name: "记录详细日志"
      description: "使用结构化日志"

    - name: "返回有意义的错误"
      description: "错误信息包含上下文"

  # 4. 资源管理
  resource_management:
    - name: "明确资源需求"
      fields: [cpu_request, memory_request, gpu_limit]

    - name: "设置超时"
      description: "防止任务无限运行"

    - name: "清理临时文件"
      description: "任务结束时清理"

---
# Pipeline 设计原则
pipeline_design:
  # 1. 模块化
  modularity:
    - 将大 Pipeline 拆分为子 Pipeline
    - 使用条件分支减少不必要执行
    - 复用通用组件

  # 2. 可观测性
  observability:
    - 设置有意义的 display_name
    - 记录关键指标到 Metrics
    - 使用 Artifact 追踪数据血缘

  # 3. 可维护性
  maintainability:
    - 参数化所有可变配置
    - 使用版本控制
    - 编写组件测试

总结

Kubeflow Pipelines 是构建生产级 ML 流水线的强大工具:

  1. 架构特点:基于 Argo Workflows,Kubernetes 原生
  2. 组件开发:Python 装饰器定义,类型安全
  3. 高级特性:缓存、分布式训练、超参数优化
  4. 部署运维:可扩展、可观测、易于集成

KFP 适合需要完整 MLOps 能力的大规模 ML 平台建设。

Prev
AI 工作流引擎概述
Next
03-Argo Workflows 深度实践