HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 微服务架构实战

    • 微服务架构设计手册
    • 第1章:微服务架构概述
    • 第2章:服务拆分与边界
    • 第3章:服务间通信
    • 第4章:数据一致性方案

第3章:服务间通信

通信模式概述

微服务通信的核心问题

单体应用:

Service A → 方法调用 → Service B

特点:
 进程内调用(纳秒级)
 可靠(不会丢失)
 事务保证(ACID)

微服务:

Service A → 网络调用 → Service B

挑战:
 网络延迟(毫秒级)
 网络故障(超时、丢包)
 分布式事务(复杂)
 服务发现(动态IP)

通信模式分类

1. 按交互方式

同步通信(Request-Response):

Client ───请求───> Server
       <───响应───

特点:
- 阻塞等待响应
- 实时性强
- 强耦合

示例:HTTP、gRPC

异步通信(Message-Based):

Producer ───消息───> Message Queue ───> Consumer

特点:
- 非阻塞
- 解耦
- 削峰填谷

示例:RabbitMQ、Kafka

2. 按通信模式

一对一(One-to-One):

Client → Server

模式:
- Request/Response(请求/响应)
- Async Request/Response(异步请求/响应)
- One-way Notification(单向通知)

一对多(One-to-Many):

Publisher → Topic → Subscriber1
                  → Subscriber2
                  → Subscriber3

模式:
- Publish/Subscribe(发布/订阅)
- Event Streaming(事件流)

同步通信

REST API

定义

REST(Representational State Transfer):基于HTTP的架构风格

核心原则:

1. 资源(Resource):一切皆资源
2. URI:唯一标识资源
3. HTTP方法:操作资源
   - GET:查询
   - POST:创建
   - PUT:更新(全量)
   - PATCH:更新(部分)
   - DELETE:删除
4. 无状态:每个请求独立
5. 统一接口

代码示例(Go + Gin)

Product Service API:

package main

import (
    "net/http"
    "github.com/gin-gonic/gin"
    "time"
)

// 商品模型
type Product struct {
    ID          string    `json:"id"`
    Name        string    `json:"name"`
    Description string    `json:"description"`
    Price       float64   `json:"price"`
    Stock       int       `json:"stock"`
    CreatedAt   time.Time `json:"created_at"`
}

// Product Service
type ProductService struct {
    products map[string]*Product
}

func NewProductService() *ProductService {
    return &ProductService{
        products: make(map[string]*Product),
    }
}

func main() {
    router := gin.Default()
    service := NewProductService()

    // RESTful API路由
    v1 := router.Group("/api/v1")
    {
        products := v1.Group("/products")
        {
            products.POST("", service.CreateProduct)        // 创建商品
            products.GET("/:id", service.GetProduct)        // 查询商品
            products.PUT("/:id", service.UpdateProduct)     // 更新商品
            products.DELETE("/:id", service.DeleteProduct)  // 删除商品
            products.GET("", service.ListProducts)          // 商品列表
        }
    }

    router.Run(":8080")
}

// 创建商品
func (s *ProductService) CreateProduct(c *gin.Context) {
    var product Product
    if err := c.ShouldBindJSON(&product); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    product.ID = generateID()
    product.CreatedAt = time.Now()
    s.products[product.ID] = &product

    c.JSON(http.StatusCreated, product)
}

// 查询商品
func (s *ProductService) GetProduct(c *gin.Context) {
    id := c.Param("id")

    product, exists := s.products[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
        return
    }

    c.JSON(http.StatusOK, product)
}

// 更新商品
func (s *ProductService) UpdateProduct(c *gin.Context) {
    id := c.Param("id")

    product, exists := s.products[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
        return
    }

    var updateData Product
    if err := c.ShouldBindJSON(&updateData); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    // 更新字段
    product.Name = updateData.Name
    product.Description = updateData.Description
    product.Price = updateData.Price
    product.Stock = updateData.Stock

    c.JSON(http.StatusOK, product)
}

// 删除商品
func (s *ProductService) DeleteProduct(c *gin.Context) {
    id := c.Param("id")

    if _, exists := s.products[id]; !exists {
        c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
        return
    }

    delete(s.products, id)
    c.JSON(http.StatusNoContent, nil)
}

// 商品列表
func (s *ProductService) ListProducts(c *gin.Context) {
    products := make([]*Product, 0, len(s.products))
    for _, p := range s.products {
        products = append(products, p)
    }

    c.JSON(http.StatusOK, gin.H{
        "total": len(products),
        "items": products,
    })
}

func generateID() string {
    return fmt.Sprintf("PROD-%d", time.Now().UnixNano())
}

HTTP Client调用:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

// Product Client
type ProductClient struct {
    baseURL    string
    httpClient *http.Client
}

func NewProductClient(baseURL string) *ProductClient {
    return &ProductClient{
        baseURL: baseURL,
        httpClient: &http.Client{
            Timeout: 5 * time.Second,
        },
    }
}

