HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • Go 架构进阶

    • Go 架构进阶学习手册 - 总目录
    • 01-GMP调度模型深度解析
    • 02-Channel源码剖析
    • 03-内存模型与GC机制
    • 04-垃圾回收器全链路
    • 05-并发模型与锁机制
    • 06-网络模型与Netpoll
    • 07-Runtime全景融合
    • 08-性能优化实战
    • 09-微服务架构实践

09-微服务架构实践

章节概述

微服务架构是现代分布式系统的主流设计模式,Go 语言凭借其高并发、低延迟的特性,成为微服务开发的首选语言。本章将深入解析微服务架构的设计原理,结合实际项目展示如何构建高性能的微服务框架。

学习目标

  • 理解微服务架构的设计原则
  • 掌握服务注册与发现机制
  • 了解熔断限流重试策略
  • 学会链路追踪和监控
  • 能够设计完整的微服务框架

️ 微服务架构设计

整体架构

┌─────────────────────────────────────────────────────────┐
│                   微服务架构                            │
├─────────────────────────────────────────────────────────┤
│  API 网关     服务注册中心   配置中心    监控中心        │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────┐ │
│  │ 路由转发  │  │ 服务发现  │  │ 配置管理  │  │ 监控   │ │
│  │ 负载均衡  │  │ 健康检查  │  │ 动态更新  │  │ 告警   │ │
│  └───────────┘  └───────────┘  └───────────┘  └────────┘ │
│                                                         │
│  业务服务 A   业务服务 B   业务服务 C   业务服务 D      │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────┐ │
│  │ 用户服务  │  │ 订单服务  │  │ 支付服务  │  │ 通知   │ │
│  │ 独立部署  │  │ 独立部署  │  │ 独立部署  │  │ 服务   │ │
│  └───────────┘  └───────────┘  └───────────┘  └────────┘ │
└─────────────────────────────────────────────────────────┘

核心组件

┌─────────────────────────────────────────────────────────┐
│                   核心组件                              │
├─────────────────────────────────────────────────────────┤
│  服务治理    通信机制    数据管理    监控运维            │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────┐ │
│  │ 注册发现  │  │ gRPC/HTTP │  │ 数据库    │  │ 监控   │ │
│  │ 负载均衡  │  │ 消息队列  │  │ 缓存      │  │ 日志   │ │
│  │ 熔断限流  │  │ 事件总线  │  │ 存储      │  │ 追踪   │ │
│  └───────────┘  └───────────┘  └───────────┘  └────────┘ │
└─────────────────────────────────────────────────────────┘

服务注册与发现

服务注册接口

package registry

import (
    "context"
    "time"
)

// 服务实例
type ServiceInstance struct {
    ID       string            `json:"id"`
    Name     string            `json:"name"`
    Address  string            `json:"address"`
    Port     int               `json:"port"`
    Metadata map[string]string `json:"metadata"`
    Tags     []string          `json:"tags"`
}

// 服务注册接口
type Registry interface {
    // 注册服务
    Register(ctx context.Context, instance *ServiceInstance) error
    
    // 注销服务
    Deregister(ctx context.Context, instanceID string) error
    
    // 发现服务
    Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
    
    // 监听服务变化
    Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
    
    // 关闭注册中心
    Close() error
}

Consul 实现

package consul

import (
    "context"
    "fmt"
    "time"
    
    "github.com/hashicorp/consul/api"
    "github.com/hashicorp/consul/api/watch"
)

type ConsulRegistry struct {
    client *api.Client
    config *api.Config
}

func NewConsulRegistry(address string) (*ConsulRegistry, error) {
    config := api.DefaultConfig()
    config.Address = address
    
    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }
    
    return &ConsulRegistry{
        client: client,
        config: config,
    }, nil
}

func (r *ConsulRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
    registration := &api.AgentServiceRegistration{
        ID:      instance.ID,
        Name:    instance.Name,
        Address: instance.Address,
        Port:    instance.Port,
        Tags:    instance.Tags,
        Meta:    instance.Metadata,
        Check: &api.AgentServiceCheck{
            HTTP:                           fmt.Sprintf("http://%s:%d/health", instance.Address, instance.Port),
            Timeout:                        "3s",
            Interval:                       "10s",
            DeregisterCriticalServiceAfter: "30s",
        },
    }
    
    return r.client.Agent().ServiceRegister(registration)
}

