第3章:服务间通信
通信模式概述
微服务通信的核心问题
单体应用:
Service A → 方法调用 → Service B
特点:
进程内调用(纳秒级)
可靠(不会丢失)
事务保证(ACID)
微服务:
Service A → 网络调用 → Service B
挑战:
网络延迟(毫秒级)
网络故障(超时、丢包)
分布式事务(复杂)
服务发现(动态IP)
通信模式分类
1. 按交互方式
同步通信(Request-Response):
Client ───请求───> Server
<───响应───
特点:
- 阻塞等待响应
- 实时性强
- 强耦合
示例:HTTP、gRPC
异步通信(Message-Based):
Producer ───消息───> Message Queue ───> Consumer
特点:
- 非阻塞
- 解耦
- 削峰填谷
示例:RabbitMQ、Kafka
2. 按通信模式
一对一(One-to-One):
Client → Server
模式:
- Request/Response(请求/响应)
- Async Request/Response(异步请求/响应)
- One-way Notification(单向通知)
一对多(One-to-Many):
Publisher → Topic → Subscriber1
→ Subscriber2
→ Subscriber3
模式:
- Publish/Subscribe(发布/订阅)
- Event Streaming(事件流)
同步通信
REST API
定义
REST(Representational State Transfer):基于HTTP的架构风格
核心原则:
1. 资源(Resource):一切皆资源
2. URI:唯一标识资源
3. HTTP方法:操作资源
- GET:查询
- POST:创建
- PUT:更新(全量)
- PATCH:更新(部分)
- DELETE:删除
4. 无状态:每个请求独立
5. 统一接口
代码示例(Go + Gin)
Product Service API:
package main
import (
"net/http"
"github.com/gin-gonic/gin"
"time"
)
// 商品模型
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Stock int `json:"stock"`
CreatedAt time.Time `json:"created_at"`
}
// Product Service
type ProductService struct {
products map[string]*Product
}
func NewProductService() *ProductService {
return &ProductService{
products: make(map[string]*Product),
}
}
func main() {
router := gin.Default()
service := NewProductService()
// RESTful API路由
v1 := router.Group("/api/v1")
{
products := v1.Group("/products")
{
products.POST("", service.CreateProduct) // 创建商品
products.GET("/:id", service.GetProduct) // 查询商品
products.PUT("/:id", service.UpdateProduct) // 更新商品
products.DELETE("/:id", service.DeleteProduct) // 删除商品
products.GET("", service.ListProducts) // 商品列表
}
}
router.Run(":8080")
}
// 创建商品
func (s *ProductService) CreateProduct(c *gin.Context) {
var product Product
if err := c.ShouldBindJSON(&product); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
product.ID = generateID()
product.CreatedAt = time.Now()
s.products[product.ID] = &product
c.JSON(http.StatusCreated, product)
}
// 查询商品
func (s *ProductService) GetProduct(c *gin.Context) {
id := c.Param("id")
product, exists := s.products[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
return
}
c.JSON(http.StatusOK, product)
}
// 更新商品
func (s *ProductService) UpdateProduct(c *gin.Context) {
id := c.Param("id")
product, exists := s.products[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
return
}
var updateData Product
if err := c.ShouldBindJSON(&updateData); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 更新字段
product.Name = updateData.Name
product.Description = updateData.Description
product.Price = updateData.Price
product.Stock = updateData.Stock
c.JSON(http.StatusOK, product)
}
// 删除商品
func (s *ProductService) DeleteProduct(c *gin.Context) {
id := c.Param("id")
if _, exists := s.products[id]; !exists {
c.JSON(http.StatusNotFound, gin.H{"error": "product not found"})
return
}
delete(s.products, id)
c.JSON(http.StatusNoContent, nil)
}
// 商品列表
func (s *ProductService) ListProducts(c *gin.Context) {
products := make([]*Product, 0, len(s.products))
for _, p := range s.products {
products = append(products, p)
}
c.JSON(http.StatusOK, gin.H{
"total": len(products),
"items": products,
})
}
func generateID() string {
return fmt.Sprintf("PROD-%d", time.Now().UnixNano())
}
HTTP Client调用:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// Product Client
type ProductClient struct {
baseURL string
httpClient *http.Client
}
func NewProductClient(baseURL string) *ProductClient {
return &ProductClient{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 5 * time.Second,
},
}
}
// 创建商品
func (c *ProductClient) CreateProduct(product *Product) (*Product, error) {
data, _ := json.Marshal(product)
resp, err := c.httpClient.Post(
c.baseURL+"/api/v1/products",
"application/json",
bytes.NewBuffer(data),
)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
var result Product
json.NewDecoder(resp.Body).Decode(&result)
return &result, nil
}
// 查询商品
func (c *ProductClient) GetProduct(id string) (*Product, error) {
resp, err := c.httpClient.Get(c.baseURL + "/api/v1/products/" + id)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("product not found")
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
var product Product
json.NewDecoder(resp.Body).Decode(&product)
return &product, nil
}
// 使用示例
func main() {
client := NewProductClient("http://localhost:8080")
// 创建商品
product := &Product{
Name: "iPhone 15",
Description: "最新款iPhone",
Price: 5999.00,
Stock: 100,
}
created, err := client.CreateProduct(product)
if err != nil {
panic(err)
}
fmt.Printf("Created: %+v\n", created)
// 查询商品
fetched, err := client.GetProduct(created.ID)
if err != nil {
panic(err)
}
fmt.Printf("Fetched: %+v\n", fetched)
}
gRPC
定义
gRPC:Google开源的高性能RPC框架
特点:
HTTP/2协议(多路复用)
Protocol Buffers(高效序列化)
强类型(类型安全)
支持流式传输
性能高(比REST快3-10倍)
Protocol Buffers定义
product.proto:
syntax = "proto3";
package product;
option go_package = "github.com/yourapp/product/pb";
// 商品服务
service ProductService {
// 创建商品
rpc CreateProduct(CreateProductRequest) returns (ProductResponse);
// 查询商品
rpc GetProduct(GetProductRequest) returns (ProductResponse);
// 更新商品
rpc UpdateProduct(UpdateProductRequest) returns (ProductResponse);
// 删除商品
rpc DeleteProduct(DeleteProductRequest) returns (DeleteProductResponse);
// 商品列表(服务端流式)
rpc ListProducts(ListProductsRequest) returns (stream ProductResponse);
// 批量检查库存(客户端流式)
rpc CheckStockBatch(stream CheckStockRequest) returns (CheckStockBatchResponse);
// 实时库存监控(双向流式)
rpc MonitorStock(stream MonitorStockRequest) returns (stream StockUpdateResponse);
}
// 商品消息
message Product {
string id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
int64 created_at = 6;
}
// 请求/响应消息
message CreateProductRequest {
string name = 1;
string description = 2;
double price = 3;
int32 stock = 4;
}
message GetProductRequest {
string id = 1;
}
message UpdateProductRequest {
string id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
}
message DeleteProductRequest {
string id = 1;
}
message ProductResponse {
Product product = 1;
}
message DeleteProductResponse {
bool success = 1;
}
message ListProductsRequest {
int32 page = 1;
int32 page_size = 2;
}
message CheckStockRequest {
string product_id = 1;
int32 quantity = 2;
}
message CheckStockBatchResponse {
bool all_available = 1;
repeated string unavailable_products = 2;
}
message MonitorStockRequest {
string product_id = 1;
}
message StockUpdateResponse {
string product_id = 1;
int32 current_stock = 2;
int64 timestamp = 3;
}
gRPC Server实现
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"time"
pb "github.com/yourapp/product/pb"
"google.golang.org/grpc"
)
// gRPC服务实现
type ProductServer struct {
pb.UnimplementedProductServiceServer
products map[string]*pb.Product
mu sync.RWMutex
}
func NewProductServer() *ProductServer {
return &ProductServer{
products: make(map[string]*pb.Product),
}
}
// 创建商品
func (s *ProductServer) CreateProduct(ctx context.Context, req *pb.CreateProductRequest) (*pb.ProductResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
product := &pb.Product{
Id: generateID(),
Name: req.Name,
Description: req.Description,
Price: req.Price,
Stock: req.Stock,
CreatedAt: time.Now().Unix(),
}
s.products[product.Id] = product
log.Printf("Created product: %s", product.Id)
return &pb.ProductResponse{Product: product}, nil
}
// 查询商品
func (s *ProductServer) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.ProductResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
product, exists := s.products[req.Id]
if !exists {
return nil, fmt.Errorf("product not found: %s", req.Id)
}
return &pb.ProductResponse{Product: product}, nil
}
// 更新商品
func (s *ProductServer) UpdateProduct(ctx context.Context, req *pb.UpdateProductRequest) (*pb.ProductResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
product, exists := s.products[req.Id]
if !exists {
return nil, fmt.Errorf("product not found: %s", req.Id)
}
product.Name = req.Name
product.Description = req.Description
product.Price = req.Price
product.Stock = req.Stock
return &pb.ProductResponse{Product: product}, nil
}
// 删除商品
func (s *ProductServer) DeleteProduct(ctx context.Context, req *pb.DeleteProductRequest) (*pb.DeleteProductResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.products, req.Id)
return &pb.DeleteProductResponse{Success: true}, nil
}
// 商品列表(服务端流式)
func (s *ProductServer) ListProducts(req *pb.ListProductsRequest, stream pb.ProductService_ListProductsServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, product := range s.products {
if err := stream.Send(&pb.ProductResponse{Product: product}); err != nil {
return err
}
}
return nil
}
// 批量检查库存(客户端流式)
func (s *ProductServer) CheckStockBatch(stream pb.ProductService_CheckStockBatchServer) error {
unavailable := []string{}
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕
return stream.SendAndClose(&pb.CheckStockBatchResponse{
AllAvailable: len(unavailable) == 0,
UnavailableProducts: unavailable,
})
}
if err != nil {
return err
}
// 检查库存
s.mu.RLock()
product, exists := s.products[req.ProductId]
s.mu.RUnlock()
if !exists || product.Stock < req.Quantity {
unavailable = append(unavailable, req.ProductId)
}
}
}
// 实时库存监控(双向流式)
func (s *ProductServer) MonitorStock(stream pb.ProductService_MonitorStockServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 模拟定期发送库存更新
go func(productID string) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
s.mu.RLock()
product, exists := s.products[productID]
s.mu.RUnlock()
if !exists {
return
}
stream.Send(&pb.StockUpdateResponse{
ProductId: productID,
CurrentStock: product.Stock,
Timestamp: time.Now().Unix(),
})
}
}(req.ProductId)
}
}
func main() {
// 启动gRPC服务器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterProductServiceServer(grpcServer, NewProductServer())
log.Println("gRPC server listening on :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
gRPC Client调用
package main
import (
"context"
"io"
"log"
"time"
pb "github.com/yourapp/product/pb"
"google.golang.org/grpc"
)
// gRPC客户端
type ProductClient struct {
client pb.ProductServiceClient
conn *grpc.ClientConn
}
func NewProductClient(address string) (*ProductClient, error) {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
return nil, err
}
return &ProductClient{
client: pb.NewProductServiceClient(conn),
conn: conn,
}, nil
}
func (c *ProductClient) Close() {
c.conn.Close()
}
// 创建商品
func (c *ProductClient) CreateProduct(name, description string, price float64, stock int32) (*pb.Product, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := c.client.CreateProduct(ctx, &pb.CreateProductRequest{
Name: name,
Description: description,
Price: price,
Stock: stock,
})
if err != nil {
return nil, err
}
return resp.Product, nil
}
// 查询商品
func (c *ProductClient) GetProduct(id string) (*pb.Product, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := c.client.GetProduct(ctx, &pb.GetProductRequest{Id: id})
if err != nil {
return nil, err
}
return resp.Product, nil
}
// 商品列表(服务端流式)
func (c *ProductClient) ListProducts() ([]*pb.Product, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := c.client.ListProducts(ctx, &pb.ListProductsRequest{})
if err != nil {
return nil, err
}
var products []*pb.Product
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
products = append(products, resp.Product)
}
return products, nil
}
// 批量检查库存(客户端流式)
func (c *ProductClient) CheckStockBatch(items []struct {
ProductID string
Quantity int32
}) (bool, []string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := c.client.CheckStockBatch(ctx)
if err != nil {
return false, nil, err
}
// 发送所有检查请求
for _, item := range items {
if err := stream.Send(&pb.CheckStockRequest{
ProductId: item.ProductID,
Quantity: item.Quantity,
}); err != nil {
return false, nil, err
}
}
// 接收结果
resp, err := stream.CloseAndRecv()
if err != nil {
return false, nil, err
}
return resp.AllAvailable, resp.UnavailableProducts, nil
}
// 使用示例
func main() {
client, err := NewProductClient("localhost:50051")
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer client.Close()
// 1. 创建商品
product, err := client.CreateProduct(
"iPhone 15",
"最新款iPhone",
5999.00,
100,
)
if err != nil {
log.Fatalf("CreateProduct failed: %v", err)
}
log.Printf("Created: %+v", product)
// 2. 查询商品
fetched, err := client.GetProduct(product.Id)
if err != nil {
log.Fatalf("GetProduct failed: %v", err)
}
log.Printf("Fetched: %+v", fetched)
// 3. 商品列表(流式)
products, err := client.ListProducts()
if err != nil {
log.Fatalf("ListProducts failed: %v", err)
}
log.Printf("Total products: %d", len(products))
// 4. 批量检查库存
available, unavailable, err := client.CheckStockBatch([]struct {
ProductID string
Quantity int32
}{
{product.Id, 10},
{product.Id, 200}, // 库存不足
})
if err != nil {
log.Fatalf("CheckStockBatch failed: %v", err)
}
log.Printf("All available: %v, Unavailable: %v", available, unavailable)
}
REST vs gRPC对比
| 维度 | REST | gRPC |
|---|---|---|
| 协议 | HTTP/1.1 | HTTP/2 |
| 数据格式 | JSON/XML | Protocol Buffers |
| 性能 | 中等 | 高(3-10倍) |
| 可读性 | 高(文本) | 低(二进制) |
| 浏览器支持 | 原生支持 | 需要grpc-web |
| 流式传输 | ||
| 类型安全 | 弱(JSON) | 强(protobuf) |
| 工具支持 | 成熟(Postman) | 较少 |
| 学习曲线 | 低 | 中等 |
选择建议:
REST适用:
外部API(浏览器/移动端)
公共API
简单CRUD
快速开发
gRPC适用:
内部服务间通信
高性能要求
实时流式传输
强类型约束
异步通信
消息队列
核心概念
Producer-Consumer模式:
Producer → Message Queue → Consumer
特点:
解耦:生产者和消费者独立
异步:生产者不等待消费者
削峰:缓冲高峰流量
可靠性:消息持久化
发布-订阅模式:
Publisher → Topic → Subscriber 1
→ Subscriber 2
→ Subscriber 3
特点:
一对多
广播消息
解耦订阅者
RabbitMQ示例
Producer(生产者):
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
)
// 订单创建事件
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
}
// RabbitMQ Producer
type RabbitMQProducer struct {
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitMQProducer(url string) (*RabbitMQProducer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
return &RabbitMQProducer{
conn: conn,
channel: ch,
}, nil
}
func (p *RabbitMQProducer) Close() {
p.channel.Close()
p.conn.Close()
}
// 发布订单创建事件
func (p *RabbitMQProducer) PublishOrderCreated(event OrderCreatedEvent) error {
// 1. 声明交换机
err := p.channel.ExchangeDeclare(
"orders", // 交换机名称
"topic", // 类型:topic
true, // 持久化
false, // 不自动删除
false, // 非internal
false, // 不等待
nil, // 参数
)
if err != nil {
return err
}
// 2. 序列化消息
body, err := json.Marshal(event)
if err != nil {
return err
}
// 3. 发布消息
err = p.channel.Publish(
"orders", // 交换机
"order.created", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 持久化
},
)
if err != nil {
return err
}
log.Printf("Published order created: %s", event.OrderID)
return nil
}
func main() {
producer, err := NewRabbitMQProducer("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// 发布订单创建事件
event := OrderCreatedEvent{
OrderID: "ORD-001",
UserID: "USER-001",
Amount: 199.99,
}
if err := producer.PublishOrderCreated(event); err != nil {
log.Fatal(err)
}
}
Consumer(消费者):
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
)
// RabbitMQ Consumer
type RabbitMQConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitMQConsumer(url string) (*RabbitMQConsumer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
return &RabbitMQConsumer{
conn: conn,
channel: ch,
}, nil
}
func (c *RabbitMQConsumer) Close() {
c.channel.Close()
c.conn.Close()
}
// 订阅订单创建事件
func (c *RabbitMQConsumer) SubscribeOrderCreated(handler func(OrderCreatedEvent) error) error {
// 1. 声明交换机
err := c.channel.ExchangeDeclare(
"orders",
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// 2. 声明队列
queue, err := c.channel.QueueDeclare(
"inventory-service-order-created", // 队列名称
true, // 持久化
false, // 不自动删除
false, // 非排他
false, // 不等待
nil, // 参数
)
if err != nil {
return err
}
// 3. 绑定队列到交换机
err = c.channel.QueueBind(
queue.Name, // 队列名称
"order.created", // routing key
"orders", // 交换机
false, // 不等待
nil, // 参数
)
if err != nil {
return err
}
// 4. 消费消息
msgs, err := c.channel.Consume(
queue.Name, // 队列
"", // consumer tag
false, // 手动ACK
false, // 非排他
false, // no-local
false, // 不等待
nil, // 参数
)
if err != nil {
return err
}
log.Println("Waiting for messages...")
// 5. 处理消息
go func() {
for msg := range msgs {
var event OrderCreatedEvent
if err := json.Unmarshal(msg.Body, &event); err != nil {
log.Printf("Failed to unmarshal: %v", err)
msg.Nack(false, false) // 拒绝消息
continue
}
// 处理事件
if err := handler(event); err != nil {
log.Printf("Handler failed: %v", err)
msg.Nack(false, true) // 重新入队
} else {
msg.Ack(false) // 确认消息
log.Printf("Processed order: %s", event.OrderID)
}
}
}()
return nil
}
func main() {
consumer, err := NewRabbitMQConsumer("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// 订阅订单创建事件
err = consumer.SubscribeOrderCreated(func(event OrderCreatedEvent) error {
// 业务逻辑:扣减库存
log.Printf("Deducting stock for order: %s", event.OrderID)
return nil
})
if err != nil {
log.Fatal(err)
}
// 阻塞主线程
select {}
}
Kafka示例
Producer:
package main
import (
"encoding/json"
"log"
"github.com/Shopify/sarama"
)
// Kafka Producer
type KafkaProducer struct {
producer sarama.SyncProducer
}
func NewKafkaProducer(brokers []string) (*KafkaProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本
config.Producer.Retry.Max = 5
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &KafkaProducer{producer: producer}, nil
}
func (p *KafkaProducer) Close() {
p.producer.Close()
}
// 发布订单创建事件
func (p *KafkaProducer) PublishOrderCreated(event OrderCreatedEvent) error {
// 1. 序列化
value, err := json.Marshal(event)
if err != nil {
return err
}
// 2. 构造消息
msg := &sarama.ProducerMessage{
Topic: "order-created",
Key: sarama.StringEncoder(event.OrderID), // 相同key保证顺序
Value: sarama.ByteEncoder(value),
}
// 3. 发送消息
partition, offset, err := p.producer.SendMessage(msg)
if err != nil {
return err
}
log.Printf("Message sent to partition %d at offset %d", partition, offset)
return nil
}
func main() {
producer, err := NewKafkaProducer([]string{"localhost:9092"})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
event := OrderCreatedEvent{
OrderID: "ORD-001",
UserID: "USER-001",
Amount: 199.99,
}
if err := producer.PublishOrderCreated(event); err != nil {
log.Fatal(err)
}
}
Consumer:
package main
import (
"context"
"encoding/json"
"log"
"github.com/Shopify/sarama"
)
// Kafka Consumer
type KafkaConsumer struct {
consumer sarama.ConsumerGroup
}
func NewKafkaConsumer(brokers []string, group string) (*KafkaConsumer, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 从最早的消息开始
consumer, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
return nil, err
}
return &KafkaConsumer{consumer: consumer}, nil
}
func (c *KafkaConsumer) Close() {
c.consumer.Close()
}
// 消费者处理器
type ConsumerGroupHandler struct {
handler func(OrderCreatedEvent) error
}
func (h ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
var event OrderCreatedEvent
if err := json.Unmarshal(message.Value, &event); err != nil {
log.Printf("Failed to unmarshal: %v", err)
continue
}
// 处理事件
if err := h.handler(event); err != nil {
log.Printf("Handler failed: %v", err)
// 可以选择不标记为已消费,触发重试
} else {
session.MarkMessage(message, "") // 标记为已消费
log.Printf("Processed order: %s", event.OrderID)
}
}
return nil
}
// 订阅订单创建事件
func (c *KafkaConsumer) SubscribeOrderCreated(handler func(OrderCreatedEvent) error) error {
topics := []string{"order-created"}
h := ConsumerGroupHandler{handler: handler}
ctx := context.Background()
for {
if err := c.consumer.Consume(ctx, topics, h); err != nil {
return err
}
}
}
func main() {
consumer, err := NewKafkaConsumer([]string{"localhost:9092"}, "inventory-service")
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// 订阅订单创建事件
err = consumer.SubscribeOrderCreated(func(event OrderCreatedEvent) error {
// 业务逻辑:扣减库存
log.Printf("Deducting stock for order: %s", event.OrderID)
return nil
})
if err != nil {
log.Fatal(err)
}
}
协议选择
决策树
需要低延迟、高性能?
├─ 是 → gRPC
└─ 否 → 需要浏览器直接调用?
├─ 是 → REST
└─ 否 → 需要解耦、异步?
├─ 是 → 消息队列
└─ 否 → REST/gRPC
场景选择
| 场景 | 推荐协议 | 原因 |
|---|---|---|
| 外部API | REST | 浏览器友好、工具成熟 |
| 移动App API | REST/gRPC | REST简单,gRPC省流量 |
| 内部服务调用 | gRPC | 高性能、强类型 |
| 实时通信 | gRPC Stream/WebSocket | 双向流式 |
| 事件通知 | 消息队列 | 解耦、可靠 |
| 批量处理 | 消息队列 | 削峰填谷 |
| 日志收集 | Kafka | 高吞吐 |
服务网格
什么是Service Mesh
定义:专门处理服务间通信的基础设施层
核心功能:
服务发现
负载均衡
熔断降级
限流
超时重试
链路追踪
流量管理
安全(mTLS)
架构
┌────────────────────────────────────────┐
│ Control Plane │
│ (Istio Pilot, Mixer, Citadel) │
└────────────────────────────────────────┘
↓ 配置
┌──────────────────────────────────────────┐
│ Data Plane (Sidecar Proxy) │
│ │
│ ┌──────┐ ←→ Proxy ←→ Proxy ←→ ┌──────┐│
│ │Service│ Envoy │Service││
│ │ A │ │ B ││
│ └──────┘ └──────┘│
└──────────────────────────────────────────┘
所有服务间通信都通过Sidecar Proxy
应用无感知,无需修改代码
Istio示例
部署服务到Istio:
# productservice.yaml
apiVersion: v1
kind: Service
metadata:
name: product-service
spec:
ports:
- port: 8080
name: http
selector:
app: product-service
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: product-service
spec:
replicas: 3
selector:
matchLabels:
app: product-service
template:
metadata:
labels:
app: product-service
spec:
containers:
- name: product-service
image: product-service:v1
ports:
- containerPort: 8080
流量管理(VirtualService):
# 金丝雀发布:90%流量到v1,10%流量到v2
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: product-service
spec:
hosts:
- product-service
http:
- match:
- headers:
user:
exact: beta-tester
route:
- destination:
host: product-service
subset: v2
- route:
- destination:
host: product-service
subset: v1
weight: 90
- destination:
host: product-service
subset: v2
weight: 10
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: product-service
spec:
host: product-service
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
超时和重试:
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: product-service
spec:
hosts:
- product-service
http:
- route:
- destination:
host: product-service
timeout: 3s
retries:
attempts: 3
perTryTimeout: 1s
retryOn: 5xx,connect-failure,refused-stream
熔断:
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: product-service
spec:
host: product-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
http2MaxRequests: 100
maxRequestsPerConnection: 2
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 50
实战案例
订单-库存-支付的通信设计
架构:
┌──────────────┐
│ API GW │
└──────────────┘
↓ REST
┌──────────────┐
│Order Service │
└──────────────┘
↓ gRPC ↓ 消息队列
┌──────────────┐ ↓
│Product │ ↓
│Service │ ↓
└──────────────┘ ↓
↓
┌──────────────┐
│Event Bus │
│(Kafka) │
└──────────────┘
↓
┌─────────┴─────────┐
↓ ↓
┌──────────────┐ ┌──────────────┐
│Inventory │ │Payment │
│Service │ │Service │
└──────────────┘ └──────────────┘
通信选择:
API Gateway → Order Service:REST
- 原因:外部接口,浏览器友好
Order Service → Product Service:gRPC
- 原因:内部同步调用,需要低延迟
Order Service → Inventory/Payment:消息队列
- 原因:异步处理,解耦,保证最终一致性
代码实现:
// Order Service
type OrderService struct {
productClient pb.ProductServiceClient // gRPC客户端
eventBus *KafkaProducer // Kafka生产者
}
func (svc *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) (*Order, error) {
// 1. 同步调用Product Service检查商品信息(gRPC)
for _, item := range req.Items {
product, err := svc.productClient.GetProduct(ctx, &pb.GetProductRequest{
Id: item.ProductID,
})
if err != nil {
return nil, fmt.Errorf("product not found: %s", item.ProductID)
}
// 验证价格
if product.Product.Price != item.Price {
return nil, fmt.Errorf("price mismatch")
}
}
// 2. 创建订单
order := &Order{
ID: generateID(),
UserID: req.UserID,
Items: req.Items,
Status: OrderStatusPending,
ShippingAddress: req.ShippingAddress,
CreatedAt: time.Now(),
}
// 3. 保存订单
if err := svc.orderRepo.Save(order); err != nil {
return nil, err
}
// 4. 发布订单创建事件(Kafka)
event := OrderCreatedEvent{
OrderID: order.ID,
UserID: order.UserID,
Items: order.Items,
Amount: calculateTotal(order.Items),
}
if err := svc.eventBus.PublishOrderCreated(event); err != nil {
// 记录日志,但不影响订单创建
log.Printf("Failed to publish event: %v", err)
}
return order, nil
}
// Inventory Service(监听订单创建事件)
func (svc *InventoryService) HandleOrderCreated(event OrderCreatedEvent) error {
// 1. 扣减库存
for _, item := range event.Items {
if err := svc.inventoryRepo.DeductStock(item.ProductID, item.Quantity); err != nil {
// 库存不足,发布失败事件
svc.eventBus.PublishStockDeductionFailed(StockDeductionFailedEvent{
OrderID: event.OrderID,
ProductID: item.ProductID,
Reason: "insufficient stock",
})
return err
}
}
// 2. 发布库存扣减成功事件
svc.eventBus.PublishStockDeducted(StockDeductedEvent{
OrderID: event.OrderID,
Items: event.Items,
})
log.Printf("Stock deducted for order: %s", event.OrderID)
return nil
}
// Payment Service(监听库存扣减成功事件)
func (svc *PaymentService) HandleStockDeducted(event StockDeductedEvent) error {
// 1. 创建支付订单
payment := &Payment{
OrderID: event.OrderID,
Amount: calculateAmount(event.Items),
Status: PaymentStatusPending,
CreatedAt: time.Now(),
}
if err := svc.paymentRepo.Save(payment); err != nil {
return err
}
// 2. 调用第三方支付
if err := svc.callPaymentGateway(payment); err != nil {
// 支付失败,发布事件
svc.eventBus.PublishPaymentFailed(PaymentFailedEvent{
OrderID: event.OrderID,
Reason: err.Error(),
})
return err
}
// 3. 发布支付成功事件
svc.eventBus.PublishPaymentSuccess(PaymentSuccessEvent{
OrderID: event.OrderID,
Amount: payment.Amount,
})
log.Printf("Payment completed for order: %s", event.OrderID)
return nil
}
面试问答
同步通信和异步通信如何选择?
答案:
同步通信适用:
需要立即响应(查询、验证)
强依赖(必须等待结果)
实时性要求高
示例:
- 查询商品详情
- 验证用户权限
- 检查库存
异步通信适用:
非实时(可延迟处理)
解耦(生产者不关心消费者)
削峰填谷(缓冲高峰流量)
事件通知(一对多)
示例:
- 发送通知邮件
- 生成订单报表
- 更新搜索索引
- 事件溯源
实践建议:
// 混合使用
func (svc *OrderService) CreateOrder(req CreateOrderRequest) (*Order, error) {
// 1. 同步:检查库存(必须等待)
available, err := svc.inventoryClient.CheckStock(req.Items)
if err != nil || !available {
return nil, errors.New("insufficient stock")
}
// 2. 创建订单
order := createOrder(req)
// 3. 异步:发送确认邮件(不等待)
svc.eventBus.Publish(OrderCreatedEvent{OrderID: order.ID})
return order, nil
}
REST和gRPC有什么区别?如何选择?
答案:
核心区别:
| 维度 | REST | gRPC |
|---|---|---|
| 协议 | HTTP/1.1 | HTTP/2 |
| 数据格式 | JSON(文本) | Protobuf(二进制) |
| 性能 | 中等 | 高(3-10倍) |
| 可读性 | 高 | 低 |
| 浏览器支持 | 原生 | 需grpc-web |
| 流式传输 |
性能对比:
同样请求:
REST:
- Payload: 1KB (JSON)
- 延迟: 10ms
gRPC:
- Payload: 200B (Protobuf,压缩5倍)
- 延迟: 2ms(HTTP/2多路复用)
性能提升:5-10倍
选择建议:
REST:
外部API(浏览器/移动端)
公共API(开放给第三方)
简单CRUD
快速开发
gRPC:
内部服务间通信
高性能要求
流式传输(实时数据)
多语言(protobuf跨语言)
如何保证消息队列的可靠性?
答案:
1. 消息持久化
// RabbitMQ持久化
err := ch.Publish(
"orders",
"order.created",
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
Body: body,
},
)
// 队列也要持久化
queue, err := ch.QueueDeclare(
"order-queue",
true, // durable: 持久化队列
false,
false,
false,
nil,
)
2. 消费确认(ACK)
// 手动ACK
msgs, err := ch.Consume(
queue.Name,
"",
false, // autoAck: false(手动确认)
false,
false,
false,
nil,
)
for msg := range msgs {
if err := handleMessage(msg); err != nil {
msg.Nack(false, true) // 处理失败,重新入队
} else {
msg.Ack(false) // 处理成功,确认
}
}
3. 消息幂等
// 使用唯一ID防止重复消费
type OrderCreatedEvent struct {
EventID string // 唯一事件ID
OrderID string
}
func (h *Handler) HandleOrderCreated(event OrderCreatedEvent) error {
// 检查是否已处理
if h.eventStore.Exists(event.EventID) {
log.Printf("Event already processed: %s", event.EventID)
return nil // 直接返回,不重复处理
}
// 处理业务逻辑
if err := h.processOrder(event.OrderID); err != nil {
return err
}
// 记录已处理
h.eventStore.Save(event.EventID)
return nil
}
4. 死信队列
# RabbitMQ死信队列配置
args := amqp.Table{
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "order.failed",
"x-message-ttl": 300000, // 5分钟
}
queue, err := ch.QueueDeclare(
"order-queue",
true,
false,
false,
false,
args,
)
什么是Service Mesh?解决了什么问题?
答案:
定义:
Service Mesh是专门处理服务间通信的基础设施层
通过Sidecar Proxy(边车代理)拦截所有网络流量
架构:
传统微服务:
Service A → 直接调用 → Service B
- 每个服务自己实现:重试、超时、熔断、限流
- 重复代码,维护困难
Service Mesh:
Service A → Sidecar Proxy → Sidecar Proxy → Service B
↑ ↑
└─────── Control Plane ────────┘
- 通信逻辑下沉到Proxy
- 应用无感知,零侵入
解决的问题:
- 服务发现
无需硬编码服务地址
Sidecar自动发现服务实例
- 负载均衡
智能路由:轮询、权重、一致性哈希
自动剔除故障实例
- 熔断限流
# Istio配置,无需修改代码
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
spec:
trafficPolicy:
outlierDetection:
consecutiveErrors: 5
baseEjectionTime: 30s
- 安全
自动mTLS加密
无需修改应用代码
- 可观测性
自动收集:
- 指标(Prometheus)
- 日志(Fluentd)
- 链路追踪(Jaeger)
主流方案:
Istio:功能最全,复杂
Linkerd:轻量级,易用
Consul Connect:Consul生态
微服务通信如何做超时和重试?
答案:
1. 超时控制
// 使用context超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := client.GetProduct(ctx, &pb.GetProductRequest{Id: productID})
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
log.Println("Request timeout")
}
return err
}
2. 重试策略
// 指数退避重试
func retryWithBackoff(fn func() error, maxRetries int) error {
backoff := time.Second
for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
// 特定错误不重试
if isNonRetryableError(err) {
return err
}
log.Printf("Retry %d/%d after %v", i+1, maxRetries, backoff)
time.Sleep(backoff)
// 指数退避:1s, 2s, 4s, 8s...
backoff *= 2
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
}
return fmt.Errorf("max retries exceeded")
}
// 使用
err := retryWithBackoff(func() error {
return client.CallService()
}, 3)
3. 重试最佳实践
// 完整重试策略
type RetryConfig struct {
MaxRetries int
InitialBackoff time.Duration
MaxBackoff time.Duration
Multiplier float64
RetryableErrors []error
}
func (c *RetryConfig) Execute(fn func() error) error {
backoff := c.InitialBackoff
for i := 0; i < c.MaxRetries; i++ {
err := fn()
if err == nil {
return nil
}
// 检查是否可重试
if !c.isRetryable(err) {
return err
}
// 最后一次重试不需要等待
if i == c.MaxRetries-1 {
return err
}
// 等待
time.Sleep(backoff)
// 计算下次退避时间
backoff = time.Duration(float64(backoff) * c.Multiplier)
if backoff > c.MaxBackoff {
backoff = c.MaxBackoff
}
}
return fmt.Errorf("max retries exceeded")
}
func (c *RetryConfig) isRetryable(err error) bool {
for _, retryableErr := range c.RetryableErrors {
if errors.Is(err, retryableErr) {
return true
}
}
return false
}
4. Istio自动重试
# 配置级别的重试,无需修改代码
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: product-service
spec:
hosts:
- product-service
http:
- route:
- destination:
host: product-service
timeout: 3s
retries:
attempts: 3
perTryTimeout: 1s
retryOn: 5xx,reset,connect-failure
注意事项:
⚠️ 幂等性:重试必须保证操作幂等
⚠️ 超时传递:下游超时要小于上游
⚠️ 重试风暴:限制同时重试数量
⚠️ 熔断配合:重试失败多次触发熔断