HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 系统设计实战

    • 系统设计面试教程
    • 系统设计方法论
    • 01-短链系统设计
    • 02 - 秒杀系统设计
    • 03 - IM 即时通讯系统设计
    • 04 - Feed 流系统设计
    • 05 - 分布式 ID 生成器设计
    • 06 - 限流系统设计
    • 第7章:搜索引擎设计
    • 08 - 推荐系统设计
    • 09 - 支付系统设计
    • 10 - 电商系统设计
    • 11 - 直播系统设计
    • 第12章:缓存系统设计
    • 第13章:消息队列设计
    • 第14章:分布式事务
    • 15 - 监控系统设计

第13章:消息队列设计

> 面试频率: 特性KafkaRabbitMQ
定位分布式流处理平台传统消息队列
吞吐量百万QPS万级QPS
延迟ms级μs级
消息顺序Partition内有序队列内有序
持久化磁盘(日志)内存+磁盘
消费模式Pull(拉)Push(推)
协议自定义二进制AMQP
场景日志、流处理、大数据任务队列、RPC

选择建议:

  • 高吞吐、大数据场景 → Kafka
  • 低延迟、复杂路由 → RabbitMQ

如何实现消息的幂等性?

答案:

方案1:唯一键约束

CREATE TABLE orders (
  order_id VARCHAR(64) PRIMARY KEY,
  ...
);

-- INSERT重复会失败,自动去重

方案2:消息表

// 记录已处理的消息ID
type MessageLog struct {
	MessageID string `gorm:"primaryKey"`
	Status    string
}

func ProcessIdempotent(msg *kafka.Message) {
	msgID := getMessageID(msg)
	
	// 检查是否已处理
	var log MessageLog
	if db.Where("message_id = ?", msgID).First(&log).Error == nil {
		return  // 已处理,跳过
	}
	
	// 开启事务
	tx := db.Begin()
	tx.Create(&MessageLog{MessageID: msgID, Status: "processing"})
	doBusiness(msg)
	tx.Model(&log).Update("status", "completed")
	tx.Commit()
}

方案3:业务ID去重

// 使用业务自带的ID(订单ID、支付ID)
payment := Payment{
	PaymentID: msg.PaymentID,  // 全局唯一
	Amount:    msg.Amount,
}
db.Create(&payment)  // 重复会失败

如何设计延迟队列?

答案:

方案1:时间轮

type TimeWheel struct {
	slots        [][]*DelayMessage
	currentSlot  int
	slotDuration time.Duration  // 每格时间
}

// 60个slot,每个1秒 = 1分钟时间轮
tw := NewTimeWheel(1*time.Second, 60)

// 添加延迟消息
msg := &DelayMessage{
	Data:       "order_cancel",
	DelayUntil: time.Now().Add(30 * time.Minute),
}
tw.AddMessage(msg)

// 时间轮转动,到期消息自动发送

方案2:RocketMQ延迟级别

// RocketMQ内置18个延迟级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

msg := &Message{
	Topic:      "order-cancel",
	Body:       []byte("order_123"),
	DelayLevel: 3,  // 10秒延迟
}

方案3:定时任务扫描

-- 订单表加字段
ALTER TABLE orders ADD COLUMN expire_at DATETIME;

-- 定时任务(每分钟扫描)
SELECT * FROM orders 
WHERE status = 'pending' 
  AND expire_at < NOW()
LIMIT 1000;

-- 取消超时订单

Consumer消费速度跟不上怎么办?

答案:

方案1:增加Consumer

Topic有4个Partition
Consumer Group只有2个Consumer
→ 增加到4个Consumer(每个消费1个Partition)

方案2:批量消费

batch := make([]*kafka.Message, 0, 100)

for {
	msg, _ := consumer.ReadMessage(100)
	batch = append(batch, msg)
	
	if len(batch) >= 100 {
		processBatch(batch)  // 批量处理,提升效率
		consumer.Commit()
		batch = batch[:0]
	}
}

方案3:异步处理

workers := 10
jobs := make(chan *kafka.Message, 1000)

// 启动worker池
for i := 0; i < workers; i++ {
	go func() {
		for msg := range jobs {
			processMessage(msg)
		}
	}()
}

// 主线程只负责拉取消息
for {
	msg, _ := consumer.ReadMessage(-1)
	jobs <- msg
}

方案4:增加Partition

# 增加Partition数量(只能增加,不能减少)
kafka-topics.sh --alter --topic orders --partitions 8

如何监控Kafka集群?

答案:

核心指标:

  1. Producer指标
- record-send-rate: 发送速率(QPS)
- record-error-rate: 发送失败率
- request-latency-avg: 请求延迟
  1. Broker指标
- messages-in-per-sec: 消息生产速率
- bytes-in-per-sec: 入流量
- under-replicated-partitions: 未完全复制的Partition数
  1. Consumer指标
- records-lag: 消费延迟(最重要!)
- records-consumed-rate: 消费速率
  1. 告警
# Consumer Lag > 1000
alert: HighConsumerLag
expr: kafka_consumer_lag > 1000
for: 5m

# ISR不足
alert: UnderReplicatedPartitions
expr: kafka_under_replicated_partitions > 0

监控工具:

  • Kafka Manager(Yahoo开源)
  • Confluent Control Center
  • Prometheus + Grafana

Prev
第12章:缓存系统设计
Next
第14章:分布式事务