HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 学习 Kafka

    • Kafka 学习手册 - 总览与导读
    • 01-核心概念与架构
    • 02-存储模块-日志与索引
    • 03-复制与ISR机制
    • 04-元数据管理与KRaft
    • 05-消费者组协调
    • 06-事务与Exactly-Once语义
    • 07-性能优化与调优
    • 08-高可用与容灾
    • 09-面试高频问题详解
    • 10-实战项目-Mini-Kafka实现

10-实战项目-Mini-Kafka实现

📋 本章概览

本章将带您动手实现一个简化版的Kafka,通过实际编码加深对Kafka核心机制的理解。我们将从项目设计、核心数据结构、关键功能实现等方面,逐步构建一个可运行的Mini-Kafka系统。

🎯 学习目标

  • 理解Kafka的核心设计思想
  • 掌握段文件、稀疏索引等关键机制
  • 实现基本的Producer和Consumer功能
  • 学习分布式系统的设计原则
  • 提升系统设计和编码能力

🏗️ 项目总体设计

系统架构

┌─────────────────────────────────────────────────────────────────┐
│                    Mini-Kafka 架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │  Producer   │    │   Broker    │    │  Consumer   │         │
│  │   Client    │    │   Server    │    │   Client    │         │
│  │             │    │             │    │             │         │
│  │ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │         │
│  │ │Message  │ │    │ │Topic    │ │    │ │Offset   │ │         │
│  │ │Builder  │ │    │ │Manager  │ │    │ │Manager  │ │         │
│  │ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │         │
│  │             │    │             │    │             │         │
│  │ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │         │
│  │ │Partition│ │    │ │Partition│ │    │ │Message  │ │         │
│  │ │Selector │ │    │ │Log      │ │    │ │Processor│ │         │
│  │ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│         │                   │                   │              │
│         │ 1. 发送消息        │ 2. 存储消息        │ 3. 消费消息   │
│         └───────────────────┼───────────────────┘              │
│                             │                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                    存储层                                   ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        ││
│  │  │   Segment   │  │   Index     │  │   Offset    │        ││
│  │  │   Files     │  │   Files     │  │   Store     │        ││
│  │  └─────────────┘  └─────────────┘  └─────────────┘        ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

项目结构

mini-kafka/
├── cmd/
│   └── broker/
│       └── main.go              # 启动入口
├── internal/
│   ├── api/
│   │   ├── producer.go          # Producer API
│   │   ├── consumer.go          # Consumer API
│   │   └── server.go            # HTTP服务器
│   ├── log/
│   │   ├── segment.go           # 段文件管理
│   │   ├── index.go             # 稀疏索引
│   │   └── partition.go         # 分区日志
│   ├── meta/
│   │   ├── topic.go             # Topic管理
│   │   └── offset.go            # Offset管理
│   └── storage/
│       ├── file.go              # 文件操作
│       └── buffer.go            # 缓冲区管理
├── pkg/
│   ├── protocol/
│   │   ├── message.go           # 消息协议
│   │   └── request.go           # 请求协议
│   └── utils/
│       ├── hash.go              # 哈希函数
│       └── time.go              # 时间工具
├── go.mod
├── go.sum
└── README.md

📊 核心数据结构

消息结构

// 消息记录
type Record struct {
    Timestamp int64  `json:"timestamp"`
    Key       []byte `json:"key"`
    Value     []byte `json:"value"`
    Offset    int64  `json:"offset"`
}

// 消息批次
type RecordBatch struct {
    BaseOffset    int64     `json:"base_offset"`
    Length        int32     `json:"length"`
    CRC           uint32    `json:"crc"`
    Records       []Record  `json:"records"`
    Timestamp     int64     `json:"timestamp"`
}