func (r *ConsulRegistry) Deregister(ctx context.Context, instanceID string) error {
    return r.client.Agent().ServiceDeregister(instanceID)
}

func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
    services, _, err := r.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }
    
    instances := make([]*ServiceInstance, 0, len(services))
    for _, service := range services {
        instance := &ServiceInstance{
            ID:       service.Service.ID,
            Name:     service.Service.Service,
            Address:  service.Service.Address,
            Port:     service.Service.Port,
            Metadata: service.Service.Meta,
            Tags:     service.Service.Tags,
        }
        instances = append(instances, instance)
    }
    
    return instances, nil
}

func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
    ch := make(chan []*ServiceInstance, 1)
    
    go func() {
        defer close(ch)
        
        for {
            select {
            case <-ctx.Done():
                return
            default:
                instances, err := r.Discover(ctx, serviceName)
                if err != nil {
                    continue
                }
                
                select {
                case ch <- instances:
                case <-ctx.Done():
                    return
                }
                
                time.Sleep(5 * time.Second)
            }
        }
    }()
    
    return ch, nil
}

func (r *ConsulRegistry) Close() error {
    return nil
}

负载均衡

负载均衡接口

package loadbalancer

import (
    "context"
    "math/rand"
    "sync"
    "time"
)

// 负载均衡策略
type Strategy int

const (
    RoundRobin Strategy = iota
    Random
    WeightedRoundRobin
    LeastConnections
)

// 负载均衡器
type LoadBalancer struct {
    strategy Strategy
    instances []*ServiceInstance
    current  int
    mutex    sync.RWMutex
}

func NewLoadBalancer(strategy Strategy) *LoadBalancer {
    return &LoadBalancer{
        strategy: strategy,
        instances: make([]*ServiceInstance, 0),
    }
}

func (lb *LoadBalancer) UpdateInstances(instances []*ServiceInstance) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    lb.instances = instances
}

func (lb *LoadBalancer) Select(ctx context.Context) (*ServiceInstance, error) {
    lb.mutex.RLock()
    defer lb.mutex.RUnlock()
    
    if len(lb.instances) == 0 {
        return nil, fmt.Errorf("no available instances")
    }
    
    switch lb.strategy {
    case RoundRobin:
        return lb.roundRobin()
    case Random:
        return lb.random()
    case WeightedRoundRobin:
        return lb.weightedRoundRobin()
    case LeastConnections:
        return lb.leastConnections()
    default:
        return lb.roundRobin()
    }
}

func (lb *LoadBalancer) roundRobin() (*ServiceInstance, error) {
    instance := lb.instances[lb.current]
    lb.current = (lb.current + 1) % len(lb.instances)
    return instance, nil
}

func (lb *LoadBalancer) random() (*ServiceInstance, error) {
    index := rand.Intn(len(lb.instances))
    return lb.instances[index], nil
}

func (lb *LoadBalancer) weightedRoundRobin() (*ServiceInstance, error) {
    // 简化的加权轮询实现
    totalWeight := 0
    for _, instance := range lb.instances {
        if weight, ok := instance.Metadata["weight"]; ok {
            totalWeight += parseWeight(weight)
        } else {
            totalWeight += 1
        }
    }
    
    if totalWeight == 0 {
        return lb.roundRobin()
    }
    
    current := lb.current
    lb.current = (lb.current + 1) % len(lb.instances)
    
    return lb.instances[current], nil
}

func (lb *LoadBalancer) leastConnections() (*ServiceInstance, error) {
    // 简化的最少连接实现
    minConnections := int64(^uint64(0) >> 1)
    var selected *ServiceInstance
    
    for _, instance := range lb.instances {
        if connections, ok := instance.Metadata["connections"]; ok {
            if connCount := parseConnections(connections); connCount < minConnections {
                minConnections = connCount
                selected = instance
            }
        } else {
            selected = instance
            break
        }
    }
    
    if selected == nil {
        return lb.roundRobin()
    }
    
    return selected, nil
}

func parseWeight(weight string) int {
    // 解析权重
    return 1
}

func parseConnections(connections string) int64 {
    // 解析连接数
    return 0
}

️ 熔断限流

熔断器实现

package circuitbreaker

import (
    "context"
    "sync"
    "time"
)

// 熔断器状态
type State int

const (
    Closed State = iota
    Open
    HalfOpen
)

