HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于

深入理解调度器设计:编程领域最难的挑战之一

一、调度器到底有多难?

1.1 一个让人清醒的事实

从纯编码难度来看——是的,调度器(Scheduler)是编程里最难的一类东西之一。

而且不是"有点难",是维度级别的难度差异。

让我直接给你一个结论,这个结论能让你一句话就向别人解释清楚:

在工程实践中,最难的不是 CRUD,不是业务,不是 UI,而是——调度器。

特别是下面几类:

  • 资源调度器(GPU/CPU/MEM/SLOT 分配)
  • 任务调度器(Workflow/Job/回调/Retry/并发调度)
  • 分布式调度器(多节点任务协作)
  • 自愈型调度器(失败恢复、任务漂移、抢占)
  • 跨节点协同调度器(像 Kubernetes / Yarn / Slurm 这种)

本质:调度器要处理 不确定性 + 并发 + 状态机 + 分布式问题。

这些都是软件领域里顶级难度的问题。


二、为什么调度器这么难?

2.1 核心差异

CRUD 是"根据规则写代码",调度器是"根据混乱建立规则"。

让我们对比一下两个世界的差异:

CRUD 的世界:

  • ✅ 输入确定
  • ✅ 输出确定
  • ✅ 错误可控
  • ✅ 流程线性

调度器的世界:

  • ❌ 输入不确定
  • ❌ 资源不确定
  • ❌ 执行时间不确定
  • ❌ 节点状态不确定
  • ❌ 网络是否抖动不确定
  • ❌ 超时、失败、重试、回调都不确定
  • ❌ 任务之间依赖可能循环、缺失、失败、延期
  • ❌ 节点可能突然消失、恢复、半死不活
  • ❌ GPU 可能被占用、锁死、驱动异常

这是两个完全不一样的世界。

2.2 用一句极简的比喻让你完全记住

CRUD 就像做快餐店的柜台点单;调度器是做整个城市的交通信号系统。

快餐店点单:

  • 流水线
  • 确定性的
  • 不会乱

城市交通系统:

  • 数万节点
  • 随机事件
  • 信号灯失效
  • 交通堵塞
  • 急救车优先
  • 红绿灯协作
  • 无法预知的事件不断发生

调度器做的就是这种事。


三、调度器涉及哪些"超级难点"?

3.1 难点一:并发 + 状态机

一个任务可能经历:

Pending → Ready → Running → Success
              ↘︎ Retry → Fail

但一旦遇到网络抖动、死锁、资源不足,瞬间变成:

Pending → Scheduled → FailedScheduling → Backoff → Reschedule
Running → Lost → Recovered → Unknown → Success

状态组合指数级爆炸。

这不是简单的 if (status == "running") 能解决的。你需要:

  • 定义所有可能的状态
  • 定义状态之间允许的转换
  • 处理并发时多个调度器同时修改状态
  • 处理网络延迟导致的状态不一致
  • 处理节点上报的状态与调度器记录的状态冲突

每一个都是难题。


3.2 难点二:多节点协作问题(分布式系统最难点)

例如:

  • 节点 A 执行阶段 1
  • 节点 B 执行阶段 2
  • 节点 C 执行阶段 3
  • A 和 B 之间依赖关系
  • B 和 C 之间通信失败
  • 某个节点突然死掉

这比 "写一个接口存数据库" 难 100 倍。

你需要处理:

  • 依赖关系:A 没完成,B 不能开始
  • 通信失败:B 给 C 发消息失败了怎么办?
  • 节点宕机:A 执行到一半挂了,任务状态怎么办?
  • 数据一致性:多个节点看到的任务状态可能不一样
  • 死锁检测:A 等 B,B 等 C,C 等 A
  • 循环依赖:如何检测并报错?

3.3 难点三:资源调度(复杂度直冲天花板)

你看过 Kubernetes 的调度逻辑就知道了:

调度过程:

  1. 过滤节点(Predicate)

    • 节点是否有足够内存?
    • 节点是否有 GPU?
    • 节点是否满足亲和性要求?
    • 节点是否有磁盘空间?
    • 节点是否健康?
  2. 打分(Score)

    • 哪个节点资源最充足?
    • 哪个节点负载最低?
    • 哪个节点网络延迟最小?
    • 哪个节点符合用户偏好?
  3. 抢占(Preemption)

    • 如果没有合适的节点,要不要踢掉低优先级任务?
    • 踢掉哪个任务?
    • 如何保证被踢掉的任务能恢复?
  4. 亲和性 / 反亲和性

    • 某些任务必须在同一个节点
    • 某些任务不能在同一个节点
    • 某些任务要尽量分散
  5. 存储、网络、驱动约束

    • 节点是否有需要的存储卷?
    • 节点是否在正确的网络区域?
    • 节点是否有正确的 GPU 驱动?
  6. 优先级队列

    • 高优先级任务优先调度
    • 同优先级任务按什么顺序?
    • 长时间等待的任务要不要提升优先级?
  7. 接受 / 拒绝 / 回滚

    • 提交任务到节点后,节点拒绝了怎么办?
    • 节点接受了但启动失败怎么办?
    • 如何回滚到之前的状态?

任何一个环节都能让人抓狂。


3.4 难点四:不确定性(调度器的噩梦)

调度器面对的是混沌世界:

  • GPU 忽然满了
  • Pod 一直 Pending
  • driver 崩溃
  • 网络断了
  • 节点心跳丢了
  • 回调丢了
  • 任务死锁
  • 算法 job 跑了 10 小时突然挂掉
  • 所有任务在某一秒同时卡住

你只能靠补偿机制、重试策略、回滚来兜底。

这不是 CRUD 所能比的。

在 CRUD 里:

  • 用户提交表单 → 验证 → 写数据库 → 返回成功
  • 失败了?重试一次或者返回错误

在调度器里:

  • 任务提交 → 选节点 → 启动容器 → 运行 → 可能失败 → 重试 → 可能网络抖动 → 节点失联 → 恢复 → 继续运行 → 可能又失败 → 达到重试上限 → 标记失败 → 用户手动重试 → 重新调度...

每一步都可能出问题,每个问题都需要处理。


四、编程难度排行榜(只看编码,不看架构)

让我给你一个真实的难度排名:

🥇 第一名:调度器(Scheduler)

涉及:状态机 + 并发 + 分布式 + 资源分配 + 不确定性

绝对是最难的。

代表系统:

  • Kubernetes Scheduler
  • Airflow Scheduler
  • Ray Scheduler
  • Volcano Scheduler
  • Slurm Scheduler

为什么难:

  • 需要处理所有上面提到的难点
  • 需要保证高可用(调度器挂了整个系统就废了)
  • 需要处理海量任务(每秒可能有上千个任务状态变化)
  • 需要保证公平性(不能让某些任务饿死)
  • 需要保证实时性(任务不能等太久)

🥈 第二名:一致性协议(Raft / Paxos)

行业里最少人能真正写出来的代码。

为什么难:

  • 需要理解分布式一致性理论
  • 需要处理网络分区
  • 需要处理脑裂问题
  • 需要保证强一致性
  • Bug 很难复现和调试

但为什么比调度器简单一点:

  • 问题域相对单一(就是保证一致性)
  • 状态相对简单(主要是选举和日志复制)
  • 有明确的算法可以参考

🥉 第三名:高性能异步系统

例如:

  • WebSocket 集群
  • 消息聚合系统
  • 实时同步系统
  • 高性能代理

为什么难:

  • 需要处理高并发
  • 需要处理网络抖动
  • 需要保证消息顺序(如果需要的话)
  • 需要处理背压(backpressure)
  • 需要优化性能

🟡 第四名:复杂交易系统(订单/支付)

涉及:状态机、事务、补偿机制