// 消息编码
func (r *Record) Encode() ([]byte, error) {
    var buf bytes.Buffer
    
    // 时间戳 (8 bytes)
    if err := binary.Write(&buf, binary.BigEndian, r.Timestamp); err != nil {
        return nil, err
    }
    
    // 键长度 (4 bytes)
    keyLen := uint32(len(r.Key))
    if err := binary.Write(&buf, binary.BigEndian, keyLen); err != nil {
        return nil, err
    }
    
    // 值长度 (4 bytes)
    valueLen := uint32(len(r.Value))
    if err := binary.Write(&buf, binary.BigEndian, valueLen); err != nil {
        return nil, err
    }
    
    // 键内容
    if _, err := buf.Write(r.Key); err != nil {
        return nil, err
    }
    
    // 值内容
    if _, err := buf.Write(r.Value); err != nil {
        return nil, err
    }
    
    return buf.Bytes(), nil
}

// 消息解码
func (r *Record) Decode(data []byte) error {
    buf := bytes.NewReader(data)
    
    // 读取时间戳
    if err := binary.Read(buf, binary.BigEndian, &r.Timestamp); err != nil {
        return err
    }
    
    // 读取键长度
    var keyLen uint32
    if err := binary.Read(buf, binary.BigEndian, &keyLen); err != nil {
        return err
    }
    
    // 读取值长度
    var valueLen uint32
    if err := binary.Read(buf, binary.BigEndian, &valueLen); err != nil {
        return err
    }
    
    // 读取键内容
    r.Key = make([]byte, keyLen)
    if _, err := buf.Read(r.Key); err != nil {
        return err
    }
    
    // 读取值内容
    r.Value = make([]byte, valueLen)
    if _, err := buf.Read(r.Value); err != nil {
        return err
    }
    
    return nil
}

段文件结构

// 段文件
type Segment struct {
    BaseOffset    int64
    LogFile       *os.File
    IndexFile     *os.File
    Size          int64
    LastOffset    int64
    Index         []IndexEntry
    mu            sync.RWMutex
}

// 索引条目
type IndexEntry struct {
    RelativeOffset int32
    Position       int64
}

// 创建段文件
func NewSegment(dir string, baseOffset int64) (*Segment, error) {
    // 创建目录
    if err := os.MkdirAll(dir, 0755); err != nil {
        return nil, err
    }
    
    // 创建日志文件
    logPath := filepath.Join(dir, fmt.Sprintf("%020d.log", baseOffset))
    logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }
    
    // 创建索引文件
    indexPath := filepath.Join(dir, fmt.Sprintf("%020d.idx", baseOffset))
    indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR, 0644)
    if err != nil {
        return nil, err
    }
    
    // 获取文件大小
    stat, err := logFile.Stat()
    if err != nil {
        return nil, err
    }
    
    segment := &Segment{
        BaseOffset: baseOffset,
        LogFile:    logFile,
        IndexFile:  indexFile,
        Size:       stat.Size(),
        LastOffset: baseOffset - 1,
        Index:      make([]IndexEntry, 0),
    }
    
    // 加载索引
    if err := segment.loadIndex(); err != nil {
        return nil, err
    }
    
    return segment, nil
}