// 熔断器配置
type Config struct {
    MaxRequests   uint32        // 半开状态最大请求数
    Interval      time.Duration // 统计时间窗口
    Timeout       time.Duration // 熔断超时时间
    ReadyToTrip   func(counts Counts) bool
    OnStateChange func(name string, from State, to State)
}

// 请求统计
type Counts struct {
    Requests             uint32
    TotalRequests        uint32
    ConsecutiveFailures  uint32
    ConsecutiveSuccesses uint32
}

// 熔断器
type CircuitBreaker struct {
    name          string
    maxRequests   uint32
    interval      time.Duration
    timeout       time.Duration
    readyToTrip   func(counts Counts) bool
    onStateChange func(name string, from State, to State)
    
    mutex      sync.Mutex
    state      State
    generation uint64
    counts     Counts
    expiry     time.Time
}

func NewCircuitBreaker(name string, config Config) *CircuitBreaker {
    cb := &CircuitBreaker{
        name:          name,
        maxRequests:   config.MaxRequests,
        interval:      config.Interval,
        timeout:       config.Timeout,
        readyToTrip:   config.ReadyToTrip,
        onStateChange: config.OnStateChange,
        state:         Closed,
    }
    
    if cb.readyToTrip == nil {
        cb.readyToTrip = defaultReadyToTrip
    }
    
    return cb
}

func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
    generation, err := cb.beforeRequest()
    if err != nil {
        return nil, err
    }
    
    defer func() {
        e := recover()
        if e != nil {
            cb.afterRequest(generation, false)
            panic(e)
        }
    }()
    
    result, err := req()
    cb.afterRequest(generation, err == nil)
    return result, err
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    state, generation := cb.currentState(now)
    
    if state == Open {
        return generation, fmt.Errorf("circuit breaker is open")
    }
    
    if state == HalfOpen && cb.counts.Requests >= cb.maxRequests {
        return generation, fmt.Errorf("circuit breaker is half-open and max requests reached")
    }
    
    cb.counts.onRequest()
    return generation, nil
}

func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    state, generation := cb.currentState(now)
    
    if generation != before {
        return
    }
    
    if success {
        cb.onSuccess(state, now)
    } else {
        cb.onFailure(state, now)
    }
}

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
    if cb.state == Open {
        if cb.expiry.Before(now) {
            cb.setState(HalfOpen, now)
            return HalfOpen, cb.generation
        }
        return Open, cb.generation
    }
    
    if cb.state == HalfOpen {
        return HalfOpen, cb.generation
    }
    
    return Closed, cb.generation
}

func (cb *CircuitBreaker) setState(state State, now time.Time) {
    if cb.state == state {
        return
    }
    
    prev := cb.state
    cb.state = state
    cb.generation++
    
    if cb.onStateChange != nil {
        cb.onStateChange(cb.name, prev, state)
    }
}

func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
    cb.counts.onSuccess()
    
    if state == HalfOpen {
        cb.setState(Closed, now)
    }
}

func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
    cb.counts.onFailure()
    
    if state == Closed && cb.readyToTrip(cb.counts) {
        cb.setState(Open, now)
    }
}

func (cb *CircuitBreaker) onRequest() {
    cb.counts.Requests++
    cb.counts.TotalRequests++
}

func (cb *CircuitBreaker) onSuccess() {
    cb.counts.ConsecutiveSuccesses++
    cb.counts.ConsecutiveFailures = 0
}

func (cb *CircuitBreaker) onFailure() {
    cb.counts.ConsecutiveFailures++
    cb.counts.ConsecutiveSuccesses = 0
}

func defaultReadyToTrip(counts Counts) bool {
    return counts.ConsecutiveFailures >= 5
}

限流器实现

package ratelimit

import (
    "context"
    "sync"
    "time"
)

// 限流器接口
type RateLimiter interface {
    Allow() bool
    Wait(ctx context.Context) error
}

// 令牌桶限流器
type TokenBucket struct {
    capacity     int64
    tokens       int64
    refillRate   int64
    lastRefill   time.Time
    mutex        sync.Mutex
}

func NewTokenBucket(capacity, refillRate int64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    tb.lastRefill = now
    
    // 添加令牌
    tokensToAdd := int64(elapsed.Seconds()) * tb.refillRate
    tb.tokens += tokensToAdd
    
    if tb.tokens > tb.capacity {
        tb.tokens = tb.capacity
    }
    
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    
    return false
}