// 创建商品
func (c *ProductClient) CreateProduct(product *Product) (*Product, error) {
    data, _ := json.Marshal(product)

    resp, err := c.httpClient.Post(
        c.baseURL+"/api/v1/products",
        "application/json",
        bytes.NewBuffer(data),
    )
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusCreated {
        return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
    }

    var result Product
    json.NewDecoder(resp.Body).Decode(&result)
    return &result, nil
}

// 查询商品
func (c *ProductClient) GetProduct(id string) (*Product, error) {
    resp, err := c.httpClient.Get(c.baseURL + "/api/v1/products/" + id)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode == http.StatusNotFound {
        return nil, fmt.Errorf("product not found")
    }

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
    }

    var product Product
    json.NewDecoder(resp.Body).Decode(&product)
    return &product, nil
}

// 使用示例
func main() {
    client := NewProductClient("http://localhost:8080")

    // 创建商品
    product := &Product{
        Name:        "iPhone 15",
        Description: "最新款iPhone",
        Price:       5999.00,
        Stock:       100,
    }

    created, err := client.CreateProduct(product)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Created: %+v\n", created)

    // 查询商品
    fetched, err := client.GetProduct(created.ID)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Fetched: %+v\n", fetched)
}

gRPC

定义

gRPC:Google开源的高性能RPC框架

特点:

 HTTP/2协议(多路复用)
 Protocol Buffers(高效序列化)
 强类型(类型安全)
 支持流式传输
 性能高(比REST快3-10倍)

Protocol Buffers定义

product.proto:

syntax = "proto3";

package product;

option go_package = "github.com/yourapp/product/pb";

// 商品服务
service ProductService {
  // 创建商品
  rpc CreateProduct(CreateProductRequest) returns (ProductResponse);

  // 查询商品
  rpc GetProduct(GetProductRequest) returns (ProductResponse);

  // 更新商品
  rpc UpdateProduct(UpdateProductRequest) returns (ProductResponse);

  // 删除商品
  rpc DeleteProduct(DeleteProductRequest) returns (DeleteProductResponse);

  // 商品列表(服务端流式)
  rpc ListProducts(ListProductsRequest) returns (stream ProductResponse);

  // 批量检查库存(客户端流式)
  rpc CheckStockBatch(stream CheckStockRequest) returns (CheckStockBatchResponse);

  // 实时库存监控(双向流式)
  rpc MonitorStock(stream MonitorStockRequest) returns (stream StockUpdateResponse);
}

// 商品消息
message Product {
  string id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock = 5;
  int64 created_at = 6;
}

// 请求/响应消息
message CreateProductRequest {
  string name = 1;
  string description = 2;
  double price = 3;
  int32 stock = 4;
}

message GetProductRequest {
  string id = 1;
}

message UpdateProductRequest {
  string id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock = 5;
}

message DeleteProductRequest {
  string id = 1;
}

message ProductResponse {
  Product product = 1;
}

message DeleteProductResponse {
  bool success = 1;
}

message ListProductsRequest {
  int32 page = 1;
  int32 page_size = 2;
}

message CheckStockRequest {
  string product_id = 1;
  int32 quantity = 2;
}

message CheckStockBatchResponse {
  bool all_available = 1;
  repeated string unavailable_products = 2;
}

message MonitorStockRequest {
  string product_id = 1;
}

message StockUpdateResponse {
  string product_id = 1;
  int32 current_stock = 2;
  int64 timestamp = 3;
}

gRPC Server实现

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "sync"
    "time"

    pb "github.com/yourapp/product/pb"
    "google.golang.org/grpc"
)

// gRPC服务实现
type ProductServer struct {
    pb.UnimplementedProductServiceServer
    products map[string]*pb.Product
    mu       sync.RWMutex
}

func NewProductServer() *ProductServer {
    return &ProductServer{
        products: make(map[string]*pb.Product),
    }
}

// 创建商品
func (s *ProductServer) CreateProduct(ctx context.Context, req *pb.CreateProductRequest) (*pb.ProductResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    product := &pb.Product{
        Id:          generateID(),
        Name:        req.Name,
        Description: req.Description,
        Price:       req.Price,
        Stock:       req.Stock,
        CreatedAt:   time.Now().Unix(),
    }

    s.products[product.Id] = product

    log.Printf("Created product: %s", product.Id)

    return &pb.ProductResponse{Product: product}, nil
}

// 查询商品
func (s *ProductServer) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.ProductResponse, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    product, exists := s.products[req.Id]
    if !exists {
        return nil, fmt.Errorf("product not found: %s", req.Id)
    }

    return &pb.ProductResponse{Product: product}, nil
}

// 更新商品
func (s *ProductServer) UpdateProduct(ctx context.Context, req *pb.UpdateProductRequest) (*pb.ProductResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    product, exists := s.products[req.Id]
    if !exists {
        return nil, fmt.Errorf("product not found: %s", req.Id)
    }

    product.Name = req.Name
    product.Description = req.Description
    product.Price = req.Price
    product.Stock = req.Stock

    return &pb.ProductResponse{Product: product}, nil
}