为什么难:

  • 需要保证数据一致性
  • 需要处理并发(两个人同时买最后一件商品)
  • 需要处理失败回滚
  • 需要处理分布式事务

但比调度器简单:

  • 状态相对少(订单就那么几个状态)
  • 不需要处理资源调度
  • 不需要处理节点失联等问题

🟢 第五名:普通业务系统

例如:

  • 登录、权限
  • 订单管理
  • 推荐系统
  • 帖子、评论

中高级开发者都能轻松搞定。


🟢 第六名:CRUD

无需解释。


五、调度器状态机到底怎么维护?

5.1 核心:单写、多读、乐观锁

状态机一定不能靠"随便改字段",而是必须满足三个核心规则。

规则 1:状态只允许从调度器写(Single Writer)

不能让节点、回调、用户乱改状态,否则会乱套。

所以:

  • 任务状态只能由调度器更新
  • 节点只上报心跳 / metrics,不得修改任务状态
  • 用户只能触发动作(重试、取消),有效动作必须由调度器处理

这样状态机才是**"单写者模型"(Single Writer Model)**。

为什么这么重要?

假设不遵守单写者模型,会发生什么:

时刻 T1: 调度器看到任务是 Pending,决定调度
时刻 T2: 节点上报任务已经 Running(因为上次调度的延迟)
时刻 T3: 调度器把任务标记为 Running
时刻 T4: 用户点击取消,直接改成 Canceled
时刻 T5: 节点上报任务 Success
时刻 T6: 现在任务是什么状态?

完全混乱了。

遵守单写者模型后:

时刻 T1: 调度器看到任务是 Pending,决定调度
时刻 T2: 节点上报心跳(不修改状态)
时刻 T3: 调度器根据心跳,更新状态为 Running
时刻 T4: 用户点击取消(只是发出取消请求)
时刻 T5: 调度器处理取消请求,更新状态为 Canceling
时刻 T6: 节点收到取消信号,停止任务
时刻 T7: 调度器根据节点反馈,更新状态为 Canceled

清晰、可控。


规则 2:每次更新必须带 version(乐观锁 CAS)

任务表 Task/WorkflowRun 需要两个字段:

version BIGINT
status VARCHAR

每次调度器更新时:

UPDATE tasks
SET status = 'Running', version = version + 1
WHERE id = xxx AND version = oldVersion;

如果 version 不一致 → 说明状态被别人改了 → 当前调度器跳过(防止竞争)。

这一步是所有工业级调度器的核心机制:

  • Kubernetes 的 resourceVersion
  • ETCD 的 CAS
  • Airflow 的 Dag Run version
  • Ray 的 task_state_revision

否则并发调度会把状态干乱。

5.2 深入理解:为什么需要 CAS?

让我们看一个实际的并发场景:

场景:两个调度器实例同时运行

时刻 T1: 调度器 A 读取任务 123,状态是 Pending,version = 5
时刻 T2: 调度器 B 读取任务 123,状态是 Pending,version = 5
时刻 T3: 调度器 A 决定调度任务,准备更新状态为 Scheduling
时刻 T4: 调度器 B 也决定调度任务,准备更新状态为 Scheduling

如果没有 version(错误做法):

-- 调度器 A 执行
UPDATE tasks SET status = 'Scheduling' WHERE id = 123;

-- 调度器 B 执行
UPDATE tasks SET status = 'Scheduling' WHERE id = 123;

结果:两个调度器都成功了!任务可能被调度两次!

有 version(正确做法):

-- 调度器 A 执行
UPDATE tasks
SET status = 'Scheduling', version = 6
WHERE id = 123 AND version = 5;
-- 影响行数:1(成功)

-- 调度器 B 执行
UPDATE tasks
SET status = 'Scheduling', version = 6
WHERE id = 123 AND version = 5;
-- 影响行数:0(失败,因为 version 已经是 6 了)

调度器 B 发现更新失败,知道任务已经被别人处理了,放弃处理。

这就是 CAS(Compare-And-Swap)的威力。


规则 3:状态机必须采用表驱动(Table Driven FSM)

写成 if-else 是最垃圾的做法。

让我对比一下两种写法:

❌ 垃圾写法(if-else 地狱):

func updateTaskStatus(task *Task, newStatus Status) error {
    if task.Status == Pending {
        if newStatus == Scheduling {
            // 执行调度逻辑
            scheduleTask(task)
            task.Status = Scheduling
        } else {
            return errors.New("invalid transition")
        }
    } else if task.Status == Scheduling {
        if newStatus == Running {
            // 启动任务
            startTask(task)
            task.Status = Running
        } else if newStatus == Failed {
            // 调度失败
            handleScheduleFailure(task)
            task.Status = Failed
        } else {
            return errors.New("invalid transition")
        }
    } else if task.Status == Running {
        if newStatus == Success {
            // 成功
            onSuccess(task)
            task.Status = Success
        } else if newStatus == Failed {
            // 失败
            onFailed(task)
            task.Status = Failed
        } else if newStatus == Lost {
            // 失联
            onLost(task)
            task.Status = Lost
        } else {
            return errors.New("invalid transition")
        }
    } else if task.Status == Failed {
        if newStatus == Retry {
            // 重试
            retryTask(task)
            task.Status = Retry
        } else {
            return errors.New("invalid transition")
        }
    }
    // ... 还有更多状态

    return updateDB(task)
}

问题:

  • 代码超长,难以维护
  • 新增状态需要修改多处
  • 难以测试
  • 容易出错

✅ 优雅写法(表驱动):

type Transition struct {
    From   []Status
    To     Status
    Action func(*Task) error
}

var transitions = []Transition{
    {
        From:   []Status{Pending, Retry},
        To:     Scheduling,
        Action: scheduleTask,
    },
    {
        From:   []Status{Scheduling},
        To:     Running,
        Action: startTask,
    },
    {
        From:   []Status{Running},
        To:     Success,
        Action: onSuccess,
    },
    {
        From:   []Status{Running},
        To:     Failed,
        Action: onFailed,
    },
    {
        From:   []Status{Running},
        To:     Lost,
        Action: onLost,
    },
    {
        From:   []Status{Failed},
        To:     Retry,
        Action: retryTask,
    },
}

func ApplyTransition(task *Task, to Status) error {
    for _, t := range transitions {
        if t.To == to && contains(t.From, task.Status) {
            // 执行副作用
            if err := t.Action(task); err != nil {
                return err
            }
            // 更新数据库(CAS)
            return updateStatus(task.ID, task.Version, to)
        }
    }
    return fmt.Errorf("invalid transition %s → %s", task.Status, to)
}

优势:

  • ✅ 无 if-else / switch 炸弹
  • ✅ 所有状态转换可配置
  • ✅ 扩展新状态不会动到旧代码
  • ✅ 可测试性极强(table-driven test)
  • ✅ 一眼就能看出所有允许的状态转换

新增状态只需要添加一条表项:

{
    From:   []Status{Lost},
    To:     Recovering,
    Action: startRecovery,
},

就这么简单。


六、状态转换之后要干什么?(Side Effects 副作用)

6.1 核心原则

优秀调度器的关键就是:

状态机负责"决定状态"
副作用负责"实际动作"
两者解耦

为什么要解耦?

因为:

  • 状态转换是纯逻辑的(Pending → Scheduling 是逻辑决策)
  • 副作用是有实际影响的(选节点、启动容器、发通知)
  • 副作用可能失败(节点满了、容器启动失败、网络断了)
  • 副作用可能耗时(启动容器可能需要几秒)

如果不解耦:

func updateStatus(task *Task) error {
    if task.Status == Pending {
        task.Status = Scheduling
        // 直接在这里做副作用
        selectNode(task)  // 如果这里失败了怎么办?
        startContainer(task)  // 如果这里失败了怎么办?
        // 状态已经改了,但副作用失败了,数据不一致!
    }
}