func (tb *TokenBucket) Wait(ctx context.Context) error {
    for {
        if tb.Allow() {
            return nil
        }
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(time.Millisecond):
            continue
        }
    }
}

// 滑动窗口限流器
type SlidingWindow struct {
    windowSize time.Duration
    limit      int64
    requests   []time.Time
    mutex      sync.Mutex
}

func NewSlidingWindow(windowSize time.Duration, limit int64) *SlidingWindow {
    return &SlidingWindow{
        windowSize: windowSize,
        limit:      limit,
        requests:   make([]time.Time, 0),
    }
}

func (sw *SlidingWindow) Allow() bool {
    sw.mutex.Lock()
    defer sw.mutex.Unlock()
    
    now := time.Now()
    cutoff := now.Add(-sw.windowSize)
    
    // 清理过期请求
    for len(sw.requests) > 0 && sw.requests[0].Before(cutoff) {
        sw.requests = sw.requests[1:]
    }
    
    if len(sw.requests) < int(sw.limit) {
        sw.requests = append(sw.requests, now)
        return true
    }
    
    return false
}

func (sw *SlidingWindow) Wait(ctx context.Context) error {
    for {
        if sw.Allow() {
            return nil
        }
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(time.Millisecond):
            continue
        }
    }
}

链路追踪

链路追踪实现

package tracing

import (
    "context"
    "fmt"
    "time"
)

// 链路追踪上下文
type TraceContext struct {
    TraceID string
    SpanID  string
    ParentID string
    Baggage  map[string]string
}

// 链路追踪器
type Tracer struct {
    serviceName string
    endpoint    string
}

func NewTracer(serviceName, endpoint string) *Tracer {
    return &Tracer{
        serviceName: serviceName,
        endpoint:    endpoint,
    }
}

func (t *Tracer) StartSpan(ctx context.Context, operationName string) (context.Context, *Span) {
    span := &Span{
        TraceID:      t.generateTraceID(),
        SpanID:       t.generateSpanID(),
        ParentID:     t.getParentID(ctx),
        OperationName: operationName,
        StartTime:    time.Now(),
        Tags:         make(map[string]string),
        Logs:         make([]Log, 0),
    }
    
    return context.WithValue(ctx, "span", span), span
}

func (t *Tracer) FinishSpan(span *Span) {
    span.EndTime = time.Now()
    span.Duration = span.EndTime.Sub(span.StartTime)
    
    // 发送到追踪系统
    t.sendSpan(span)
}