// 删除商品
func (s *ProductServer) DeleteProduct(ctx context.Context, req *pb.DeleteProductRequest) (*pb.DeleteProductResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    delete(s.products, req.Id)

    return &pb.DeleteProductResponse{Success: true}, nil
}

// 商品列表(服务端流式)
func (s *ProductServer) ListProducts(req *pb.ListProductsRequest, stream pb.ProductService_ListProductsServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    for _, product := range s.products {
        if err := stream.Send(&pb.ProductResponse{Product: product}); err != nil {
            return err
        }
    }

    return nil
}

// 批量检查库存(客户端流式)
func (s *ProductServer) CheckStockBatch(stream pb.ProductService_CheckStockBatchServer) error {
    unavailable := []string{}

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            // 客户端发送完毕
            return stream.SendAndClose(&pb.CheckStockBatchResponse{
                AllAvailable:        len(unavailable) == 0,
                UnavailableProducts: unavailable,
            })
        }
        if err != nil {
            return err
        }

        // 检查库存
        s.mu.RLock()
        product, exists := s.products[req.ProductId]
        s.mu.RUnlock()

        if !exists || product.Stock < req.Quantity {
            unavailable = append(unavailable, req.ProductId)
        }
    }
}

// 实时库存监控(双向流式)
func (s *ProductServer) MonitorStock(stream pb.ProductService_MonitorStockServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // 模拟定期发送库存更新
        go func(productID string) {
            ticker := time.NewTicker(2 * time.Second)
            defer ticker.Stop()

            for range ticker.C {
                s.mu.RLock()
                product, exists := s.products[productID]
                s.mu.RUnlock()

                if !exists {
                    return
                }

                stream.Send(&pb.StockUpdateResponse{
                    ProductId:    productID,
                    CurrentStock: product.Stock,
                    Timestamp:    time.Now().Unix(),
                })
            }
        }(req.ProductId)
    }
}

func main() {
    // 启动gRPC服务器
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    grpcServer := grpc.NewServer()
    pb.RegisterProductServiceServer(grpcServer, NewProductServer())

    log.Println("gRPC server listening on :50051")
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

gRPC Client调用

package main

import (
    "context"
    "io"
    "log"
    "time"

    pb "github.com/yourapp/product/pb"
    "google.golang.org/grpc"
)

// gRPC客户端
type ProductClient struct {
    client pb.ProductServiceClient
    conn   *grpc.ClientConn
}

func NewProductClient(address string) (*ProductClient, error) {
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        return nil, err
    }

    return &ProductClient{
        client: pb.NewProductServiceClient(conn),
        conn:   conn,
    }, nil
}

func (c *ProductClient) Close() {
    c.conn.Close()
}

// 创建商品
func (c *ProductClient) CreateProduct(name, description string, price float64, stock int32) (*pb.Product, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    resp, err := c.client.CreateProduct(ctx, &pb.CreateProductRequest{
        Name:        name,
        Description: description,
        Price:       price,
        Stock:       stock,
    })

    if err != nil {
        return nil, err
    }

    return resp.Product, nil
}

// 查询商品
func (c *ProductClient) GetProduct(id string) (*pb.Product, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    resp, err := c.client.GetProduct(ctx, &pb.GetProductRequest{Id: id})
    if err != nil {
        return nil, err
    }

    return resp.Product, nil
}

// 商品列表(服务端流式)
func (c *ProductClient) ListProducts() ([]*pb.Product, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := c.client.ListProducts(ctx, &pb.ListProductsRequest{})
    if err != nil {
        return nil, err
    }

    var products []*pb.Product
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return nil, err
        }

        products = append(products, resp.Product)
    }

    return products, nil
}

// 批量检查库存(客户端流式)
func (c *ProductClient) CheckStockBatch(items []struct {
    ProductID string
    Quantity  int32
}) (bool, []string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := c.client.CheckStockBatch(ctx)
    if err != nil {
        return false, nil, err
    }

    // 发送所有检查请求
    for _, item := range items {
        if err := stream.Send(&pb.CheckStockRequest{
            ProductId: item.ProductID,
            Quantity:  item.Quantity,
        }); err != nil {
            return false, nil, err
        }
    }

    // 接收结果
    resp, err := stream.CloseAndRecv()
    if err != nil {
        return false, nil, err
    }

    return resp.AllAvailable, resp.UnavailableProducts, nil
}

