09-微服务架构实践
章节概述
微服务架构是现代分布式系统的主流设计模式,Go 语言凭借其高并发、低延迟的特性,成为微服务开发的首选语言。本章将深入解析微服务架构的设计原理,结合实际项目展示如何构建高性能的微服务框架。
学习目标
- 理解微服务架构的设计原则
- 掌握服务注册与发现机制
- 了解熔断限流重试策略
- 学会链路追踪和监控
- 能够设计完整的微服务框架
️ 微服务架构设计
整体架构
┌─────────────────────────────────────────────────────────┐
│ 微服务架构 │
├─────────────────────────────────────────────────────────┤
│ API 网关 服务注册中心 配置中心 监控中心 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ 路由转发 │ │ 服务发现 │ │ 配置管理 │ │ 监控 │ │
│ │ 负载均衡 │ │ 健康检查 │ │ 动态更新 │ │ 告警 │ │
│ └───────────┘ └───────────┘ └───────────┘ └────────┘ │
│ │
│ 业务服务 A 业务服务 B 业务服务 C 业务服务 D │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ 用户服务 │ │ 订单服务 │ │ 支付服务 │ │ 通知 │ │
│ │ 独立部署 │ │ 独立部署 │ │ 独立部署 │ │ 服务 │ │
│ └───────────┘ └───────────┘ └───────────┘ └────────┘ │
└─────────────────────────────────────────────────────────┘
核心组件
┌─────────────────────────────────────────────────────────┐
│ 核心组件 │
├─────────────────────────────────────────────────────────┤
│ 服务治理 通信机制 数据管理 监控运维 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ 注册发现 │ │ gRPC/HTTP │ │ 数据库 │ │ 监控 │ │
│ │ 负载均衡 │ │ 消息队列 │ │ 缓存 │ │ 日志 │ │
│ │ 熔断限流 │ │ 事件总线 │ │ 存储 │ │ 追踪 │ │
│ └───────────┘ └───────────┘ └───────────┘ └────────┘ │
└─────────────────────────────────────────────────────────┘
服务注册与发现
服务注册接口
package registry
import (
"context"
"time"
)
// 服务实例
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
Tags []string `json:"tags"`
}
// 服务注册接口
type Registry interface {
// 注册服务
Register(ctx context.Context, instance *ServiceInstance) error
// 注销服务
Deregister(ctx context.Context, instanceID string) error
// 发现服务
Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
// 监听服务变化
Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
// 关闭注册中心
Close() error
}
Consul 实现
package consul
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
type ConsulRegistry struct {
client *api.Client
config *api.Config
}
func NewConsulRegistry(address string) (*ConsulRegistry, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulRegistry{
client: client,
config: config,
}, nil
}
func (r *ConsulRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
registration := &api.AgentServiceRegistration{
ID: instance.ID,
Name: instance.Name,
Address: instance.Address,
Port: instance.Port,
Tags: instance.Tags,
Meta: instance.Metadata,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", instance.Address, instance.Port),
Timeout: "3s",
Interval: "10s",
DeregisterCriticalServiceAfter: "30s",
},
}
return r.client.Agent().ServiceRegister(registration)
}
func (r *ConsulRegistry) Deregister(ctx context.Context, instanceID string) error {
return r.client.Agent().ServiceDeregister(instanceID)
}
func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
services, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
instances := make([]*ServiceInstance, 0, len(services))
for _, service := range services {
instance := &ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Metadata: service.Service.Meta,
Tags: service.Service.Tags,
}
instances = append(instances, instance)
}
return instances, nil
}
func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
ch := make(chan []*ServiceInstance, 1)
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
default:
instances, err := r.Discover(ctx, serviceName)
if err != nil {
continue
}
select {
case ch <- instances:
case <-ctx.Done():
return
}
time.Sleep(5 * time.Second)
}
}
}()
return ch, nil
}
func (r *ConsulRegistry) Close() error {
return nil
}
负载均衡
负载均衡接口
package loadbalancer
import (
"context"
"math/rand"
"sync"
"time"
)
// 负载均衡策略
type Strategy int
const (
RoundRobin Strategy = iota
Random
WeightedRoundRobin
LeastConnections
)
// 负载均衡器
type LoadBalancer struct {
strategy Strategy
instances []*ServiceInstance
current int
mutex sync.RWMutex
}
func NewLoadBalancer(strategy Strategy) *LoadBalancer {
return &LoadBalancer{
strategy: strategy,
instances: make([]*ServiceInstance, 0),
}
}
func (lb *LoadBalancer) UpdateInstances(instances []*ServiceInstance) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.instances = instances
}
func (lb *LoadBalancer) Select(ctx context.Context) (*ServiceInstance, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()
if len(lb.instances) == 0 {
return nil, fmt.Errorf("no available instances")
}
switch lb.strategy {
case RoundRobin:
return lb.roundRobin()
case Random:
return lb.random()
case WeightedRoundRobin:
return lb.weightedRoundRobin()
case LeastConnections:
return lb.leastConnections()
default:
return lb.roundRobin()
}
}
func (lb *LoadBalancer) roundRobin() (*ServiceInstance, error) {
instance := lb.instances[lb.current]
lb.current = (lb.current + 1) % len(lb.instances)
return instance, nil
}
func (lb *LoadBalancer) random() (*ServiceInstance, error) {
index := rand.Intn(len(lb.instances))
return lb.instances[index], nil
}
func (lb *LoadBalancer) weightedRoundRobin() (*ServiceInstance, error) {
// 简化的加权轮询实现
totalWeight := 0
for _, instance := range lb.instances {
if weight, ok := instance.Metadata["weight"]; ok {
totalWeight += parseWeight(weight)
} else {
totalWeight += 1
}
}
if totalWeight == 0 {
return lb.roundRobin()
}
current := lb.current
lb.current = (lb.current + 1) % len(lb.instances)
return lb.instances[current], nil
}
func (lb *LoadBalancer) leastConnections() (*ServiceInstance, error) {
// 简化的最少连接实现
minConnections := int64(^uint64(0) >> 1)
var selected *ServiceInstance
for _, instance := range lb.instances {
if connections, ok := instance.Metadata["connections"]; ok {
if connCount := parseConnections(connections); connCount < minConnections {
minConnections = connCount
selected = instance
}
} else {
selected = instance
break
}
}
if selected == nil {
return lb.roundRobin()
}
return selected, nil
}
func parseWeight(weight string) int {
// 解析权重
return 1
}
func parseConnections(connections string) int64 {
// 解析连接数
return 0
}
️ 熔断限流
熔断器实现
package circuitbreaker
import (
"context"
"sync"
"time"
)
// 熔断器状态
type State int
const (
Closed State = iota
Open
HalfOpen
)
// 熔断器配置
type Config struct {
MaxRequests uint32 // 半开状态最大请求数
Interval time.Duration // 统计时间窗口
Timeout time.Duration // 熔断超时时间
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
}
// 请求统计
type Counts struct {
Requests uint32
TotalRequests uint32
ConsecutiveFailures uint32
ConsecutiveSuccesses uint32
}
// 熔断器
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
onStateChange func(name string, from State, to State)
mutex sync.Mutex
state State
generation uint64
counts Counts
expiry time.Time
}
func NewCircuitBreaker(name string, config Config) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
maxRequests: config.MaxRequests,
interval: config.Interval,
timeout: config.Timeout,
readyToTrip: config.ReadyToTrip,
onStateChange: config.OnStateChange,
state: Closed,
}
if cb.readyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
}
return cb
}
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, err == nil)
return result, err
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == Open {
return generation, fmt.Errorf("circuit breaker is open")
}
if state == HalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, fmt.Errorf("circuit breaker is half-open and max requests reached")
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
if cb.state == Open {
if cb.expiry.Before(now) {
cb.setState(HalfOpen, now)
return HalfOpen, cb.generation
}
return Open, cb.generation
}
if cb.state == HalfOpen {
return HalfOpen, cb.generation
}
return Closed, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.generation++
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
cb.counts.onSuccess()
if state == HalfOpen {
cb.setState(Closed, now)
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
cb.counts.onFailure()
if state == Closed && cb.readyToTrip(cb.counts) {
cb.setState(Open, now)
}
}
func (cb *CircuitBreaker) onRequest() {
cb.counts.Requests++
cb.counts.TotalRequests++
}
func (cb *CircuitBreaker) onSuccess() {
cb.counts.ConsecutiveSuccesses++
cb.counts.ConsecutiveFailures = 0
}
func (cb *CircuitBreaker) onFailure() {
cb.counts.ConsecutiveFailures++
cb.counts.ConsecutiveSuccesses = 0
}
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures >= 5
}
限流器实现
package ratelimit
import (
"context"
"sync"
"time"
)
// 限流器接口
type RateLimiter interface {
Allow() bool
Wait(ctx context.Context) error
}
// 令牌桶限流器
type TokenBucket struct {
capacity int64
tokens int64
refillRate int64
lastRefill time.Time
mutex sync.Mutex
}
func NewTokenBucket(capacity, refillRate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
tb.lastRefill = now
// 添加令牌
tokensToAdd := int64(elapsed.Seconds()) * tb.refillRate
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func (tb *TokenBucket) Wait(ctx context.Context) error {
for {
if tb.Allow() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Millisecond):
continue
}
}
}
// 滑动窗口限流器
type SlidingWindow struct {
windowSize time.Duration
limit int64
requests []time.Time
mutex sync.Mutex
}
func NewSlidingWindow(windowSize time.Duration, limit int64) *SlidingWindow {
return &SlidingWindow{
windowSize: windowSize,
limit: limit,
requests: make([]time.Time, 0),
}
}
func (sw *SlidingWindow) Allow() bool {
sw.mutex.Lock()
defer sw.mutex.Unlock()
now := time.Now()
cutoff := now.Add(-sw.windowSize)
// 清理过期请求
for len(sw.requests) > 0 && sw.requests[0].Before(cutoff) {
sw.requests = sw.requests[1:]
}
if len(sw.requests) < int(sw.limit) {
sw.requests = append(sw.requests, now)
return true
}
return false
}
func (sw *SlidingWindow) Wait(ctx context.Context) error {
for {
if sw.Allow() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Millisecond):
continue
}
}
}
链路追踪
链路追踪实现
package tracing
import (
"context"
"fmt"
"time"
)
// 链路追踪上下文
type TraceContext struct {
TraceID string
SpanID string
ParentID string
Baggage map[string]string
}
// 链路追踪器
type Tracer struct {
serviceName string
endpoint string
}
func NewTracer(serviceName, endpoint string) *Tracer {
return &Tracer{
serviceName: serviceName,
endpoint: endpoint,
}
}
func (t *Tracer) StartSpan(ctx context.Context, operationName string) (context.Context, *Span) {
span := &Span{
TraceID: t.generateTraceID(),
SpanID: t.generateSpanID(),
ParentID: t.getParentID(ctx),
OperationName: operationName,
StartTime: time.Now(),
Tags: make(map[string]string),
Logs: make([]Log, 0),
}
return context.WithValue(ctx, "span", span), span
}
func (t *Tracer) FinishSpan(span *Span) {
span.EndTime = time.Now()
span.Duration = span.EndTime.Sub(span.StartTime)
// 发送到追踪系统
t.sendSpan(span)
}
func (t *Tracer) generateTraceID() string {
// 生成 TraceID
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func (t *Tracer) generateSpanID() string {
// 生成 SpanID
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func (t *Tracer) getParentID(ctx context.Context) string {
if span, ok := ctx.Value("span").(*Span); ok {
return span.SpanID
}
return ""
}
func (t *Tracer) sendSpan(span *Span) {
// 发送到追踪系统
fmt.Printf("Sending span: %+v\n", span)
}
// 链路追踪 Span
type Span struct {
TraceID string
SpanID string
ParentID string
OperationName string
StartTime time.Time
EndTime time.Time
Duration time.Duration
Tags map[string]string
Logs []Log
}
type Log struct {
Timestamp time.Time
Fields map[string]interface{}
}
func (s *Span) SetTag(key, value string) {
s.Tags[key] = value
}
func (s *Span) Log(fields map[string]interface{}) {
s.Logs = append(s.Logs, Log{
Timestamp: time.Now(),
Fields: fields,
})
}
️ 微服务框架实现
框架核心
package microservice
import (
"context"
"fmt"
"net/http"
"time"
)
// 微服务框架
type Microservice struct {
name string
version string
registry Registry
lb LoadBalancer
breaker *CircuitBreaker
limiter RateLimiter
tracer *Tracer
server *http.Server
handlers map[string]http.HandlerFunc
}
func NewMicroservice(name, version string) *Microservice {
return &Microservice{
name: name,
version: version,
handlers: make(map[string]http.HandlerFunc),
}
}
func (ms *Microservice) SetRegistry(registry Registry) {
ms.registry = registry
}
func (ms *Microservice) SetLoadBalancer(lb LoadBalancer) {
ms.lb = lb
}
func (ms *Microservice) SetCircuitBreaker(breaker *CircuitBreaker) {
ms.breaker = breaker
}
func (ms *Microservice) SetRateLimiter(limiter RateLimiter) {
ms.limiter = limiter
}
func (ms *Microservice) SetTracer(tracer *Tracer) {
ms.tracer = tracer
}
func (ms *Microservice) RegisterHandler(path string, handler http.HandlerFunc) {
ms.handlers[path] = handler
}
func (ms *Microservice) Start(port int) error {
mux := http.NewServeMux()
// 注册健康检查
mux.HandleFunc("/health", ms.healthHandler)
// 注册业务处理器
for path, handler := range ms.handlers {
mux.HandleFunc(path, ms.wrapHandler(handler))
}
ms.server = &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}
// 注册到服务发现
if ms.registry != nil {
instance := &ServiceInstance{
ID: fmt.Sprintf("%s-%d", ms.name, time.Now().Unix()),
Name: ms.name,
Address: "localhost",
Port: port,
Metadata: map[string]string{
"version": ms.version,
},
}
if err := ms.registry.Register(context.Background(), instance); err != nil {
return err
}
}
return ms.server.ListenAndServe()
}
func (ms *Microservice) Stop() error {
if ms.server != nil {
return ms.server.Shutdown(context.Background())
}
return nil
}
func (ms *Microservice) healthHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
func (ms *Microservice) wrapHandler(handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 限流
if ms.limiter != nil {
if !ms.limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
}
// 链路追踪
var span *Span
if ms.tracer != nil {
ctx, s := ms.tracer.StartSpan(r.Context(), r.URL.Path)
span = s
r = r.WithContext(ctx)
}
// 熔断
if ms.breaker != nil {
result, err := ms.breaker.Execute(func() (interface{}, error) {
handler(w, r)
return nil, nil
})
if err != nil {
http.Error(w, "Circuit breaker open", http.StatusServiceUnavailable)
return
}
_ = result
} else {
handler(w, r)
}
// 完成链路追踪
if span != nil {
ms.tracer.FinishSpan(span)
}
}
}
服务调用
package microservice
import (
"context"
"fmt"
"net/http"
"time"
)
// 服务调用器
type ServiceCaller struct {
registry Registry
lb LoadBalancer
breaker *CircuitBreaker
limiter RateLimiter
tracer *Tracer
client *http.Client
}
func NewServiceCaller(registry Registry, lb LoadBalancer) *ServiceCaller {
return &ServiceCaller{
registry: registry,
lb: lb,
client: &http.Client{
Timeout: 30 * time.Second,
},
}
}
func (sc *ServiceCaller) SetCircuitBreaker(breaker *CircuitBreaker) {
sc.breaker = breaker
}
func (sc *ServiceCaller) SetRateLimiter(limiter RateLimiter) {
sc.limiter = limiter
}
func (sc *ServiceCaller) SetTracer(tracer *Tracer) {
sc.tracer = tracer
}
func (sc *ServiceCaller) Call(ctx context.Context, serviceName, path string) (*http.Response, error) {
// 限流
if sc.limiter != nil {
if !sc.limiter.Allow() {
return nil, fmt.Errorf("rate limit exceeded")
}
}
// 链路追踪
var span *Span
if sc.tracer != nil {
ctx, s := sc.tracer.StartSpan(ctx, fmt.Sprintf("call_%s", serviceName))
span = s
}
// 熔断
if sc.breaker != nil {
result, err := sc.breaker.Execute(func() (interface{}, error) {
return sc.doCall(ctx, serviceName, path)
})
if err != nil {
return nil, err
}
return result.(*http.Response), nil
}
return sc.doCall(ctx, serviceName, path)
}
func (sc *ServiceCaller) doCall(ctx context.Context, serviceName, path string) (*http.Response, error) {
// 服务发现
instances, err := sc.registry.Discover(ctx, serviceName)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no available instances for service %s", serviceName)
}
// 负载均衡
instance, err := sc.lb.Select(ctx)
if err != nil {
return nil, err
}
// 构建请求 URL
url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
// 发送请求
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := sc.client.Do(req)
if err != nil {
return nil, err
}
return resp, nil
}
️ 实战项目
用户服务实现
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/hashicorp/consul/api"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
type UserService struct {
users map[int]*User
}
func NewUserService() *UserService {
return &UserService{
users: make(map[int]*User),
}
}
func (us *UserService) GetUser(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "Missing user ID", http.StatusBadRequest)
return
}
// 模拟数据库查询
time.Sleep(100 * time.Millisecond)
user := &User{
ID: 1,
Name: "John Doe",
Email: "john@example.com",
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
}
func (us *UserService) CreateUser(w http.ResponseWriter, r *http.Request) {
var user User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// 模拟数据库插入
time.Sleep(200 * time.Millisecond)
user.ID = len(us.users) + 1
us.users[user.ID] = &user
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(user)
}
func main() {
// 创建用户服务
userService := NewUserService()
// 创建微服务
ms := NewMicroservice("user-service", "1.0.0")
// 注册处理器
ms.RegisterHandler("/users", userService.GetUser)
ms.RegisterHandler("/users/create", userService.CreateUser)
// 设置服务发现
registry, err := NewConsulRegistry("localhost:8500")
if err != nil {
log.Fatal(err)
}
ms.SetRegistry(registry)
// 设置负载均衡
lb := NewLoadBalancer(RoundRobin)
ms.SetLoadBalancer(lb)
// 设置熔断器
breaker := NewCircuitBreaker("user-service", Config{
MaxRequests: 10,
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
})
ms.SetCircuitBreaker(breaker)
// 设置限流器
limiter := NewTokenBucket(100, 10)
ms.SetRateLimiter(limiter)
// 设置链路追踪
tracer := NewTracer("user-service", "http://localhost:14268/api/traces")
ms.SetTracer(tracer)
// 启动服务
log.Println("Starting user service on :8080")
if err := ms.Start(8080); err != nil {
log.Fatal(err)
}
}
订单服务实现
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
type Order struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Items []Item `json:"items"`
Total float64 `json:"total"`
}
type Item struct {
ProductID int `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type OrderService struct {
orders map[int]*Order
caller *ServiceCaller
}
func NewOrderService(caller *ServiceCaller) *OrderService {
return &OrderService{
orders: make(map[int]*Order),
caller: caller,
}
}
func (os *OrderService) GetOrder(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "Missing order ID", http.StatusBadRequest)
return
}
// 模拟数据库查询
time.Sleep(100 * time.Millisecond)
order := &Order{
ID: 1,
UserID: 1,
Items: []Item{
{ProductID: 1, Quantity: 2, Price: 10.0},
{ProductID: 2, Quantity: 1, Price: 20.0},
},
Total: 40.0,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(order)
}
func (os *OrderService) CreateOrder(w http.ResponseWriter, r *http.Request) {
var order Order
if err := json.NewDecoder(r.Body).Decode(&order); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// 调用用户服务验证用户
resp, err := os.caller.Call(r.Context(), "user-service", "/users?id="+fmt.Sprintf("%d", order.UserID))
if err != nil {
http.Error(w, "User validation failed", http.StatusBadRequest)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
http.Error(w, "Invalid user", http.StatusBadRequest)
return
}
// 模拟数据库插入
time.Sleep(200 * time.Millisecond)
order.ID = len(os.orders) + 1
os.orders[order.ID] = &order
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(order)
}
func main() {
// 创建服务调用器
registry, err := NewConsulRegistry("localhost:8500")
if err != nil {
log.Fatal(err)
}
lb := NewLoadBalancer(RoundRobin)
caller := NewServiceCaller(registry, lb)
// 设置熔断器
breaker := NewCircuitBreaker("order-service", Config{
MaxRequests: 10,
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
})
caller.SetCircuitBreaker(breaker)
// 设置限流器
limiter := NewTokenBucket(100, 10)
caller.SetRateLimiter(limiter)
// 设置链路追踪
tracer := NewTracer("order-service", "http://localhost:14268/api/traces")
caller.SetTracer(tracer)
// 创建订单服务
orderService := NewOrderService(caller)
// 创建微服务
ms := NewMicroservice("order-service", "1.0.0")
// 注册处理器
ms.RegisterHandler("/orders", orderService.GetOrder)
ms.RegisterHandler("/orders/create", orderService.CreateOrder)
// 设置服务发现
ms.SetRegistry(registry)
ms.SetLoadBalancer(lb)
ms.SetCircuitBreaker(breaker)
ms.SetRateLimiter(limiter)
ms.SetTracer(tracer)
// 启动服务
log.Println("Starting order service on :8081")
if err := ms.Start(8081); err != nil {
log.Fatal(err)
}
}
性能测试
压力测试
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
func stressTest(url string, concurrency, requests int) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < requests/concurrency; j++ {
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
resp.Body.Close()
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf("Concurrency: %d, Requests: %d, Duration: %v, QPS: %.2f\n",
concurrency, requests, duration, float64(requests)/duration.Seconds())
}
func main() {
// 测试用户服务
stressTest("http://localhost:8080/users?id=1", 100, 10000)
// 测试订单服务
stressTest("http://localhost:8081/orders?id=1", 100, 10000)
}
面试题库
基础问题
微服务架构的优势?
- 独立部署
- 技术栈灵活
- 可扩展性强
- 故障隔离
服务注册与发现的作用?
- 服务定位
- 健康检查
- 负载均衡
- 故障转移
熔断器的工作原理?
- 监控请求失败率
- 自动熔断保护
- 半开状态恢复
- 防止雪崩效应
进阶问题
如何设计高可用的微服务?
- 服务注册发现
- 负载均衡
- 熔断限流
- 链路追踪
- 监控告警
微服务的监控指标?
- 请求量
- 响应时间
- 错误率
- 资源使用率
微服务的挑战?
- 分布式复杂性
- 数据一致性
- 服务治理
- 监控调试
源码问题
服务注册的实现原理?
- 心跳机制
- 健康检查
- 服务发现
- 故障转移
负载均衡的策略?
- 轮询
- 随机
- 加权轮询
- 最少连接
扩展阅读
相关章节
- 01-GMP调度模型深度解析 - 微服务中的并发处理
- 02-Channel源码剖析 - 微服务间的通信
- 03-内存模型与GC机制 - 微服务的内存管理
- 04-垃圾回收器全链路 - 微服务的 GC 优化
- 05-并发模型与锁机制 - 微服务的并发控制
- 06-网络模型与Netpoll - 微服务的网络通信
- 07-Runtime全景融合 - 微服务的运行时优化
- 08-性能优化实战 - 微服务的性能调优
恭喜! 你已经完成了 Go 架构进阶学习手册的全部内容。通过系统性的学习,你现在应该能够:
- 深入理解 Go Runtime 的底层机制
- 掌握 GMP 调度、内存管理、GC 等核心组件
- 具备性能调优和架构设计能力
- 能够构建高性能的微服务系统
继续实践和探索,成为 Go 架构专家!