【编程难度第五名】普通业务系统 - 中高级开发者的主战场
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ NO.5 普通业务系统(绝大多数人做的)
一、什么是普通业务系统?
1.1 定义和范围
代表系统:
- 登录注册系统: 用户认证、权限管理
- 内容管理系统: 文章、评论、点赞
- 推荐系统: Feed 流、个性化推荐
- 搜索系统: 全文搜索、筛选排序
- IM 系统: 聊天、消息推送
特点:
- 并发量适中(1000-10000 QPS)
- 延迟要求不高(100-500ms)
- 数据量适中(百万-千万级)
- 以 CRUD 为主,但有一定业务逻辑
- 需要考虑缓存、分页、搜索等
与其他系统的差异:
CRUD:
- 纯增删改查
- 无复杂逻辑
- 无缓存优化
普通业务系统:
- CRUD + 业务逻辑
- 需要缓存
- 需要优化性能
- 需要考虑并发
复杂系统(交易/异步/调度/一致性):
- 分布式事务
- 高并发(10 万+ QPS)
- 强一致性
- 状态机复杂
二、核心技术点
2.1 权限系统(RBAC)
什么是 RBAC?
RBAC: Role-Based Access Control(基于角色的访问控制)
用户 -> 角色 -> 权限 -> 资源
示例:
- 用户: 张三
- 角色: 编辑
- 权限: 文章管理(读、写、删除)
- 资源: /api/articles/*
数据库设计
-- 用户表
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(64) UNIQUE NOT NULL,
password VARCHAR(128) NOT NULL, -- bcrypt hash
email VARCHAR(128),
phone VARCHAR(32),
status VARCHAR(32) NOT NULL DEFAULT 'Active',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 角色表
CREATE TABLE roles (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64) UNIQUE NOT NULL,
description VARCHAR(256),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 权限表
CREATE TABLE permissions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64) UNIQUE NOT NULL,
resource VARCHAR(128) NOT NULL, -- 资源路径,如 /api/articles
action VARCHAR(32) NOT NULL, -- 操作,如 read/write/delete
description VARCHAR(256),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_name (name),
INDEX idx_resource (resource)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 用户-角色关联表
CREATE TABLE user_roles (
user_id BIGINT NOT NULL,
role_id BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, role_id),
INDEX idx_user (user_id),
INDEX idx_role (role_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 角色-权限关联表
CREATE TABLE role_permissions (
role_id BIGINT NOT NULL,
permission_id BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (role_id, permission_id),
INDEX idx_role (role_id),
INDEX idx_permission (permission_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
代码实现
type RBACService struct {
db *sql.DB
redis *redis.Client
}
// 检查用户是否有权限
func (s *RBACService) HasPermission(
ctx context.Context,
userID int64,
resource string,
action string,
) (bool, error) {
// 1. 从缓存获取用户权限
cacheKey := fmt.Sprintf("user_permissions:%d", userID)
cached, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
// 命中缓存
var permissions []Permission
json.Unmarshal([]byte(cached), &permissions)
return s.checkPermission(permissions, resource, action), nil
}
// 2. 从数据库查询
permissions, err := s.getUserPermissions(ctx, userID)
if err != nil {
return false, err
}
// 3. 写入缓存(1 小时)
data, _ := json.Marshal(permissions)
s.redis.Set(ctx, cacheKey, data, 1*time.Hour)
// 4. 检查权限
return s.checkPermission(permissions, resource, action), nil
}
func (s *RBACService) getUserPermissions(
ctx context.Context,
userID int64,
) ([]Permission, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT DISTINCT p.id, p.name, p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN roles r ON rp.role_id = r.id
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = ?
`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var permissions []Permission
for rows.Next() {
var p Permission
if err := rows.Scan(&p.ID, &p.Name, &p.Resource, &p.Action); err != nil {
return nil, err
}
permissions = append(permissions, p)
}
return permissions, nil
}
func (s *RBACService) checkPermission(
permissions []Permission,
resource string,
action string,
) bool {
for _, p := range permissions {
// 支持通配符
// 例如: /api/articles/* 匹配 /api/articles/123
if matchPattern(p.Resource, resource) && p.Action == action {
return true
}
// 支持 * 操作(所有操作)
if matchPattern(p.Resource, resource) && p.Action == "*" {
return true
}
}
return false
}
func matchPattern(pattern, resource string) bool {
// 简单实现:支持 * 通配符
if pattern == "*" {
return true
}
if strings.HasSuffix(pattern, "/*") {
prefix := strings.TrimSuffix(pattern, "/*")
return strings.HasPrefix(resource, prefix)
}
return pattern == resource
}
// 分配角色给用户
func (s *RBACService) AssignRole(
ctx context.Context,
userID int64,
roleID int64,
) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO user_roles (user_id, role_id)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE user_id = user_id
`, userID, roleID)
if err != nil {
return err
}
// 清除用户权限缓存
cacheKey := fmt.Sprintf("user_permissions:%d", userID)
s.redis.Del(ctx, cacheKey)
return nil
}
// 创建角色
func (s *RBACService) CreateRole(
ctx context.Context,
name string,
description string,
permissionIDs []int64,
) (int64, error) {
// 开启事务
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
defer tx.Rollback()
// 1. 创建角色
result, err := tx.ExecContext(ctx, `
INSERT INTO roles (name, description)
VALUES (?, ?)
`, name, description)
if err != nil {
return 0, err
}
roleID, _ := result.LastInsertId()
// 2. 分配权限
for _, permID := range permissionIDs {
_, err = tx.ExecContext(ctx, `
INSERT INTO role_permissions (role_id, permission_id)
VALUES (?, ?)
`, roleID, permID)
if err != nil {
return 0, err
}
}
// 提交事务
if err := tx.Commit(); err != nil {
return 0, err
}
return roleID, nil
}
中间件集成
// 权限检查中间件
func PermissionMiddleware(rbac *RBACService) gin.HandlerFunc {
return func(c *gin.Context) {
// 1. 获取用户 ID(从 JWT token 或 session)
userID, exists := c.Get("user_id")
if !exists {
c.JSON(401, gin.H{"error": "Unauthorized"})
c.Abort()
return
}
// 2. 获取资源和操作
resource := c.Request.URL.Path
action := strings.ToLower(c.Request.Method)
// HTTP Method -> Action 映射
actionMap := map[string]string{
"get": "read",
"post": "write",
"put": "write",
"patch": "write",
"delete": "delete",
}
action = actionMap[action]
// 3. 检查权限
hasPermission, err := rbac.HasPermission(
c.Request.Context(),
userID.(int64),
resource,
action,
)
if err != nil {
c.JSON(500, gin.H{"error": "Internal server error"})
c.Abort()
return
}
if !hasPermission {
c.JSON(403, gin.H{"error": "Forbidden"})
c.Abort()
return
}
c.Next()
}
}
// 使用示例
func main() {
r := gin.Default()
// 应用权限中间件
r.Use(PermissionMiddleware(rbacService))
r.GET("/api/articles", listArticles)
r.POST("/api/articles", createArticle)
r.DELETE("/api/articles/:id", deleteArticle)
r.Run(":8080")
}
2.2 缓存策略
Cache-Aside 模式(最常用)
type ArticleService struct {
db *sql.DB
redis *redis.Client
}
// 读取文章(带缓存)
func (s *ArticleService) GetArticle(
ctx context.Context,
id int64,
) (*Article, error) {
// 1. 查缓存
cacheKey := fmt.Sprintf("article:%d", id)
cached, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
// 命中缓存
var article Article
json.Unmarshal([]byte(cached), &article)
log.Debug("cache hit", "articleID", id)
return &article, nil
}
log.Debug("cache miss", "articleID", id)
// 2. 查数据库
var article Article
err = s.db.QueryRowContext(ctx, `
SELECT id, title, content, author_id, view_count, created_at
FROM articles
WHERE id = ?
`, id).Scan(
&article.ID,
&article.Title,
&article.Content,
&article.AuthorID,
&article.ViewCount,
&article.CreatedAt,
)
if err == sql.ErrNoRows {
// 缓存空值,防止缓存穿透
s.redis.Set(ctx, cacheKey, "null", 5*time.Minute)
return nil, errors.New("article not found")
}
if err != nil {
return nil, err
}
// 3. 写入缓存
data, _ := json.Marshal(article)
s.redis.Set(ctx, cacheKey, data, 1*time.Hour)
return &article, nil
}
// 更新文章(删除缓存)
func (s *ArticleService) UpdateArticle(
ctx context.Context,
article *Article,
) error {
// 1. 更新数据库
_, err := s.db.ExecContext(ctx, `
UPDATE articles
SET title = ?,
content = ?,
updated_at = NOW()
WHERE id = ?
`, article.Title, article.Content, article.ID)
if err != nil {
return err
}
// 2. 删除缓存(让下次读取时重新加载)
cacheKey := fmt.Sprintf("article:%d", article.ID)
s.redis.Del(ctx, cacheKey)
log.Info("article updated, cache deleted", "articleID", article.ID)
return nil
}
缓存穿透解决方案
问题: 大量请求不存在的 key,绕过缓存直接打到数据库
// 方案 1: 缓存空值
func (s *ArticleService) GetArticleSafe(
ctx context.Context,
id int64,
) (*Article, error) {
cacheKey := fmt.Sprintf("article:%d", id)
cached, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
if cached == "null" {
// 空值缓存
return nil, errors.New("article not found")
}
var article Article
json.Unmarshal([]byte(cached), &article)
return &article, nil
}
// 查数据库
var article Article
err = s.db.QueryRowContext(ctx, `
SELECT id, title, content FROM articles WHERE id = ?
`, id).Scan(&article.ID, &article.Title, &article.Content)
if err == sql.ErrNoRows {
// 缓存空值(短时间)
s.redis.Set(ctx, cacheKey, "null", 5*time.Minute)
return nil, errors.New("article not found")
}
if err != nil {
return nil, err
}
data, _ := json.Marshal(article)
s.redis.Set(ctx, cacheKey, data, 1*time.Hour)
return &article, nil
}
// 方案 2: 布隆过滤器
type BloomFilter struct {
redis *redis.Client
key string
}
func (bf *BloomFilter) Add(id int64) error {
// 使用 Redis 的 Bloom Filter(需要 RedisBloom 模块)
return bf.redis.Do(
context.Background(),
"BF.ADD",
bf.key,
id,
).Err()
}
func (bf *BloomFilter) Exists(id int64) (bool, error) {
result, err := bf.redis.Do(
context.Background(),
"BF.EXISTS",
bf.key,
id,
).Int()
return result == 1, err
}
func (s *ArticleService) GetArticleWithBloom(
ctx context.Context,
id int64,
) (*Article, error) {
// 1. 布隆过滤器检查
exists, _ := s.bloomFilter.Exists(id)
if !exists {
// 一定不存在,直接返回
return nil, errors.New("article not found")
}
// 2. 可能存在,查缓存和数据库
return s.GetArticle(ctx, id)
}
缓存雪崩解决方案
问题: 大量缓存同时过期,请求全部打到数据库
// 方案: 随机过期时间
func (s *ArticleService) SetCacheWithRandomTTL(
ctx context.Context,
key string,
value interface{},
baseTTL time.Duration,
) error {
// 在基础 TTL 上增加随机时间(0-30 分钟)
randomExtra := time.Duration(rand.Intn(1800)) * time.Second
ttl := baseTTL + randomExtra
data, _ := json.Marshal(value)
return s.redis.Set(ctx, key, data, ttl).Err()
}
// 使用示例
func (s *ArticleService) GetArticleWithRandomTTL(
ctx context.Context,
id int64,
) (*Article, error) {
cacheKey := fmt.Sprintf("article:%d", id)
cached, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
var article Article
json.Unmarshal([]byte(cached), &article)
return &article, nil
}
// 查数据库
var article Article
err = s.db.QueryRowContext(ctx, `...`).Scan(...)
if err != nil {
return nil, err
}
// 缓存(随机 TTL)
s.SetCacheWithRandomTTL(ctx, cacheKey, article, 1*time.Hour)
return &article, nil
}
2.3 分页查询
Offset 分页(最常用)
type PageRequest struct {
Page int `form:"page" binding:"min=1"`
PageSize int `form:"page_size" binding:"min=1,max=100"`
}
type PageResponse struct {
Total int64 `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
Data interface{} `json:"data"`
}
func (s *ArticleService) ListArticles(
ctx context.Context,
req PageRequest,
) (*PageResponse, error) {
// 1. 计算偏移量
offset := (req.Page - 1) * req.PageSize
// 2. 查询总数(可以缓存)
var total int64
err := s.db.QueryRowContext(ctx, `
SELECT COUNT(*) FROM articles WHERE status = 'Published'
`).Scan(&total)
if err != nil {
return nil, err
}
// 3. 查询数据
rows, err := s.db.QueryContext(ctx, `
SELECT id, title, author_id, view_count, created_at
FROM articles
WHERE status = 'Published'
ORDER BY created_at DESC
LIMIT ? OFFSET ?
`, req.PageSize, offset)
if err != nil {
return nil, err
}
defer rows.Close()
var articles []Article
for rows.Next() {
var a Article
rows.Scan(&a.ID, &a.Title, &a.AuthorID, &a.ViewCount, &a.CreatedAt)
articles = append(articles, a)
}
return &PageResponse{
Total: total,
Page: req.Page,
PageSize: req.PageSize,
Data: articles,
}, nil
}
优点: 简单,支持跳页 缺点: 深度分页性能差(OFFSET 10000 很慢)
Cursor 分页(更好的性能)
type CursorRequest struct {
Cursor int64 `form:"cursor"` // 上一页最后一条的 ID
PageSize int `form:"page_size" binding:"min=1,max=100"`
}
type CursorResponse struct {
NextCursor int64 `json:"next_cursor"`
HasMore bool `json:"has_more"`
Data interface{} `json:"data"`
}
func (s *ArticleService) ListArticlesCursor(
ctx context.Context,
req CursorRequest,
) (*CursorResponse, error) {
// 查询数据(多查一条判断是否还有更多)
rows, err := s.db.QueryContext(ctx, `
SELECT id, title, author_id, view_count, created_at
FROM articles
WHERE status = 'Published'
AND id < ?
ORDER BY id DESC
LIMIT ?
`, req.Cursor, req.PageSize+1)
if err != nil {
return nil, err
}
defer rows.Close()
var articles []Article
for rows.Next() {
var a Article
rows.Scan(&a.ID, &a.Title, &a.AuthorID, &a.ViewCount, &a.CreatedAt)
articles = append(articles, a)
}
// 判断是否还有更多
hasMore := len(articles) > req.PageSize
if hasMore {
articles = articles[:req.PageSize]
}
// 下一页的 cursor
var nextCursor int64
if len(articles) > 0 {
nextCursor = articles[len(articles)-1].ID
}
return &CursorResponse{
NextCursor: nextCursor,
HasMore: hasMore,
Data: articles,
}, nil
}
优点: 性能好,适合深度分页 缺点: 不支持跳页
2.4 全文搜索
使用 Elasticsearch
type SearchService struct {
es *elasticsearch.Client
}
// 索引文章到 ES
func (s *SearchService) IndexArticle(article *Article) error {
doc := map[string]interface{}{
"id": article.ID,
"title": article.Title,
"content": article.Content,
"author_id": article.AuthorID,
"view_count": article.ViewCount,
"created_at": article.CreatedAt,
}
data, _ := json.Marshal(doc)
req := esapi.IndexRequest{
Index: "articles",
DocumentID: strconv.FormatInt(article.ID, 10),
Body: bytes.NewReader(data),
Refresh: "true",
}
res, err := req.Do(context.Background(), s.es)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("index failed: %s", res.String())
}
return nil
}
// 搜索文章
func (s *SearchService) SearchArticles(
keyword string,
page int,
pageSize int,
) ([]Article, int64, error) {
// 构建查询
query := map[string]interface{}{
"query": map[string]interface{}{
"multi_match": map[string]interface{}{
"query": keyword,
"fields": []string{"title^3", "content"}, // title 权重更高
},
},
"from": (page - 1) * pageSize,
"size": pageSize,
"sort": []map[string]interface{}{
{"_score": "desc"},
{"created_at": "desc"},
},
}
data, _ := json.Marshal(query)
res, err := s.es.Search(
s.es.Search.WithContext(context.Background()),
s.es.Search.WithIndex("articles"),
s.es.Search.WithBody(bytes.NewReader(data)),
)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
// 解析结果
var result map[string]interface{}
json.NewDecoder(res.Body).Decode(&result)
hits := result["hits"].(map[string]interface{})
total := int64(hits["total"].(map[string]interface{})["value"].(float64))
var articles []Article
for _, hit := range hits["hits"].([]interface{}) {
h := hit.(map[string]interface{})
source := h["_source"].(map[string]interface{})
article := Article{
ID: int64(source["id"].(float64)),
Title: source["title"].(string),
Content: source["content"].(string),
AuthorID: int64(source["author_id"].(float64)),
ViewCount: int(source["view_count"].(float64)),
}
articles = append(articles, article)
}
return articles, total, nil
}
2.5 消息队列(异步处理)
使用 Redis Streams
type MessageQueue struct {
redis *redis.Client
}
// 发送消息
func (mq *MessageQueue) Publish(
ctx context.Context,
topic string,
message interface{},
) error {
data, _ := json.Marshal(message)
return mq.redis.XAdd(ctx, &redis.XAddArgs{
Stream: topic,
Values: map[string]interface{}{
"data": data,
},
}).Err()
}
// 消费消息
func (mq *MessageQueue) Subscribe(
ctx context.Context,
topic string,
group string,
consumer string,
handler func(interface{}) error,
) error {
// 创建消费者组
mq.redis.XGroupCreateMkStream(ctx, topic, group, "0")
for {
// 读取消息
streams, err := mq.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{topic, ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
continue
}
return err
}
for _, stream := range streams {
for _, message := range stream.Messages {
data := message.Values["data"].(string)
var msg interface{}
json.Unmarshal([]byte(data), &msg)
// 处理消息
err := handler(msg)
if err != nil {
log.Error("handle message failed",
"messageID", message.ID,
"err", err)
continue
}
// ACK
mq.redis.XAck(ctx, topic, group, message.ID)
}
}
}
}
// 使用示例
func main() {
mq := &MessageQueue{redis: redisClient}
// 发布消息
go func() {
for {
mq.Publish(context.Background(), "email", map[string]interface{}{
"to": "user@example.com",
"subject": "Welcome",
"body": "Welcome to our platform!",
})
time.Sleep(1 * time.Second)
}
}()
// 消费消息
mq.Subscribe(
context.Background(),
"email",
"email-workers",
"worker-1",
func(msg interface{}) error {
// 发送邮件
log.Info("sending email", "msg", msg)
return nil
},
)
}
三、完整案例:博客系统
3.1 需求
- 用户注册、登录
- 发布文章、编辑、删除
- 评论、点赞
- 文章搜索
- Feed 流推荐
3.2 数据库设计
-- 用户表
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(64) UNIQUE NOT NULL,
email VARCHAR(128) UNIQUE NOT NULL,
password VARCHAR(128) NOT NULL,
avatar VARCHAR(256),
bio TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 文章表
CREATE TABLE articles (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
title VARCHAR(256) NOT NULL,
content TEXT NOT NULL,
author_id BIGINT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'Draft',
view_count INT NOT NULL DEFAULT 0,
like_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_author (author_id),
INDEX idx_status_created (status, created_at),
FULLTEXT idx_content (title, content)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 评论表
CREATE TABLE comments (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
article_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_article (article_id),
INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 点赞表
CREATE TABLE likes (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
article_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_article_user (article_id, user_id),
INDEX idx_article (article_id),
INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.3 核心功能实现
发布文章
func (s *ArticleService) PublishArticle(
ctx context.Context,
userID int64,
title string,
content string,
) (*Article, error) {
// 1. 创建文章
result, err := s.db.ExecContext(ctx, `
INSERT INTO articles (title, content, author_id, status)
VALUES (?, ?, ?, 'Published')
`, title, content, userID)
if err != nil {
return nil, err
}
articleID, _ := result.LastInsertId()
article := &Article{
ID: articleID,
Title: title,
Content: content,
AuthorID: userID,
Status: "Published",
CreatedAt: time.Now(),
}
// 2. 索引到 ES(异步)
go s.searchService.IndexArticle(article)
// 3. 推送到粉丝 Feed(异步)
go s.feedService.PushToFollowers(userID, articleID)
log.Info("article published",
"articleID", articleID,
"authorID", userID)
return article, nil
}
点赞文章
func (s *ArticleService) LikeArticle(
ctx context.Context,
userID int64,
articleID int64,
) error {
// 1. 检查是否已点赞(Redis)
likeKey := fmt.Sprintf("like:%d:%d", articleID, userID)
exists, _ := s.redis.Exists(ctx, likeKey).Result()
if exists > 0 {
return errors.New("already liked")
}
// 2. 插入点赞记录
_, err := s.db.ExecContext(ctx, `
INSERT INTO likes (article_id, user_id)
VALUES (?, ?)
`, articleID, userID)
if err != nil {
// 可能重复点赞(并发)
if strings.Contains(err.Error(), "Duplicate") {
return errors.New("already liked")
}
return err
}
// 3. 增加文章点赞数
s.db.ExecContext(ctx, `
UPDATE articles
SET like_count = like_count + 1
WHERE id = ?
`, articleID)
// 4. 缓存点赞状态(1 天)
s.redis.Set(ctx, likeKey, 1, 24*time.Hour)
// 5. 删除文章缓存
s.redis.Del(ctx, fmt.Sprintf("article:%d", articleID))
log.Info("article liked",
"articleID", articleID,
"userID", userID)
return nil
}
Feed 流推荐
type FeedService struct {
db *sql.DB
redis *redis.Client
}
// 推送文章到粉丝 Feed
func (s *FeedService) PushToFollowers(
authorID int64,
articleID int64,
) error {
// 1. 查询粉丝列表
rows, err := s.db.Query(`
SELECT user_id FROM follows WHERE author_id = ?
`, authorID)
if err != nil {
return err
}
defer rows.Close()
// 2. 推送到每个粉丝的 Feed
for rows.Next() {
var followerID int64
rows.Scan(&followerID)
// 使用 Redis Sorted Set(按时间排序)
feedKey := fmt.Sprintf("feed:%d", followerID)
score := float64(time.Now().Unix())
s.redis.ZAdd(context.Background(), feedKey, &redis.Z{
Score: score,
Member: articleID,
})
// 只保留最新 1000 条
s.redis.ZRemRangeByRank(context.Background(), feedKey, 0, -1001)
}
return nil
}
// 获取用户 Feed
func (s *FeedService) GetUserFeed(
userID int64,
page int,
pageSize int,
) ([]Article, error) {
feedKey := fmt.Sprintf("feed:%d", userID)
// 从 Redis 获取文章 ID 列表(倒序)
start := int64((page - 1) * pageSize)
stop := start + int64(pageSize) - 1
articleIDs, err := s.redis.ZRevRange(
context.Background(),
feedKey,
start,
stop,
).Result()
if err != nil {
return nil, err
}
// 批量查询文章
var articles []Article
for _, idStr := range articleIDs {
articleID, _ := strconv.ParseInt(idStr, 10, 64)
article, err := articleService.GetArticle(context.Background(), articleID)
if err == nil {
articles = append(articles, *article)
}
}
return articles, nil
}
四、性能优化技巧
4.1 数据库优化
-- 1. 添加索引
CREATE INDEX idx_articles_author_status ON articles(author_id, status);
CREATE INDEX idx_articles_created ON articles(created_at DESC);
-- 2. 优化查询
-- 差的查询
SELECT * FROM articles; -- 避免 SELECT *
-- 好的查询
SELECT id, title, author_id, created_at
FROM articles
WHERE status = 'Published'
ORDER BY created_at DESC
LIMIT 20;
-- 3. 使用 EXPLAIN 分析
EXPLAIN SELECT * FROM articles WHERE author_id = 123;
4.2 N+1 查询问题
// 差的做法(N+1 查询)
func listArticlesWithAuthor(ctx context.Context) ([]ArticleWithAuthor, error) {
// 1. 查询文章列表
rows, _ := db.Query("SELECT id, title, author_id FROM articles LIMIT 20")
defer rows.Close()
var results []ArticleWithAuthor
for rows.Next() {
var article Article
rows.Scan(&article.ID, &article.Title, &article.AuthorID)
// 2. 为每篇文章查询作者(N 次查询!)
var author User
db.QueryRow(
"SELECT id, username FROM users WHERE id = ?",
article.AuthorID,
).Scan(&author.ID, &author.Username)
results = append(results, ArticleWithAuthor{
Article: article,
Author: author,
})
}
return results, nil
}
// 好的做法(2 次查询)
func listArticlesWithAuthorOptimized(ctx context.Context) ([]ArticleWithAuthor, error) {
// 1. 查询文章列表
rows, _ := db.Query("SELECT id, title, author_id FROM articles LIMIT 20")
defer rows.Close()
var articles []Article
authorIDs := make(map[int64]bool)
for rows.Next() {
var article Article
rows.Scan(&article.ID, &article.Title, &article.AuthorID)
articles = append(articles, article)
authorIDs[article.AuthorID] = true
}
// 2. 批量查询作者(1 次查询)
ids := make([]int64, 0, len(authorIDs))
for id := range authorIDs {
ids = append(ids, id)
}
query := fmt.Sprintf(
"SELECT id, username FROM users WHERE id IN (%s)",
strings.Repeat("?,", len(ids)-1)+"?",
)
args := make([]interface{}, len(ids))
for i, id := range ids {
args[i] = id
}
rows2, _ := db.Query(query, args...)
defer rows2.Close()
authors := make(map[int64]User)
for rows2.Next() {
var author User
rows2.Scan(&author.ID, &author.Username)
authors[author.ID] = author
}
// 3. 组装结果
var results []ArticleWithAuthor
for _, article := range articles {
results = append(results, ArticleWithAuthor{
Article: article,
Author: authors[article.AuthorID],
})
}
return results, nil
}
五、总结
5.1 为什么普通业务系统排第五?
比 CRUD 难:
- 有业务逻辑
- 需要缓存优化
- 需要考虑性能
- 需要权限控制
比复杂系统简单:
- 无分布式事务
- 并发量适中
- 延迟要求不高
- 无复杂状态机
5.2 关键技术点
必备技能:
- RBAC 权限系统
- Redis 缓存
- 分页查询
- 全文搜索(ES)
- 消息队列
性能优化:
- 数据库索引
- 查询优化
- 缓存策略
- 批量查询
最佳实践:
- 缓存穿透/雪崩/击穿
- N+1 查询问题
- 深度分页优化
- 异步处理
5.3 学习路径
- 掌握基础: SQL、Redis、HTTP
- 学习框架: Gin/Echo、GORM
- 实践项目: 博客、论坛、电商
- 性能优化: 缓存、索引、分页
- 生产经验: 监控、日志、错误处理
普通业务系统是中高级开发者的主战场!
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ [NO.3 高性能异步系统](消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ NO.5 普通业务系统(绝大多数人做的)