// 使用示例
func main() {
    client, err := NewProductClient("localhost:50051")
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer client.Close()

    // 1. 创建商品
    product, err := client.CreateProduct(
        "iPhone 15",
        "最新款iPhone",
        5999.00,
        100,
    )
    if err != nil {
        log.Fatalf("CreateProduct failed: %v", err)
    }
    log.Printf("Created: %+v", product)

    // 2. 查询商品
    fetched, err := client.GetProduct(product.Id)
    if err != nil {
        log.Fatalf("GetProduct failed: %v", err)
    }
    log.Printf("Fetched: %+v", fetched)

    // 3. 商品列表(流式)
    products, err := client.ListProducts()
    if err != nil {
        log.Fatalf("ListProducts failed: %v", err)
    }
    log.Printf("Total products: %d", len(products))

    // 4. 批量检查库存
    available, unavailable, err := client.CheckStockBatch([]struct {
        ProductID string
        Quantity  int32
    }{
        {product.Id, 10},
        {product.Id, 200}, // 库存不足
    })
    if err != nil {
        log.Fatalf("CheckStockBatch failed: %v", err)
    }
    log.Printf("All available: %v, Unavailable: %v", available, unavailable)
}

REST vs gRPC对比

维度RESTgRPC
协议HTTP/1.1HTTP/2
数据格式JSON/XMLProtocol Buffers
性能中等高(3-10倍)
可读性高(文本)低(二进制)
浏览器支持原生支持需要grpc-web
流式传输
类型安全弱(JSON)强(protobuf)
工具支持成熟(Postman)较少
学习曲线低中等

选择建议:

REST适用:
 外部API(浏览器/移动端)
 公共API
 简单CRUD
 快速开发

gRPC适用:
 内部服务间通信
 高性能要求
 实时流式传输
 强类型约束

异步通信

消息队列

核心概念

Producer-Consumer模式:

Producer → Message Queue → Consumer

特点:
 解耦:生产者和消费者独立
 异步:生产者不等待消费者
 削峰:缓冲高峰流量
 可靠性:消息持久化

发布-订阅模式:

Publisher → Topic → Subscriber 1
                  → Subscriber 2
                  → Subscriber 3

特点:
 一对多
 广播消息
 解耦订阅者

RabbitMQ示例

Producer(生产者):

package main

import (
    "encoding/json"
    "log"

    "github.com/streadway/amqp"
)

// 订单创建事件
type OrderCreatedEvent struct {
    OrderID string  `json:"order_id"`
    UserID  string  `json:"user_id"`
    Amount  float64 `json:"amount"`
}

// RabbitMQ Producer
type RabbitMQProducer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitMQProducer(url string) (*RabbitMQProducer, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    return &RabbitMQProducer{
        conn:    conn,
        channel: ch,
    }, nil
}

func (p *RabbitMQProducer) Close() {
    p.channel.Close()
    p.conn.Close()
}

// 发布订单创建事件
func (p *RabbitMQProducer) PublishOrderCreated(event OrderCreatedEvent) error {
    // 1. 声明交换机
    err := p.channel.ExchangeDeclare(
        "orders",  // 交换机名称
        "topic",   // 类型:topic
        true,      // 持久化
        false,     // 不自动删除
        false,     // 非internal
        false,     // 不等待
        nil,       // 参数
    )
    if err != nil {
        return err
    }

    // 2. 序列化消息
    body, err := json.Marshal(event)
    if err != nil {
        return err
    }

    // 3. 发布消息
    err = p.channel.Publish(
        "orders",            // 交换机
        "order.created",     // routing key
        false,               // mandatory
        false,               // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent, // 持久化
        },
    )

    if err != nil {
        return err
    }

    log.Printf("Published order created: %s", event.OrderID)
    return nil
}

func main() {
    producer, err := NewRabbitMQProducer("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    // 发布订单创建事件
    event := OrderCreatedEvent{
        OrderID: "ORD-001",
        UserID:  "USER-001",
        Amount:  199.99,
    }

    if err := producer.PublishOrderCreated(event); err != nil {
        log.Fatal(err)
    }
}

Consumer(消费者):

package main

import (
    "encoding/json"
    "log"

    "github.com/streadway/amqp"
)

// RabbitMQ Consumer
type RabbitMQConsumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitMQConsumer(url string) (*RabbitMQConsumer, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        return nil, err
    }

    return &RabbitMQConsumer{
        conn:    conn,
        channel: ch,
    }, nil
}

func (c *RabbitMQConsumer) Close() {
    c.channel.Close()
    c.conn.Close()
}