解耦后:

func ApplyTransition(task *Task, to Status) error {
    // 1. 先执行副作用
    if err := executeAction(task, to); err != nil {
        return err  // 副作用失败,状态不变
    }

    // 2. 副作用成功,才更新状态
    return updateStatusCAS(task, to)
}

这样就保证了状态和副作用的一致性。


6.2 典型副作用详解

让我详细讲解每个状态转换的副作用。

Pending → Scheduling

副作用:

func scheduleTask(ctx context.Context, task *Task) error {
    // 1. 资源过滤(Filter)
    candidateNodes := filterNodes(task.Requirements)
    if len(candidateNodes) == 0 {
        return errors.New("no suitable node")
    }

    // 2. 节点打分(Score)
    scoredNodes := scoreNodes(candidateNodes, task)

    // 3. 选择最优节点
    bestNode := selectBestNode(scoredNodes)

    // 4. 写入调度结果
    task.ScheduledNode = bestNode.Name
    task.ScheduledAt = time.Now()

    return nil
}

资源过滤示例:

func filterNodes(requirements Requirements) []Node {
    var result []Node

    for _, node := range allNodes {
        // 检查内存
        if node.AvailableMemory < requirements.Memory {
            continue
        }

        // 检查 CPU
        if node.AvailableCPU < requirements.CPU {
            continue
        }

        // 检查 GPU
        if requirements.GPU > 0 && node.AvailableGPU < requirements.GPU {
            continue
        }

        // 检查节点健康状态
        if !node.IsReady {
            continue
        }

        // 检查标签匹配
        if !matchLabels(node.Labels, requirements.NodeSelector) {
            continue
        }

        result = append(result, node)
    }

    return result
}

节点打分示例:

func scoreNodes(nodes []Node, task *Task) []ScoredNode {
    var result []ScoredNode

    for _, node := range nodes {
        score := 0

        // 资源充足度(0-100 分)
        memScore := (node.AvailableMemory / node.TotalMemory) * 100
        cpuScore := (node.AvailableCPU / node.TotalCPU) * 100
        score += int(memScore + cpuScore) / 2

        // 负载均衡(0-100 分)
        loadScore := (1 - node.CurrentLoad) * 100
        score += int(loadScore)

        // 亲和性(0-100 分)
        if hasAffinity(node, task) {
            score += 100
        }

        result = append(result, ScoredNode{
            Node:  node,
            Score: score,
        })
    }

    // 按分数排序
    sort.Slice(result, func(i, j int) bool {
        return result[i].Score > result[j].Score
    })

    return result
}

Scheduling → Running

副作用:

func startTask(ctx context.Context, task *Task) error {
    // 1. 构建容器配置
    containerConfig := buildContainerConfig(task)

    // 2. 调用底层系统启动容器(Kubernetes/Volcano/Ray)
    // 注意:这里必须幂等!
    jobID := fmt.Sprintf("task-%d", task.ID)
    err := clusterClient.CreateJob(ctx, jobID, containerConfig)
    if err != nil {
        // 如果是"已存在"错误,认为成功
        if isAlreadyExistsError(err) {
            // 验证任务确实在运行
            job, err := clusterClient.GetJob(ctx, jobID)
            if err != nil {
                return err
            }
            if job.Status != "Running" {
                return fmt.Errorf("job exists but not running: %s", job.Status)
            }
        } else {
            return err
        }
    }

    // 3. 记录启动时间
    task.StartedAt = time.Now()

    // 4. 初始化心跳
    task.HeartbeatVersion = 1
    task.LastHeartbeatAt = time.Now()

    // 5. 注册监控
    registerMetrics(task)

    return nil
}

为什么要幂等?

因为可能发生这种情况:

时刻 T1: 调度器 A 调用 CreateJob,成功
时刻 T2: 调度器 A 准备更新数据库
时刻 T3: 调度器 A 网络断了,更新失败
时刻 T4: 调度器 B 接手任务
时刻 T5: 调度器 B 调用 CreateJob,发现已存在
时刻 T6: 调度器 B 验证任务确实在运行,继续

如果不幂等,调度器 B 会报错,任务卡住。


Running → Success

副作用:

func onSuccess(ctx context.Context, task *Task) error {
    // 1. 记录完成时间
    task.FinishedAt = time.Now()

    // 2. 清理资源
    err := cleanupResources(ctx, task)
    if err != nil {
        log.Error("cleanup failed", "taskID", task.ID, "err", err)
        // 注意:清理失败不影响任务成功
    }

    // 3. 发送通知
    notifyUser(task, "Task completed successfully")

    // 4. 更新 Workflow 状态
    updateWorkflowProgress(task.WorkflowID)

    // 5. 记录 metrics
    recordMetrics(task, "success")

    // 6. 触发依赖任务
    triggerDependentTasks(task)

    return nil
}

Running → Failed

副作用:

func onFailed(ctx context.Context, task *Task) error {
    // 1. 记录失败时间
    task.FinishedAt = time.Now()

    // 2. 收集错误信息
    errorInfo := collectErrorInfo(task)
    task.ErrorReason = errorInfo

    // 3. 判断是否需要重试
    if task.RetryCount < task.MaxRetries {
        // 转到 RetryWait 状态
        task.RetryCount++
        task.Status = StatusRetryWait

        // 计算下次重试时间(指数退避)
        backoffDuration := calculateBackoff(task.RetryCount)
        task.NextRetryAt = time.Now().Add(backoffDuration)

        log.Info("task will retry",
            "taskID", task.ID,
            "retryCount", task.RetryCount,
            "nextRetryAt", task.NextRetryAt)
    } else {
        // 已达重试上限,真正失败
        log.Error("task failed permanently",
            "taskID", task.ID,
            "error", errorInfo)

        // 发送告警
        sendAlert(task, "Task failed after max retries")
    }

    // 4. 清理资源
    cleanupResources(ctx, task)

    // 5. 记录 metrics
    recordMetrics(task, "failed")

    return nil
}

func calculateBackoff(retryCount int) time.Duration {
    // 指数退避:1s, 2s, 4s, 8s, 16s, ...
    seconds := math.Pow(2, float64(retryCount))
    // 最大 5 分钟
    if seconds > 300 {
        seconds = 300
    }
    return time.Duration(seconds) * time.Second
}

Running → Lost

副作用:

func onLost(ctx context.Context, task *Task) error {
    // 1. 记录失联时间
    task.LostAt = time.Now()

    // 2. 尝试查询底层系统状态
    job, err := clusterClient.GetJob(ctx, fmt.Sprintf("task-%d", task.ID))
    if err == nil && job != nil {
        // 任务其实还在运行,只是心跳丢了
        if job.Status == "Running" {
            log.Warn("task lost but still running", "taskID", task.ID)
            // 恢复心跳
            task.LastHeartbeatAt = time.Now()
            task.Status = StatusRunning
            return nil
        }

        // 任务已经结束
        if job.Status == "Success" {
            task.Status = StatusSuccess
            return onSuccess(ctx, task)
        }

        if job.Status == "Failed" {
            task.Status = StatusFailed
            return onFailed(ctx, task)
        }
    }

    // 3. 真的失联了,决定是否重试
    if task.RetryCount < task.MaxRetries {
        task.RetryCount++
        task.Status = StatusRetryWait
        task.NextRetryAt = time.Now().Add(calculateBackoff(task.RetryCount))
    } else {
        task.Status = StatusFailed
        task.ErrorReason = "Task lost and exceeded max retries"
    }

    // 4. 清理资源
    cleanupResources(ctx, task)

    // 5. 发送告警
    sendAlert(task, "Task lost")

    return nil
}

6.3 副作用的关键原则

为了保持系统干净:

副作用必须外置,不得写在状态机内部。

副作用必须幂等。

