【编程难度第一名】深入理解调度器设计:编程领域最难的挑战之一
本系列文章
➤ NO.1 调度器
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)
一、调度器到底有多难?
1.1 一个让人清醒的事实
从纯编码难度来看——是的,调度器(Scheduler)是编程里最难的一类东西之一。
而且不是"有点难",是维度级别的难度差异。
让我直接给你一个结论,这个结论能让你一句话就向别人解释清楚:
在工程实践中,最难的不是 CRUD,不是业务,不是 UI,而是——调度器。
特别是下面几类:
- 资源调度器(GPU/CPU/MEM/SLOT 分配)
- 任务调度器(Workflow/Job/回调/Retry/并发调度)
- 分布式调度器(多节点任务协作)
- 自愈型调度器(失败恢复、任务漂移、抢占)
- 跨节点协同调度器(像 Kubernetes / Yarn / Slurm 这种)
本质:调度器要处理 不确定性 + 并发 + 状态机 + 分布式问题。
这些都是软件领域里顶级难度的问题。
二、为什么调度器这么难?
2.1 核心差异
CRUD 是"根据规则写代码",调度器是"根据混乱建立规则"。
让我们对比一下两个世界的差异:
CRUD 的世界:
- 输入确定
- 输出确定
- 错误可控
- 流程线性
调度器的世界:
- 输入不确定
- 资源不确定
- 执行时间不确定
- 节点状态不确定
- 网络是否抖动不确定
- 超时、失败、重试、回调都不确定
- 任务之间依赖可能循环、缺失、失败、延期
- 节点可能突然消失、恢复、半死不活
- GPU 可能被占用、锁死、驱动异常
这是两个完全不一样的世界。
2.2 用一句极简的比喻让你完全记住
CRUD 就像做快餐店的柜台点单;调度器是做整个城市的交通信号系统。
快餐店点单:
- 流水线
- 确定性的
- 不会乱
城市交通系统:
- 数万节点
- 随机事件
- 信号灯失效
- 交通堵塞
- 急救车优先
- 红绿灯协作
- 无法预知的事件不断发生
调度器做的就是这种事。
三、调度器涉及哪些"超级难点"?
3.1 难点一:并发 + 状态机
一个任务可能经历:
Pending → Ready → Running → Success
↘︎ Retry → Fail
但一旦遇到网络抖动、死锁、资源不足,瞬间变成:
Pending → Scheduled → FailedScheduling → Backoff → Reschedule
Running → Lost → Recovered → Unknown → Success
状态组合指数级爆炸。
这不是简单的 if (status == "running") 能解决的。你需要:
- 定义所有可能的状态
- 定义状态之间允许的转换
- 处理并发时多个调度器同时修改状态
- 处理网络延迟导致的状态不一致
- 处理节点上报的状态与调度器记录的状态冲突
每一个都是难题。
3.2 难点二:多节点协作问题(分布式系统最难点)
例如:
- 节点 A 执行阶段 1
- 节点 B 执行阶段 2
- 节点 C 执行阶段 3
- A 和 B 之间依赖关系
- B 和 C 之间通信失败
- 某个节点突然死掉
这比 "写一个接口存数据库" 难 100 倍。
你需要处理:
- 依赖关系:A 没完成,B 不能开始
- 通信失败:B 给 C 发消息失败了怎么办?
- 节点宕机:A 执行到一半挂了,任务状态怎么办?
- 数据一致性:多个节点看到的任务状态可能不一样
- 死锁检测:A 等 B,B 等 C,C 等 A
- 循环依赖:如何检测并报错?
3.3 难点三:资源调度(复杂度直冲天花板)
你看过 Kubernetes 的调度逻辑就知道了:
调度过程:
过滤节点(Predicate)
- 节点是否有足够内存?
- 节点是否有 GPU?
- 节点是否满足亲和性要求?
- 节点是否有磁盘空间?
- 节点是否健康?
打分(Score)
- 哪个节点资源最充足?
- 哪个节点负载最低?
- 哪个节点网络延迟最小?
- 哪个节点符合用户偏好?
抢占(Preemption)
- 如果没有合适的节点,要不要踢掉低优先级任务?
- 踢掉哪个任务?
- 如何保证被踢掉的任务能恢复?
亲和性 / 反亲和性
- 某些任务必须在同一个节点
- 某些任务不能在同一个节点
- 某些任务要尽量分散
存储、网络、驱动约束
- 节点是否有需要的存储卷?
- 节点是否在正确的网络区域?
- 节点是否有正确的 GPU 驱动?
优先级队列
- 高优先级任务优先调度
- 同优先级任务按什么顺序?
- 长时间等待的任务要不要提升优先级?
接受 / 拒绝 / 回滚
- 提交任务到节点后,节点拒绝了怎么办?
- 节点接受了但启动失败怎么办?
- 如何回滚到之前的状态?
任何一个环节都能让人抓狂。
3.4 难点四:不确定性(调度器的噩梦)
调度器面对的是混沌世界:
- GPU 忽然满了
- Pod 一直 Pending
- driver 崩溃
- 网络断了
- 节点心跳丢了
- 回调丢了
- 任务死锁
- 算法 job 跑了 10 小时突然挂掉
- 所有任务在某一秒同时卡住
你只能靠补偿机制、重试策略、回滚来兜底。
这不是 CRUD 所能比的。
在 CRUD 里:
- 用户提交表单 → 验证 → 写数据库 → 返回成功
- 失败了?重试一次或者返回错误
在调度器里:
- 任务提交 → 选节点 → 启动容器 → 运行 → 可能失败 → 重试 → 可能网络抖动 → 节点失联 → 恢复 → 继续运行 → 可能又失败 → 达到重试上限 → 标记失败 → 用户手动重试 → 重新调度...
每一步都可能出问题,每个问题都需要处理。
四、编程难度排行榜(只看编码,不看架构)
让我给你一个真实的难度排名:
第一名:调度器(Scheduler)
涉及:状态机 + 并发 + 分布式 + 资源分配 + 不确定性
绝对是最难的。
代表系统:
- Kubernetes Scheduler
- Airflow Scheduler
- Ray Scheduler
- Volcano Scheduler
- Slurm Scheduler
为什么难:
- 需要处理所有上面提到的难点
- 需要保证高可用(调度器挂了整个系统就废了)
- 需要处理海量任务(每秒可能有上千个任务状态变化)
- 需要保证公平性(不能让某些任务饿死)
- 需要保证实时性(任务不能等太久)
第二名:一致性协议(Raft / Paxos)
行业里最少人能真正写出来的代码。
为什么难:
- 需要理解分布式一致性理论
- 需要处理网络分区
- 需要处理脑裂问题
- 需要保证强一致性
- Bug 很难复现和调试
但为什么比调度器简单一点:
- 问题域相对单一(就是保证一致性)
- 状态相对简单(主要是选举和日志复制)
- 有明确的算法可以参考
第三名:高性能异步系统
例如:
- WebSocket 集群
- 消息聚合系统
- 实时同步系统
- 高性能代理
为什么难:
- 需要处理高并发
- 需要处理网络抖动
- 需要保证消息顺序(如果需要的话)
- 需要处理背压(backpressure)
- 需要优化性能
第四名:复杂交易系统(订单/支付)
涉及:状态机、事务、补偿机制
为什么难:
- 需要保证数据一致性
- 需要处理并发(两个人同时买最后一件商品)
- 需要处理失败回滚
- 需要处理分布式事务
但比调度器简单:
- 状态相对少(订单就那么几个状态)
- 不需要处理资源调度
- 不需要处理节点失联等问题
第五名:普通业务系统
例如:
- 登录、权限
- 订单管理
- 推荐系统
- 帖子、评论
中高级开发者都能轻松搞定。
第六名:CRUD
无需解释。
五、调度器状态机到底怎么维护?
5.1 核心:单写、多读、乐观锁
状态机一定不能靠"随便改字段",而是必须满足三个核心规则。
规则 1:状态只允许从调度器写(Single Writer)
不能让节点、回调、用户乱改状态,否则会乱套。
所以:
- 任务状态只能由调度器更新
- 节点只上报心跳 / metrics,不得修改任务状态
- 用户只能触发动作(重试、取消),有效动作必须由调度器处理
这样状态机才是**"单写者模型"(Single Writer Model)**。
为什么这么重要?
假设不遵守单写者模型,会发生什么:
时刻 T1: 调度器看到任务是 Pending,决定调度
时刻 T2: 节点上报任务已经 Running(因为上次调度的延迟)
时刻 T3: 调度器把任务标记为 Running
时刻 T4: 用户点击取消,直接改成 Canceled
时刻 T5: 节点上报任务 Success
时刻 T6: 现在任务是什么状态?
完全混乱了。
遵守单写者模型后:
时刻 T1: 调度器看到任务是 Pending,决定调度
时刻 T2: 节点上报心跳(不修改状态)
时刻 T3: 调度器根据心跳,更新状态为 Running
时刻 T4: 用户点击取消(只是发出取消请求)
时刻 T5: 调度器处理取消请求,更新状态为 Canceling
时刻 T6: 节点收到取消信号,停止任务
时刻 T7: 调度器根据节点反馈,更新状态为 Canceled
清晰、可控。
规则 2:每次更新必须带 version(乐观锁 CAS)
任务表 Task/WorkflowRun 需要两个字段:
version BIGINT
status VARCHAR
每次调度器更新时:
UPDATE tasks
SET status = 'Running', version = version + 1
WHERE id = xxx AND version = oldVersion;
如果 version 不一致 → 说明状态被别人改了 → 当前调度器跳过(防止竞争)。
这一步是所有工业级调度器的核心机制:
- Kubernetes 的
resourceVersion - ETCD 的 CAS
- Airflow 的 Dag Run version
- Ray 的 task_state_revision
否则并发调度会把状态干乱。
5.2 深入理解:为什么需要 CAS?
让我们看一个实际的并发场景:
场景:两个调度器实例同时运行
时刻 T1: 调度器 A 读取任务 123,状态是 Pending,version = 5
时刻 T2: 调度器 B 读取任务 123,状态是 Pending,version = 5
时刻 T3: 调度器 A 决定调度任务,准备更新状态为 Scheduling
时刻 T4: 调度器 B 也决定调度任务,准备更新状态为 Scheduling
如果没有 version(错误做法):
-- 调度器 A 执行
UPDATE tasks SET status = 'Scheduling' WHERE id = 123;
-- 调度器 B 执行
UPDATE tasks SET status = 'Scheduling' WHERE id = 123;
结果:两个调度器都成功了!任务可能被调度两次!
有 version(正确做法):
-- 调度器 A 执行
UPDATE tasks
SET status = 'Scheduling', version = 6
WHERE id = 123 AND version = 5;
-- 影响行数:1(成功)
-- 调度器 B 执行
UPDATE tasks
SET status = 'Scheduling', version = 6
WHERE id = 123 AND version = 5;
-- 影响行数:0(失败,因为 version 已经是 6 了)
调度器 B 发现更新失败,知道任务已经被别人处理了,放弃处理。
这就是 CAS(Compare-And-Swap)的威力。
规则 3:状态机必须采用表驱动(Table Driven FSM)
写成 if-else 是最垃圾的做法。
让我对比一下两种写法:
** 垃圾写法(if-else 地狱):**
func updateTaskStatus(task *Task, newStatus Status) error {
if task.Status == Pending {
if newStatus == Scheduling {
// 执行调度逻辑
scheduleTask(task)
task.Status = Scheduling
} else {
return errors.New("invalid transition")
}
} else if task.Status == Scheduling {
if newStatus == Running {
// 启动任务
startTask(task)
task.Status = Running
} else if newStatus == Failed {
// 调度失败
handleScheduleFailure(task)
task.Status = Failed
} else {
return errors.New("invalid transition")
}
} else if task.Status == Running {
if newStatus == Success {
// 成功
onSuccess(task)
task.Status = Success
} else if newStatus == Failed {
// 失败
onFailed(task)
task.Status = Failed
} else if newStatus == Lost {
// 失联
onLost(task)
task.Status = Lost
} else {
return errors.New("invalid transition")
}
} else if task.Status == Failed {
if newStatus == Retry {
// 重试
retryTask(task)
task.Status = Retry
} else {
return errors.New("invalid transition")
}
}
// ... 还有更多状态
return updateDB(task)
}
问题:
- 代码超长,难以维护
- 新增状态需要修改多处
- 难以测试
- 容易出错
** 优雅写法(表驱动):**
type Transition struct {
From []Status
To Status
Action func(*Task) error
}
var transitions = []Transition{
{
From: []Status{Pending, Retry},
To: Scheduling,
Action: scheduleTask,
},
{
From: []Status{Scheduling},
To: Running,
Action: startTask,
},
{
From: []Status{Running},
To: Success,
Action: onSuccess,
},
{
From: []Status{Running},
To: Failed,
Action: onFailed,
},
{
From: []Status{Running},
To: Lost,
Action: onLost,
},
{
From: []Status{Failed},
To: Retry,
Action: retryTask,
},
}
func ApplyTransition(task *Task, to Status) error {
for _, t := range transitions {
if t.To == to && contains(t.From, task.Status) {
// 执行副作用
if err := t.Action(task); err != nil {
return err
}
// 更新数据库(CAS)
return updateStatus(task.ID, task.Version, to)
}
}
return fmt.Errorf("invalid transition %s → %s", task.Status, to)
}
优势:
- 无 if-else / switch 炸弹
- 所有状态转换可配置
- 扩展新状态不会动到旧代码
- 可测试性极强(table-driven test)
- 一眼就能看出所有允许的状态转换
新增状态只需要添加一条表项:
{
From: []Status{Lost},
To: Recovering,
Action: startRecovery,
},
就这么简单。
六、状态转换之后要干什么?(Side Effects 副作用)
6.1 核心原则
优秀调度器的关键就是:
状态机负责"决定状态"
副作用负责"实际动作"
两者解耦
为什么要解耦?
因为:
- 状态转换是纯逻辑的(Pending → Scheduling 是逻辑决策)
- 副作用是有实际影响的(选节点、启动容器、发通知)
- 副作用可能失败(节点满了、容器启动失败、网络断了)
- 副作用可能耗时(启动容器可能需要几秒)
如果不解耦:
func updateStatus(task *Task) error {
if task.Status == Pending {
task.Status = Scheduling
// 直接在这里做副作用
selectNode(task) // 如果这里失败了怎么办?
startContainer(task) // 如果这里失败了怎么办?
// 状态已经改了,但副作用失败了,数据不一致!
}
}
解耦后:
func ApplyTransition(task *Task, to Status) error {
// 1. 先执行副作用
if err := executeAction(task, to); err != nil {
return err // 副作用失败,状态不变
}
// 2. 副作用成功,才更新状态
return updateStatusCAS(task, to)
}
这样就保证了状态和副作用的一致性。
6.2 典型副作用详解
让我详细讲解每个状态转换的副作用。
Pending → Scheduling
副作用:
func scheduleTask(ctx context.Context, task *Task) error {
// 1. 资源过滤(Filter)
candidateNodes := filterNodes(task.Requirements)
if len(candidateNodes) == 0 {
return errors.New("no suitable node")
}
// 2. 节点打分(Score)
scoredNodes := scoreNodes(candidateNodes, task)
// 3. 选择最优节点
bestNode := selectBestNode(scoredNodes)
// 4. 写入调度结果
task.ScheduledNode = bestNode.Name
task.ScheduledAt = time.Now()
return nil
}
资源过滤示例:
func filterNodes(requirements Requirements) []Node {
var result []Node
for _, node := range allNodes {
// 检查内存
if node.AvailableMemory < requirements.Memory {
continue
}
// 检查 CPU
if node.AvailableCPU < requirements.CPU {
continue
}
// 检查 GPU
if requirements.GPU > 0 && node.AvailableGPU < requirements.GPU {
continue
}
// 检查节点健康状态
if !node.IsReady {
continue
}
// 检查标签匹配
if !matchLabels(node.Labels, requirements.NodeSelector) {
continue
}
result = append(result, node)
}
return result
}
节点打分示例:
func scoreNodes(nodes []Node, task *Task) []ScoredNode {
var result []ScoredNode
for _, node := range nodes {
score := 0
// 资源充足度(0-100 分)
memScore := (node.AvailableMemory / node.TotalMemory) * 100
cpuScore := (node.AvailableCPU / node.TotalCPU) * 100
score += int(memScore + cpuScore) / 2
// 负载均衡(0-100 分)
loadScore := (1 - node.CurrentLoad) * 100
score += int(loadScore)
// 亲和性(0-100 分)
if hasAffinity(node, task) {
score += 100
}
result = append(result, ScoredNode{
Node: node,
Score: score,
})
}
// 按分数排序
sort.Slice(result, func(i, j int) bool {
return result[i].Score > result[j].Score
})
return result
}
Scheduling → Running
副作用:
func startTask(ctx context.Context, task *Task) error {
// 1. 构建容器配置
containerConfig := buildContainerConfig(task)
// 2. 调用底层系统启动容器(Kubernetes/Volcano/Ray)
// 注意:这里必须幂等!
jobID := fmt.Sprintf("task-%d", task.ID)
err := clusterClient.CreateJob(ctx, jobID, containerConfig)
if err != nil {
// 如果是"已存在"错误,认为成功
if isAlreadyExistsError(err) {
// 验证任务确实在运行
job, err := clusterClient.GetJob(ctx, jobID)
if err != nil {
return err
}
if job.Status != "Running" {
return fmt.Errorf("job exists but not running: %s", job.Status)
}
} else {
return err
}
}
// 3. 记录启动时间
task.StartedAt = time.Now()
// 4. 初始化心跳
task.HeartbeatVersion = 1
task.LastHeartbeatAt = time.Now()
// 5. 注册监控
registerMetrics(task)
return nil
}
为什么要幂等?
因为可能发生这种情况:
时刻 T1: 调度器 A 调用 CreateJob,成功
时刻 T2: 调度器 A 准备更新数据库
时刻 T3: 调度器 A 网络断了,更新失败
时刻 T4: 调度器 B 接手任务
时刻 T5: 调度器 B 调用 CreateJob,发现已存在
时刻 T6: 调度器 B 验证任务确实在运行,继续
如果不幂等,调度器 B 会报错,任务卡住。
Running → Success
副作用:
func onSuccess(ctx context.Context, task *Task) error {
// 1. 记录完成时间
task.FinishedAt = time.Now()
// 2. 清理资源
err := cleanupResources(ctx, task)
if err != nil {
log.Error("cleanup failed", "taskID", task.ID, "err", err)
// 注意:清理失败不影响任务成功
}
// 3. 发送通知
notifyUser(task, "Task completed successfully")
// 4. 更新 Workflow 状态
updateWorkflowProgress(task.WorkflowID)
// 5. 记录 metrics
recordMetrics(task, "success")
// 6. 触发依赖任务
triggerDependentTasks(task)
return nil
}
Running → Failed
副作用:
func onFailed(ctx context.Context, task *Task) error {
// 1. 记录失败时间
task.FinishedAt = time.Now()
// 2. 收集错误信息
errorInfo := collectErrorInfo(task)
task.ErrorReason = errorInfo
// 3. 判断是否需要重试
if task.RetryCount < task.MaxRetries {
// 转到 RetryWait 状态
task.RetryCount++
task.Status = StatusRetryWait
// 计算下次重试时间(指数退避)
backoffDuration := calculateBackoff(task.RetryCount)
task.NextRetryAt = time.Now().Add(backoffDuration)
log.Info("task will retry",
"taskID", task.ID,
"retryCount", task.RetryCount,
"nextRetryAt", task.NextRetryAt)
} else {
// 已达重试上限,真正失败
log.Error("task failed permanently",
"taskID", task.ID,
"error", errorInfo)
// 发送告警
sendAlert(task, "Task failed after max retries")
}
// 4. 清理资源
cleanupResources(ctx, task)
// 5. 记录 metrics
recordMetrics(task, "failed")
return nil
}
func calculateBackoff(retryCount int) time.Duration {
// 指数退避:1s, 2s, 4s, 8s, 16s, ...
seconds := math.Pow(2, float64(retryCount))
// 最大 5 分钟
if seconds > 300 {
seconds = 300
}
return time.Duration(seconds) * time.Second
}
Running → Lost
副作用:
func onLost(ctx context.Context, task *Task) error {
// 1. 记录失联时间
task.LostAt = time.Now()
// 2. 尝试查询底层系统状态
job, err := clusterClient.GetJob(ctx, fmt.Sprintf("task-%d", task.ID))
if err == nil && job != nil {
// 任务其实还在运行,只是心跳丢了
if job.Status == "Running" {
log.Warn("task lost but still running", "taskID", task.ID)
// 恢复心跳
task.LastHeartbeatAt = time.Now()
task.Status = StatusRunning
return nil
}
// 任务已经结束
if job.Status == "Success" {
task.Status = StatusSuccess
return onSuccess(ctx, task)
}
if job.Status == "Failed" {
task.Status = StatusFailed
return onFailed(ctx, task)
}
}
// 3. 真的失联了,决定是否重试
if task.RetryCount < task.MaxRetries {
task.RetryCount++
task.Status = StatusRetryWait
task.NextRetryAt = time.Now().Add(calculateBackoff(task.RetryCount))
} else {
task.Status = StatusFailed
task.ErrorReason = "Task lost and exceeded max retries"
}
// 4. 清理资源
cleanupResources(ctx, task)
// 5. 发送告警
sendAlert(task, "Task lost")
return nil
}
6.3 副作用的关键原则
为了保持系统干净:
副作用必须外置,不得写在状态机内部。
副作用必须幂等。
副作用失败要有明确的错误处理。
七、调度的并发竞争怎么解决?(五大机制)
你现在系统里肯定已经遇到了:
- 多个调度循环抢同一个任务
- 同一个任务被两个节点执行
- Lost/Running 状态互相覆盖
- 回调与调度循环竞争
解决方案必须用五件武器。
1. 乐观锁 version(最重要)
前面讲了,这是调度器最关键的机制。
让我再深入讲解一下实现细节:
type TaskRepo interface {
UpdateStatusCAS(ctx context.Context,
taskID int64,
oldVersion int64,
newStatus Status,
newVersion int64) (bool, error)
}
func (r *PostgresTaskRepo) UpdateStatusCAS(
ctx context.Context,
taskID int64,
oldVersion int64,
newStatus Status,
newVersion int64,
) (bool, error) {
result, err := r.db.ExecContext(ctx, `
UPDATE task_runs
SET status = $1,
version = $2,
updated_at = NOW()
WHERE id = $3
AND version = $4
`, newStatus, newVersion, taskID, oldVersion)
if err != nil {
return false, err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return false, err
}
// 如果影响行数为 0,说明 version 不匹配
return rowsAffected > 0, nil
}
使用示例:
func (s *Scheduler) handleTask(ctx context.Context, task *Task) error {
oldVersion := task.Version
newVersion := oldVersion + 1
// 执行副作用
if err := scheduleTask(ctx, task); err != nil {
return err
}
// CAS 更新
success, err := s.repo.UpdateStatusCAS(
ctx,
task.ID,
oldVersion,
StatusScheduling,
newVersion,
)
if err != nil {
return err
}
if !success {
// CAS 失败,说明任务已被别的调度器处理
log.Info("task already processed by another scheduler",
"taskID", task.ID)
return nil
}
// 更新本地任务对象
task.Status = StatusScheduling
task.Version = newVersion
return nil
}
2. 任务加锁(分布式锁或者 DB 行锁)
在多个调度器实例时,你要锁:
- Task ID
- Node ID
但是注意:
调度器不应该使用长锁(如 Redis lock),而是短锁(DB 行锁 / CAS)。
为什么?
Redis 锁会导致:
- 死锁:调度器 A 拿到锁后挂了,锁永远不释放
- 过期问题:锁设置了过期时间,但任务还没处理完,锁就过期了
- 时钟漂移:不同机器的时钟不一致,导致锁提前过期
正确做法:使用 DB 行锁
func (r *PostgresTaskRepo) PickRunnableTasks(
ctx context.Context,
limit int,
) ([]*Task, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, status, version, ...
FROM task_runs
WHERE status IN ('Pending', 'RetryWait')
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
AND is_paused = FALSE
ORDER BY priority DESC, id
FOR UPDATE SKIP LOCKED
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*Task
for rows.Next() {
var task Task
if err := rows.Scan(&task.ID, &task.Status, ...); err != nil {
return nil, err
}
tasks = append(tasks, &task)
}
return tasks, nil
}
FOR UPDATE SKIP LOCKED 的作用:
FOR UPDATE:锁定这些行,其他事务无法修改SKIP LOCKED:跳过已被锁定的行
效果:
调度器 A 查询:拿到任务 1, 2, 3(锁定它们)
调度器 B 查询:拿到任务 4, 5, 6(跳过 1, 2, 3)
调度器 C 查询:拿到任务 7, 8, 9(跳过 1-6)
完美的并发控制,不会重复处理!
3. 幂等副作用(例如启动容器必须幂等)
换句话说:
- 启动容器 → 多次调用,只会启动一个实例
- 创建任务目录 → 不会重复创建
- 心跳注册 → 幂等
- 模型加载 → 判断已加载则跳过
示例:幂等的容器启动
func (c *K8sClient) CreateJob(ctx context.Context, jobName string, config JobConfig) error {
// 1. 先检查是否已存在
existingJob, err := c.clientset.BatchV1().Jobs(namespace).Get(
ctx,
jobName,
metav1.GetOptions{},
)
if err == nil {
// 任务已存在
if existingJob.Status.Active > 0 {
// 正在运行,直接返回成功
return nil
}
if existingJob.Status.Succeeded > 0 {
// 已经成功,直接返回成功
return nil
}
// 失败了,删除重建
if existingJob.Status.Failed > 0 {
err = c.clientset.BatchV1().Jobs(namespace).Delete(
ctx,
jobName,
metav1.DeleteOptions{},
)
if err != nil {
return err
}
}
} else if !errors.IsNotFound(err) {
// 其他错误
return err
}
// 2. 创建任务
job := buildK8sJob(jobName, config)
_, err = c.clientset.BatchV1().Jobs(namespace).Create(
ctx,
job,
metav1.CreateOptions{},
)
if err != nil && errors.IsAlreadyExists(err) {
// 并发创建,已存在,认为成功
return nil
}
return err
}
幂等性保证了即使重复调用也不会出问题。
4. 事件订阅(watch 模式)避免重复扫描
WorkflowRun/Task 改变状态时推事件:
- 通知 scheduler
- scheduler 只处理变化的任务
减少竞争。
实现方式一:数据库触发器 + 消息队列
CREATE OR REPLACE FUNCTION notify_task_change()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'task_change',
json_build_object(
'id', NEW.id,
'status', NEW.status,
'version', NEW.version
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER task_change_trigger
AFTER UPDATE ON task_runs
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION notify_task_change();
调度器监听事件:
func (s *Scheduler) watchTaskChanges(ctx context.Context) {
listener := pq.NewListener(
s.dbURL,
10*time.Second,
time.Minute,
func(ev pq.ListenerEventType, err error) {
if err != nil {
log.Error("listener error", "err", err)
}
},
)
err := listener.Listen("task_change")
if err != nil {
log.Fatal("listen failed", "err", err)
}
for {
select {
case notification := <-listener.Notify:
var event TaskChangeEvent
if err := json.Unmarshal([]byte(notification.Extra), &event); err != nil {
log.Error("unmarshal failed", "err", err)
continue
}
// 处理事件
s.handleTaskChange(ctx, event)
case <-ctx.Done():
return
}
}
}
优势:
- 不需要轮询数据库
- 实时响应状态变化
- 减少数据库压力
5. 反向补偿(Compensation)
在状态机中加入:
- Recovering 状态
- Backoff 状态
- Retry 状态
- Lost 状态
所有异常都可以靠补偿机制来恢复。
示例:Lost 状态的补偿
func (s *Scheduler) runRecoveryLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.recoverLostTasks(ctx)
case <-ctx.Done():
return
}
}
}
func (s *Scheduler) recoverLostTasks(ctx context.Context) {
// 查找所有 Lost 状态的任务
lostTasks, err := s.repo.FindTasksByStatus(ctx, StatusLost)
if err != nil {
log.Error("find lost tasks failed", "err", err)
return
}
for _, task := range lostTasks {
// 查询底层系统状态
job, err := s.clusterClient.GetJob(
ctx,
fmt.Sprintf("task-%d", task.ID),
)
if err != nil {
log.Error("get job failed", "taskID", task.ID, "err", err)
continue
}
// 根据实际状态恢复
if job.Status == "Running" {
// 任务还在运行,恢复心跳
err = s.applyTransition(ctx, task, StatusRunning)
} else if job.Status == "Success" {
// 任务已成功
err = s.applyTransition(ctx, task, StatusSuccess)
} else if job.Status == "Failed" {
// 任务已失败
err = s.applyTransition(ctx, task, StatusFailed)
} else {
// 任务真的丢了,重试
if task.RetryCount < task.MaxRetries {
err = s.applyTransition(ctx, task, StatusRetryWait)
} else {
err = s.applyTransition(ctx, task, StatusFailed)
}
}
if err != nil {
log.Error("recover task failed", "taskID", task.ID, "err", err)
}
}
}
补偿机制保证了系统的自愈能力。
八、如何处理异常降级 & 暂停(Pause)?
8.1 异常暂停任务(Pause)
在任务结构加字段:
type Task struct {
// ...
IsPaused bool `db:"is_paused"`
PauseReason string `db:"pause_reason"`
PausedAt time.Time `db:"paused_at"`
PausedBy string `db:"paused_by"`
}
FSM 强制阻断:
任何状态 → Pause (允许)
Pause → 其他状态(不允许,除非 Resume)
实现:
func (s *Scheduler) PauseTask(
ctx context.Context,
taskID int64,
reason string,
operator string,
) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 更新暂停标志
_, err = tx.ExecContext(ctx, `
UPDATE task_runs
SET is_paused = TRUE,
pause_reason = $1,
paused_at = NOW(),
paused_by = $2,
updated_at = NOW()
WHERE id = $3
`, reason, operator, taskID)
if err != nil {
return err
}
// 如果任务正在运行,尝试停止
task, err := s.repo.GetTask(ctx, taskID)
if err != nil {
return err
}
if task.Status == StatusRunning {
// 调用底层系统停止任务
err = s.clusterClient.StopJob(
ctx,
fmt.Sprintf("task-%d", taskID),
)
if err != nil {
log.Error("stop job failed", "taskID", taskID, "err", err)
// 继续,因为暂停标志已设置,下次调度会跳过
}
}
return tx.Commit()
}
Resume 动作:
func (s *Scheduler) ResumeTask(
ctx context.Context,
taskID int64,
operator string,
) error {
_, err := s.db.ExecContext(ctx, `
UPDATE task_runs
SET is_paused = FALSE,
pause_reason = NULL,
paused_at = NULL,
paused_by = NULL,
updated_at = NOW()
WHERE id = $1
`, taskID)
return err
}
调度器在选任务时跳过暂停的任务:
SELECT *
FROM task_runs
WHERE status IN ('Pending', 'RetryWait')
AND is_paused = FALSE -- 关键!
...
8.2 整个调度器降级(Degrade)
适用场景:
- GPU 紧张
- Node 大面积 Lost
- etcd 压力太大
- 数据库压力太大
- 调度器本身过载
调度器进入降级模式:
- 暂停所有新任务(Pending 不调度)
- Running/Lost 继续走
- 增加心跳超时阈值(避免误判)
- 减少调度频率
- 只处理高优先级任务
实现:
type Scheduler struct {
// ...
isDegraded bool
degradeMode DegradeMode
degradeUntil time.Time
}
type DegradeMode int
const (
DegradeModeNormal DegradeMode = iota
DegradeModePartial // 部分降级:只调度高优先级任务
DegraduModeFull // 完全降级:停止所有调度
)
func (s *Scheduler) enterDegradeMode(
mode DegradeMode,
duration time.Duration,
reason string,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.isDegraded = true
s.degradeMode = mode
s.degradeUntil = time.Now().Add(duration)
log.Warn("entering degrade mode",
"mode", mode,
"duration", duration,
"reason", reason)
// 发送告警
s.alertManager.Send(Alert{
Level: "warning",
Title: "Scheduler Degraded",
Message: fmt.Sprintf("Reason: %s, Mode: %v, Duration: %v",
reason, mode, duration),
})
}
func (s *Scheduler) checkAutoDegrade(ctx context.Context) {
// 检查 GPU 资源
gpuUsage, err := s.getGPUUsage(ctx)
if err == nil && gpuUsage > 0.95 {
s.enterDegradeMode(
DegradeModePartial,
10*time.Minute,
"GPU usage > 95%",
)
return
}
// 检查 Node 健康度
healthyNodes, totalNodes := s.getNodeHealth(ctx)
if float64(healthyNodes)/float64(totalNodes) < 0.5 {
s.enterDegradeMode(
DegraduModeFull,
30*time.Minute,
"Less than 50% nodes healthy",
)
return
}
// 检查调度器负载
if s.metrics.PendingTasks > 10000 {
s.enterDegradeMode(
DegradeModePartial,
5*time.Minute,
"Too many pending tasks",
)
return
}
}
func (s *Scheduler) RunOnce(ctx context.Context) error {
// 检查是否需要降级
s.checkAutoDegrade(ctx)
// 检查是否还在降级期
if s.isDegraded && time.Now().After(s.degradeUntil) {
s.exitDegradeMode()
}
// 如果在降级模式
if s.isDegraded {
if s.degradeMode == DegraduModeFull {
// 完全降级:不调度任何任务
log.Info("in full degrade mode, skipping scheduling")
return nil
}
if s.degradeMode == DegradeModePartial {
// 部分降级:只调度高优先级任务
tasks, err := s.repo.PickHighPriorityTasks(ctx, 10)
if err != nil {
return err
}
return s.processTasks(ctx, tasks)
}
}
// 正常调度
tasks, err := s.repo.PickRunnableTasks(ctx, 100)
if err != nil {
return err
}
return s.processTasks(ctx, tasks)
}
非常实用,可以避免雪崩。
九、心跳怎么设计才是工业级?
9.1 当前方案的问题
常见做法:
节点 → 每隔 n 秒 → http/rpc 上报
这个方案能用,但不够稳定。
问题:
- 节点重启后,旧心跳可能覆盖新心跳
- 网络重试可能导致心跳乱序
- 无法区分"真失联"和"网络抖动"
- 无法处理"节点半死不活"的情况
9.2 工业级做法(Ray/K8s/Volcano 都类似)
核心思想:调度器下发心跳 token,节点定期上报 → 调度器校验版本
数据结构:
task_runs 表新增字段:
heartbeat_version BIGINT NOT NULL DEFAULT 0,
last_heartbeat_at TIMESTAMPTZ,
heartbeat_timeout INTERVAL NOT NULL DEFAULT INTERVAL '60 seconds'
节点运行任务后:
每 10 秒上报:
POST /api/v1/heartbeat
{
"taskID": 123,
"version": 9,
"metrics": {
"cpuUsage": 0.8,
"memoryUsage": 0.6,
"progress": 0.5
},
"logs": {
"lastLine": "Processing batch 100/200",
"lastUpdated": "2024-01-15T10:30:00Z"
}
}
调度器检查:
func (s *Server) HandleHeartbeat(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req HeartbeatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", 400)
return
}
// 查询任务
task, err := s.repo.GetTask(ctx, req.TaskID)
if err != nil {
http.Error(w, "task not found", 404)
return
}
// 关键:检查版本号
if req.Version < task.HeartbeatVersion {
// 旧心跳,丢弃
log.Warn("outdated heartbeat",
"taskID", req.TaskID,
"receivedVersion", req.Version,
"currentVersion", task.HeartbeatVersion)
// 返回 200,但不更新(避免节点重试)
w.WriteHeader(200)
return
}
// 更新心跳
err = s.repo.UpdateHeartbeat(ctx, req.TaskID, req.Version, time.Now())
if err != nil {
http.Error(w, "update failed", 500)
return
}
// 可选:更新 metrics
if req.Metrics != nil {
s.metricsStore.Update(req.TaskID, req.Metrics)
}
w.WriteHeader(200)
}
这样可以避免:
- 重启覆盖心跳
- 老节点误报
- 网络重试带来乱序
9.3 Lost 判定不只看超时
最重要:要同时看:
- 心跳超时(>30s)
- 节点状态(Ready/NotReady)
- 容器平台回调(failed/success)
- 任务运行日志最后更新时间
- 是否收到节点"退出"事件
多条件组合,减少误判。
func (s *Scheduler) CheckLostTasks(ctx context.Context) error {
now := time.Now()
// 查找所有 Running 任务
tasks, err := s.repo.FindTasksByStatus(ctx, StatusRunning)
if err != nil {
return err
}
for _, task := range tasks {
// 条件 1:心跳超时
heartbeatTimeout := task.LastHeartbeatAt.Add(task.HeartbeatTimeout)
if !now.After(heartbeatTimeout) {
// 心跳正常
continue
}
// 心跳超时了,进一步检查
// 条件 2:查询节点状态
node, err := s.clusterClient.GetNode(ctx, task.ScheduledNode)
if err != nil {
log.Error("get node failed", "node", task.ScheduledNode, "err", err)
} else {
if node.IsReady {
// 节点是健康的,可能只是任务心跳丢了
// 给一次宽限期
if now.Sub(task.LastHeartbeatAt) < 2*task.HeartbeatTimeout {
continue
}
}
}
// 条件 3:查询底层任务状态
job, err := s.clusterClient.GetJob(
ctx,
fmt.Sprintf("task-%d", task.ID),
)
if err != nil {
log.Error("get job failed", "taskID", task.ID, "err", err)
} else {
if job.Status == "Running" {
// 任务还在运行,只是心跳丢了
// 恢复心跳
log.Warn("task heartbeat lost but job still running",
"taskID", task.ID)
task.LastHeartbeatAt = now
s.repo.UpdateHeartbeat(ctx, task.ID, task.HeartbeatVersion, now)
continue
}
if job.Status == "Success" {
// 任务已成功,心跳没上报
log.Info("task success but heartbeat lost",
"taskID", task.ID)
s.applyTransition(ctx, task, StatusSuccess)
continue
}
if job.Status == "Failed" {
// 任务已失败
log.Info("task failed but heartbeat lost",
"taskID", task.ID)
s.applyTransition(ctx, task, StatusFailed)
continue
}
}
// 条件 4:检查日志更新时间
lastLogTime, err := s.logStore.GetLastUpdateTime(task.ID)
if err == nil {
if now.Sub(lastLogTime) < task.HeartbeatTimeout {
// 日志还在更新,任务可能还在跑
log.Info("task heartbeat lost but logs still updating",
"taskID", task.ID)
continue
}
}
// 所有条件都确认:任务真的失联了
log.Warn("task confirmed lost",
"taskID", task.ID,
"lastHeartbeat", task.LastHeartbeatAt,
"timeout", task.HeartbeatTimeout)
err = s.applyTransition(ctx, task, StatusLost)
if err != nil {
log.Error("mark task lost failed", "taskID", task.ID, "err", err)
}
}
return nil
}
Ray、K8s、Volcano 都是这么做的。
十、完整的工业级实现方案
现在,让我给你一套完整的、可落地的工业级调度器蓝图。
10.1 对象约定
统一叫法:
- WorkflowRun:一条工作流实例(比如一次大任务)
- TaskRun:工作流里的具体节点任务(调一次推理 / 训练 / 数据处理)
- 调度器只调 TaskRun,WorkflowRun 由上层按依赖控制即可。
下面主要以 TaskRun 为例讲状态机和代码。
10.2 状态机设计(ASCII 图 + 状态说明)
Task 状态枚举
Pending :待调度(新任务 / 等待首跑)
Scheduling :正在选节点 / 提交到底层系统
Running :底层已接收,任务在跑
Success :执行成功
Failed :执行失败(且不再重试)
RetryWait :失败后等待重试时间窗
Lost :失联(心跳超时 / 节点炸了)
Paused :人工或系统暂停
Canceled :人工取消
状态流转图(完整版)
+----------------+
| |
| RetryWait <--+<--------------------+
| | |
+-------+--------+ |
| |
v |
Pending --> Scheduling --> Running --> Success |
^ | ^ | |
| | | v |
| | | Failed ----------- +
| | | |
| | | v
| | | Lost
| | |
| | +----------------------+
| | |
| v |
+--------- Paused <-------- Canceled ---+
说明:
- Pending → Scheduling:调度器选中要执行的任务
- Scheduling → Running:成功提交到底层(K8s/Volcano/Ray)
- Running → Success/Failed/Lost:回调 or 心跳判定
- Failed → RetryWait:还有重试次数
- RetryWait → Pending:到达 next_retry_at 重新排队
- 任何状态 → Paused/Canceled:人工操作 or 系统降级
10.3 数据库表结构设计(完整版)
workflow_runs 表
CREATE TABLE workflow_runs (
id BIGSERIAL PRIMARY KEY,
project_id BIGINT NOT NULL,
name VARCHAR(256) NOT NULL,
status VARCHAR(32) NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
config JSONB, -- 工作流配置
error_reason TEXT,
-- 统计字段
total_tasks INT NOT NULL DEFAULT 0,
pending_tasks INT NOT NULL DEFAULT 0,
running_tasks INT NOT NULL DEFAULT 0,
succeeded_tasks INT NOT NULL DEFAULT 0,
failed_tasks INT NOT NULL DEFAULT 0
);
CREATE INDEX idx_workflow_status ON workflow_runs(status);
CREATE INDEX idx_workflow_project ON workflow_runs(project_id);
task_runs 表(核心)
CREATE TABLE task_runs (
id BIGSERIAL PRIMARY KEY,
workflow_run_id BIGINT NOT NULL REFERENCES workflow_runs(id),
name VARCHAR(128) NOT NULL,
task_type VARCHAR(64) NOT NULL, -- train/infer/register/eval
-- 状态相关
status VARCHAR(32) NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
-- 调度相关
priority INT NOT NULL DEFAULT 0,
scheduled_node VARCHAR(128),
queue_name VARCHAR(64),
-- 重试相关
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_retry_at TIMESTAMPTZ,
-- 暂停/取消相关
is_paused BOOLEAN NOT NULL DEFAULT FALSE,
pause_reason TEXT,
paused_at TIMESTAMPTZ,
paused_by VARCHAR(128),
is_manual_cancel BOOLEAN NOT NULL DEFAULT FALSE,
-- 心跳相关
heartbeat_version BIGINT NOT NULL DEFAULT 0,
last_heartbeat_at TIMESTAMPTZ,
heartbeat_timeout INTERVAL NOT NULL DEFAULT INTERVAL '60 seconds',
-- 任务数据
payload JSONB, -- 任务参数
result JSONB, -- 结构化执行结果
error_reason TEXT,
error_stack TEXT,
-- 资源需求
cpu_request DECIMAL(10,2),
memory_request BIGINT, -- bytes
gpu_request INT,
-- 时间相关
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
scheduled_at TIMESTAMPTZ,
-- 依赖关系
depends_on BIGINT[], -- 依赖的任务 ID 列表
-- 元数据
labels JSONB, -- 标签
annotations JSONB -- 注解
);
CREATE INDEX idx_task_status_queue ON task_runs(status, queue_name);
CREATE INDEX idx_task_workflow ON task_runs(workflow_run_id);
CREATE INDEX idx_task_next_retry ON task_runs(status, next_retry_at)
WHERE next_retry_at IS NOT NULL;
CREATE INDEX idx_task_running ON task_runs(status, last_heartbeat_at)
WHERE status = 'Running';
10.4 表驱动状态机实现(完整 Go 代码)
状态和 Transition 结构
package fsm
import (
"context"
"fmt"
)
type Status string
const (
StatusPending Status = "Pending"
StatusScheduling Status = "Scheduling"
StatusRunning Status = "Running"
StatusSuccess Status = "Success"
StatusFailed Status = "Failed"
StatusRetryWait Status = "RetryWait"
StatusLost Status = "Lost"
StatusPaused Status = "Paused"
StatusCanceled Status = "Canceled"
)
type Task struct {
ID int64
Status Status
Version int64
RetryCount int
MaxRetries int
// ... 其他字段
}
type Transition struct {
From []Status
To Status
Action func(ctx context.Context, task *Task) error
}
type ActionFunc func(ctx context.Context, task *Task) error
状态转换表
package fsm
var transitions = []Transition{
{
From: []Status{StatusPending, StatusRetryWait},
To: StatusScheduling,
Action: scheduleTask, // 选节点、写 scheduled_node、下发到底层
},
{
From: []Status{StatusScheduling},
To: StatusRunning,
Action: startTask, // 已成功提交,记录 start_time / 注册心跳
},
{
From: []Status{StatusRunning},
To: StatusSuccess,
Action: onSuccess, // 清理资源、发通知、更新 metrics
},
{
From: []Status{StatusRunning},
To: StatusFailed,
Action: onFailed,
},
{
From: []Status{StatusRunning},
To: StatusLost,
Action: onLost,
},
{
From: []Status{StatusFailed, StatusLost},
To: StatusRetryWait,
Action: onRetryWait,
},
{
From: []Status{StatusRunning, StatusPending, StatusRetryWait},
To: StatusPaused,
Action: onPaused, // 记录原因,不再被调度
},
{
From: []Status{StatusPending, StatusRetryWait, StatusPaused},
To: StatusCanceled,
Action: onCanceled,
},
}
应用状态转换(带 CAS 乐观锁)
package fsm
func ApplyTransition(
ctx context.Context,
repo TaskRepo,
task *Task,
to Status,
) error {
// 1. 查找匹配的转换
var found *Transition
for i := range transitions {
t := &transitions[i]
if t.To != to {
continue
}
for _, from := range t.From {
if from == task.Status {
found = t
break
}
}
if found != nil {
break
}
}
if found == nil {
return fmt.Errorf("invalid transition %s -> %s", task.Status, to)
}
// 2. 先执行副作用(一般在一个 DB 事务里更安全)
if err := found.Action(ctx, task); err != nil {
return fmt.Errorf("action failed: %w", err)
}
// 3. CAS 更新状态 + version(单写者)
oldVersion := task.Version
task.Status = to
task.Version++
ok, err := repo.UpdateStatusCAS(
ctx,
task.ID,
oldVersion,
to,
task.Version,
)
if err != nil {
return fmt.Errorf("update status failed: %w", err)
}
if !ok {
return fmt.Errorf("cas conflict when updating task %d", task.ID)
}
return nil
}
func contains(statuses []Status, status Status) bool {
for _, s := range statuses {
if s == status {
return true
}
}
return false
}
10.5 调度循环核心逻辑(完整实现)
拉取可调度任务(FOR UPDATE SKIP LOCKED)
package repo
import (
"context"
"database/sql"
)
type TaskRepo interface {
PickRunnableTasks(ctx context.Context, limit int) ([]*Task, error)
UpdateStatusCAS(ctx context.Context, taskID int64, oldVersion int64,
newStatus Status, newVersion int64) (bool, error)
GetTask(ctx context.Context, taskID int64) (*Task, error)
// ... 其他方法
}
type PostgresTaskRepo struct {
db *sql.DB
}
func (r *PostgresTaskRepo) PickRunnableTasks(
ctx context.Context,
limit int,
) ([]*Task, error) {
query := `
SELECT
id, workflow_run_id, name, status, version,
priority, retry_count, max_retries,
heartbeat_version, last_heartbeat_at, heartbeat_timeout,
payload, created_at, updated_at
FROM task_runs
WHERE status IN ('Pending', 'RetryWait')
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
AND is_paused = FALSE
ORDER BY priority DESC, id
FOR UPDATE SKIP LOCKED
LIMIT $1
`
rows, err := r.db.QueryContext(ctx, query, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*Task
for rows.Next() {
task := &Task{}
err := rows.Scan(
&task.ID, &task.WorkflowRunID, &task.Name,
&task.Status, &task.Version,
&task.Priority, &task.RetryCount, &task.MaxRetries,
&task.HeartbeatVersion, &task.LastHeartbeatAt,
&task.HeartbeatTimeout,
&task.Payload, &task.CreatedAt, &task.UpdatedAt,
)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return tasks, rows.Err()
}
func (r *PostgresTaskRepo) UpdateStatusCAS(
ctx context.Context,
taskID int64,
oldVersion int64,
newStatus Status,
newVersion int64,
) (bool, error) {
result, err := r.db.ExecContext(ctx, `
UPDATE task_runs
SET status = $1,
version = $2,
updated_at = NOW()
WHERE id = $3
AND version = $4
`, newStatus, newVersion, taskID, oldVersion)
if err != nil {
return false, err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return false, err
}
return rowsAffected > 0, nil
}
Go 调度循环
package scheduler
import (
"context"
"sync"
"time"
)
type Scheduler struct {
repo TaskRepo
clusterClient ClusterClient
workerPool *WorkerPool
logger Logger
// 降级相关
mu sync.RWMutex
isDegraded bool
degradeMode DegradeMode
degradeUntil time.Time
}
func (s *Scheduler) Run(ctx context.Context) error {
// 启动多个后台循环
g, ctx := errgroup.WithContext(ctx)
// 主调度循环
g.Go(func() error {
return s.runMainLoop(ctx)
})
// 心跳检查循环
g.Go(func() error {
return s.runHeartbeatCheckLoop(ctx)
})
// 恢复循环
g.Go(func() error {
return s.runRecoveryLoop(ctx)
})
// 降级检查循环
g.Go(func() error {
return s.runDegradeCheckLoop(ctx)
})
return g.Wait()
}
func (s *Scheduler) runMainLoop(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := s.RunOnce(ctx); err != nil {
s.logger.Error("schedule once failed", "err", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *Scheduler) RunOnce(ctx context.Context) error {
// 检查降级
if s.shouldSkipScheduling() {
return nil
}
// 拉取任务
limit := s.getTaskLimit()
tasks, err := s.repo.PickRunnableTasks(ctx, limit)
if err != nil {
return err
}
if len(tasks) == 0 {
return nil
}
s.logger.Info("picked tasks", "count", len(tasks))
// 并发处理任务
for _, t := range tasks {
task := t // 避免闭包问题
s.workerPool.Submit(func() {
if err := s.handleTask(ctx, task); err != nil {
s.logger.Error("handleTask failed",
"taskID", task.ID, "err", err)
}
})
}
return nil
}
func (s *Scheduler) handleTask(ctx context.Context, task *Task) error {
s.logger.Info("handling task",
"taskID", task.ID,
"status", task.Status)
// Pending / RetryWait -> Scheduling
if err := ApplyTransition(ctx, s.repo, task, StatusScheduling); err != nil {
return fmt.Errorf("transition to Scheduling failed: %w", err)
}
s.logger.Info("task scheduled", "taskID", task.ID)
// Scheduling -> Running
if err := ApplyTransition(ctx, s.repo, task, StatusRunning); err != nil {
return fmt.Errorf("transition to Running failed: %w", err)
}
s.logger.Info("task started", "taskID", task.ID)
return nil
}
func (s *Scheduler) shouldSkipScheduling() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.isDegraded {
return false
}
// 检查是否过期
if time.Now().After(s.degradeUntil) {
return false
}
return s.degradeMode == DegraduModeFull
}
func (s *Scheduler) getTaskLimit() int {
s.mu.RLock()
defer s.mu.RUnlock()
if s.isDegraded && s.degradeMode == DegradeModePartial {
return 10 // 降级时只拉取 10 个
}
return 100 // 正常时拉取 100 个
}
十一、完整项目结构建议
scheduler/
├── main.go # 主入口
├── config/
│ └── config.go # 配置
├── fsm/ # 状态机
│ ├── fsm.go # 状态机核心
│ ├── transitions.go # 转换表
│ └── actions.go # 副作用函数
├── repo/ # 数据访问层
│ ├── task_repo.go
│ ├── workflow_repo.go
│ └── postgres.go
├── scheduler/ # 调度器
│ ├── scheduler.go # 主调度器
│ ├── heartbeat.go # 心跳检查
│ ├── recovery.go # 恢复逻辑
│ └── degrade.go # 降级逻辑
├── runtime/ # 运行时适配
│ ├── cluster_client.go # 集群客户端接口
│ ├── kubernetes.go # K8s 实现
│ ├── volcano.go # Volcano 实现
│ └── ray.go # Ray 实现
├── api/ # API 服务
│ ├── server.go
│ ├── heartbeat.go # 心跳接口
│ └── task.go # 任务管理接口
├── worker/ # 工作池
│ └── pool.go
├── metrics/ # 监控指标
│ └── metrics.go
└── pkg/ # 公共包
├── logger/
├── alert/
└── errors/
十二、总结:调度器最关键的七个要点
1. 为什么要"调度器写状态、其他都不能写"?
因为:
- 只有调度器知道"现在要干什么"
- 如果节点、用户、回调乱改状态 → 状态机会乱套
- 所以必须单点写状态(single writer)
这可以把 99% 状态错乱问题直接解决掉。
2. 为什么要 CAS(version 乐观锁)?
因为调度器可能多实例跑:
- 两个 scheduler 同时抢到任务 → 都想运行
- 如果没有 version → A 改 Running,B 改 Running,冲突无法检测
- 有 version → 谁先更新谁赢,后来的会失败
这是所有工业级调度系统的共同点(K8s、Ray、Airflow、TiDB 分布式调度等)。
3. 状态机用"表驱动"而不是 if-else 为什么更强?
表驱动状态机:
- 更清晰
- 更容易扩展
- 更容易测试
- 不会出现乱七八糟的 if/switch 炸弹
未来你要加状态(Backoff / Recovering / Pausing / Queued / Degraded / NodeLost),你只需要加一条表项,而不是修改一堆代码。
4. 状态机本身只负责"转状态",不负责"动作"
要把"状态的变更"和"动作的执行"分开:
- 状态机决定:从 A → B
- 副作用执行:落地这次动作(启动容器、重试、清理等等)
这样系统才不会乱。
5. 并发是调度器最难的问题(不是状态机本身)
并发竞争来自:
- 多个调度循环
- 节点回调
- 心跳
- 重试机制
- 用户触发取消/暂停动作
解决方式:
- CAS version(防止重复更新状态)
- 行级锁(短锁)(在临界区保护一次执行,避免双启动)
- 副作用必须幂等(多次调用不能出事故,如重复启动容器)
- 事件订阅模式(避免循环扫整个任务表)
- 补偿机制(自动恢复)(Lost、Failed、Retry、Recovering)
6. 心跳要用"版本号 + 多信号判断"才能可靠
不是收到心跳就说明健康,也不是 30 秒没心跳就 Lost。
要结合:
- 最后心跳时间
- 心跳 version
- 节点是否 NotReady
- K8s/Ray task 回调
- 日志更新时间
- 节点退出事件
才能得出真实的"Lost"判断。
否则会误杀任务。
7. Pause/Resume/Degrade 是高级调度器必须具备的能力
比如:
- GPU 不够 → 整体进入 Degrade
- 某个 Workflow 需要暂停 → Pause
- 恢复 → Resume
没有这些,你以后会很痛苦。
十三、极简版
调度器最关键的是:
状态机 + version CAS + 幂等副作用 + 可靠心跳 + 补偿机制
其他都是细节。
结语:为什么调度器值得深入学习?
调度器是编程领域的"珠穆朗玛峰"。
它不是最常见的工作,但一旦你需要写调度器,你会发现:
- CRUD 的经验完全不够用
- 普通业务系统的思维方式完全不适用
- 你需要重新理解"状态"、"并发"、"分布式"、"不确定性"
但是,一旦你掌握了调度器的设计原则:
- 你会对分布式系统有全新的理解
- 你会对状态机有深刻的认识
- 你会对并发控制有实战经验
- 你会成为团队里少数几个能解决"最难问题"的人
这不仅是技术能力的提升,更是思维方式的升级。
调度器很难,但征服它之后,你会发现自己已经站在了另一个高度。
本系列文章
➤ NO.1 调度器
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)