// 订阅订单创建事件
func (c *RabbitMQConsumer) SubscribeOrderCreated(handler func(OrderCreatedEvent) error) error {
    // 1. 声明交换机
    err := c.channel.ExchangeDeclare(
        "orders",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // 2. 声明队列
    queue, err := c.channel.QueueDeclare(
        "inventory-service-order-created", // 队列名称
        true,                               // 持久化
        false,                              // 不自动删除
        false,                              // 非排他
        false,                              // 不等待
        nil,                                // 参数
    )
    if err != nil {
        return err
    }

    // 3. 绑定队列到交换机
    err = c.channel.QueueBind(
        queue.Name,      // 队列名称
        "order.created", // routing key
        "orders",        // 交换机
        false,           // 不等待
        nil,             // 参数
    )
    if err != nil {
        return err
    }

    // 4. 消费消息
    msgs, err := c.channel.Consume(
        queue.Name, // 队列
        "",         // consumer tag
        false,      // 手动ACK
        false,      // 非排他
        false,      // no-local
        false,      // 不等待
        nil,        // 参数
    )
    if err != nil {
        return err
    }

    log.Println("Waiting for messages...")

    // 5. 处理消息
    go func() {
        for msg := range msgs {
            var event OrderCreatedEvent
            if err := json.Unmarshal(msg.Body, &event); err != nil {
                log.Printf("Failed to unmarshal: %v", err)
                msg.Nack(false, false) // 拒绝消息
                continue
            }

            // 处理事件
            if err := handler(event); err != nil {
                log.Printf("Handler failed: %v", err)
                msg.Nack(false, true) // 重新入队
            } else {
                msg.Ack(false) // 确认消息
                log.Printf("Processed order: %s", event.OrderID)
            }
        }
    }()

    return nil
}

func main() {
    consumer, err := NewRabbitMQConsumer("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // 订阅订单创建事件
    err = consumer.SubscribeOrderCreated(func(event OrderCreatedEvent) error {
        // 业务逻辑:扣减库存
        log.Printf("Deducting stock for order: %s", event.OrderID)
        return nil
    })

    if err != nil {
        log.Fatal(err)
    }

    // 阻塞主线程
    select {}
}

Kafka示例

Producer:

package main

import (
    "encoding/json"
    "log"

    "github.com/Shopify/sarama"
)

// Kafka Producer
type KafkaProducer struct {
    producer sarama.SyncProducer
}

func NewKafkaProducer(brokers []string) (*KafkaProducer, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本
    config.Producer.Retry.Max = 5

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }

    return &KafkaProducer{producer: producer}, nil
}

func (p *KafkaProducer) Close() {
    p.producer.Close()
}

// 发布订单创建事件
func (p *KafkaProducer) PublishOrderCreated(event OrderCreatedEvent) error {
    // 1. 序列化
    value, err := json.Marshal(event)
    if err != nil {
        return err
    }

    // 2. 构造消息
    msg := &sarama.ProducerMessage{
        Topic: "order-created",
        Key:   sarama.StringEncoder(event.OrderID), // 相同key保证顺序
        Value: sarama.ByteEncoder(value),
    }

    // 3. 发送消息
    partition, offset, err := p.producer.SendMessage(msg)
    if err != nil {
        return err
    }

    log.Printf("Message sent to partition %d at offset %d", partition, offset)
    return nil
}

func main() {
    producer, err := NewKafkaProducer([]string{"localhost:9092"})
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    event := OrderCreatedEvent{
        OrderID: "ORD-001",
        UserID:  "USER-001",
        Amount:  199.99,
    }

    if err := producer.PublishOrderCreated(event); err != nil {
        log.Fatal(err)
    }
}

Consumer:

package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/Shopify/sarama"
)

// Kafka Consumer
type KafkaConsumer struct {
    consumer sarama.ConsumerGroup
}

func NewKafkaConsumer(brokers []string, group string) (*KafkaConsumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最早的消息开始

    consumer, err := sarama.NewConsumerGroup(brokers, group, config)
    if err != nil {
        return nil, err
    }

    return &KafkaConsumer{consumer: consumer}, nil
}

func (c *KafkaConsumer) Close() {
    c.consumer.Close()
}

// 消费者处理器
type ConsumerGroupHandler struct {
    handler func(OrderCreatedEvent) error
}

func (h ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        var event OrderCreatedEvent
        if err := json.Unmarshal(message.Value, &event); err != nil {
            log.Printf("Failed to unmarshal: %v", err)
            continue
        }

        // 处理事件
        if err := h.handler(event); err != nil {
            log.Printf("Handler failed: %v", err)
            // 可以选择不标记为已消费,触发重试
        } else {
            session.MarkMessage(message, "") // 标记为已消费
            log.Printf("Processed order: %s", event.OrderID)
        }
    }

    return nil
}

// 订阅订单创建事件
func (c *KafkaConsumer) SubscribeOrderCreated(handler func(OrderCreatedEvent) error) error {
    topics := []string{"order-created"}
    h := ConsumerGroupHandler{handler: handler}

    ctx := context.Background()
    for {
        if err := c.consumer.Consume(ctx, topics, h); err != nil {
            return err
        }
    }
}

func main() {
    consumer, err := NewKafkaConsumer([]string{"localhost:9092"}, "inventory-service")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // 订阅订单创建事件
    err = consumer.SubscribeOrderCreated(func(event OrderCreatedEvent) error {
        // 业务逻辑:扣减库存
        log.Printf("Deducting stock for order: %s", event.OrderID)
        return nil
    })

    if err != nil {
        log.Fatal(err)
    }
}

协议选择

决策树

需要低延迟、高性能?
├─ 是 → gRPC
└─ 否 → 需要浏览器直接调用?
         ├─ 是 → REST
         └─ 否 → 需要解耦、异步?
                  ├─ 是 → 消息队列
                  └─ 否 → REST/gRPC