副作用失败要有明确的错误处理。


七、调度的并发竞争怎么解决?(五大机制)

你现在系统里肯定已经遇到了:

  • 多个调度循环抢同一个任务
  • 同一个任务被两个节点执行
  • Lost/Running 状态互相覆盖
  • 回调与调度循环竞争

解决方案必须用五件武器。

1. 乐观锁 version(最重要)

前面讲了,这是调度器最关键的机制。

让我再深入讲解一下实现细节:

type TaskRepo interface {
    UpdateStatusCAS(ctx context.Context,
                    taskID int64,
                    oldVersion int64,
                    newStatus Status,
                    newVersion int64) (bool, error)
}

func (r *PostgresTaskRepo) UpdateStatusCAS(
    ctx context.Context,
    taskID int64,
    oldVersion int64,
    newStatus Status,
    newVersion int64,
) (bool, error) {
    result, err := r.db.ExecContext(ctx, `
        UPDATE task_runs
        SET status = $1,
            version = $2,
            updated_at = NOW()
        WHERE id = $3
          AND version = $4
    `, newStatus, newVersion, taskID, oldVersion)

    if err != nil {
        return false, err
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return false, err
    }

    // 如果影响行数为 0,说明 version 不匹配
    return rowsAffected > 0, nil
}

使用示例:

func (s *Scheduler) handleTask(ctx context.Context, task *Task) error {
    oldVersion := task.Version
    newVersion := oldVersion + 1

    // 执行副作用
    if err := scheduleTask(ctx, task); err != nil {
        return err
    }

    // CAS 更新
    success, err := s.repo.UpdateStatusCAS(
        ctx,
        task.ID,
        oldVersion,
        StatusScheduling,
        newVersion,
    )

    if err != nil {
        return err
    }

    if !success {
        // CAS 失败,说明任务已被别的调度器处理
        log.Info("task already processed by another scheduler",
                 "taskID", task.ID)
        return nil
    }

    // 更新本地任务对象
    task.Status = StatusScheduling
    task.Version = newVersion

    return nil
}

2. 任务加锁(分布式锁或者 DB 行锁)

在多个调度器实例时,你要锁:

  • Task ID
  • Node ID

但是注意:

调度器不应该使用长锁(如 Redis lock),而是短锁(DB 行锁 / CAS)。

为什么?

Redis 锁会导致:

  • 死锁:调度器 A 拿到锁后挂了,锁永远不释放
  • 过期问题:锁设置了过期时间,但任务还没处理完,锁就过期了
  • 时钟漂移:不同机器的时钟不一致,导致锁提前过期

正确做法:使用 DB 行锁

func (r *PostgresTaskRepo) PickRunnableTasks(
    ctx context.Context,
    limit int,
) ([]*Task, error) {
    rows, err := r.db.QueryContext(ctx, `
        SELECT id, status, version, ...
        FROM task_runs
        WHERE status IN ('Pending', 'RetryWait')
          AND (next_retry_at IS NULL OR next_retry_at <= NOW())
          AND is_paused = FALSE
        ORDER BY priority DESC, id
        FOR UPDATE SKIP LOCKED
        LIMIT $1
    `, limit)

    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var tasks []*Task
    for rows.Next() {
        var task Task
        if err := rows.Scan(&task.ID, &task.Status, ...); err != nil {
            return nil, err
        }
        tasks = append(tasks, &task)
    }

    return tasks, nil
}

FOR UPDATE SKIP LOCKED 的作用:

  • FOR UPDATE:锁定这些行,其他事务无法修改
  • SKIP LOCKED:跳过已被锁定的行

效果:

调度器 A 查询:拿到任务 1, 2, 3(锁定它们)
调度器 B 查询:拿到任务 4, 5, 6(跳过 1, 2, 3)
调度器 C 查询:拿到任务 7, 8, 9(跳过 1-6)

完美的并发控制,不会重复处理!


3. 幂等副作用(例如启动容器必须幂等)

换句话说:

  • 启动容器 → 多次调用,只会启动一个实例
  • 创建任务目录 → 不会重复创建
  • 心跳注册 → 幂等
  • 模型加载 → 判断已加载则跳过

示例:幂等的容器启动

func (c *K8sClient) CreateJob(ctx context.Context, jobName string, config JobConfig) error {
    // 1. 先检查是否已存在
    existingJob, err := c.clientset.BatchV1().Jobs(namespace).Get(
        ctx,
        jobName,
        metav1.GetOptions{},
    )

    if err == nil {
        // 任务已存在
        if existingJob.Status.Active > 0 {
            // 正在运行,直接返回成功
            return nil
        }

        if existingJob.Status.Succeeded > 0 {
            // 已经成功,直接返回成功
            return nil
        }

        // 失败了,删除重建
        if existingJob.Status.Failed > 0 {
            err = c.clientset.BatchV1().Jobs(namespace).Delete(
                ctx,
                jobName,
                metav1.DeleteOptions{},
            )
            if err != nil {
                return err
            }
        }
    } else if !errors.IsNotFound(err) {
        // 其他错误
        return err
    }

    // 2. 创建任务
    job := buildK8sJob(jobName, config)
    _, err = c.clientset.BatchV1().Jobs(namespace).Create(
        ctx,
        job,
        metav1.CreateOptions{},
    )

    if err != nil && errors.IsAlreadyExists(err) {
        // 并发创建,已存在,认为成功
        return nil
    }

    return err
}

幂等性保证了即使重复调用也不会出问题。


4. 事件订阅(watch 模式)避免重复扫描

WorkflowRun/Task 改变状态时推事件:

  • 通知 scheduler
  • scheduler 只处理变化的任务

减少竞争。

实现方式一:数据库触发器 + 消息队列

CREATE OR REPLACE FUNCTION notify_task_change()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify(
        'task_change',
        json_build_object(
            'id', NEW.id,
            'status', NEW.status,
            'version', NEW.version
        )::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER task_change_trigger
AFTER UPDATE ON task_runs
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION notify_task_change();

调度器监听事件:

func (s *Scheduler) watchTaskChanges(ctx context.Context) {
    listener := pq.NewListener(
        s.dbURL,
        10*time.Second,
        time.Minute,
        func(ev pq.ListenerEventType, err error) {
            if err != nil {
                log.Error("listener error", "err", err)
            }
        },
    )

    err := listener.Listen("task_change")
    if err != nil {
        log.Fatal("listen failed", "err", err)
    }

    for {
        select {
        case notification := <-listener.Notify:
            var event TaskChangeEvent
            if err := json.Unmarshal([]byte(notification.Extra), &event); err != nil {
                log.Error("unmarshal failed", "err", err)
                continue
            }

            // 处理事件
            s.handleTaskChange(ctx, event)

        case <-ctx.Done():
            return
        }
    }
}

优势:

  • 不需要轮询数据库
  • 实时响应状态变化
  • 减少数据库压力

5. 反向补偿(Compensation)

在状态机中加入:

  • Recovering 状态
  • Backoff 状态
  • Retry 状态
  • Lost 状态

所有异常都可以靠补偿机制来恢复。

示例:Lost 状态的补偿

func (s *Scheduler) runRecoveryLoop(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            s.recoverLostTasks(ctx)
        case <-ctx.Done():
            return
        }
    }
}