// 加载索引
func (s *Segment) loadIndex() error {
    s.Index = make([]IndexEntry, 0)
    
    if _, err := s.IndexFile.Seek(0, io.SeekStart); err != nil {
        return err
    }
    
    buf := make([]byte, 12) // 4 bytes offset + 8 bytes position
    for {
        _, err := io.ReadFull(s.IndexFile, buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        
        entry := IndexEntry{
            RelativeOffset: int32(binary.BigEndian.Uint32(buf[0:4])),
            Position:       int64(binary.BigEndian.Uint64(buf[4:12])),
        }
        
        s.Index = append(s.Index, entry)
    }
    
    return nil
}

// 写入索引条目
func (s *Segment) writeIndexEntry(relativeOffset int32, position int64) error {
    buf := make([]byte, 12)
    binary.BigEndian.PutUint32(buf[0:4], uint32(relativeOffset))
    binary.BigEndian.PutUint64(buf[4:12], uint64(position))
    
    _, err := s.IndexFile.Write(buf)
    return err
}

分区日志结构

// 分区日志
type PartitionLog struct {
    Topic           string
    Partition       int32
    Dir             string
    Segments        []*Segment
    ActiveSegment   *Segment
    MaxSegmentBytes int64
    IndexInterval   int64
    mu              sync.RWMutex
}

// 创建分区日志
func NewPartitionLog(topic string, partition int32, dir string) (*PartitionLog, error) {
    partitionDir := filepath.Join(dir, fmt.Sprintf("%s-%d", topic, partition))
    
    pl := &PartitionLog{
        Topic:           topic,
        Partition:       partition,
        Dir:             partitionDir,
        Segments:        make([]*Segment, 0),
        MaxSegmentBytes: 1 << 30, // 1GB
        IndexInterval:   1 << 20, // 1MB
    }
    
    // 加载现有段文件
    if err := pl.loadSegments(); err != nil {
        return nil, err
    }
    
    // 如果没有段文件,创建第一个
    if len(pl.Segments) == 0 {
        segment, err := NewSegment(partitionDir, 0)
        if err != nil {
            return nil, err
        }
        pl.Segments = append(pl.Segments, segment)
        pl.ActiveSegment = segment
    } else {
        pl.ActiveSegment = pl.Segments[len(pl.Segments)-1]
    }
    
    return pl, nil
}

// 加载段文件
func (pl *PartitionLog) loadSegments() error {
    entries, err := os.ReadDir(pl.Dir)
    if err != nil {
        return err
    }
    
    var baseOffsets []int64
    for _, entry := range entries {
        if strings.HasSuffix(entry.Name(), ".log") {
            baseStr := strings.TrimSuffix(entry.Name(), ".log")
            if base, err := strconv.ParseInt(baseStr, 10, 64); err == nil {
                baseOffsets = append(baseOffsets, base)
            }
        }
    }
    
    sort.Slice(baseOffsets, func(i, j int) bool {
        return baseOffsets[i] < baseOffsets[j]
    })
    
    for _, baseOffset := range baseOffsets {
        segment, err := NewSegment(pl.Dir, baseOffset)
        if err != nil {
            return err
        }
        pl.Segments = append(pl.Segments, segment)
    }
    
    return nil
}

🚀 核心功能实现

消息追加

// 追加消息
func (pl *PartitionLog) Append(records []Record) (int64, int64, error) {
    pl.mu.Lock()
    defer pl.mu.Unlock()
    
    if len(records) == 0 {
        return pl.ActiveSegment.LastOffset + 1, pl.ActiveSegment.LastOffset, nil
    }
    
    // 检查是否需要创建新段
    if err := pl.maybeRollSegment(); err != nil {
        return 0, 0, err
    }
    
    // 编码消息批次
    batch, err := pl.encodeBatch(records)
    if err != nil {
        return 0, 0, err
    }
    
    // 获取写入位置
    position, err := pl.ActiveSegment.LogFile.Seek(0, io.SeekEnd)
    if err != nil {
        return 0, 0, err
    }
    
    // 写入日志文件
    if _, err := pl.ActiveSegment.LogFile.Write(batch); err != nil {
        return 0, 0, err
    }
    
    // 更新段大小
    pl.ActiveSegment.Size += int64(len(batch))
    
    // 更新偏移量
    baseOffset := pl.ActiveSegment.LastOffset + 1
    pl.ActiveSegment.LastOffset += int64(len(records))
    
    // 写入索引
    if pl.shouldWriteIndex(position) {
        relativeOffset := int32(baseOffset - pl.ActiveSegment.BaseOffset)
        if err := pl.ActiveSegment.writeIndexEntry(relativeOffset, position); err != nil {
            return 0, 0, err
        }
        
        pl.ActiveSegment.Index = append(pl.ActiveSegment.Index, IndexEntry{
            RelativeOffset: relativeOffset,
            Position:       position,
        })
    }
    
    return baseOffset, pl.ActiveSegment.LastOffset, nil
}

// 编码消息批次
func (pl *PartitionLog) encodeBatch(records []Record) ([]byte, error) {
    var buf bytes.Buffer
    
    // 批次长度 (4 bytes) - 稍后填充
    lengthPos := buf.Len()
    buf.Write(make([]byte, 4))
    
    // 记录数量 (4 bytes)
    if err := binary.Write(&buf, binary.BigEndian, uint32(len(records))); err != nil {
        return nil, err
    }
    
    // 编码每条记录
    for _, record := range records {
        data, err := record.Encode()
        if err != nil {
            return nil, err
        }
        
        // 记录长度 (4 bytes)
        if err := binary.Write(&buf, binary.BigEndian, uint32(len(data))); err != nil {
            return nil, err
        }
        
        // 记录内容
        if _, err := buf.Write(data); err != nil {
            return nil, err
        }
    }
    
    // 计算批次长度
    batchData := buf.Bytes()
    batchLength := len(batchData) - 4
    binary.BigEndian.PutUint32(batchData[lengthPos:lengthPos+4], uint32(batchLength))
    
    return batchData, nil
}

// 检查是否需要创建新段
func (pl *PartitionLog) maybeRollSegment() error {
    if pl.ActiveSegment.Size >= pl.MaxSegmentBytes {
        return pl.rollSegment()
    }
    return nil
}

// 创建新段
func (pl *PartitionLog) rollSegment() error {
    baseOffset := pl.ActiveSegment.LastOffset + 1
    segment, err := NewSegment(pl.Dir, baseOffset)
    if err != nil {
        return err
    }
    
    pl.Segments = append(pl.Segments, segment)
    pl.ActiveSegment = segment
    
    return nil
}

// 检查是否需要写入索引
func (pl *PartitionLog) shouldWriteIndex(position int64) bool {
    if len(pl.ActiveSegment.Index) == 0 {
        return true
    }
    
    lastEntry := pl.ActiveSegment.Index[len(pl.ActiveSegment.Index)-1]
    return position-lastEntry.Position >= pl.IndexInterval
}

消息读取

// 读取消息
func (pl *PartitionLog) Read(offset int64, maxBytes int64) ([]Record, error) {
    pl.mu.RLock()
    defer pl.mu.RUnlock()
    
    // 查找包含指定偏移量的段
    segment := pl.findSegment(offset)
    if segment == nil {
        return nil, fmt.Errorf("offset %d not found", offset)
    }
    
    // 使用索引查找起始位置
    startPosition := pl.findStartPosition(segment, offset)
    
    // 从起始位置开始读取
    if _, err := segment.LogFile.Seek(startPosition, io.SeekStart); err != nil {
        return nil, err
    }
    
    var records []Record
    var bytesRead int64
    
    for bytesRead < maxBytes {
        // 读取批次长度
        var batchLength uint32
        if err := binary.Read(segment.LogFile, binary.BigEndian, &batchLength); err != nil {
            if err == io.EOF {
                break
            }
            return nil, err
        }
        
        // 读取批次数据
        batchData := make([]byte, batchLength)
        if _, err := io.ReadFull(segment.LogFile, batchData); err != nil {
            return nil, err
        }
        
        // 解码批次
        batchRecords, err := pl.decodeBatch(batchData)
        if err != nil {
            return nil, err
        }
        
        // 过滤指定偏移量之后的消息
        for _, record := range batchRecords {
            if record.Offset >= offset {
                records = append(records, record)
                bytesRead += int64(len(record.Key) + len(record.Value))
            }
        }
    }
    
    return records, nil
}

// 查找段
func (pl *PartitionLog) findSegment(offset int64) *Segment {
    for i := len(pl.Segments) - 1; i >= 0; i-- {
        segment := pl.Segments[i]
        if offset >= segment.BaseOffset {
            return segment
        }
    }
    return nil
}

// 查找起始位置
func (pl *PartitionLog) findStartPosition(segment *Segment, offset int64) int64 {
    // 使用索引查找最接近的位置
    for i := len(segment.Index) - 1; i >= 0; i-- {
        entry := segment.Index[i]
        absoluteOffset := segment.BaseOffset + int64(entry.RelativeOffset)
        if absoluteOffset <= offset {
            return entry.Position
        }
    }
    
    return 0
}

// 解码消息批次
func (pl *PartitionLog) decodeBatch(data []byte) ([]Record, error) {
    buf := bytes.NewReader(data)
    
    // 读取记录数量
    var recordCount uint32
    if err := binary.Read(buf, binary.BigEndian, &recordCount); err != nil {
        return nil, err
    }
    
    var records []Record
    for i := uint32(0); i < recordCount; i++ {
        // 读取记录长度
        var recordLength uint32
        if err := binary.Read(buf, binary.BigEndian, &recordLength); err != nil {
            return nil, err
        }
        
        // 读取记录数据
        recordData := make([]byte, recordLength)
        if _, err := buf.Read(recordData); err != nil {
            return nil, err
        }
        
        // 解码记录
        var record Record
        if err := record.Decode(recordData); err != nil {
            return nil, err
        }
        
        records = append(records, record)
    }
    
    return records, nil
}

HTTP API服务器

// HTTP服务器
type Server struct {
    topicManager *TopicManager
    offsetStore  *OffsetStore
    addr         string
}

// 启动服务器
func (s *Server) Start() error {
    http.HandleFunc("/topics", s.handleTopics)
    http.HandleFunc("/topics/", s.handleTopic)
    http.HandleFunc("/produce", s.handleProduce)
    http.HandleFunc("/consume", s.handleConsume)
    http.HandleFunc("/offsets", s.handleOffsets)
    
    log.Printf("Server starting on %s", s.addr)
    return http.ListenAndServe(s.addr, nil)
}

// 处理Topic列表
func (s *Server) handleTopics(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        s.listTopics(w, r)
    case "POST":
        s.createTopic(w, r)
    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

// 列出所有Topic
func (s *Server) listTopics(w http.ResponseWriter, r *http.Request) {
    topics := s.topicManager.ListTopics()
    
    response := map[string]interface{}{
        "topics": topics,
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// 创建Topic
func (s *Server) createTopic(w http.ResponseWriter, r *http.Request) {
    var request struct {
        Name       string `json:"name"`
        Partitions int    `json:"partitions"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }
    
    if err := s.topicManager.CreateTopic(request.Name, request.Partitions); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(map[string]string{"status": "created"})
}

// 处理生产请求
func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) {
    var request struct {
        Topic     string  `json:"topic"`
        Partition int32   `json:"partition"`
        Records   []Record `json:"records"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }
    
    // 获取分区日志
    partitionLog, err := s.topicManager.GetPartitionLog(request.Topic, request.Partition)
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    // 追加消息
    baseOffset, lastOffset, err := partitionLog.Append(request.Records)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    response := map[string]interface{}{
        "base_offset": baseOffset,
        "last_offset": lastOffset,
        "count":       len(request.Records),
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// 处理消费请求
func (s *Server) handleConsume(w http.ResponseWriter, r *http.Request) {
    var request struct {
        Topic     string `json:"topic"`
        Partition int32  `json:"partition"`
        Offset    int64  `json:"offset"`
        MaxBytes  int64  `json:"max_bytes"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }
    
    if request.MaxBytes == 0 {
        request.MaxBytes = 1024 * 1024 // 1MB
    }
    
    // 获取分区日志
    partitionLog, err := s.topicManager.GetPartitionLog(request.Topic, request.Partition)
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    // 读取消息
    records, err := partitionLog.Read(request.Offset, request.MaxBytes)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    response := map[string]interface{}{
        "records": records,
        "count":   len(records),
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

Topic管理器

// Topic管理器
type TopicManager struct {
    topics map[string]*Topic
    dir    string
    mu     sync.RWMutex
}

// Topic结构
type Topic struct {
    Name       string
    Partitions map[int32]*PartitionLog
    mu         sync.RWMutex
}

// 创建Topic管理器
func NewTopicManager(dir string) *TopicManager {
    return &TopicManager{
        topics: make(map[string]*Topic),
        dir:    dir,
    }
}

// 创建Topic
func (tm *TopicManager) CreateTopic(name string, partitionCount int) error {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    if _, exists := tm.topics[name]; exists {
        return fmt.Errorf("topic %s already exists", name)
    }
    
    topic := &Topic{
        Name:       name,
        Partitions: make(map[int32]*PartitionLog),
    }
    
    // 创建分区
    for i := 0; i < partitionCount; i++ {
        partitionLog, err := NewPartitionLog(name, int32(i), tm.dir)
        if err != nil {
            return err
        }
        topic.Partitions[int32(i)] = partitionLog
    }
    
    tm.topics[name] = topic
    return nil
}

// 获取分区日志
func (tm *TopicManager) GetPartitionLog(topicName string, partition int32) (*PartitionLog, error) {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    
    topic, exists := tm.topics[topicName]
    if !exists {
        return nil, fmt.Errorf("topic %s not found", topicName)
    }
    
    partitionLog, exists := topic.Partitions[partition]
    if !exists {
        return nil, fmt.Errorf("partition %d not found", partition)
    }
    
    return partitionLog, nil
}

// 列出所有Topic
func (tm *TopicManager) ListTopics() []string {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    
    var topics []string
    for name := range tm.topics {
        topics = append(topics, name)
    }
    
    return topics
}

🧪 测试验证

单元测试

// 测试段文件
func TestSegment(t *testing.T) {
    // 创建临时目录
    dir, err := os.MkdirTemp("", "test-segment")
    if err != nil {
        t.Fatal(err)
    }
    defer os.RemoveAll(dir)
    
    // 创建段文件
    segment, err := NewSegment(dir, 0)
    if err != nil {
        t.Fatal(err)
    }
    defer segment.LogFile.Close()
    defer segment.IndexFile.Close()
    
    // 测试写入和读取
    records := []Record{
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
    }
    
    // 写入记录
    baseOffset, lastOffset, err := segment.Append(records)
    if err != nil {
        t.Fatal(err)
    }
    
    if baseOffset != 0 {
        t.Errorf("Expected base offset 0, got %d", baseOffset)
    }
    
    if lastOffset != 1 {
        t.Errorf("Expected last offset 1, got %d", lastOffset)
    }
    
    // 读取记录
    readRecords, err := segment.Read(0, 1024)
    if err != nil {
        t.Fatal(err)
    }
    
    if len(readRecords) != 2 {
        t.Errorf("Expected 2 records, got %d", len(readRecords))
    }
}

// 测试分区日志
func TestPartitionLog(t *testing.T) {
    // 创建临时目录
    dir, err := os.MkdirTemp("", "test-partition")
    if err != nil {
        t.Fatal(err)
    }
    defer os.RemoveAll(dir)
    
    // 创建分区日志
    partitionLog, err := NewPartitionLog("test-topic", 0, dir)
    if err != nil {
        t.Fatal(err)
    }
    
    // 测试消息追加
    records := []Record{
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
    }
    
    baseOffset, lastOffset, err := partitionLog.Append(records)
    if err != nil {
        t.Fatal(err)
    }
    
    // 测试消息读取
    readRecords, err := partitionLog.Read(baseOffset, 1024)
    if err != nil {
        t.Fatal(err)
    }
    
    if len(readRecords) != 2 {
        t.Errorf("Expected 2 records, got %d", len(readRecords))
    }
}

集成测试

// 测试完整流程
func TestIntegration(t *testing.T) {
    // 创建临时目录
    dir, err := os.MkdirTemp("", "test-integration")
    if err != nil {
        t.Fatal(err)
    }
    defer os.RemoveAll(dir)
    
    // 创建Topic管理器
    topicManager := NewTopicManager(dir)
    
    // 创建Topic
    if err := topicManager.CreateTopic("test-topic", 3); err != nil {
        t.Fatal(err)
    }
    
    // 获取分区日志
    partitionLog, err := topicManager.GetPartitionLog("test-topic", 0)
    if err != nil {
        t.Fatal(err)
    }
    
    // 生产消息
    records := []Record{
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
        {Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
    }
    
    baseOffset, lastOffset, err := partitionLog.Append(records)
    if err != nil {
        t.Fatal(err)
    }
    
    // 消费消息
    readRecords, err := partitionLog.Read(baseOffset, 1024)
    if err != nil {
        t.Fatal(err)
    }
    
    // 验证消息
    if len(readRecords) != 2 {
        t.Errorf("Expected 2 records, got %d", len(readRecords))
    }
    
    if string(readRecords[0].Key) != "key1" {
        t.Errorf("Expected key1, got %s", string(readRecords[0].Key))
    }
    
    if string(readRecords[0].Value) != "value1" {
        t.Errorf("Expected value1, got %s", string(readRecords[0].Value))
    }
}

🚀 快速运行指南

启动服务

# 编译项目
go build -o mini-kafka cmd/broker/main.go

# 启动服务
./mini-kafka -data ./data -addr :8080

测试API

# 创建Topic
curl -X POST http://localhost:8080/topics \
  -H "Content-Type: application/json" \
  -d '{"name": "test-topic", "partitions": 3}'

# 生产消息
curl -X POST http://localhost:8080/produce \
  -H "Content-Type: application/json" \
  -d '{
    "topic": "test-topic",
    "partition": 0,
    "records": [
      {"key": "key1", "value": "value1"},
      {"key": "key2", "value": "value2"}
    ]
  }'

# 消费消息
curl -X POST http://localhost:8080/consume \
  -H "Content-Type: application/json" \
  -d '{
    "topic": "test-topic",
    "partition": 0,
    "offset": 0,
    "max_bytes": 1024
  }'

🔧 扩展方向

1. 多副本支持

// 副本管理器
type ReplicaManager struct {
    replicas map[string]map[int32][]*Replica
    mu       sync.RWMutex
}

// 副本
type Replica struct {
    BrokerID int32
    IsLeader bool
    IsInISR  bool
    Log      *PartitionLog
}

2. 消费者组支持

// 消费者组
type ConsumerGroup struct {
    GroupID   string
    Consumers []*Consumer
    Coordinator *GroupCoordinator
    mu        sync.RWMutex
}

3. 事务支持

// 事务管理器
type TransactionManager struct {
    transactions map[string]*Transaction
    coordinator  *TransactionCoordinator
    mu           sync.RWMutex
}

4. 压缩支持

// 压缩器
type Compressor struct {
    algorithm string
    level     int
}

func (c *Compressor) Compress(data []byte) ([]byte, error) {
    switch c.algorithm {
    case "gzip":
        return c.compressGzip(data)
    case "snappy":
        return c.compressSnappy(data)
    default:
        return data, nil
    }
}

🎯 面试高频考点

1. 如何设计一个消息队列?

答案要点:

  • 消息存储:段文件、稀疏索引
  • 消息传输:网络协议、序列化
  • 高可用:多副本、故障转移
  • 性能优化:批量处理、压缩

2. 如何保证消息的顺序性?

答案要点:

  • 分区内有序
  • 顺序写入
  • 顺序读取
  • 偏移量管理

3. 如何实现高吞吐量?

答案要点:

  • 顺序写入
  • 批量处理
  • 零拷贝
  • 分区并行

4. 如何保证消息不丢失?

答案要点:

  • 持久化存储
  • 多副本机制
  • 确认机制
  • 故障恢复

📝 本章小结

本章通过实现一个简化版的Kafka,深入理解了:

  1. 核心设计思想: 段文件、稀疏索引、分区并行
  2. 关键数据结构: 消息、段文件、分区日志
  3. 核心功能实现: 消息追加、读取、索引管理
  4. 系统架构设计: 模块化、可扩展、可测试
  5. 实际编码经验: 错误处理、并发控制、性能优化

通过这个实战项目,您不仅加深了对Kafka的理解,还提升了系统设计和编码能力。这为后续学习更复杂的分布式系统奠定了坚实基础。


学习完成: 恭喜您完成了Kafka学习手册的全部内容!现在您已经具备了Kafka的全面知识体系,可以自信地应对相关技术挑战。

Prev
09-面试高频问题详解