func (t *Tracer) generateTraceID() string {
    // 生成 TraceID
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

func (t *Tracer) generateSpanID() string {
    // 生成 SpanID
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

func (t *Tracer) getParentID(ctx context.Context) string {
    if span, ok := ctx.Value("span").(*Span); ok {
        return span.SpanID
    }
    return ""
}

func (t *Tracer) sendSpan(span *Span) {
    // 发送到追踪系统
    fmt.Printf("Sending span: %+v\n", span)
}

// 链路追踪 Span
type Span struct {
    TraceID      string
    SpanID       string
    ParentID     string
    OperationName string
    StartTime    time.Time
    EndTime      time.Time
    Duration     time.Duration
    Tags         map[string]string
    Logs         []Log
}

type Log struct {
    Timestamp time.Time
    Fields    map[string]interface{}
}

func (s *Span) SetTag(key, value string) {
    s.Tags[key] = value
}

func (s *Span) Log(fields map[string]interface{}) {
    s.Logs = append(s.Logs, Log{
        Timestamp: time.Now(),
        Fields:    fields,
    })
}

️ 微服务框架实现

框架核心

package microservice

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

// 微服务框架
type Microservice struct {
    name        string
    version     string
    registry    Registry
    lb          LoadBalancer
    breaker     *CircuitBreaker
    limiter     RateLimiter
    tracer      *Tracer
    server      *http.Server
    handlers    map[string]http.HandlerFunc
}

func NewMicroservice(name, version string) *Microservice {
    return &Microservice{
        name:     name,
        version:  version,
        handlers: make(map[string]http.HandlerFunc),
    }
}

func (ms *Microservice) SetRegistry(registry Registry) {
    ms.registry = registry
}

func (ms *Microservice) SetLoadBalancer(lb LoadBalancer) {
    ms.lb = lb
}

func (ms *Microservice) SetCircuitBreaker(breaker *CircuitBreaker) {
    ms.breaker = breaker
}

func (ms *Microservice) SetRateLimiter(limiter RateLimiter) {
    ms.limiter = limiter
}

func (ms *Microservice) SetTracer(tracer *Tracer) {
    ms.tracer = tracer
}

func (ms *Microservice) RegisterHandler(path string, handler http.HandlerFunc) {
    ms.handlers[path] = handler
}

func (ms *Microservice) Start(port int) error {
    mux := http.NewServeMux()
    
    // 注册健康检查
    mux.HandleFunc("/health", ms.healthHandler)
    
    // 注册业务处理器
    for path, handler := range ms.handlers {
        mux.HandleFunc(path, ms.wrapHandler(handler))
    }
    
    ms.server = &http.Server{
        Addr:    fmt.Sprintf(":%d", port),
        Handler: mux,
    }
    
    // 注册到服务发现
    if ms.registry != nil {
        instance := &ServiceInstance{
            ID:      fmt.Sprintf("%s-%d", ms.name, time.Now().Unix()),
            Name:    ms.name,
            Address: "localhost",
            Port:    port,
            Metadata: map[string]string{
                "version": ms.version,
            },
        }
        
        if err := ms.registry.Register(context.Background(), instance); err != nil {
            return err
        }
    }
    
    return ms.server.ListenAndServe()
}

func (ms *Microservice) Stop() error {
    if ms.server != nil {
        return ms.server.Shutdown(context.Background())
    }
    return nil
}

func (ms *Microservice) healthHandler(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("OK"))
}

func (ms *Microservice) wrapHandler(handler http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 限流
        if ms.limiter != nil {
            if !ms.limiter.Allow() {
                http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
                return
            }
        }
        
        // 链路追踪
        var span *Span
        if ms.tracer != nil {
            ctx, s := ms.tracer.StartSpan(r.Context(), r.URL.Path)
            span = s
            r = r.WithContext(ctx)
        }
        
        // 熔断
        if ms.breaker != nil {
            result, err := ms.breaker.Execute(func() (interface{}, error) {
                handler(w, r)
                return nil, nil
            })
            
            if err != nil {
                http.Error(w, "Circuit breaker open", http.StatusServiceUnavailable)
                return
            }
            
            _ = result
        } else {
            handler(w, r)
        }
        
        // 完成链路追踪
        if span != nil {
            ms.tracer.FinishSpan(span)
        }
    }
}

服务调用

package microservice

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

// 服务调用器
type ServiceCaller struct {
    registry Registry
    lb       LoadBalancer
    breaker  *CircuitBreaker
    limiter  RateLimiter
    tracer   *Tracer
    client   *http.Client
}

func NewServiceCaller(registry Registry, lb LoadBalancer) *ServiceCaller {
    return &ServiceCaller{
        registry: registry,
        lb:       lb,
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
    }
}

func (sc *ServiceCaller) SetCircuitBreaker(breaker *CircuitBreaker) {
    sc.breaker = breaker
}

func (sc *ServiceCaller) SetRateLimiter(limiter RateLimiter) {
    sc.limiter = limiter
}

func (sc *ServiceCaller) SetTracer(tracer *Tracer) {
    sc.tracer = tracer
}

func (sc *ServiceCaller) Call(ctx context.Context, serviceName, path string) (*http.Response, error) {
    // 限流
    if sc.limiter != nil {
        if !sc.limiter.Allow() {
            return nil, fmt.Errorf("rate limit exceeded")
        }
    }
    
    // 链路追踪
    var span *Span
    if sc.tracer != nil {
        ctx, s := sc.tracer.StartSpan(ctx, fmt.Sprintf("call_%s", serviceName))
        span = s
    }
    
    // 熔断
    if sc.breaker != nil {
        result, err := sc.breaker.Execute(func() (interface{}, error) {
            return sc.doCall(ctx, serviceName, path)
        })
        
        if err != nil {
            return nil, err
        }
        
        return result.(*http.Response), nil
    }
    
    return sc.doCall(ctx, serviceName, path)
}

func (sc *ServiceCaller) doCall(ctx context.Context, serviceName, path string) (*http.Response, error) {
    // 服务发现
    instances, err := sc.registry.Discover(ctx, serviceName)
    if err != nil {
        return nil, err
    }
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("no available instances for service %s", serviceName)
    }
    
    // 负载均衡
    instance, err := sc.lb.Select(ctx)
    if err != nil {
        return nil, err
    }
    
    // 构建请求 URL
    url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
    
    // 发送请求
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }
    
    resp, err := sc.client.Do(req)
    if err != nil {
        return nil, err
    }
    
    return resp, nil
}