func (s *Scheduler) recoverLostTasks(ctx context.Context) {
    // 查找所有 Lost 状态的任务
    lostTasks, err := s.repo.FindTasksByStatus(ctx, StatusLost)
    if err != nil {
        log.Error("find lost tasks failed", "err", err)
        return
    }

    for _, task := range lostTasks {
        // 查询底层系统状态
        job, err := s.clusterClient.GetJob(
            ctx,
            fmt.Sprintf("task-%d", task.ID),
        )

        if err != nil {
            log.Error("get job failed", "taskID", task.ID, "err", err)
            continue
        }

        // 根据实际状态恢复
        if job.Status == "Running" {
            // 任务还在运行,恢复心跳
            err = s.applyTransition(ctx, task, StatusRunning)
        } else if job.Status == "Success" {
            // 任务已成功
            err = s.applyTransition(ctx, task, StatusSuccess)
        } else if job.Status == "Failed" {
            // 任务已失败
            err = s.applyTransition(ctx, task, StatusFailed)
        } else {
            // 任务真的丢了,重试
            if task.RetryCount < task.MaxRetries {
                err = s.applyTransition(ctx, task, StatusRetryWait)
            } else {
                err = s.applyTransition(ctx, task, StatusFailed)
            }
        }

        if err != nil {
            log.Error("recover task failed", "taskID", task.ID, "err", err)
        }
    }
}

补偿机制保证了系统的自愈能力。


八、如何处理异常降级 & 暂停(Pause)?

8.1 异常暂停任务(Pause)

在任务结构加字段:

type Task struct {
    // ...
    IsPaused     bool      `db:"is_paused"`
    PauseReason  string    `db:"pause_reason"`
    PausedAt     time.Time `db:"paused_at"`
    PausedBy     string    `db:"paused_by"`
}

FSM 强制阻断:

任何状态 → Pause (允许)
Pause → 其他状态(不允许,除非 Resume)

实现:

func (s *Scheduler) PauseTask(
    ctx context.Context,
    taskID int64,
    reason string,
    operator string,
) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 更新暂停标志
    _, err = tx.ExecContext(ctx, `
        UPDATE task_runs
        SET is_paused = TRUE,
            pause_reason = $1,
            paused_at = NOW(),
            paused_by = $2,
            updated_at = NOW()
        WHERE id = $3
    `, reason, operator, taskID)

    if err != nil {
        return err
    }

    // 如果任务正在运行,尝试停止
    task, err := s.repo.GetTask(ctx, taskID)
    if err != nil {
        return err
    }

    if task.Status == StatusRunning {
        // 调用底层系统停止任务
        err = s.clusterClient.StopJob(
            ctx,
            fmt.Sprintf("task-%d", taskID),
        )
        if err != nil {
            log.Error("stop job failed", "taskID", taskID, "err", err)
            // 继续,因为暂停标志已设置,下次调度会跳过
        }
    }

    return tx.Commit()
}

Resume 动作:

func (s *Scheduler) ResumeTask(
    ctx context.Context,
    taskID int64,
    operator string,
) error {
    _, err := s.db.ExecContext(ctx, `
        UPDATE task_runs
        SET is_paused = FALSE,
            pause_reason = NULL,
            paused_at = NULL,
            paused_by = NULL,
            updated_at = NOW()
        WHERE id = $1
    `, taskID)

    return err
}

调度器在选任务时跳过暂停的任务:

SELECT *
FROM task_runs
WHERE status IN ('Pending', 'RetryWait')
  AND is_paused = FALSE  -- 关键!
  ...

8.2 整个调度器降级(Degrade)

适用场景:

  • GPU 紧张
  • Node 大面积 Lost
  • etcd 压力太大
  • 数据库压力太大
  • 调度器本身过载

调度器进入降级模式:

  1. 暂停所有新任务(Pending 不调度)
  2. Running/Lost 继续走
  3. 增加心跳超时阈值(避免误判)
  4. 减少调度频率
  5. 只处理高优先级任务

实现:

type Scheduler struct {
    // ...
    isDegraded   bool
    degradeMode  DegradeMode
    degradeUntil time.Time
}

type DegradeMode int

const (
    DegradeModeNormal DegradeMode = iota
    DegradeModePartial  // 部分降级:只调度高优先级任务
    DegraduModeFull     // 完全降级:停止所有调度
)

func (s *Scheduler) enterDegradeMode(
    mode DegradeMode,
    duration time.Duration,
    reason string,
) {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.isDegraded = true
    s.degradeMode = mode
    s.degradeUntil = time.Now().Add(duration)

    log.Warn("entering degrade mode",
        "mode", mode,
        "duration", duration,
        "reason", reason)

    // 发送告警
    s.alertManager.Send(Alert{
        Level:   "warning",
        Title:   "Scheduler Degraded",
        Message: fmt.Sprintf("Reason: %s, Mode: %v, Duration: %v",
                            reason, mode, duration),
    })
}

func (s *Scheduler) checkAutoDegrade(ctx context.Context) {
    // 检查 GPU 资源
    gpuUsage, err := s.getGPUUsage(ctx)
    if err == nil && gpuUsage > 0.95 {
        s.enterDegradeMode(
            DegradeModePartial,
            10*time.Minute,
            "GPU usage > 95%",
        )
        return
    }

    // 检查 Node 健康度
    healthyNodes, totalNodes := s.getNodeHealth(ctx)
    if float64(healthyNodes)/float64(totalNodes) < 0.5 {
        s.enterDegradeMode(
            DegraduModeFull,
            30*time.Minute,
            "Less than 50% nodes healthy",
        )
        return
    }

    // 检查调度器负载
    if s.metrics.PendingTasks > 10000 {
        s.enterDegradeMode(
            DegradeModePartial,
            5*time.Minute,
            "Too many pending tasks",
        )
        return
    }
}

func (s *Scheduler) RunOnce(ctx context.Context) error {
    // 检查是否需要降级
    s.checkAutoDegrade(ctx)

    // 检查是否还在降级期
    if s.isDegraded && time.Now().After(s.degradeUntil) {
        s.exitDegradeMode()
    }

    // 如果在降级模式
    if s.isDegraded {
        if s.degradeMode == DegraduModeFull {
            // 完全降级:不调度任何任务
            log.Info("in full degrade mode, skipping scheduling")
            return nil
        }

        if s.degradeMode == DegradeModePartial {
            // 部分降级:只调度高优先级任务
            tasks, err := s.repo.PickHighPriorityTasks(ctx, 10)
            if err != nil {
                return err
            }
            return s.processTasks(ctx, tasks)
        }
    }

    // 正常调度
    tasks, err := s.repo.PickRunnableTasks(ctx, 100)
    if err != nil {
        return err
    }
    return s.processTasks(ctx, tasks)
}

非常实用,可以避免雪崩。


九、心跳怎么设计才是工业级?

9.1 当前方案的问题

常见做法:

节点 → 每隔 n 秒 → http/rpc 上报

这个方案能用,但不够稳定。

问题:

  • 节点重启后,旧心跳可能覆盖新心跳
  • 网络重试可能导致心跳乱序
  • 无法区分"真失联"和"网络抖动"
  • 无法处理"节点半死不活"的情况

9.2 工业级做法(Ray/K8s/Volcano 都类似)

核心思想:调度器下发心跳 token,节点定期上报 → 调度器校验版本

数据结构:

task_runs 表新增字段:
  heartbeat_version BIGINT NOT NULL DEFAULT 0,
  last_heartbeat_at TIMESTAMPTZ,
  heartbeat_timeout INTERVAL NOT NULL DEFAULT INTERVAL '60 seconds'

节点运行任务后:

每 10 秒上报:

POST /api/v1/heartbeat
{
  "taskID": 123,
  "version": 9,
  "metrics": {
    "cpuUsage": 0.8,
    "memoryUsage": 0.6,
    "progress": 0.5
  },
  "logs": {
    "lastLine": "Processing batch 100/200",
    "lastUpdated": "2024-01-15T10:30:00Z"
  }
}

调度器检查:

func (s *Server) HandleHeartbeat(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    var req HeartbeatRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request", 400)
        return
    }

    // 查询任务
    task, err := s.repo.GetTask(ctx, req.TaskID)
    if err != nil {
        http.Error(w, "task not found", 404)
        return
    }

    // 关键:检查版本号
    if req.Version < task.HeartbeatVersion {
        // 旧心跳,丢弃
        log.Warn("outdated heartbeat",
            "taskID", req.TaskID,
            "receivedVersion", req.Version,
            "currentVersion", task.HeartbeatVersion)

        // 返回 200,但不更新(避免节点重试)
        w.WriteHeader(200)
        return
    }

    // 更新心跳
    err = s.repo.UpdateHeartbeat(ctx, req.TaskID, req.Version, time.Now())
    if err != nil {
        http.Error(w, "update failed", 500)
        return
    }

    // 可选:更新 metrics
    if req.Metrics != nil {
        s.metricsStore.Update(req.TaskID, req.Metrics)
    }

    w.WriteHeader(200)
}

这样可以避免:

  • ✅ 重启覆盖心跳
  • ✅ 老节点误报
  • ✅ 网络重试带来乱序

9.3 Lost 判定不只看超时

最重要:要同时看:

  • 心跳超时(>30s)
  • 节点状态(Ready/NotReady)
  • 容器平台回调(failed/success)
  • 任务运行日志最后更新时间
  • 是否收到节点"退出"事件

多条件组合,减少误判。

func (s *Scheduler) CheckLostTasks(ctx context.Context) error {
    now := time.Now()

    // 查找所有 Running 任务
    tasks, err := s.repo.FindTasksByStatus(ctx, StatusRunning)
    if err != nil {
        return err
    }

    for _, task := range tasks {
        // 条件 1:心跳超时
        heartbeatTimeout := task.LastHeartbeatAt.Add(task.HeartbeatTimeout)
        if !now.After(heartbeatTimeout) {
            // 心跳正常
            continue
        }

        // 心跳超时了,进一步检查

        // 条件 2:查询节点状态
        node, err := s.clusterClient.GetNode(ctx, task.ScheduledNode)
        if err != nil {
            log.Error("get node failed", "node", task.ScheduledNode, "err", err)
        } else {
            if node.IsReady {
                // 节点是健康的,可能只是任务心跳丢了
                // 给一次宽限期
                if now.Sub(task.LastHeartbeatAt) < 2*task.HeartbeatTimeout {
                    continue
                }
            }
        }

        // 条件 3:查询底层任务状态
        job, err := s.clusterClient.GetJob(
            ctx,
            fmt.Sprintf("task-%d", task.ID),
        )
        if err != nil {
            log.Error("get job failed", "taskID", task.ID, "err", err)
        } else {
            if job.Status == "Running" {
                // 任务还在运行,只是心跳丢了
                // 恢复心跳
                log.Warn("task heartbeat lost but job still running",
                    "taskID", task.ID)
                task.LastHeartbeatAt = now
                s.repo.UpdateHeartbeat(ctx, task.ID, task.HeartbeatVersion, now)
                continue
            }

            if job.Status == "Success" {
                // 任务已成功,心跳没上报
                log.Info("task success but heartbeat lost",
                    "taskID", task.ID)
                s.applyTransition(ctx, task, StatusSuccess)
                continue
            }

            if job.Status == "Failed" {
                // 任务已失败
                log.Info("task failed but heartbeat lost",
                    "taskID", task.ID)
                s.applyTransition(ctx, task, StatusFailed)
                continue
            }
        }

        // 条件 4:检查日志更新时间
        lastLogTime, err := s.logStore.GetLastUpdateTime(task.ID)
        if err == nil {
            if now.Sub(lastLogTime) < task.HeartbeatTimeout {
                // 日志还在更新,任务可能还在跑
                log.Info("task heartbeat lost but logs still updating",
                    "taskID", task.ID)
                continue
            }
        }

        // 所有条件都确认:任务真的失联了
        log.Warn("task confirmed lost",
            "taskID", task.ID,
            "lastHeartbeat", task.LastHeartbeatAt,
            "timeout", task.HeartbeatTimeout)

        err = s.applyTransition(ctx, task, StatusLost)
        if err != nil {
            log.Error("mark task lost failed", "taskID", task.ID, "err", err)
        }
    }

    return nil
}

Ray、K8s、Volcano 都是这么做的。


十、完整的工业级实现方案

现在,让我给你一套完整的、可落地的工业级调度器蓝图。

10.1 对象约定

统一叫法:

  • WorkflowRun:一条工作流实例(比如一次大任务)
  • TaskRun:工作流里的具体节点任务(调一次推理 / 训练 / 数据处理)
  • 调度器只调 TaskRun,WorkflowRun 由上层按依赖控制即可。

下面主要以 TaskRun 为例讲状态机和代码。


10.2 状态机设计(ASCII 图 + 状态说明)

Task 状态枚举

Pending      :待调度(新任务 / 等待首跑)
Scheduling   :正在选节点 / 提交到底层系统
Running      :底层已接收,任务在跑
Success      :执行成功
Failed       :执行失败(且不再重试)
RetryWait    :失败后等待重试时间窗
Lost         :失联(心跳超时 / 节点炸了)
Paused       :人工或系统暂停
Canceled     :人工取消

状态流转图(完整版)

        +----------------+
        |                |
        |   RetryWait <--+<--------------------+
        |                |                     |
        +-------+--------+                     |
                |                              |
                v                              |
Pending --> Scheduling --> Running --> Success |
   ^            |   ^         |                |
   |            |   |         v                |
   |            |   |       Failed ----------- +
   |            |   |         |
   |            |   |         v
   |            |   |       Lost
   |            |   |
   |            |   +----------------------+
   |            |                          |
   |            v                          |
   +--------- Paused <-------- Canceled ---+

说明:

  • Pending → Scheduling:调度器选中要执行的任务
  • Scheduling → Running:成功提交到底层(K8s/Volcano/Ray)
  • Running → Success/Failed/Lost:回调 or 心跳判定
  • Failed → RetryWait:还有重试次数
  • RetryWait → Pending:到达 next_retry_at 重新排队
  • 任何状态 → Paused/Canceled:人工操作 or 系统降级

10.3 数据库表结构设计(完整版)

workflow_runs 表

CREATE TABLE workflow_runs (
    id              BIGSERIAL PRIMARY KEY,
    project_id      BIGINT NOT NULL,
    name            VARCHAR(256) NOT NULL,
    status          VARCHAR(32) NOT NULL,
    version         BIGINT NOT NULL DEFAULT 0,

    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    started_at      TIMESTAMPTZ,
    finished_at     TIMESTAMPTZ,

    config          JSONB,       -- 工作流配置
    error_reason    TEXT,

    -- 统计字段
    total_tasks     INT NOT NULL DEFAULT 0,
    pending_tasks   INT NOT NULL DEFAULT 0,
    running_tasks   INT NOT NULL DEFAULT 0,
    succeeded_tasks INT NOT NULL DEFAULT 0,
    failed_tasks    INT NOT NULL DEFAULT 0
);

CREATE INDEX idx_workflow_status ON workflow_runs(status);
CREATE INDEX idx_workflow_project ON workflow_runs(project_id);

task_runs 表(核心)

