13-容器生命周期管理
学习目标
- 理解容器生命周期的各个阶段
- 掌握容器状态机的设计和实现
- 能够实现容器的创建、启动、停止、删除
- 掌握容器日志收集和信号处理
- 理解容器运行时的高级功能
前置知识
- 容器基础原理
- Go 语言并发编程
- 系统编程基础
- 进程管理基础
一、容器生命周期概述
1.1 生命周期阶段
graph TD
A[Created] --> B[Running]
B --> C[Paused]
C --> B
B --> D[Stopped]
D --> E[Removed]
F[Error] --> D
B --> F
C --> F
1.2 状态转换
状态 | 描述 | 可转换到 |
---|---|---|
Created | 容器已创建但未启动 | Running, Removed |
Running | 容器正在运行 | Paused, Stopped, Error |
Paused | 容器已暂停 | Running, Stopped |
Stopped | 容器已停止 | Removed |
Removed | 容器已删除 | - |
Error | 容器出错 | Stopped |
️ 二、容器状态机
2.1 状态定义
package container
import (
"sync"
"time"
)
// ContainerStatus 容器状态
type ContainerStatus int
const (
StatusCreated ContainerStatus = iota
StatusRunning
StatusPaused
StatusStopped
StatusRemoved
StatusError
)
// String 返回状态字符串
func (s ContainerStatus) String() string {
switch s {
case StatusCreated:
return "created"
case StatusRunning:
return "running"
case StatusPaused:
return "paused"
case StatusStopped:
return "stopped"
case StatusRemoved:
return "removed"
case StatusError:
return "error"
default:
return "unknown"
}
}
// ContainerState 容器状态
type ContainerState struct {
Status ContainerStatus `json:"status"`
PID int `json:"pid"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
PausedAt time.Time `json:"paused_at"`
StoppedAt time.Time `json:"stopped_at"`
Error string `json:"error,omitempty"`
ExitCode int `json:"exit_code"`
mutex sync.RWMutex
}
// NewContainerState 创建容器状态
func NewContainerState() *ContainerState {
return &ContainerState{
Status: StatusCreated,
CreatedAt: time.Now(),
}
}
// GetStatus 获取状态
func (cs *ContainerState) GetStatus() ContainerStatus {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.Status
}
// SetStatus 设置状态
func (cs *ContainerState) SetStatus(status ContainerStatus) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.Status = status
now := time.Now()
switch status {
case StatusRunning:
cs.StartedAt = now
case StatusPaused:
cs.PausedAt = now
case StatusStopped:
cs.StoppedAt = now
}
}
// SetError 设置错误
func (cs *ContainerState) SetError(err string) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.Error = err
cs.Status = StatusError
}
// SetExitCode 设置退出码
func (cs *ContainerState) SetExitCode(code int) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.ExitCode = code
}
// IsRunning 检查是否正在运行
func (cs *ContainerState) IsRunning() bool {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.Status == StatusRunning
}
// IsStopped 检查是否已停止
func (cs *ContainerState) IsStopped() bool {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
return cs.Status == StatusStopped || cs.Status == StatusError
}
2.2 状态转换器
// StateTransition 状态转换
type StateTransition struct {
From ContainerStatus
To ContainerStatus
Action func() error
}
// StateMachine 状态机
type StateMachine struct {
currentState *ContainerState
transitions map[ContainerStatus][]StateTransition
mutex sync.RWMutex
}
// NewStateMachine 创建状态机
func NewStateMachine() *StateMachine {
sm := &StateMachine{
currentState: NewContainerState(),
transitions: make(map[ContainerStatus][]StateTransition),
}
// 定义状态转换规则
sm.defineTransitions()
return sm
}
// defineTransitions 定义状态转换规则
func (sm *StateMachine) defineTransitions() {
// Created -> Running
sm.addTransition(StatusCreated, StatusRunning, func() error {
return nil
})
// Created -> Removed
sm.addTransition(StatusCreated, StatusRemoved, func() error {
return nil
})
// Running -> Paused
sm.addTransition(StatusRunning, StatusPaused, func() error {
return nil
})
// Running -> Stopped
sm.addTransition(StatusRunning, StatusStopped, func() error {
return nil
})
// Running -> Error
sm.addTransition(StatusRunning, StatusError, func() error {
return nil
})
// Paused -> Running
sm.addTransition(StatusPaused, StatusRunning, func() error {
return nil
})
// Paused -> Stopped
sm.addTransition(StatusPaused, StatusStopped, func() error {
return nil
})
// Stopped -> Removed
sm.addTransition(StatusStopped, StatusRemoved, func() error {
return nil
})
}
// addTransition 添加状态转换
func (sm *StateMachine) addTransition(from, to ContainerStatus, action func() error) {
transition := StateTransition{
From: from,
To: to,
Action: action,
}
sm.transitions[from] = append(sm.transitions[from], transition)
}
// Transition 执行状态转换
func (sm *StateMachine) Transition(to ContainerStatus) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
currentStatus := sm.currentState.GetStatus()
// 检查转换是否有效
if !sm.isValidTransition(currentStatus, to) {
return fmt.Errorf("invalid transition from %s to %s", currentStatus, to)
}
// 执行转换动作
for _, transition := range sm.transitions[currentStatus] {
if transition.To == to {
if err := transition.Action(); err != nil {
return fmt.Errorf("transition action failed: %v", err)
}
break
}
}
// 更新状态
sm.currentState.SetStatus(to)
return nil
}
// isValidTransition 检查转换是否有效
func (sm *StateMachine) isValidTransition(from, to ContainerStatus) bool {
for _, transition := range sm.transitions[from] {
if transition.To == to {
return true
}
}
return false
}
// GetCurrentState 获取当前状态
func (sm *StateMachine) GetCurrentState() *ContainerState {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
return sm.currentState
}
三、容器管理器
3.1 容器管理器结构
// ContainerManager 容器管理器
type ContainerManager struct {
containers map[string]*Container
mutex sync.RWMutex
stateMachine *StateMachine
logger *logrus.Logger
}
// NewContainerManager 创建容器管理器
func NewContainerManager() *ContainerManager {
return &ContainerManager{
containers: make(map[string]*Container),
stateMachine: NewStateMachine(),
logger: logrus.New(),
}
}
// CreateContainer 创建容器
func (cm *ContainerManager) CreateContainer(config *ContainerConfig) (*Container, error) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
// 生成容器 ID
containerID := generateContainerID()
// 创建容器
container := NewContainer(containerID, config.Name, config.Rootfs, config.Cmd, config.Args)
// 设置配置
container.SetResourceLimits(config.Memory, config.CPU, config.Pids)
container.SetNetworkConfig(config.NetworkMode, config.IP, config.Ports)
container.SetMounts(config.Mounts)
// 验证配置
if err := container.Validate(); err != nil {
return nil, fmt.Errorf("container validation failed: %v", err)
}
// 保存容器
cm.containers[containerID] = container
cm.logger.WithFields(logrus.Fields{
"container_id": containerID,
"name": config.Name,
}).Info("Container created")
return container, nil
}
// StartContainer 启动容器
func (cm *ContainerManager) StartContainer(containerID string) error {
cm.mutex.Lock()
container, exists := cm.containers[containerID]
cm.mutex.Unlock()
if !exists {
return fmt.Errorf("container %s not found", containerID)
}
// 检查状态
if container.GetStatus() != StatusCreated {
return fmt.Errorf("container %s is not in created state", containerID)
}
// 启动容器
if err := container.Start(); err != nil {
container.SetError(err.Error())
return fmt.Errorf("failed to start container: %v", err)
}
cm.logger.WithFields(logrus.Fields{
"container_id": containerID,
}).Info("Container started")
return nil
}
// StopContainer 停止容器
func (cm *ContainerManager) StopContainer(containerID string, timeout int) error {
cm.mutex.RLock()
container, exists := cm.containers[containerID]
cm.mutex.RUnlock()
if !exists {
return fmt.Errorf("container %s not found", containerID)
}
// 检查状态
if !container.IsRunning() {
return fmt.Errorf("container %s is not running", containerID)
}
// 停止容器
if err := container.Stop(timeout); err != nil {
container.SetError(err.Error())
return fmt.Errorf("failed to stop container: %v", err)
}
cm.logger.WithFields(logrus.Fields{
"container_id": containerID,
}).Info("Container stopped")
return nil
}
// RemoveContainer 删除容器
func (cm *ContainerManager) RemoveContainer(containerID string) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
container, exists := cm.containers[containerID]
if !exists {
return fmt.Errorf("container %s not found", containerID)
}
// 检查状态
if container.IsRunning() {
return fmt.Errorf("container %s is still running", containerID)
}
// 删除容器
if err := container.Remove(); err != nil {
return fmt.Errorf("failed to remove container: %v", err)
}
// 从管理器中移除
delete(cm.containers, containerID)
cm.logger.WithFields(logrus.Fields{
"container_id": containerID,
}).Info("Container removed")
return nil
}
// ListContainers 列出容器
func (cm *ContainerManager) ListContainers() []*Container {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
containers := make([]*Container, 0, len(cm.containers))
for _, container := range cm.containers {
containers = append(containers, container)
}
return containers
}
// GetContainer 获取容器
func (cm *ContainerManager) GetContainer(containerID string) (*Container, error) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
container, exists := cm.containers[containerID]
if !exists {
return nil, fmt.Errorf("container %s not found", containerID)
}
return container, nil
}
3.2 容器配置
// ContainerConfig 容器配置
type ContainerConfig struct {
Name string
Rootfs string
Cmd string
Args []string
Env []string
WorkingDir string
User string
Hostname string
// 资源限制
Memory string
CPU string
Pids int
// 网络配置
NetworkMode string
IP string
Ports []PortMapping
// 存储配置
Mounts []Mount
ReadOnly bool
// 运行时配置
Detach bool
TTY bool
Interactive bool
}
// NewContainerConfig 创建容器配置
func NewContainerConfig() *ContainerConfig {
return &ContainerConfig{
Name: "",
Rootfs: "",
Cmd: "/bin/sh",
Args: []string{},
Env: []string{},
WorkingDir: "/",
User: "root",
Hostname: "container",
Memory: "128M",
CPU: "50000 100000",
Pids: 100,
NetworkMode: "bridge",
IP: "",
Ports: []PortMapping{},
Mounts: []Mount{},
ReadOnly: false,
Detach: false,
TTY: false,
Interactive: false,
}
}
四、日志管理
4.1 日志收集器
// LogCollector 日志收集器
type LogCollector struct {
containerID string
logFile *os.File
mutex sync.Mutex
logger *logrus.Logger
}
// NewLogCollector 创建日志收集器
func NewLogCollector(containerID string) (*LogCollector, error) {
// 创建日志目录
logDir := filepath.Join("/var/log/containers", containerID)
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create log directory: %v", err)
}
// 打开日志文件
logFile, err := os.OpenFile(
filepath.Join(logDir, "container.log"),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0644,
)
if err != nil {
return nil, fmt.Errorf("failed to open log file: %v", err)
}
return &LogCollector{
containerID: containerID,
logFile: logFile,
logger: logrus.New(),
}, nil
}
// Write 写入日志
func (lc *LogCollector) Write(data []byte) (int, error) {
lc.mutex.Lock()
defer lc.mutex.Unlock()
// 写入文件
if _, err := lc.logFile.Write(data); err != nil {
return 0, err
}
// 同步到磁盘
if err := lc.logFile.Sync(); err != nil {
return 0, err
}
return len(data), nil
}
// Close 关闭日志收集器
func (lc *LogCollector) Close() error {
lc.mutex.Lock()
defer lc.mutex.Unlock()
if lc.logFile != nil {
return lc.logFile.Close()
}
return nil
}
// GetLogs 获取日志
func (lc *LogCollector) GetLogs(since time.Time, tail int) ([]string, error) {
lc.mutex.Lock()
defer lc.mutex.Unlock()
// 读取日志文件
data, err := os.ReadFile(lc.logFile.Name())
if err != nil {
return nil, err
}
// 解析日志行
lines := strings.Split(string(data), "\n")
// 过滤时间
var filteredLines []string
for _, line := range lines {
if line == "" {
continue
}
// 解析时间戳
if len(line) > 19 {
timestampStr := line[:19]
if timestamp, err := time.Parse("2006-01-02T15:04:05", timestampStr); err == nil {
if timestamp.After(since) {
filteredLines = append(filteredLines, line)
}
}
}
}
// 返回最后 tail 行
if tail > 0 && len(filteredLines) > tail {
filteredLines = filteredLines[len(filteredLines)-tail:]
}
return filteredLines, nil
}
4.2 日志轮转
// LogRotator 日志轮转器
type LogRotator struct {
containerID string
maxSize int64
maxFiles int
logger *logrus.Logger
}
// NewLogRotator 创建日志轮转器
func NewLogRotator(containerID string, maxSize int64, maxFiles int) *LogRotator {
return &LogRotator{
containerID: containerID,
maxSize: maxSize,
maxFiles: maxFiles,
logger: logrus.New(),
}
}
// Rotate 轮转日志
func (lr *LogRotator) Rotate() error {
logFile := filepath.Join("/var/log/containers", lr.containerID, "container.log")
// 检查文件大小
stat, err := os.Stat(logFile)
if err != nil {
return err
}
if stat.Size() < lr.maxSize {
return nil
}
// 轮转文件
for i := lr.maxFiles - 1; i > 0; i-- {
oldFile := fmt.Sprintf("%s.%d", logFile, i)
newFile := fmt.Sprintf("%s.%d", logFile, i+1)
if _, err := os.Stat(oldFile); err == nil {
os.Rename(oldFile, newFile)
}
}
// 移动当前文件
if err := os.Rename(logFile, logFile+".1"); err != nil {
return err
}
// 创建新文件
if _, err := os.Create(logFile); err != nil {
return err
}
lr.logger.WithFields(logrus.Fields{
"container_id": lr.containerID,
"file_size": stat.Size(),
}).Info("Log rotated")
return nil
}
五、信号处理
5.1 信号处理器
// SignalHandler 信号处理器
type SignalHandler struct {
containerID string
signals chan os.Signal
logger *logrus.Logger
}
// NewSignalHandler 创建信号处理器
func NewSignalHandler(containerID string) *SignalHandler {
return &SignalHandler{
containerID: containerID,
signals: make(chan os.Signal, 1),
logger: logrus.New(),
}
}
// Start 启动信号处理
func (sh *SignalHandler) Start() {
// 注册信号
signal.Notify(sh.signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
go func() {
for sig := range sh.signals {
sh.logger.WithFields(logrus.Fields{
"container_id": sh.containerID,
"signal": sig,
}).Info("Received signal")
// 处理信号
if err := sh.handleSignal(sig); err != nil {
sh.logger.WithFields(logrus.Fields{
"container_id": sh.containerID,
"signal": sig,
"error": err,
}).Error("Failed to handle signal")
}
}
}()
}
// handleSignal 处理信号
func (sh *SignalHandler) handleSignal(sig os.Signal) error {
switch sig {
case syscall.SIGTERM:
return sh.handleSIGTERM()
case syscall.SIGINT:
return sh.handleSIGINT()
case syscall.SIGQUIT:
return sh.handleSIGQUIT()
default:
return fmt.Errorf("unhandled signal: %v", sig)
}
}
// handleSIGTERM 处理 SIGTERM
func (sh *SignalHandler) handleSIGTERM() error {
// 优雅停止容器
return sh.gracefulStop()
}
// handleSIGINT 处理 SIGINT
func (sh *SignalHandler) handleSIGINT() error {
// 立即停止容器
return sh.immediateStop()
}
// handleSIGQUIT 处理 SIGQUIT
func (sh *SignalHandler) handleSIGQUIT() error {
// 生成核心转储
return sh.generateCoreDump()
}
// gracefulStop 优雅停止
func (sh *SignalHandler) gracefulStop() error {
// 发送 SIGTERM 到容器进程
// 等待进程退出
// 清理资源
return nil
}
// immediateStop 立即停止
func (sh *SignalHandler) immediateStop() error {
// 发送 SIGKILL 到容器进程
// 立即清理资源
return nil
}
// generateCoreDump 生成核心转储
func (sh *SignalHandler) generateCoreDump() error {
// 生成核心转储文件
// 记录调试信息
return nil
}
5.2 进程监控
// ProcessMonitor 进程监控器
type ProcessMonitor struct {
containerID string
pid int
logger *logrus.Logger
stopCh chan struct{}
}
// NewProcessMonitor 创建进程监控器
func NewProcessMonitor(containerID string, pid int) *ProcessMonitor {
return &ProcessMonitor{
containerID: containerID,
pid: pid,
logger: logrus.New(),
stopCh: make(chan struct{}),
}
}
// Start 启动监控
func (pm *ProcessMonitor) Start() {
go pm.monitor()
}
// Stop 停止监控
func (pm *ProcessMonitor) Stop() {
close(pm.stopCh)
}
// monitor 监控进程
func (pm *ProcessMonitor) monitor() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !pm.isProcessRunning() {
pm.logger.WithFields(logrus.Fields{
"container_id": pm.containerID,
"pid": pm.pid,
}).Info("Process exited")
// 通知容器管理器
pm.notifyProcessExit()
return
}
case <-pm.stopCh:
return
}
}
}
// isProcessRunning 检查进程是否运行
func (pm *ProcessMonitor) isProcessRunning() bool {
if err := syscall.Kill(pm.pid, 0); err != nil {
return false
}
return true
}
// notifyProcessExit 通知进程退出
func (pm *ProcessMonitor) notifyProcessExit() {
// 通知容器管理器进程已退出
// 更新容器状态
}
六、验证检查清单
基础功能
- [ ] 能够实现容器状态机
- [ ] 能够管理容器生命周期
- [ ] 能够收集容器日志
- [ ] 能够处理容器信号
高级功能
- [ ] 能够实现容器管理器
- [ ] 能够处理容器错误
- [ ] 能够监控容器进程
- [ ] 能够实现日志轮转
调试技能
- [ ] 能够调试状态转换
- [ ] 能够调试信号处理
- [ ] 能够调试日志收集
- [ ] 能够调试进程监控
相关链接
- 12-Go实现完整容器 - 完整功能实现
- 14-调试技术与工具 - 调试技术详解
- 15-OCI规范与标准化 - 标准化实现
下一步:让我们学习调试技术与工具,这是容器开发的重要技能!