场景选择

场景推荐协议原因
外部APIREST浏览器友好、工具成熟
移动App APIREST/gRPCREST简单,gRPC省流量
内部服务调用gRPC高性能、强类型
实时通信gRPC Stream/WebSocket双向流式
事件通知消息队列解耦、可靠
批量处理消息队列削峰填谷
日志收集Kafka高吞吐

服务网格

什么是Service Mesh

定义:专门处理服务间通信的基础设施层

核心功能:

 服务发现
 负载均衡
 熔断降级
 限流
 超时重试
 链路追踪
 流量管理
 安全(mTLS)

架构

┌────────────────────────────────────────┐
│          Control Plane                 │
│  (Istio Pilot, Mixer, Citadel)        │
└────────────────────────────────────────┘
             ↓ 配置
┌──────────────────────────────────────────┐
│          Data Plane (Sidecar Proxy)      │
│                                          │
│  ┌──────┐ ←→ Proxy ←→ Proxy ←→ ┌──────┐│
│  │Service│           Envoy      │Service││
│  │  A   │                       │  B   ││
│  └──────┘                       └──────┘│
└──────────────────────────────────────────┘

所有服务间通信都通过Sidecar Proxy
应用无感知,无需修改代码

Istio示例

部署服务到Istio:

# productservice.yaml
apiVersion: v1
kind: Service
metadata:
  name: product-service
spec:
  ports:
  - port: 8080
    name: http
  selector:
    app: product-service
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: product-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: product-service
  template:
    metadata:
      labels:
        app: product-service
    spec:
      containers:
      - name: product-service
        image: product-service:v1
        ports:
        - containerPort: 8080

流量管理(VirtualService):

# 金丝雀发布:90%流量到v1,10%流量到v2
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: product-service
spec:
  hosts:
  - product-service
  http:
  - match:
    - headers:
        user:
          exact: beta-tester
    route:
    - destination:
        host: product-service
        subset: v2
  - route:
    - destination:
        host: product-service
        subset: v1
      weight: 90
    - destination:
        host: product-service
        subset: v2
      weight: 10
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: product-service
spec:
  host: product-service
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2

超时和重试:

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: product-service
spec:
  hosts:
  - product-service
  http:
  - route:
    - destination:
        host: product-service
    timeout: 3s
    retries:
      attempts: 3
      perTryTimeout: 1s
      retryOn: 5xx,connect-failure,refused-stream

熔断:

apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: product-service
spec:
  host: product-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        http2MaxRequests: 100
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 50

实战案例

订单-库存-支付的通信设计

架构:

┌──────────────┐
│   API GW     │
└──────────────┘
       ↓ REST
┌──────────────┐
│Order Service │
└──────────────┘
   ↓ gRPC         ↓ 消息队列
┌──────────────┐  ↓
│Product       │  ↓
│Service       │  ↓
└──────────────┘  ↓
                  ↓
            ┌──────────────┐
            │Event Bus     │
            │(Kafka)       │
            └──────────────┘
                  ↓
        ┌─────────┴─────────┐
        ↓                   ↓
┌──────────────┐    ┌──────────────┐
│Inventory     │    │Payment       │
│Service       │    │Service       │
└──────────────┘    └──────────────┘

通信选择:

  1. API Gateway → Order Service:REST

    • 原因:外部接口,浏览器友好
  2. Order Service → Product Service:gRPC

    • 原因:内部同步调用,需要低延迟
  3. Order Service → Inventory/Payment:消息队列

    • 原因:异步处理,解耦,保证最终一致性

代码实现:

// Order Service
type OrderService struct {
    productClient pb.ProductServiceClient  // gRPC客户端
    eventBus      *KafkaProducer           // Kafka生产者
}