CREATE TABLE task_runs (
    id               BIGSERIAL PRIMARY KEY,
    workflow_run_id  BIGINT NOT NULL REFERENCES workflow_runs(id),
    name             VARCHAR(128) NOT NULL,
    task_type        VARCHAR(64) NOT NULL,  -- train/infer/register/eval

    -- 状态相关
    status           VARCHAR(32) NOT NULL,
    version          BIGINT NOT NULL DEFAULT 0,

    -- 调度相关
    priority         INT NOT NULL DEFAULT 0,
    scheduled_node   VARCHAR(128),
    queue_name       VARCHAR(64),

    -- 重试相关
    retry_count      INT NOT NULL DEFAULT 0,
    max_retries      INT NOT NULL DEFAULT 3,
    next_retry_at    TIMESTAMPTZ,

    -- 暂停/取消相关
    is_paused        BOOLEAN NOT NULL DEFAULT FALSE,
    pause_reason     TEXT,
    paused_at        TIMESTAMPTZ,
    paused_by        VARCHAR(128),
    is_manual_cancel BOOLEAN NOT NULL DEFAULT FALSE,

    -- 心跳相关
    heartbeat_version BIGINT NOT NULL DEFAULT 0,
    last_heartbeat_at TIMESTAMPTZ,
    heartbeat_timeout INTERVAL NOT NULL DEFAULT INTERVAL '60 seconds',

    -- 任务数据
    payload          JSONB,       -- 任务参数
    result           JSONB,       -- 结构化执行结果
    error_reason     TEXT,
    error_stack      TEXT,

    -- 资源需求
    cpu_request      DECIMAL(10,2),
    memory_request   BIGINT,       -- bytes
    gpu_request      INT,

    -- 时间相关
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    started_at       TIMESTAMPTZ,
    finished_at      TIMESTAMPTZ,
    scheduled_at     TIMESTAMPTZ,

    -- 依赖关系
    depends_on       BIGINT[],    -- 依赖的任务 ID 列表

    -- 元数据
    labels           JSONB,       -- 标签
    annotations      JSONB        -- 注解
);

CREATE INDEX idx_task_status_queue ON task_runs(status, queue_name);
CREATE INDEX idx_task_workflow ON task_runs(workflow_run_id);
CREATE INDEX idx_task_next_retry ON task_runs(status, next_retry_at)
    WHERE next_retry_at IS NOT NULL;
CREATE INDEX idx_task_running ON task_runs(status, last_heartbeat_at)
    WHERE status = 'Running';

10.4 表驱动状态机实现(完整 Go 代码)

状态和 Transition 结构

package fsm

import (
    "context"
    "fmt"
)

type Status string

const (
    StatusPending    Status = "Pending"
    StatusScheduling Status = "Scheduling"
    StatusRunning    Status = "Running"
    StatusSuccess    Status = "Success"
    StatusFailed     Status = "Failed"
    StatusRetryWait  Status = "RetryWait"
    StatusLost       Status = "Lost"
    StatusPaused     Status = "Paused"
    StatusCanceled   Status = "Canceled"
)

type Task struct {
    ID         int64
    Status     Status
    Version    int64
    RetryCount int
    MaxRetries int
    // ... 其他字段
}

type Transition struct {
    From   []Status
    To     Status
    Action func(ctx context.Context, task *Task) error
}

type ActionFunc func(ctx context.Context, task *Task) error

状态转换表

package fsm

var transitions = []Transition{
    {
        From:   []Status{StatusPending, StatusRetryWait},
        To:     StatusScheduling,
        Action: scheduleTask, // 选节点、写 scheduled_node、下发到底层
    },
    {
        From:   []Status{StatusScheduling},
        To:     StatusRunning,
        Action: startTask, // 已成功提交,记录 start_time / 注册心跳
    },
    {
        From:   []Status{StatusRunning},
        To:     StatusSuccess,
        Action: onSuccess, // 清理资源、发通知、更新 metrics
    },
    {
        From:   []Status{StatusRunning},
        To:     StatusFailed,
        Action: onFailed,
    },
    {
        From:   []Status{StatusRunning},
        To:     StatusLost,
        Action: onLost,
    },
    {
        From:   []Status{StatusFailed, StatusLost},
        To:     StatusRetryWait,
        Action: onRetryWait,
    },
    {
        From:   []Status{StatusRunning, StatusPending, StatusRetryWait},
        To:     StatusPaused,
        Action: onPaused, // 记录原因,不再被调度
    },
    {
        From:   []Status{StatusPending, StatusRetryWait, StatusPaused},
        To:     StatusCanceled,
        Action: onCanceled,
    },
}

应用状态转换(带 CAS 乐观锁)

package fsm

func ApplyTransition(
    ctx context.Context,
    repo TaskRepo,
    task *Task,
    to Status,
) error {
    // 1. 查找匹配的转换
    var found *Transition
    for i := range transitions {
        t := &transitions[i]
        if t.To != to {
            continue
        }
        for _, from := range t.From {
            if from == task.Status {
                found = t
                break
            }
        }
        if found != nil {
            break
        }
    }

    if found == nil {
        return fmt.Errorf("invalid transition %s -> %s", task.Status, to)
    }

    // 2. 先执行副作用(一般在一个 DB 事务里更安全)
    if err := found.Action(ctx, task); err != nil {
        return fmt.Errorf("action failed: %w", err)
    }

    // 3. CAS 更新状态 + version(单写者)
    oldVersion := task.Version
    task.Status = to
    task.Version++

    ok, err := repo.UpdateStatusCAS(
        ctx,
        task.ID,
        oldVersion,
        to,
        task.Version,
    )
    if err != nil {
        return fmt.Errorf("update status failed: %w", err)
    }

    if !ok {
        return fmt.Errorf("cas conflict when updating task %d", task.ID)
    }

    return nil
}

func contains(statuses []Status, status Status) bool {
    for _, s := range statuses {
        if s == status {
            return true
        }
    }
    return false
}

10.5 调度循环核心逻辑(完整实现)

拉取可调度任务(FOR UPDATE SKIP LOCKED)

package repo

import (
    "context"
    "database/sql"
)

type TaskRepo interface {
    PickRunnableTasks(ctx context.Context, limit int) ([]*Task, error)
    UpdateStatusCAS(ctx context.Context, taskID int64, oldVersion int64,
                    newStatus Status, newVersion int64) (bool, error)
    GetTask(ctx context.Context, taskID int64) (*Task, error)
    // ... 其他方法
}

type PostgresTaskRepo struct {
    db *sql.DB
}

func (r *PostgresTaskRepo) PickRunnableTasks(
    ctx context.Context,
    limit int,
) ([]*Task, error) {
    query := `
        SELECT
            id, workflow_run_id, name, status, version,
            priority, retry_count, max_retries,
            heartbeat_version, last_heartbeat_at, heartbeat_timeout,
            payload, created_at, updated_at
        FROM task_runs
        WHERE status IN ('Pending', 'RetryWait')
          AND (next_retry_at IS NULL OR next_retry_at <= NOW())
          AND is_paused = FALSE
        ORDER BY priority DESC, id
        FOR UPDATE SKIP LOCKED
        LIMIT $1
    `

    rows, err := r.db.QueryContext(ctx, query, limit)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var tasks []*Task
    for rows.Next() {
        task := &Task{}
        err := rows.Scan(
            &task.ID, &task.WorkflowRunID, &task.Name,
            &task.Status, &task.Version,
            &task.Priority, &task.RetryCount, &task.MaxRetries,
            &task.HeartbeatVersion, &task.LastHeartbeatAt,
            &task.HeartbeatTimeout,
            &task.Payload, &task.CreatedAt, &task.UpdatedAt,
        )
        if err != nil {
            return nil, err
        }
        tasks = append(tasks, task)
    }

    return tasks, rows.Err()
}

func (r *PostgresTaskRepo) UpdateStatusCAS(
    ctx context.Context,
    taskID int64,
    oldVersion int64,
    newStatus Status,
    newVersion int64,
) (bool, error) {
    result, err := r.db.ExecContext(ctx, `
        UPDATE task_runs
        SET status = $1,
            version = $2,
            updated_at = NOW()
        WHERE id = $3
          AND version = $4
    `, newStatus, newVersion, taskID, oldVersion)

    if err != nil {
        return false, err
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return false, err
    }

    return rowsAffected > 0, nil
}

Go 调度循环

package scheduler

import (
    "context"
    "sync"
    "time"
)