️ 实战项目

用户服务实现

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/hashicorp/consul/api"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

type UserService struct {
    users map[int]*User
}

func NewUserService() *UserService {
    return &UserService{
        users: make(map[int]*User),
    }
}

func (us *UserService) GetUser(w http.ResponseWriter, r *http.Request) {
    id := r.URL.Query().Get("id")
    if id == "" {
        http.Error(w, "Missing user ID", http.StatusBadRequest)
        return
    }
    
    // 模拟数据库查询
    time.Sleep(100 * time.Millisecond)
    
    user := &User{
        ID:    1,
        Name:  "John Doe",
        Email: "john@example.com",
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(user)
}

func (us *UserService) CreateUser(w http.ResponseWriter, r *http.Request) {
    var user User
    if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }
    
    // 模拟数据库插入
    time.Sleep(200 * time.Millisecond)
    
    user.ID = len(us.users) + 1
    us.users[user.ID] = &user
    
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(user)
}

func main() {
    // 创建用户服务
    userService := NewUserService()
    
    // 创建微服务
    ms := NewMicroservice("user-service", "1.0.0")
    
    // 注册处理器
    ms.RegisterHandler("/users", userService.GetUser)
    ms.RegisterHandler("/users/create", userService.CreateUser)
    
    // 设置服务发现
    registry, err := NewConsulRegistry("localhost:8500")
    if err != nil {
        log.Fatal(err)
    }
    ms.SetRegistry(registry)
    
    // 设置负载均衡
    lb := NewLoadBalancer(RoundRobin)
    ms.SetLoadBalancer(lb)
    
    // 设置熔断器
    breaker := NewCircuitBreaker("user-service", Config{
        MaxRequests: 10,
        Interval:    10 * time.Second,
        Timeout:     30 * time.Second,
    })
    ms.SetCircuitBreaker(breaker)
    
    // 设置限流器
    limiter := NewTokenBucket(100, 10)
    ms.SetRateLimiter(limiter)
    
    // 设置链路追踪
    tracer := NewTracer("user-service", "http://localhost:14268/api/traces")
    ms.SetTracer(tracer)
    
    // 启动服务
    log.Println("Starting user service on :8080")
    if err := ms.Start(8080); err != nil {
        log.Fatal(err)
    }
}

订单服务实现

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)

type Order struct {
    ID     int    `json:"id"`
    UserID int    `json:"user_id"`
    Items  []Item `json:"items"`
    Total  float64 `json:"total"`
}