func (svc *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) (*Order, error) {
    // 1. 同步调用Product Service检查商品信息(gRPC)
    for _, item := range req.Items {
        product, err := svc.productClient.GetProduct(ctx, &pb.GetProductRequest{
            Id: item.ProductID,
        })
        if err != nil {
            return nil, fmt.Errorf("product not found: %s", item.ProductID)
        }

        // 验证价格
        if product.Product.Price != item.Price {
            return nil, fmt.Errorf("price mismatch")
        }
    }

    // 2. 创建订单
    order := &Order{
        ID:              generateID(),
        UserID:          req.UserID,
        Items:           req.Items,
        Status:          OrderStatusPending,
        ShippingAddress: req.ShippingAddress,
        CreatedAt:       time.Now(),
    }

    // 3. 保存订单
    if err := svc.orderRepo.Save(order); err != nil {
        return nil, err
    }

    // 4. 发布订单创建事件(Kafka)
    event := OrderCreatedEvent{
        OrderID: order.ID,
        UserID:  order.UserID,
        Items:   order.Items,
        Amount:  calculateTotal(order.Items),
    }

    if err := svc.eventBus.PublishOrderCreated(event); err != nil {
        // 记录日志,但不影响订单创建
        log.Printf("Failed to publish event: %v", err)
    }

    return order, nil
}
// Inventory Service(监听订单创建事件)
func (svc *InventoryService) HandleOrderCreated(event OrderCreatedEvent) error {
    // 1. 扣减库存
    for _, item := range event.Items {
        if err := svc.inventoryRepo.DeductStock(item.ProductID, item.Quantity); err != nil {
            // 库存不足,发布失败事件
            svc.eventBus.PublishStockDeductionFailed(StockDeductionFailedEvent{
                OrderID:   event.OrderID,
                ProductID: item.ProductID,
                Reason:    "insufficient stock",
            })
            return err
        }
    }

    // 2. 发布库存扣减成功事件
    svc.eventBus.PublishStockDeducted(StockDeductedEvent{
        OrderID: event.OrderID,
        Items:   event.Items,
    })

    log.Printf("Stock deducted for order: %s", event.OrderID)
    return nil
}
// Payment Service(监听库存扣减成功事件)
func (svc *PaymentService) HandleStockDeducted(event StockDeductedEvent) error {
    // 1. 创建支付订单
    payment := &Payment{
        OrderID:   event.OrderID,
        Amount:    calculateAmount(event.Items),
        Status:    PaymentStatusPending,
        CreatedAt: time.Now(),
    }

    if err := svc.paymentRepo.Save(payment); err != nil {
        return err
    }

    // 2. 调用第三方支付
    if err := svc.callPaymentGateway(payment); err != nil {
        // 支付失败,发布事件
        svc.eventBus.PublishPaymentFailed(PaymentFailedEvent{
            OrderID: event.OrderID,
            Reason:  err.Error(),
        })
        return err
    }

    // 3. 发布支付成功事件
    svc.eventBus.PublishPaymentSuccess(PaymentSuccessEvent{
        OrderID: event.OrderID,
        Amount:  payment.Amount,
    })

    log.Printf("Payment completed for order: %s", event.OrderID)
    return nil
}

面试问答

同步通信和异步通信如何选择?

答案:

同步通信适用:

 需要立即响应(查询、验证)
 强依赖(必须等待结果)
 实时性要求高

示例:
- 查询商品详情
- 验证用户权限
- 检查库存

异步通信适用:

 非实时(可延迟处理)
 解耦(生产者不关心消费者)
 削峰填谷(缓冲高峰流量)
 事件通知(一对多)

示例:
- 发送通知邮件
- 生成订单报表
- 更新搜索索引
- 事件溯源

实践建议:

// 混合使用
func (svc *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
    // 1. 同步:检查库存(必须等待)
    available, err := svc.inventoryClient.CheckStock(req.Items)
    if err != nil || !available {
        return nil, errors.New("insufficient stock")
    }

    // 2. 创建订单
    order := createOrder(req)

    // 3. 异步:发送确认邮件(不等待)
    svc.eventBus.Publish(OrderCreatedEvent{OrderID: order.ID})

    return order, nil
}

REST和gRPC有什么区别?如何选择?

答案:

核心区别:

维度RESTgRPC
协议HTTP/1.1HTTP/2
数据格式JSON(文本)Protobuf(二进制)
性能中等高(3-10倍)
可读性高低
浏览器支持原生需grpc-web
流式传输

性能对比:

同样请求:

REST:
- Payload: 1KB (JSON)
- 延迟: 10ms

gRPC:
- Payload: 200B (Protobuf,压缩5倍)
- 延迟: 2ms(HTTP/2多路复用)

性能提升:5-10倍

选择建议:

REST:
 外部API(浏览器/移动端)
 公共API(开放给第三方)
 简单CRUD
 快速开发

gRPC:
 内部服务间通信
 高性能要求
 流式传输(实时数据)
 多语言(protobuf跨语言)

如何保证消息队列的可靠性?

答案:

1. 消息持久化

// RabbitMQ持久化
err := ch.Publish(
    "orders",
    "order.created",
    false,
    false,
    amqp.Publishing{
        DeliveryMode: amqp.Persistent, // 持久化消息
        Body:         body,
    },
)

// 队列也要持久化
queue, err := ch.QueueDeclare(
    "order-queue",
    true,  // durable: 持久化队列
    false,
    false,
    false,
    nil,
)

2. 消费确认(ACK)

// 手动ACK
msgs, err := ch.Consume(
    queue.Name,
    "",
    false,  // autoAck: false(手动确认)
    false,
    false,
    false,
    nil,
)

for msg := range msgs {
    if err := handleMessage(msg); err != nil {
        msg.Nack(false, true)  // 处理失败,重新入队
    } else {
        msg.Ack(false)  // 处理成功,确认
    }
}

3. 消息幂等

// 使用唯一ID防止重复消费
type OrderCreatedEvent struct {
    EventID string  // 唯一事件ID
    OrderID string
}