type Scheduler struct {
    repo         TaskRepo
    clusterClient ClusterClient
    workerPool   *WorkerPool
    logger       Logger

    // 降级相关
    mu           sync.RWMutex
    isDegraded   bool
    degradeMode  DegradeMode
    degradeUntil time.Time
}

func (s *Scheduler) Run(ctx context.Context) error {
    // 启动多个后台循环
    g, ctx := errgroup.WithContext(ctx)

    // 主调度循环
    g.Go(func() error {
        return s.runMainLoop(ctx)
    })

    // 心跳检查循环
    g.Go(func() error {
        return s.runHeartbeatCheckLoop(ctx)
    })

    // 恢复循环
    g.Go(func() error {
        return s.runRecoveryLoop(ctx)
    })

    // 降级检查循环
    g.Go(func() error {
        return s.runDegradeCheckLoop(ctx)
    })

    return g.Wait()
}

func (s *Scheduler) runMainLoop(ctx context.Context) error {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            if err := s.RunOnce(ctx); err != nil {
                s.logger.Error("schedule once failed", "err", err)
            }
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (s *Scheduler) RunOnce(ctx context.Context) error {
    // 检查降级
    if s.shouldSkipScheduling() {
        return nil
    }

    // 拉取任务
    limit := s.getTaskLimit()
    tasks, err := s.repo.PickRunnableTasks(ctx, limit)
    if err != nil {
        return err
    }

    if len(tasks) == 0 {
        return nil
    }

    s.logger.Info("picked tasks", "count", len(tasks))

    // 并发处理任务
    for _, t := range tasks {
        task := t // 避免闭包问题
        s.workerPool.Submit(func() {
            if err := s.handleTask(ctx, task); err != nil {
                s.logger.Error("handleTask failed",
                    "taskID", task.ID, "err", err)
            }
        })
    }

    return nil
}

func (s *Scheduler) handleTask(ctx context.Context, task *Task) error {
    s.logger.Info("handling task",
        "taskID", task.ID,
        "status", task.Status)

    // Pending / RetryWait -> Scheduling
    if err := ApplyTransition(ctx, s.repo, task, StatusScheduling); err != nil {
        return fmt.Errorf("transition to Scheduling failed: %w", err)
    }

    s.logger.Info("task scheduled", "taskID", task.ID)

    // Scheduling -> Running
    if err := ApplyTransition(ctx, s.repo, task, StatusRunning); err != nil {
        return fmt.Errorf("transition to Running failed: %w", err)
    }

    s.logger.Info("task started", "taskID", task.ID)

    return nil
}

func (s *Scheduler) shouldSkipScheduling() bool {
    s.mu.RLock()
    defer s.mu.RUnlock()

    if !s.isDegraded {
        return false
    }

    // 检查是否过期
    if time.Now().After(s.degradeUntil) {
        return false
    }

    return s.degradeMode == DegraduModeFull
}

func (s *Scheduler) getTaskLimit() int {
    s.mu.RLock()
    defer s.mu.RUnlock()

    if s.isDegraded && s.degradeMode == DegradeModePartial {
        return 10  // 降级时只拉取 10 个
    }

    return 100  // 正常时拉取 100 个
}

十一、完整项目结构建议

scheduler/
├── main.go                  # 主入口
├── config/
│   └── config.go           # 配置
├── fsm/                    # 状态机
│   ├── fsm.go             # 状态机核心
│   ├── transitions.go     # 转换表
│   └── actions.go         # 副作用函数
├── repo/                   # 数据访问层
│   ├── task_repo.go
│   ├── workflow_repo.go
│   └── postgres.go
├── scheduler/              # 调度器
│   ├── scheduler.go       # 主调度器
│   ├── heartbeat.go       # 心跳检查
│   ├── recovery.go        # 恢复逻辑
│   └── degrade.go         # 降级逻辑
├── runtime/                # 运行时适配
│   ├── cluster_client.go  # 集群客户端接口
│   ├── kubernetes.go      # K8s 实现
│   ├── volcano.go         # Volcano 实现
│   └── ray.go             # Ray 实现
├── api/                    # API 服务
│   ├── server.go
│   ├── heartbeat.go       # 心跳接口
│   └── task.go            # 任务管理接口
├── worker/                 # 工作池
│   └── pool.go
├── metrics/                # 监控指标
│   └── metrics.go
└── pkg/                    # 公共包
    ├── logger/
    ├── alert/
    └── errors/

十二、总结:调度器最关键的七个要点

1. 为什么要"调度器写状态、其他都不能写"?

因为:

  • 只有调度器知道"现在要干什么"
  • 如果节点、用户、回调乱改状态 → 状态机会乱套
  • 所以必须单点写状态(single writer)

这可以把 99% 状态错乱问题直接解决掉。


2. 为什么要 CAS(version 乐观锁)?

因为调度器可能多实例跑:

  • 两个 scheduler 同时抢到任务 → 都想运行
  • 如果没有 version → A 改 Running,B 改 Running,冲突无法检测
  • 有 version → 谁先更新谁赢,后来的会失败

这是所有工业级调度系统的共同点(K8s、Ray、Airflow、TiDB 分布式调度等)。


3. 状态机用"表驱动"而不是 if-else 为什么更强?

表驱动状态机:

  • 更清晰
  • 更容易扩展
  • 更容易测试
  • 不会出现乱七八糟的 if/switch 炸弹

未来你要加状态(Backoff / Recovering / Pausing / Queued / Degraded / NodeLost),你只需要加一条表项,而不是修改一堆代码。


4. 状态机本身只负责"转状态",不负责"动作"

要把"状态的变更"和"动作的执行"分开:

  • 状态机决定:从 A → B
  • 副作用执行:落地这次动作(启动容器、重试、清理等等)

这样系统才不会乱。


5. 并发是调度器最难的问题(不是状态机本身)

并发竞争来自:

  • 多个调度循环
  • 节点回调
  • 心跳
  • 重试机制
  • 用户触发取消/暂停动作

解决方式:

  • CAS version(防止重复更新状态)
  • 行级锁(短锁)(在临界区保护一次执行,避免双启动)
  • 副作用必须幂等(多次调用不能出事故,如重复启动容器)
  • 事件订阅模式(避免循环扫整个任务表)
  • 补偿机制(自动恢复)(Lost、Failed、Retry、Recovering)

6. 心跳要用"版本号 + 多信号判断"才能可靠

不是收到心跳就说明健康,也不是 30 秒没心跳就 Lost。

要结合:

  • 最后心跳时间
  • 心跳 version
  • 节点是否 NotReady
  • K8s/Ray task 回调
  • 日志更新时间
  • 节点退出事件

才能得出真实的"Lost"判断。

否则会误杀任务。


7. Pause/Resume/Degrade 是高级调度器必须具备的能力

比如:

  • GPU 不够 → 整体进入 Degrade
  • 某个 Workflow 需要暂停 → Pause
  • 恢复 → Resume

没有这些,你以后会很痛苦。


十三、极简版

调度器最关键的是:

状态机 + version CAS + 幂等副作用 + 可靠心跳 + 补偿机制

其他都是细节。


结语:为什么调度器值得深入学习?

调度器是编程领域的"珠穆朗玛峰"。

它不是最常见的工作,但一旦你需要写调度器,你会发现:

  • CRUD 的经验完全不够用
  • 普通业务系统的思维方式完全不适用
  • 你需要重新理解"状态"、"并发"、"分布式"、"不确定性"

但是,一旦你掌握了调度器的设计原则:

  • 你会对分布式系统有全新的理解
  • 你会对状态机有深刻的认识
  • 你会对并发控制有实战经验
  • 你会成为团队里少数几个能解决"最难问题"的人

这不仅是技术能力的提升,更是思维方式的升级。

调度器很难,但征服它之后,你会发现自己已经站在了另一个高度。