源码地址:https://github.com/k8scat/docker-log-driver-tencent-cls
在现代云原生架构中,容器化应用已经成为主流部署方式。随着容器数量的快速增长,如何高效地收集、存储和分析容器日志成为了一个关键挑战。传统的日志收集方式往往存在以下问题:
- 日志分散在各个容器中,难以统一管理
- 缺乏结构化的日志格式,不利于后续分析
- 日志存储成本高,且难以进行实时查询
- 缺乏统一的日志检索和监控机制
为了解决这些问题,我们开发了一个专门的 Docker 日志驱动,将容器日志直接发送到腾讯云的 CLS(Cloud Log Service)日志服务。这个驱动实现了与 Docker 日志系统的深度集成,提供了高性能、可靠的日志传输能力。
技术架构设计
整体架构
该日志驱动采用了模块化的设计架构,主要包含以下几个核心组件:
- Driver 模块:负责管理日志流和容器生命周期
- Logger 模块:处理日志格式化和发送逻辑
- Client 模块:封装腾讯云 CLS SDK 的调用
- Server 模块:提供 Docker 插件接口服务
- 配置管理模块:处理各种配置参数的解析和验证
这种分层架构确保了代码的可维护性和可扩展性,每个模块都有明确的职责边界。
核心数据结构
项目定义了多个关键的数据结构来支持日志驱动的功能:
type Driver struct {
streams map[string]*logStream
containerStreams map[string]*logStream
mu sync.RWMutex
fs fileSystem
newTencentCLSLogger newTencentCLSLoggerFunc
processLogs func(stream *logStream)
logger *zap.Logger
}
type TencentCLSLogger struct {
client client
formatter *messageFormatter
cfg *loggerConfig
buffer chan string
mu sync.Mutex
partialLogsBuffer *partialLogBuffer
wg sync.WaitGroup
closed chan struct{
}
logger *zap.Logger
}
这些数据结构的设计充分考虑了并发安全性和资源管理,确保了在高并发场景下的稳定运行。
核心功能实现
日志流管理
日志驱动的核心功能是管理容器的日志流。每个容器启动时,驱动会创建一个独立的日志流来处理该容器的所有日志输出:
func (d *Driver) StartLogging(streamPath string, containerDetails *ContainerDetails) (stream *logStream, err error) {
d.logger.Info("starting logging", zap.String("stream_path", streamPath), zap.Any("container_details", containerDetails))
d.mu.RLock()
if _, ok := d.streams[streamPath]; ok {
d.mu.RUnlock()
return nil, errors.New("already logging")
}
d.mu.RUnlock()
name := "container:" + containerDetails.ContainerName
stream = &logStream{
streamPath: streamPath,
containerDetails: containerDetails,
logger: d.logger.Named(name),
fs: d.fs,
stop: make(chan struct{
}),
}
// 初始化日志流
if err := d.initializeStream(stream); err != nil {
return nil, err
}
// 启动日志处理协程
go d.processLogs(stream)
return stream, nil
}
这种设计确保了每个容器的日志都能被独立处理,避免了不同容器之间的日志混淆。
日志处理流程
日志处理采用了异步非阻塞的设计模式,确保不会影响容器的正常运行:
func (d *Driver) defaultProcessLogs(stream *logStream, processedNotifier chan<- struct{
}) {
defer func() {
if err := stream.Close(); err != nil {
d.logger.Error("failed to close stream", zap.Error(err))
}
}()
logs := NewLogs(stream)
for logs.Next(