func (h *Handler) HandleOrderCreated(event OrderCreatedEvent) error {
    // 检查是否已处理
    if h.eventStore.Exists(event.EventID) {
        log.Printf("Event already processed: %s", event.EventID)
        return nil  // 直接返回,不重复处理
    }

    // 处理业务逻辑
    if err := h.processOrder(event.OrderID); err != nil {
        return err
    }

    // 记录已处理
    h.eventStore.Save(event.EventID)
    return nil
}

4. 死信队列

# RabbitMQ死信队列配置
args := amqp.Table{
    "x-dead-letter-exchange": "dlx",
    "x-dead-letter-routing-key": "order.failed",
    "x-message-ttl": 300000,  // 5分钟
}

queue, err := ch.QueueDeclare(
    "order-queue",
    true,
    false,
    false,
    false,
    args,
)

什么是Service Mesh?解决了什么问题?

答案:

定义:

Service Mesh是专门处理服务间通信的基础设施层
通过Sidecar Proxy(边车代理)拦截所有网络流量

架构:

传统微服务:
Service A → 直接调用 → Service B
- 每个服务自己实现:重试、超时、熔断、限流
- 重复代码,维护困难

Service Mesh:
Service A → Sidecar Proxy → Sidecar Proxy → Service B
           ↑                              ↑
           └─────── Control Plane ────────┘
- 通信逻辑下沉到Proxy
- 应用无感知,零侵入

解决的问题:

  1. 服务发现
无需硬编码服务地址
Sidecar自动发现服务实例
  1. 负载均衡
智能路由:轮询、权重、一致性哈希
自动剔除故障实例
  1. 熔断限流
# Istio配置,无需修改代码
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
spec:
  trafficPolicy:
    outlierDetection:
      consecutiveErrors: 5
      baseEjectionTime: 30s
  1. 安全
自动mTLS加密
无需修改应用代码
  1. 可观测性
自动收集:
- 指标(Prometheus)
- 日志(Fluentd)
- 链路追踪(Jaeger)

主流方案:

Istio:功能最全,复杂
Linkerd:轻量级,易用
Consul Connect:Consul生态

微服务通信如何做超时和重试?

答案:

1. 超时控制

// 使用context超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

resp, err := client.GetProduct(ctx, &pb.GetProductRequest{Id: productID})
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        log.Println("Request timeout")
    }
    return err
}

2. 重试策略

// 指数退避重试
func retryWithBackoff(fn func() error, maxRetries int) error {
    backoff := time.Second

    for i := 0; i < maxRetries; i++ {
        err := fn()
        if err == nil {
            return nil
        }

        // 特定错误不重试
        if isNonRetryableError(err) {
            return err
        }

        log.Printf("Retry %d/%d after %v", i+1, maxRetries, backoff)
        time.Sleep(backoff)

        // 指数退避:1s, 2s, 4s, 8s...
        backoff *= 2
        if backoff > 30*time.Second {
            backoff = 30 * time.Second
        }
    }

    return fmt.Errorf("max retries exceeded")
}

// 使用
err := retryWithBackoff(func() error {
    return client.CallService()
}, 3)

3. 重试最佳实践

// 完整重试策略
type RetryConfig struct {
    MaxRetries     int
    InitialBackoff time.Duration
    MaxBackoff     time.Duration
    Multiplier     float64
    RetryableErrors []error
}

func (c *RetryConfig) Execute(fn func() error) error {
    backoff := c.InitialBackoff

    for i := 0; i < c.MaxRetries; i++ {
        err := fn()
        if err == nil {
            return nil
        }

        // 检查是否可重试
        if !c.isRetryable(err) {
            return err
        }

        // 最后一次重试不需要等待
        if i == c.MaxRetries-1 {
            return err
        }

        // 等待
        time.Sleep(backoff)

        // 计算下次退避时间
        backoff = time.Duration(float64(backoff) * c.Multiplier)
        if backoff > c.MaxBackoff {
            backoff = c.MaxBackoff
        }
    }

    return fmt.Errorf("max retries exceeded")
}

func (c *RetryConfig) isRetryable(err error) bool {
    for _, retryableErr := range c.RetryableErrors {
        if errors.Is(err, retryableErr) {
            return true
        }
    }
    return false
}

4. Istio自动重试

# 配置级别的重试,无需修改代码
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: product-service
spec:
  hosts:
  - product-service
  http:
  - route:
    - destination:
        host: product-service
    timeout: 3s
    retries:
      attempts: 3
      perTryTimeout: 1s
      retryOn: 5xx,reset,connect-failure

注意事项:

⚠️ 幂等性:重试必须保证操作幂等
⚠️ 超时传递:下游超时要小于上游
⚠️ 重试风暴:限制同时重试数量
⚠️ 熔断配合:重试失败多次触发熔断

参考资料

  • gRPC Documentation
  • Protocol Buffers
  • RabbitMQ Tutorials
  • Apache Kafka Documentation
  • Istio Documentation
  • Martin Fowler - Microservices
Prev
第2章:服务拆分与边界
Next
第4章:数据一致性方案