10-实战项目-Mini-Kafka实现
📋 本章概览
本章将带您动手实现一个简化版的Kafka,通过实际编码加深对Kafka核心机制的理解。我们将从项目设计、核心数据结构、关键功能实现等方面,逐步构建一个可运行的Mini-Kafka系统。
🎯 学习目标
- 理解Kafka的核心设计思想
- 掌握段文件、稀疏索引等关键机制
- 实现基本的Producer和Consumer功能
- 学习分布式系统的设计原则
- 提升系统设计和编码能力
🏗️ 项目总体设计
系统架构
┌─────────────────────────────────────────────────────────────────┐
│ Mini-Kafka 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Producer │ │ Broker │ │ Consumer │ │
│ │ Client │ │ Server │ │ Client │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │Message │ │ │ │Topic │ │ │ │Offset │ │ │
│ │ │Builder │ │ │ │Manager │ │ │ │Manager │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │Partition│ │ │ │Partition│ │ │ │Message │ │ │
│ │ │Selector │ │ │ │Log │ │ │ │Processor│ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ 1. 发送消息 │ 2. 存储消息 │ 3. 消费消息 │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 存储层 ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ Segment │ │ Index │ │ Offset │ ││
│ │ │ Files │ │ Files │ │ Store │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
项目结构
mini-kafka/
├── cmd/
│ └── broker/
│ └── main.go # 启动入口
├── internal/
│ ├── api/
│ │ ├── producer.go # Producer API
│ │ ├── consumer.go # Consumer API
│ │ └── server.go # HTTP服务器
│ ├── log/
│ │ ├── segment.go # 段文件管理
│ │ ├── index.go # 稀疏索引
│ │ └── partition.go # 分区日志
│ ├── meta/
│ │ ├── topic.go # Topic管理
│ │ └── offset.go # Offset管理
│ └── storage/
│ ├── file.go # 文件操作
│ └── buffer.go # 缓冲区管理
├── pkg/
│ ├── protocol/
│ │ ├── message.go # 消息协议
│ │ └── request.go # 请求协议
│ └── utils/
│ ├── hash.go # 哈希函数
│ └── time.go # 时间工具
├── go.mod
├── go.sum
└── README.md
📊 核心数据结构
消息结构
// 消息记录
type Record struct {
Timestamp int64 `json:"timestamp"`
Key []byte `json:"key"`
Value []byte `json:"value"`
Offset int64 `json:"offset"`
}
// 消息批次
type RecordBatch struct {
BaseOffset int64 `json:"base_offset"`
Length int32 `json:"length"`
CRC uint32 `json:"crc"`
Records []Record `json:"records"`
Timestamp int64 `json:"timestamp"`
}
// 消息编码
func (r *Record) Encode() ([]byte, error) {
var buf bytes.Buffer
// 时间戳 (8 bytes)
if err := binary.Write(&buf, binary.BigEndian, r.Timestamp); err != nil {
return nil, err
}
// 键长度 (4 bytes)
keyLen := uint32(len(r.Key))
if err := binary.Write(&buf, binary.BigEndian, keyLen); err != nil {
return nil, err
}
// 值长度 (4 bytes)
valueLen := uint32(len(r.Value))
if err := binary.Write(&buf, binary.BigEndian, valueLen); err != nil {
return nil, err
}
// 键内容
if _, err := buf.Write(r.Key); err != nil {
return nil, err
}
// 值内容
if _, err := buf.Write(r.Value); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// 消息解码
func (r *Record) Decode(data []byte) error {
buf := bytes.NewReader(data)
// 读取时间戳
if err := binary.Read(buf, binary.BigEndian, &r.Timestamp); err != nil {
return err
}
// 读取键长度
var keyLen uint32
if err := binary.Read(buf, binary.BigEndian, &keyLen); err != nil {
return err
}
// 读取值长度
var valueLen uint32
if err := binary.Read(buf, binary.BigEndian, &valueLen); err != nil {
return err
}
// 读取键内容
r.Key = make([]byte, keyLen)
if _, err := buf.Read(r.Key); err != nil {
return err
}
// 读取值内容
r.Value = make([]byte, valueLen)
if _, err := buf.Read(r.Value); err != nil {
return err
}
return nil
}
段文件结构
// 段文件
type Segment struct {
BaseOffset int64
LogFile *os.File
IndexFile *os.File
Size int64
LastOffset int64
Index []IndexEntry
mu sync.RWMutex
}
// 索引条目
type IndexEntry struct {
RelativeOffset int32
Position int64
}
// 创建段文件
func NewSegment(dir string, baseOffset int64) (*Segment, error) {
// 创建目录
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
// 创建日志文件
logPath := filepath.Join(dir, fmt.Sprintf("%020d.log", baseOffset))
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
// 创建索引文件
indexPath := filepath.Join(dir, fmt.Sprintf("%020d.idx", baseOffset))
indexFile, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
// 获取文件大小
stat, err := logFile.Stat()
if err != nil {
return nil, err
}
segment := &Segment{
BaseOffset: baseOffset,
LogFile: logFile,
IndexFile: indexFile,
Size: stat.Size(),
LastOffset: baseOffset - 1,
Index: make([]IndexEntry, 0),
}
// 加载索引
if err := segment.loadIndex(); err != nil {
return nil, err
}
return segment, nil
}
// 加载索引
func (s *Segment) loadIndex() error {
s.Index = make([]IndexEntry, 0)
if _, err := s.IndexFile.Seek(0, io.SeekStart); err != nil {
return err
}
buf := make([]byte, 12) // 4 bytes offset + 8 bytes position
for {
_, err := io.ReadFull(s.IndexFile, buf)
if err == io.EOF {
break
}
if err != nil {
return err
}
entry := IndexEntry{
RelativeOffset: int32(binary.BigEndian.Uint32(buf[0:4])),
Position: int64(binary.BigEndian.Uint64(buf[4:12])),
}
s.Index = append(s.Index, entry)
}
return nil
}
// 写入索引条目
func (s *Segment) writeIndexEntry(relativeOffset int32, position int64) error {
buf := make([]byte, 12)
binary.BigEndian.PutUint32(buf[0:4], uint32(relativeOffset))
binary.BigEndian.PutUint64(buf[4:12], uint64(position))
_, err := s.IndexFile.Write(buf)
return err
}
分区日志结构
// 分区日志
type PartitionLog struct {
Topic string
Partition int32
Dir string
Segments []*Segment
ActiveSegment *Segment
MaxSegmentBytes int64
IndexInterval int64
mu sync.RWMutex
}
// 创建分区日志
func NewPartitionLog(topic string, partition int32, dir string) (*PartitionLog, error) {
partitionDir := filepath.Join(dir, fmt.Sprintf("%s-%d", topic, partition))
pl := &PartitionLog{
Topic: topic,
Partition: partition,
Dir: partitionDir,
Segments: make([]*Segment, 0),
MaxSegmentBytes: 1 << 30, // 1GB
IndexInterval: 1 << 20, // 1MB
}
// 加载现有段文件
if err := pl.loadSegments(); err != nil {
return nil, err
}
// 如果没有段文件,创建第一个
if len(pl.Segments) == 0 {
segment, err := NewSegment(partitionDir, 0)
if err != nil {
return nil, err
}
pl.Segments = append(pl.Segments, segment)
pl.ActiveSegment = segment
} else {
pl.ActiveSegment = pl.Segments[len(pl.Segments)-1]
}
return pl, nil
}
// 加载段文件
func (pl *PartitionLog) loadSegments() error {
entries, err := os.ReadDir(pl.Dir)
if err != nil {
return err
}
var baseOffsets []int64
for _, entry := range entries {
if strings.HasSuffix(entry.Name(), ".log") {
baseStr := strings.TrimSuffix(entry.Name(), ".log")
if base, err := strconv.ParseInt(baseStr, 10, 64); err == nil {
baseOffsets = append(baseOffsets, base)
}
}
}
sort.Slice(baseOffsets, func(i, j int) bool {
return baseOffsets[i] < baseOffsets[j]
})
for _, baseOffset := range baseOffsets {
segment, err := NewSegment(pl.Dir, baseOffset)
if err != nil {
return err
}
pl.Segments = append(pl.Segments, segment)
}
return nil
}
🚀 核心功能实现
消息追加
// 追加消息
func (pl *PartitionLog) Append(records []Record) (int64, int64, error) {
pl.mu.Lock()
defer pl.mu.Unlock()
if len(records) == 0 {
return pl.ActiveSegment.LastOffset + 1, pl.ActiveSegment.LastOffset, nil
}
// 检查是否需要创建新段
if err := pl.maybeRollSegment(); err != nil {
return 0, 0, err
}
// 编码消息批次
batch, err := pl.encodeBatch(records)
if err != nil {
return 0, 0, err
}
// 获取写入位置
position, err := pl.ActiveSegment.LogFile.Seek(0, io.SeekEnd)
if err != nil {
return 0, 0, err
}
// 写入日志文件
if _, err := pl.ActiveSegment.LogFile.Write(batch); err != nil {
return 0, 0, err
}
// 更新段大小
pl.ActiveSegment.Size += int64(len(batch))
// 更新偏移量
baseOffset := pl.ActiveSegment.LastOffset + 1
pl.ActiveSegment.LastOffset += int64(len(records))
// 写入索引
if pl.shouldWriteIndex(position) {
relativeOffset := int32(baseOffset - pl.ActiveSegment.BaseOffset)
if err := pl.ActiveSegment.writeIndexEntry(relativeOffset, position); err != nil {
return 0, 0, err
}
pl.ActiveSegment.Index = append(pl.ActiveSegment.Index, IndexEntry{
RelativeOffset: relativeOffset,
Position: position,
})
}
return baseOffset, pl.ActiveSegment.LastOffset, nil
}
// 编码消息批次
func (pl *PartitionLog) encodeBatch(records []Record) ([]byte, error) {
var buf bytes.Buffer
// 批次长度 (4 bytes) - 稍后填充
lengthPos := buf.Len()
buf.Write(make([]byte, 4))
// 记录数量 (4 bytes)
if err := binary.Write(&buf, binary.BigEndian, uint32(len(records))); err != nil {
return nil, err
}
// 编码每条记录
for _, record := range records {
data, err := record.Encode()
if err != nil {
return nil, err
}
// 记录长度 (4 bytes)
if err := binary.Write(&buf, binary.BigEndian, uint32(len(data))); err != nil {
return nil, err
}
// 记录内容
if _, err := buf.Write(data); err != nil {
return nil, err
}
}
// 计算批次长度
batchData := buf.Bytes()
batchLength := len(batchData) - 4
binary.BigEndian.PutUint32(batchData[lengthPos:lengthPos+4], uint32(batchLength))
return batchData, nil
}
// 检查是否需要创建新段
func (pl *PartitionLog) maybeRollSegment() error {
if pl.ActiveSegment.Size >= pl.MaxSegmentBytes {
return pl.rollSegment()
}
return nil
}
// 创建新段
func (pl *PartitionLog) rollSegment() error {
baseOffset := pl.ActiveSegment.LastOffset + 1
segment, err := NewSegment(pl.Dir, baseOffset)
if err != nil {
return err
}
pl.Segments = append(pl.Segments, segment)
pl.ActiveSegment = segment
return nil
}
// 检查是否需要写入索引
func (pl *PartitionLog) shouldWriteIndex(position int64) bool {
if len(pl.ActiveSegment.Index) == 0 {
return true
}
lastEntry := pl.ActiveSegment.Index[len(pl.ActiveSegment.Index)-1]
return position-lastEntry.Position >= pl.IndexInterval
}
消息读取
// 读取消息
func (pl *PartitionLog) Read(offset int64, maxBytes int64) ([]Record, error) {
pl.mu.RLock()
defer pl.mu.RUnlock()
// 查找包含指定偏移量的段
segment := pl.findSegment(offset)
if segment == nil {
return nil, fmt.Errorf("offset %d not found", offset)
}
// 使用索引查找起始位置
startPosition := pl.findStartPosition(segment, offset)
// 从起始位置开始读取
if _, err := segment.LogFile.Seek(startPosition, io.SeekStart); err != nil {
return nil, err
}
var records []Record
var bytesRead int64
for bytesRead < maxBytes {
// 读取批次长度
var batchLength uint32
if err := binary.Read(segment.LogFile, binary.BigEndian, &batchLength); err != nil {
if err == io.EOF {
break
}
return nil, err
}
// 读取批次数据
batchData := make([]byte, batchLength)
if _, err := io.ReadFull(segment.LogFile, batchData); err != nil {
return nil, err
}
// 解码批次
batchRecords, err := pl.decodeBatch(batchData)
if err != nil {
return nil, err
}
// 过滤指定偏移量之后的消息
for _, record := range batchRecords {
if record.Offset >= offset {
records = append(records, record)
bytesRead += int64(len(record.Key) + len(record.Value))
}
}
}
return records, nil
}
// 查找段
func (pl *PartitionLog) findSegment(offset int64) *Segment {
for i := len(pl.Segments) - 1; i >= 0; i-- {
segment := pl.Segments[i]
if offset >= segment.BaseOffset {
return segment
}
}
return nil
}
// 查找起始位置
func (pl *PartitionLog) findStartPosition(segment *Segment, offset int64) int64 {
// 使用索引查找最接近的位置
for i := len(segment.Index) - 1; i >= 0; i-- {
entry := segment.Index[i]
absoluteOffset := segment.BaseOffset + int64(entry.RelativeOffset)
if absoluteOffset <= offset {
return entry.Position
}
}
return 0
}
// 解码消息批次
func (pl *PartitionLog) decodeBatch(data []byte) ([]Record, error) {
buf := bytes.NewReader(data)
// 读取记录数量
var recordCount uint32
if err := binary.Read(buf, binary.BigEndian, &recordCount); err != nil {
return nil, err
}
var records []Record
for i := uint32(0); i < recordCount; i++ {
// 读取记录长度
var recordLength uint32
if err := binary.Read(buf, binary.BigEndian, &recordLength); err != nil {
return nil, err
}
// 读取记录数据
recordData := make([]byte, recordLength)
if _, err := buf.Read(recordData); err != nil {
return nil, err
}
// 解码记录
var record Record
if err := record.Decode(recordData); err != nil {
return nil, err
}
records = append(records, record)
}
return records, nil
}
HTTP API服务器
// HTTP服务器
type Server struct {
topicManager *TopicManager
offsetStore *OffsetStore
addr string
}
// 启动服务器
func (s *Server) Start() error {
http.HandleFunc("/topics", s.handleTopics)
http.HandleFunc("/topics/", s.handleTopic)
http.HandleFunc("/produce", s.handleProduce)
http.HandleFunc("/consume", s.handleConsume)
http.HandleFunc("/offsets", s.handleOffsets)
log.Printf("Server starting on %s", s.addr)
return http.ListenAndServe(s.addr, nil)
}
// 处理Topic列表
func (s *Server) handleTopics(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
s.listTopics(w, r)
case "POST":
s.createTopic(w, r)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// 列出所有Topic
func (s *Server) listTopics(w http.ResponseWriter, r *http.Request) {
topics := s.topicManager.ListTopics()
response := map[string]interface{}{
"topics": topics,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// 创建Topic
func (s *Server) createTopic(w http.ResponseWriter, r *http.Request) {
var request struct {
Name string `json:"name"`
Partitions int `json:"partitions"`
}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if err := s.topicManager.CreateTopic(request.Name, request.Partitions); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{"status": "created"})
}
// 处理生产请求
func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) {
var request struct {
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Records []Record `json:"records"`
}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// 获取分区日志
partitionLog, err := s.topicManager.GetPartitionLog(request.Topic, request.Partition)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// 追加消息
baseOffset, lastOffset, err := partitionLog.Append(request.Records)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"base_offset": baseOffset,
"last_offset": lastOffset,
"count": len(request.Records),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// 处理消费请求
func (s *Server) handleConsume(w http.ResponseWriter, r *http.Request) {
var request struct {
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
MaxBytes int64 `json:"max_bytes"`
}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if request.MaxBytes == 0 {
request.MaxBytes = 1024 * 1024 // 1MB
}
// 获取分区日志
partitionLog, err := s.topicManager.GetPartitionLog(request.Topic, request.Partition)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// 读取消息
records, err := partitionLog.Read(request.Offset, request.MaxBytes)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"records": records,
"count": len(records),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
Topic管理器
// Topic管理器
type TopicManager struct {
topics map[string]*Topic
dir string
mu sync.RWMutex
}
// Topic结构
type Topic struct {
Name string
Partitions map[int32]*PartitionLog
mu sync.RWMutex
}
// 创建Topic管理器
func NewTopicManager(dir string) *TopicManager {
return &TopicManager{
topics: make(map[string]*Topic),
dir: dir,
}
}
// 创建Topic
func (tm *TopicManager) CreateTopic(name string, partitionCount int) error {
tm.mu.Lock()
defer tm.mu.Unlock()
if _, exists := tm.topics[name]; exists {
return fmt.Errorf("topic %s already exists", name)
}
topic := &Topic{
Name: name,
Partitions: make(map[int32]*PartitionLog),
}
// 创建分区
for i := 0; i < partitionCount; i++ {
partitionLog, err := NewPartitionLog(name, int32(i), tm.dir)
if err != nil {
return err
}
topic.Partitions[int32(i)] = partitionLog
}
tm.topics[name] = topic
return nil
}
// 获取分区日志
func (tm *TopicManager) GetPartitionLog(topicName string, partition int32) (*PartitionLog, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
topic, exists := tm.topics[topicName]
if !exists {
return nil, fmt.Errorf("topic %s not found", topicName)
}
partitionLog, exists := topic.Partitions[partition]
if !exists {
return nil, fmt.Errorf("partition %d not found", partition)
}
return partitionLog, nil
}
// 列出所有Topic
func (tm *TopicManager) ListTopics() []string {
tm.mu.RLock()
defer tm.mu.RUnlock()
var topics []string
for name := range tm.topics {
topics = append(topics, name)
}
return topics
}
🧪 测试验证
单元测试
// 测试段文件
func TestSegment(t *testing.T) {
// 创建临时目录
dir, err := os.MkdirTemp("", "test-segment")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
// 创建段文件
segment, err := NewSegment(dir, 0)
if err != nil {
t.Fatal(err)
}
defer segment.LogFile.Close()
defer segment.IndexFile.Close()
// 测试写入和读取
records := []Record{
{Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
{Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
}
// 写入记录
baseOffset, lastOffset, err := segment.Append(records)
if err != nil {
t.Fatal(err)
}
if baseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", baseOffset)
}
if lastOffset != 1 {
t.Errorf("Expected last offset 1, got %d", lastOffset)
}
// 读取记录
readRecords, err := segment.Read(0, 1024)
if err != nil {
t.Fatal(err)
}
if len(readRecords) != 2 {
t.Errorf("Expected 2 records, got %d", len(readRecords))
}
}
// 测试分区日志
func TestPartitionLog(t *testing.T) {
// 创建临时目录
dir, err := os.MkdirTemp("", "test-partition")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
// 创建分区日志
partitionLog, err := NewPartitionLog("test-topic", 0, dir)
if err != nil {
t.Fatal(err)
}
// 测试消息追加
records := []Record{
{Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
{Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
}
baseOffset, lastOffset, err := partitionLog.Append(records)
if err != nil {
t.Fatal(err)
}
// 测试消息读取
readRecords, err := partitionLog.Read(baseOffset, 1024)
if err != nil {
t.Fatal(err)
}
if len(readRecords) != 2 {
t.Errorf("Expected 2 records, got %d", len(readRecords))
}
}
集成测试
// 测试完整流程
func TestIntegration(t *testing.T) {
// 创建临时目录
dir, err := os.MkdirTemp("", "test-integration")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
// 创建Topic管理器
topicManager := NewTopicManager(dir)
// 创建Topic
if err := topicManager.CreateTopic("test-topic", 3); err != nil {
t.Fatal(err)
}
// 获取分区日志
partitionLog, err := topicManager.GetPartitionLog("test-topic", 0)
if err != nil {
t.Fatal(err)
}
// 生产消息
records := []Record{
{Timestamp: time.Now().UnixMilli(), Key: []byte("key1"), Value: []byte("value1")},
{Timestamp: time.Now().UnixMilli(), Key: []byte("key2"), Value: []byte("value2")},
}
baseOffset, lastOffset, err := partitionLog.Append(records)
if err != nil {
t.Fatal(err)
}
// 消费消息
readRecords, err := partitionLog.Read(baseOffset, 1024)
if err != nil {
t.Fatal(err)
}
// 验证消息
if len(readRecords) != 2 {
t.Errorf("Expected 2 records, got %d", len(readRecords))
}
if string(readRecords[0].Key) != "key1" {
t.Errorf("Expected key1, got %s", string(readRecords[0].Key))
}
if string(readRecords[0].Value) != "value1" {
t.Errorf("Expected value1, got %s", string(readRecords[0].Value))
}
}
🚀 快速运行指南
启动服务
# 编译项目
go build -o mini-kafka cmd/broker/main.go
# 启动服务
./mini-kafka -data ./data -addr :8080
测试API
# 创建Topic
curl -X POST http://localhost:8080/topics \
-H "Content-Type: application/json" \
-d '{"name": "test-topic", "partitions": 3}'
# 生产消息
curl -X POST http://localhost:8080/produce \
-H "Content-Type: application/json" \
-d '{
"topic": "test-topic",
"partition": 0,
"records": [
{"key": "key1", "value": "value1"},
{"key": "key2", "value": "value2"}
]
}'
# 消费消息
curl -X POST http://localhost:8080/consume \
-H "Content-Type: application/json" \
-d '{
"topic": "test-topic",
"partition": 0,
"offset": 0,
"max_bytes": 1024
}'
🔧 扩展方向
1. 多副本支持
// 副本管理器
type ReplicaManager struct {
replicas map[string]map[int32][]*Replica
mu sync.RWMutex
}
// 副本
type Replica struct {
BrokerID int32
IsLeader bool
IsInISR bool
Log *PartitionLog
}
2. 消费者组支持
// 消费者组
type ConsumerGroup struct {
GroupID string
Consumers []*Consumer
Coordinator *GroupCoordinator
mu sync.RWMutex
}
3. 事务支持
// 事务管理器
type TransactionManager struct {
transactions map[string]*Transaction
coordinator *TransactionCoordinator
mu sync.RWMutex
}
4. 压缩支持
// 压缩器
type Compressor struct {
algorithm string
level int
}
func (c *Compressor) Compress(data []byte) ([]byte, error) {
switch c.algorithm {
case "gzip":
return c.compressGzip(data)
case "snappy":
return c.compressSnappy(data)
default:
return data, nil
}
}
🎯 面试高频考点
1. 如何设计一个消息队列?
答案要点:
- 消息存储:段文件、稀疏索引
- 消息传输:网络协议、序列化
- 高可用:多副本、故障转移
- 性能优化:批量处理、压缩
2. 如何保证消息的顺序性?
答案要点:
- 分区内有序
- 顺序写入
- 顺序读取
- 偏移量管理
3. 如何实现高吞吐量?
答案要点:
- 顺序写入
- 批量处理
- 零拷贝
- 分区并行
4. 如何保证消息不丢失?
答案要点:
- 持久化存储
- 多副本机制
- 确认机制
- 故障恢复
📝 本章小结
本章通过实现一个简化版的Kafka,深入理解了:
- 核心设计思想: 段文件、稀疏索引、分区并行
- 关键数据结构: 消息、段文件、分区日志
- 核心功能实现: 消息追加、读取、索引管理
- 系统架构设计: 模块化、可扩展、可测试
- 实际编码经验: 错误处理、并发控制、性能优化
通过这个实战项目,您不仅加深了对Kafka的理解,还提升了系统设计和编码能力。这为后续学习更复杂的分布式系统奠定了坚实基础。
学习完成: 恭喜您完成了Kafka学习手册的全部内容!现在您已经具备了Kafka的全面知识体系,可以自信地应对相关技术挑战。