第13章:消息队列设计
| > 面试频率: 特性 | Kafka | RabbitMQ |
|---|---|---|
| 定位 | 分布式流处理平台 | 传统消息队列 |
| 吞吐量 | 百万QPS | 万级QPS |
| 延迟 | ms级 | μs级 |
| 消息顺序 | Partition内有序 | 队列内有序 |
| 持久化 | 磁盘(日志) | 内存+磁盘 |
| 消费模式 | Pull(拉) | Push(推) |
| 协议 | 自定义二进制 | AMQP |
| 场景 | 日志、流处理、大数据 | 任务队列、RPC |
选择建议:
- 高吞吐、大数据场景 → Kafka
- 低延迟、复杂路由 → RabbitMQ
如何实现消息的幂等性?
答案:
方案1:唯一键约束
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY,
...
);
-- INSERT重复会失败,自动去重
方案2:消息表
// 记录已处理的消息ID
type MessageLog struct {
MessageID string `gorm:"primaryKey"`
Status string
}
func ProcessIdempotent(msg *kafka.Message) {
msgID := getMessageID(msg)
// 检查是否已处理
var log MessageLog
if db.Where("message_id = ?", msgID).First(&log).Error == nil {
return // 已处理,跳过
}
// 开启事务
tx := db.Begin()
tx.Create(&MessageLog{MessageID: msgID, Status: "processing"})
doBusiness(msg)
tx.Model(&log).Update("status", "completed")
tx.Commit()
}
方案3:业务ID去重
// 使用业务自带的ID(订单ID、支付ID)
payment := Payment{
PaymentID: msg.PaymentID, // 全局唯一
Amount: msg.Amount,
}
db.Create(&payment) // 重复会失败
如何设计延迟队列?
答案:
方案1:时间轮
type TimeWheel struct {
slots [][]*DelayMessage
currentSlot int
slotDuration time.Duration // 每格时间
}
// 60个slot,每个1秒 = 1分钟时间轮
tw := NewTimeWheel(1*time.Second, 60)
// 添加延迟消息
msg := &DelayMessage{
Data: "order_cancel",
DelayUntil: time.Now().Add(30 * time.Minute),
}
tw.AddMessage(msg)
// 时间轮转动,到期消息自动发送
方案2:RocketMQ延迟级别
// RocketMQ内置18个延迟级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg := &Message{
Topic: "order-cancel",
Body: []byte("order_123"),
DelayLevel: 3, // 10秒延迟
}
方案3:定时任务扫描
-- 订单表加字段
ALTER TABLE orders ADD COLUMN expire_at DATETIME;
-- 定时任务(每分钟扫描)
SELECT * FROM orders
WHERE status = 'pending'
AND expire_at < NOW()
LIMIT 1000;
-- 取消超时订单
Consumer消费速度跟不上怎么办?
答案:
方案1:增加Consumer
Topic有4个Partition
Consumer Group只有2个Consumer
→ 增加到4个Consumer(每个消费1个Partition)
方案2:批量消费
batch := make([]*kafka.Message, 0, 100)
for {
msg, _ := consumer.ReadMessage(100)
batch = append(batch, msg)
if len(batch) >= 100 {
processBatch(batch) // 批量处理,提升效率
consumer.Commit()
batch = batch[:0]
}
}
方案3:异步处理
workers := 10
jobs := make(chan *kafka.Message, 1000)
// 启动worker池
for i := 0; i < workers; i++ {
go func() {
for msg := range jobs {
processMessage(msg)
}
}()
}
// 主线程只负责拉取消息
for {
msg, _ := consumer.ReadMessage(-1)
jobs <- msg
}
方案4:增加Partition
# 增加Partition数量(只能增加,不能减少)
kafka-topics.sh --alter --topic orders --partitions 8
如何监控Kafka集群?
答案:
核心指标:
- Producer指标
- record-send-rate: 发送速率(QPS)
- record-error-rate: 发送失败率
- request-latency-avg: 请求延迟
- Broker指标
- messages-in-per-sec: 消息生产速率
- bytes-in-per-sec: 入流量
- under-replicated-partitions: 未完全复制的Partition数
- Consumer指标
- records-lag: 消费延迟(最重要!)
- records-consumed-rate: 消费速率
- 告警
# Consumer Lag > 1000
alert: HighConsumerLag
expr: kafka_consumer_lag > 1000
for: 5m
# ISR不足
alert: UnderReplicatedPartitions
expr: kafka_under_replicated_partitions > 0
监控工具:
- Kafka Manager(Yahoo开源)
- Confluent Control Center
- Prometheus + Grafana