type Item struct {
    ProductID int     `json:"product_id"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

type OrderService struct {
    orders map[int]*Order
    caller *ServiceCaller
}

func NewOrderService(caller *ServiceCaller) *OrderService {
    return &OrderService{
        orders: make(map[int]*Order),
        caller: caller,
    }
}

func (os *OrderService) GetOrder(w http.ResponseWriter, r *http.Request) {
    id := r.URL.Query().Get("id")
    if id == "" {
        http.Error(w, "Missing order ID", http.StatusBadRequest)
        return
    }
    
    // 模拟数据库查询
    time.Sleep(100 * time.Millisecond)
    
    order := &Order{
        ID:     1,
        UserID: 1,
        Items: []Item{
            {ProductID: 1, Quantity: 2, Price: 10.0},
            {ProductID: 2, Quantity: 1, Price: 20.0},
        },
        Total: 40.0,
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(order)
}

func (os *OrderService) CreateOrder(w http.ResponseWriter, r *http.Request) {
    var order Order
    if err := json.NewDecoder(r.Body).Decode(&order); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }
    
    // 调用用户服务验证用户
    resp, err := os.caller.Call(r.Context(), "user-service", "/users?id="+fmt.Sprintf("%d", order.UserID))
    if err != nil {
        http.Error(w, "User validation failed", http.StatusBadRequest)
        return
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        http.Error(w, "Invalid user", http.StatusBadRequest)
        return
    }
    
    // 模拟数据库插入
    time.Sleep(200 * time.Millisecond)
    
    order.ID = len(os.orders) + 1
    os.orders[order.ID] = &order
    
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(order)
}

func main() {
    // 创建服务调用器
    registry, err := NewConsulRegistry("localhost:8500")
    if err != nil {
        log.Fatal(err)
    }
    
    lb := NewLoadBalancer(RoundRobin)
    caller := NewServiceCaller(registry, lb)
    
    // 设置熔断器
    breaker := NewCircuitBreaker("order-service", Config{
        MaxRequests: 10,
        Interval:    10 * time.Second,
        Timeout:     30 * time.Second,
    })
    caller.SetCircuitBreaker(breaker)
    
    // 设置限流器
    limiter := NewTokenBucket(100, 10)
    caller.SetRateLimiter(limiter)
    
    // 设置链路追踪
    tracer := NewTracer("order-service", "http://localhost:14268/api/traces")
    caller.SetTracer(tracer)
    
    // 创建订单服务
    orderService := NewOrderService(caller)
    
    // 创建微服务
    ms := NewMicroservice("order-service", "1.0.0")
    
    // 注册处理器
    ms.RegisterHandler("/orders", orderService.GetOrder)
    ms.RegisterHandler("/orders/create", orderService.CreateOrder)
    
    // 设置服务发现
    ms.SetRegistry(registry)
    ms.SetLoadBalancer(lb)
    ms.SetCircuitBreaker(breaker)
    ms.SetRateLimiter(limiter)
    ms.SetTracer(tracer)
    
    // 启动服务
    log.Println("Starting order service on :8081")
    if err := ms.Start(8081); err != nil {
        log.Fatal(err)
    }
}

性能测试

压力测试

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

func stressTest(url string, concurrency, requests int) {
    var wg sync.WaitGroup
    start := time.Now()
    
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            for j := 0; j < requests/concurrency; j++ {
                resp, err := http.Get(url)
                if err != nil {
                    fmt.Printf("Error: %v\n", err)
                    continue
                }
                resp.Body.Close()
            }
        }()
    }
    
    wg.Wait()
    duration := time.Since(start)
    
    fmt.Printf("Concurrency: %d, Requests: %d, Duration: %v, QPS: %.2f\n",
        concurrency, requests, duration, float64(requests)/duration.Seconds())
}

func main() {
    // 测试用户服务
    stressTest("http://localhost:8080/users?id=1", 100, 10000)
    
    // 测试订单服务
    stressTest("http://localhost:8081/orders?id=1", 100, 10000)
}

面试题库

基础问题

  1. 微服务架构的优势?

    • 独立部署
    • 技术栈灵活
    • 可扩展性强
    • 故障隔离
  2. 服务注册与发现的作用?

    • 服务定位
    • 健康检查
    • 负载均衡
    • 故障转移
  3. 熔断器的工作原理?

    • 监控请求失败率
    • 自动熔断保护
    • 半开状态恢复
    • 防止雪崩效应

进阶问题

  1. 如何设计高可用的微服务?

    • 服务注册发现
    • 负载均衡
    • 熔断限流
    • 链路追踪
    • 监控告警
  2. 微服务的监控指标?

    • 请求量
    • 响应时间
    • 错误率
    • 资源使用率
  3. 微服务的挑战?

    • 分布式复杂性
    • 数据一致性
    • 服务治理
    • 监控调试

源码问题

  1. 服务注册的实现原理?

    • 心跳机制
    • 健康检查
    • 服务发现
    • 故障转移
  2. 负载均衡的策略?

    • 轮询
    • 随机
    • 加权轮询
    • 最少连接

扩展阅读

  • 微服务架构设计模式
  • Go 微服务实践
  • 服务网格
  • 云原生架构

相关章节

  • 01-GMP调度模型深度解析 - 微服务中的并发处理
  • 02-Channel源码剖析 - 微服务间的通信
  • 03-内存模型与GC机制 - 微服务的内存管理
  • 04-垃圾回收器全链路 - 微服务的 GC 优化
  • 05-并发模型与锁机制 - 微服务的并发控制
  • 06-网络模型与Netpoll - 微服务的网络通信
  • 07-Runtime全景融合 - 微服务的运行时优化
  • 08-性能优化实战 - 微服务的性能调优

恭喜! 你已经完成了 Go 架构进阶学习手册的全部内容。通过系统性的学习,你现在应该能够:

  • 深入理解 Go Runtime 的底层机制
  • 掌握 GMP 调度、内存管理、GC 等核心组件
  • 具备性能调优和架构设计能力
  • 能够构建高性能的微服务系统

继续实践和探索,成为 Go 架构专家!

Prev
08-性能优化实战