缓存与消息队列
1. Redis缓存架构
1.1 缓存层级
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────┴────────────────────────────────────┐
│ L1缓存:本地内存 │
│ (LRU, 容量10MB, TTL 1s) │
└────────────────────┬────────────────────────────────────┘
│ Cache Miss
┌────────────────────┴────────────────────────────────────┐
│ L2缓存:Redis │
│ (容量100GB, TTL 60s - 1hour) │
└────────────────────┬────────────────────────────────────┘
│ Cache Miss
┌────────────────────┴────────────────────────────────────┐
│ 数据库:MySQL │
└─────────────────────────────────────────────────────────┘
1.2 缓存实现
本地缓存(LRU):
package cache
import (
"container/list"
"sync"
"time"
)
type CacheItem struct {
key string
value interface{}
expiresAt time.Time
}
type LRUCache struct {
capacity int
items map[string]*list.Element
list *list.List
mu sync.RWMutex
}
func NewLRUCache(capacity int) *LRUCache {
return &LRUCache{
capacity: capacity,
items: make(map[string]*list.Element),
list: list.New(),
}
}
func (lru *LRUCache) Get(key string) (interface{}, bool) {
lru.mu.Lock()
defer lru.mu.Unlock()
elem, exists := lru.items[key]
if !exists {
return nil, false
}
item := elem.Value.(*CacheItem)
// 检查是否过期
if time.Now().After(item.expiresAt) {
lru.removeElement(elem)
return nil, false
}
// 移到链表头部(最近使用)
lru.list.MoveToFront(elem)
return item.value, true
}
func (lru *LRUCache) Set(key string, value interface{}, ttl time.Duration) {
lru.mu.Lock()
defer lru.mu.Unlock()
// 如果已存在,更新并移到头部
if elem, exists := lru.items[key]; exists {
lru.list.MoveToFront(elem)
item := elem.Value.(*CacheItem)
item.value = value
item.expiresAt = time.Now().Add(ttl)
return
}
// 新增项
item := &CacheItem{
key: key,
value: value,
expiresAt: time.Now().Add(ttl),
}
elem := lru.list.PushFront(item)
lru.items[key] = elem
// 超过容量,删除最久未使用的项
if lru.list.Len() > lru.capacity {
oldest := lru.list.Back()
if oldest != nil {
lru.removeElement(oldest)
}
}
}
func (lru *LRUCache) removeElement(elem *list.Element) {
lru.list.Remove(elem)
item := elem.Value.(*CacheItem)
delete(lru.items, item.key)
}
多级缓存服务:
type CacheService struct {
l1Cache *LRUCache
redis *redis.Client
db *sql.DB
}
func NewCacheService(redisClient *redis.Client, db *sql.DB) *CacheService {
return &CacheService{
l1Cache: NewLRUCache(10000),
redis: redisClient,
db: db,
}
}
func (cs *CacheService) GetTicker(symbol string) (*Ticker, error) {
cacheKey := fmt.Sprintf("ticker:%s", symbol)
// 1. 尝试L1缓存
if value, ok := cs.l1Cache.Get(cacheKey); ok {
return value.(*Ticker), nil
}
// 2. 尝试Redis
var ticker Ticker
data, err := cs.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
json.Unmarshal([]byte(data), &ticker)
// 写入L1缓存
cs.l1Cache.Set(cacheKey, &ticker, 1*time.Second)
return &ticker, nil
}
// 3. 从数据库查询
ticker, err = cs.queryTickerFromDB(symbol)
if err != nil {
return nil, err
}
// 4. 回写缓存
tickerJSON, _ := json.Marshal(ticker)
cs.redis.Set(context.Background(), cacheKey, tickerJSON, 5*time.Second)
cs.l1Cache.Set(cacheKey, &ticker, 1*time.Second)
return &ticker, nil
}
2. 缓存策略
2.1 Cache-Aside(旁路缓存)
读取流程:
func (cs *CacheService) GetUserBalance(userID int64, currency string) (*Balance, error) {
cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
// 1. 先查缓存
data, err := cs.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
var balance Balance
json.Unmarshal([]byte(data), &balance)
return &balance, nil
}
// 2. 缓存未命中,查数据库
balance, err := cs.queryBalanceFromDB(userID, currency)
if err != nil {
return nil, err
}
// 3. 写入缓存
balanceJSON, _ := json.Marshal(balance)
cs.redis.Set(context.Background(), cacheKey, balanceJSON, 60*time.Second)
return balance, nil
}
更新流程:
func (cs *CacheService) UpdateBalance(userID int64, currency string, delta float64) error {
// 1. 更新数据库
err := cs.updateBalanceInDB(userID, currency, delta)
if err != nil {
return err
}
// 2. 删除缓存(而不是更新缓存)
cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
cs.redis.Del(context.Background(), cacheKey)
return nil
}
2.2 Write-Through(直写缓存)
func (cs *CacheService) UpdateBalanceWriteThrough(userID int64, currency string, delta float64) error {
// 1. 同时更新数据库和缓存
balance, err := cs.updateBalanceInDB(userID, currency, delta)
if err != nil {
return err
}
// 2. 更新缓存
cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
balanceJSON, _ := json.Marshal(balance)
cs.redis.Set(context.Background(), cacheKey, balanceJSON, 60*time.Second)
return nil
}
2.3 Write-Behind(异步回写)
type WriteBuffer struct {
updates chan *BalanceUpdate
redis *redis.Client
db *sql.DB
}
type BalanceUpdate struct {
UserID int64
Currency string
Delta float64
}
func NewWriteBuffer(redis *redis.Client, db *sql.DB) *WriteBuffer {
wb := &WriteBuffer{
updates: make(chan *BalanceUpdate, 10000),
redis: redis,
db: db,
}
// 启动后台写入协程
go wb.flushWorker()
return wb
}
func (wb *WriteBuffer) UpdateBalance(userID int64, currency string, delta float64) {
// 1. 先更新缓存(快速返回)
cacheKey := fmt.Sprintf("balance:%d:%s", userID, currency)
wb.redis.IncrByFloat(context.Background(), cacheKey, delta)
// 2. 异步更新数据库
wb.updates <- &BalanceUpdate{
UserID: userID,
Currency: currency,
Delta: delta,
}
}
func (wb *WriteBuffer) flushWorker() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
buffer := make([]*BalanceUpdate, 0, 100)
for {
select {
case update := <-wb.updates:
buffer = append(buffer, update)
// 批量写入
if len(buffer) >= 100 {
wb.flushToDB(buffer)
buffer = buffer[:0]
}
case <-ticker.C:
// 定时刷新
if len(buffer) > 0 {
wb.flushToDB(buffer)
buffer = buffer[:0]
}
}
}
}
func (wb *WriteBuffer) flushToDB(updates []*BalanceUpdate) {
// 批量更新数据库
for _, update := range updates {
wb.db.Exec(`
UPDATE account_balances
SET available = available + ?
WHERE user_id = ? AND currency = ?
`, update.Delta, update.UserID, update.Currency)
}
}
3. 缓存常见问题
3.1 缓存穿透
问题:查询不存在的数据,缓存和数据库都没有,导致每次都查数据库。
解决方案1:布隆过滤器
import "github.com/bits-and-blooms/bloom/v3"
type CacheWithBloom struct {
cache *redis.Client
bloom *bloom.BloomFilter
db *sql.DB
}
func NewCacheWithBloom(cache *redis.Client, db *sql.DB) *CacheWithBloom {
// 创建布隆过滤器:预期100万个元素,误判率0.01%
bf := bloom.NewWithEstimates(1000000, 0.0001)
// 初始化:将所有存在的userID加入布隆过滤器
rows, _ := db.Query("SELECT id FROM users")
defer rows.Close()
for rows.Next() {
var userID int64
rows.Scan(&userID)
bf.Add([]byte(fmt.Sprintf("%d", userID)))
}
return &CacheWithBloom{
cache: cache,
bloom: bf,
db: db,
}
}
func (cwb *CacheWithBloom) GetUser(userID int64) (*User, error) {
key := []byte(fmt.Sprintf("%d", userID))
// 1. 布隆过滤器判断:如果不存在,直接返回
if !cwb.bloom.Test(key) {
return nil, errors.New("user not found")
}
// 2. 查缓存
cacheKey := fmt.Sprintf("user:%d", userID)
data, err := cwb.cache.Get(context.Background(), cacheKey).Result()
if err == nil {
var user User
json.Unmarshal([]byte(data), &user)
return &user, nil
}
// 3. 查数据库
user, err := cwb.queryUserFromDB(userID)
if err != nil {
return nil, err
}
// 4. 写入缓存
userJSON, _ := json.Marshal(user)
cwb.cache.Set(context.Background(), cacheKey, userJSON, 600*time.Second)
return user, nil
}
解决方案2:缓存空值
func (cs *CacheService) GetUserWithNullCache(userID int64) (*User, error) {
cacheKey := fmt.Sprintf("user:%d", userID)
// 1. 查缓存
data, err := cs.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
if data == "null" {
// 缓存的空值
return nil, errors.New("user not found")
}
var user User
json.Unmarshal([]byte(data), &user)
return &user, nil
}
// 2. 查数据库
user, err := cs.queryUserFromDB(userID)
if err != nil {
// 数据库中不存在,缓存空值(短TTL)
cs.redis.Set(context.Background(), cacheKey, "null", 60*time.Second)
return nil, err
}
// 3. 写入缓存
userJSON, _ := json.Marshal(user)
cs.redis.Set(context.Background(), cacheKey, userJSON, 600*time.Second)
return user, nil
}
3.2 缓存击穿
问题:热点数据过期瞬间,大量请求同时查数据库。
解决方案:分布式锁 + 双重检查
func (cs *CacheService) GetTickerWithLock(symbol string) (*Ticker, error) {
cacheKey := fmt.Sprintf("ticker:%s", symbol)
// 1. 查缓存
data, err := cs.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
var ticker Ticker
json.Unmarshal([]byte(data), &ticker)
return &ticker, nil
}
// 2. 缓存未命中,获取分布式锁
lockKey := fmt.Sprintf("lock:ticker:%s", symbol)
locked, err := cs.redis.SetNX(context.Background(), lockKey, "1", 10*time.Second).Result()
if locked {
// 获取锁成功,查询数据库
defer cs.redis.Del(context.Background(), lockKey)
// 双重检查:其他协程可能已经写入缓存
data, err := cs.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
var ticker Ticker
json.Unmarshal([]byte(data), &ticker)
return &ticker, nil
}
// 查询数据库
ticker, err := cs.queryTickerFromDB(symbol)
if err != nil {
return nil, err
}
// 写入缓存
tickerJSON, _ := json.Marshal(ticker)
cs.redis.Set(context.Background(), cacheKey, tickerJSON, 5*time.Second)
return ticker, nil
} else {
// 获取锁失败,等待一段时间后重试
time.Sleep(50 * time.Millisecond)
return cs.GetTickerWithLock(symbol)
}
}
3.3 缓存雪崩
问题:大量缓存同时过期,导致数据库压力暴增。
解决方案1:随机TTL
func (cs *CacheService) SetWithRandomTTL(key string, value interface{}, baseTTL time.Duration) {
// 添加随机时间:baseTTL ± 20%
randomOffset := time.Duration(rand.Int63n(int64(baseTTL) / 5))
ttl := baseTTL + randomOffset - baseTTL/10
valueJSON, _ := json.Marshal(value)
cs.redis.Set(context.Background(), key, valueJSON, ttl)
}
解决方案2:双层过期策略
type CacheValue struct {
Data interface{} `json:"data"`
ExpiresAt int64 `json:"expires_at"` // 逻辑过期时间
}
func (cs *CacheService) SetWithLogicalExpire(key string, value interface{}, ttl time.Duration) {
cacheValue := &CacheValue{
Data: value,
ExpiresAt: time.Now().Add(ttl).Unix(),
}
valueJSON, _ := json.Marshal(cacheValue)
// 物理过期时间设置为逻辑过期时间的2倍
cs.redis.Set(context.Background(), key, valueJSON, ttl*2)
}
func (cs *CacheService) GetWithLogicalExpire(key string) (interface{}, error) {
data, err := cs.redis.Get(context.Background(), key).Result()
if err != nil {
return nil, err
}
var cacheValue CacheValue
json.Unmarshal([]byte(data), &cacheValue)
// 检查逻辑过期时间
if time.Now().Unix() > cacheValue.ExpiresAt {
// 逻辑过期,异步更新缓存
go cs.refreshCache(key)
// 返回旧数据(避免击穿)
return cacheValue.Data, nil
}
return cacheValue.Data, nil
}
4. Kafka消息队列
4.1 Kafka架构
┌──────────────────────────────────────────────────────────┐
│ Producer │
│ 撮合引擎、清算系统、风控系统 │
└────────────┬────────────┬─────────────┬──────────────────┘
│ │ │
↓ ↓ ↓
┌────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Topic: │ │ Topic: │ │ Topic: │ │
│ │ trades │ │ orders │ │ balances │ │
│ │ (3 parts) │ │ (5 parts) │ │ (5 parts) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────────┬────────────┬─────────────┬──────────────┘
│ │ │
↓ ↓ ↓
┌────────────────────────────────────────────────────────┐
│ Consumers │
│ 行情服务、推送服务、数据归档、风控分析 │
└──────────────────────────────────────────────────────┘
4.2 Producer实现
package kafka
import (
"context"
"encoding/json"
"github.com/segmentio/kafka-go"
)
type TradeProducer struct {
writer *kafka.Writer
}
func NewTradeProducer(brokers []string) *TradeProducer {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "trades",
Balancer: &kafka.Hash{}, // 按key分区
}
return &TradeProducer{writer: writer}
}
func (tp *TradeProducer) PublishTrade(trade *Trade) error {
tradeJSON, err := json.Marshal(trade)
if err != nil {
return err
}
// 使用symbol作为key,保证同一交易对的消息有序
message := kafka.Message{
Key: []byte(trade.Symbol),
Value: tradeJSON,
}
return tp.writer.WriteMessages(context.Background(), message)
}
func (tp *TradeProducer) Close() error {
return tp.writer.Close()
}
批量发送优化:
type BatchTradeProducer struct {
writer *kafka.Writer
buffer chan *Trade
batchSize int
}
func NewBatchTradeProducer(brokers []string, batchSize int) *BatchTradeProducer {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "trades",
Balancer: &kafka.Hash{},
BatchSize: batchSize,
BatchTimeout: 10 * time.Millisecond,
}
btp := &BatchTradeProducer{
writer: writer,
buffer: make(chan *Trade, 10000),
batchSize: batchSize,
}
go btp.flushWorker()
return btp
}
func (btp *BatchTradeProducer) PublishTrade(trade *Trade) {
btp.buffer <- trade
}
func (btp *BatchTradeProducer) flushWorker() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
batch := make([]kafka.Message, 0, btp.batchSize)
for {
select {
case trade := <-btp.buffer:
tradeJSON, _ := json.Marshal(trade)
message := kafka.Message{
Key: []byte(trade.Symbol),
Value: tradeJSON,
}
batch = append(batch, message)
// 达到批量大小,立即发送
if len(batch) >= btp.batchSize {
btp.writer.WriteMessages(context.Background(), batch...)
batch = batch[:0]
}
case <-ticker.C:
// 定时刷新
if len(batch) > 0 {
btp.writer.WriteMessages(context.Background(), batch...)
batch = batch[:0]
}
}
}
}
4.3 Consumer实现
type TradeConsumer struct {
reader *kafka.Reader
}
func NewTradeConsumer(brokers []string, groupID string) *TradeConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "trades",
GroupID: groupID,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
return &TradeConsumer{reader: reader}
}
func (tc *TradeConsumer) Start(handler func(*Trade) error) error {
for {
message, err := tc.reader.ReadMessage(context.Background())
if err != nil {
return err
}
var trade Trade
err = json.Unmarshal(message.Value, &trade)
if err != nil {
// 记录错误,继续处理下一条
log.Printf("unmarshal error: %v", err)
continue
}
// 处理消息
err = handler(&trade)
if err != nil {
log.Printf("handler error: %v", err)
// 根据业务决定是否重试
}
// 提交offset
tc.reader.CommitMessages(context.Background(), message)
}
}
func (tc *TradeConsumer) Close() error {
return tc.reader.Close()
}
并发消费:
type ParallelTradeConsumer struct {
reader *kafka.Reader
workers int
tradesChan chan *Trade
}
func NewParallelTradeConsumer(brokers []string, groupID string, workers int) *ParallelTradeConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "trades",
GroupID: groupID,
})
ptc := &ParallelTradeConsumer{
reader: reader,
workers: workers,
tradesChan: make(chan *Trade, 10000),
}
// 启动多个worker
for i := 0; i < workers; i++ {
go ptc.worker(i)
}
return ptc
}
func (ptc *ParallelTradeConsumer) Start() error {
for {
message, err := ptc.reader.FetchMessage(context.Background())
if err != nil {
return err
}
var trade Trade
err = json.Unmarshal(message.Value, &trade)
if err != nil {
log.Printf("unmarshal error: %v", err)
ptc.reader.CommitMessages(context.Background(), message)
continue
}
// 发送到worker处理
ptc.tradesChan <- &trade
// 提交offset
ptc.reader.CommitMessages(context.Background(), message)
}
}
func (ptc *ParallelTradeConsumer) worker(id int) {
for trade := range ptc.tradesChan {
// 处理trade
err := ptc.processTrade(trade)
if err != nil {
log.Printf("worker %d: process error: %v", id, err)
}
}
}
func (ptc *ParallelTradeConsumer) processTrade(trade *Trade) error {
// 业务逻辑:更新行情、推送WebSocket等
return nil
}
4.4 消息顺序性保证
问题:同一用户的订单消息必须有序处理。
解决方案:按userID分区
type OrderProducer struct {
writer *kafka.Writer
}
func NewOrderProducer(brokers []string) *OrderProducer {
writer := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "orders",
// 使用自定义分区器:按userID分区
Balancer: &UserBalancer{},
}
return &OrderProducer{writer: writer}
}
// 自定义分区器
type UserBalancer struct{}
func (ub *UserBalancer) Balance(msg kafka.Message, partitions ...int) int {
// 从key中提取userID,分配到对应分区
userID := string(msg.Key)
hash := fnv.New32a()
hash.Write([]byte(userID))
return int(hash.Sum32()) % len(partitions)
}
// 发送订单消息
func (op *OrderProducer) PublishOrder(order *Order) error {
orderJSON, _ := json.Marshal(order)
message := kafka.Message{
Key: []byte(fmt.Sprintf("%d", order.UserID)), // 使用userID作为key
Value: orderJSON,
}
return op.writer.WriteMessages(context.Background(), message)
}
4.5 消息幂等性
问题:消费者重启后可能重复消费消息。
解决方案1:唯一ID去重
type IdempotentConsumer struct {
reader *kafka.Reader
processedIDs *redis.Client
}
func (ic *IdempotentConsumer) ProcessTrade(trade *Trade) error {
// 1. 检查是否已处理
key := fmt.Sprintf("processed:trade:%s", trade.TradeID)
exists, err := ic.processedIDs.Exists(context.Background(), key).Result()
if exists > 0 {
// 已处理,跳过
log.Printf("trade %s already processed", trade.TradeID)
return nil
}
// 2. 处理业务逻辑
err = ic.handleTrade(trade)
if err != nil {
return err
}
// 3. 标记为已处理(TTL 7天)
ic.processedIDs.Set(context.Background(), key, "1", 7*24*time.Hour)
return nil
}
解决方案2:数据库唯一约束
CREATE TABLE processed_trades (
trade_id VARCHAR(64) PRIMARY KEY,
processed_at DATETIME NOT NULL,
INDEX idx_processed_at (processed_at)
) ENGINE=InnoDB;
-- 插入时利用主键约束去重
INSERT IGNORE INTO processed_trades (trade_id, processed_at)
VALUES (?, NOW());
5. 实战:成交流处理
5.1 完整流程
撮合引擎 → Kafka(trades) → 消费者 → [行情聚合、WebSocket推送、数据持久化]
5.2 实现
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/go-redis/redis/v8"
"github.com/segmentio/kafka-go"
)
type TradeStreamProcessor struct {
kafka *kafka.Reader
redis *redis.Client
wsServer *WebSocketServer
}
func NewTradeStreamProcessor(brokers []string, redisAddr string) *TradeStreamProcessor {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "trades",
GroupID: "trade-processor",
})
redisClient := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
return &TradeStreamProcessor{
kafka: reader,
redis: redisClient,
wsServer: NewWebSocketServer(),
}
}
func (tsp *TradeStreamProcessor) Start() {
for {
message, err := tsp.kafka.ReadMessage(context.Background())
if err != nil {
log.Printf("read error: %v", err)
continue
}
var trade Trade
err = json.Unmarshal(message.Value, &trade)
if err != nil {
log.Printf("unmarshal error: %v", err)
continue
}
// 并发处理多个任务
go tsp.updateTicker(&trade)
go tsp.updateKline(&trade)
go tsp.updateDepth(&trade)
go tsp.pushToWebSocket(&trade)
}
}
// 更新Ticker
func (tsp *TradeStreamProcessor) updateTicker(trade *Trade) {
key := fmt.Sprintf("ticker:%s", trade.Symbol)
// 更新最新价
tsp.redis.HSet(context.Background(), key, "lastPrice", trade.Price)
// 累加成交量
tsp.redis.HIncrByFloat(context.Background(), key, "volume", trade.Quantity)
tsp.redis.HIncrByFloat(context.Background(), key, "quoteVolume", trade.Amount)
// 更新24h高低价
tsp.redis.Eval(context.Background(), `
local high = redis.call('HGET', KEYS[1], 'highPrice')
if not high or tonumber(ARGV[1]) > tonumber(high) then
redis.call('HSET', KEYS[1], 'highPrice', ARGV[1])
end
local low = redis.call('HGET', KEYS[1], 'lowPrice')
if not low or tonumber(ARGV[1]) < tonumber(low) then
redis.call('HSET', KEYS[1], 'lowPrice', ARGV[1])
end
`, []string{key}, trade.Price)
// 设置过期时间
tsp.redis.Expire(context.Background(), key, 24*time.Hour)
}
// 更新K线
func (tsp *TradeStreamProcessor) updateKline(trade *Trade) {
intervals := []string{"1m", "5m", "15m", "1h", "4h", "1d"}
for _, interval := range intervals {
key := fmt.Sprintf("kline:%s:%s", trade.Symbol, interval)
// 获取当前K线的时间窗口
timestamp := tsp.getKlineTimestamp(trade.Timestamp, interval)
// 使用Lua脚本原子更新K线
tsp.redis.Eval(context.Background(), `
local key = KEYS[1]
local timestamp = ARGV[1]
local price = tonumber(ARGV[2])
local quantity = tonumber(ARGV[3])
-- 获取当前K线
local kline = redis.call('HGETALL', key)
local klineMap = {}
for i = 1, #kline, 2 do
klineMap[kline[i]] = kline[i+1]
end
if klineMap['timestamp'] == timestamp then
-- 更新现有K线
local high = tonumber(klineMap['high'])
local low = tonumber(klineMap['low'])
local volume = tonumber(klineMap['volume'])
redis.call('HSET', key, 'close', price)
redis.call('HSET', key, 'high', math.max(high, price))
redis.call('HSET', key, 'low', math.min(low, price))
redis.call('HINCRBY', key, 'volume', quantity)
else
-- 新建K线
redis.call('HSET', key, 'timestamp', timestamp)
redis.call('HSET', key, 'open', price)
redis.call('HSET', key, 'high', price)
redis.call('HSET', key, 'low', price)
redis.call('HSET', key, 'close', price)
redis.call('HSET', key, 'volume', quantity)
end
redis.call('EXPIRE', key, 86400)
`, []string{key}, timestamp, trade.Price, trade.Quantity)
}
}
// 更新深度(减少对应档位的量)
func (tsp *TradeStreamProcessor) updateDepth(trade *Trade) {
// 买单成交,减少卖盘深度
askKey := fmt.Sprintf("depth:ask:%s", trade.Symbol)
tsp.redis.ZIncrBy(context.Background(), askKey, -trade.Quantity, fmt.Sprintf("%f", trade.Price))
// 移除数量为0的档位
tsp.redis.ZRemRangeByScore(context.Background(), askKey, "-inf", "0")
}
// 推送WebSocket
func (tsp *TradeStreamProcessor) pushToWebSocket(trade *Trade) {
tradeMsg := map[string]interface{}{
"event": "trade",
"symbol": trade.Symbol,
"tradeId": trade.TradeID,
"price": trade.Price,
"quantity": trade.Quantity,
"timestamp": trade.Timestamp.Unix(),
}
tsp.wsServer.Broadcast(trade.Symbol, tradeMsg)
}
func (tsp *TradeStreamProcessor) getKlineTimestamp(t time.Time, interval string) int64 {
switch interval {
case "1m":
return t.Truncate(time.Minute).Unix()
case "5m":
return t.Truncate(5 * time.Minute).Unix()
case "15m":
return t.Truncate(15 * time.Minute).Unix()
case "1h":
return t.Truncate(time.Hour).Unix()
case "4h":
return t.Truncate(4 * time.Hour).Unix()
case "1d":
return t.Truncate(24 * time.Hour).Unix()
default:
return t.Unix()
}
}
func main() {
processor := NewTradeStreamProcessor(
[]string{"localhost:9092"},
"localhost:6379",
)
log.Println("Trade stream processor started")
processor.Start()
}
小结
本章介绍了缓存与消息队列的设计:
- Redis缓存架构:多级缓存(本地 + Redis)、LRU实现
- 缓存策略:Cache-Aside、Write-Through、Write-Behind
- 缓存问题:穿透、击穿、雪崩的解决方案
- Kafka消息队列:Producer/Consumer实现、批量发送、并发消费
- 消息顺序性:分区策略保证顺序
- 消息幂等性:唯一ID去重、数据库约束
- 实战案例:成交流实时处理
下一章将讲解用户系统与KYC认证。