1. 重写logs
2. logs增加单测 3. task更换新的log实例 4. 配置文件增加日志相关配置
This commit is contained in:
142
internal/config/config.go
Normal file
142
internal/config/config.go
Normal file
@@ -0,0 +1,142 @@
|
||||
// Package config 提供配置文件读取和解析功能
|
||||
// 支持YAML格式的配置文件解析
|
||||
// 包含服务器和数据库相关配置
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
// Config 代表应用的完整配置结构
|
||||
type Config struct {
|
||||
// App 应用基础配置
|
||||
App AppConfig `yaml:"app"`
|
||||
|
||||
// Server 服务器配置
|
||||
Server ServerConfig `yaml:"server"`
|
||||
|
||||
// Log 日志配置
|
||||
Log LogConfig `yaml:"log"`
|
||||
|
||||
// Database 数据库配置
|
||||
Database DatabaseConfig `yaml:"database"`
|
||||
|
||||
// WebSocket WebSocket配置
|
||||
WebSocket WebSocketConfig `yaml:"websocket"`
|
||||
|
||||
// Heartbeat 心跳配置
|
||||
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
|
||||
}
|
||||
|
||||
// AppConfig 代表应用基础配置
|
||||
type AppConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Version string `yaml:"version"`
|
||||
}
|
||||
|
||||
// ServerConfig 代表服务器配置
|
||||
type ServerConfig struct {
|
||||
// Port 服务器监听端口
|
||||
Port int `yaml:"port"`
|
||||
// Mode 服务器运行模式
|
||||
Mode string `yaml:"mode"`
|
||||
}
|
||||
|
||||
// LogConfig 代表日志配置
|
||||
type LogConfig struct {
|
||||
Level string `yaml:"level"`
|
||||
Format string `yaml:"format"`
|
||||
EnableFile bool `yaml:"enable_file"`
|
||||
FilePath string `yaml:"file_path"`
|
||||
MaxSize int `yaml:"max_size"`
|
||||
MaxBackups int `yaml:"max_backups"`
|
||||
MaxAge int `yaml:"max_age"`
|
||||
Compress bool `yaml:"compress"`
|
||||
}
|
||||
|
||||
// DatabaseConfig 代表数据库配置
|
||||
type DatabaseConfig struct {
|
||||
// Host 数据库主机地址
|
||||
Host string `yaml:"host"`
|
||||
|
||||
// Port 数据库端口
|
||||
Port int `yaml:"port"`
|
||||
|
||||
// Username 数据库用户名
|
||||
Username string `yaml:"username"`
|
||||
|
||||
// Password 数据库密码
|
||||
Password string `yaml:"password"`
|
||||
|
||||
// DBName 数据库名称
|
||||
DBName string `yaml:"dbname"`
|
||||
|
||||
// SSLMode SSL模式
|
||||
SSLMode string `yaml:"sslmode"`
|
||||
|
||||
// MaxOpenConns 最大开放连接数
|
||||
MaxOpenConns int `yaml:"max_open_conns"`
|
||||
|
||||
// MaxIdleConns 最大空闲连接数
|
||||
MaxIdleConns int `yaml:"max_idle_conns"`
|
||||
|
||||
// ConnMaxLifetime 连接最大生命周期(秒)
|
||||
ConnMaxLifetime int `yaml:"conn_max_lifetime"`
|
||||
}
|
||||
|
||||
// WebSocketConfig 代表WebSocket配置
|
||||
type WebSocketConfig struct {
|
||||
// Timeout WebSocket请求超时时间(秒)
|
||||
Timeout int `yaml:"timeout"`
|
||||
|
||||
// HeartbeatInterval 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接
|
||||
HeartbeatInterval int `yaml:"heartbeat_interval"`
|
||||
}
|
||||
|
||||
// HeartbeatConfig 代表心跳配置
|
||||
type HeartbeatConfig struct {
|
||||
// Interval 心跳间隔(秒)
|
||||
Interval int `yaml:"interval"`
|
||||
|
||||
// Concurrency 请求并发数
|
||||
Concurrency int `yaml:"concurrency"`
|
||||
}
|
||||
|
||||
// NewConfig 创建并返回一个新的配置实例
|
||||
func NewConfig() *Config {
|
||||
// 默认值可以在这里设置,但我们优先使用配置文件中的值
|
||||
return &Config{}
|
||||
}
|
||||
|
||||
// Load 从指定路径加载配置文件
|
||||
func (c *Config) Load(path string) error {
|
||||
// 读取配置文件
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("配置文件读取失败: %v", err)
|
||||
}
|
||||
|
||||
// 解析YAML配置
|
||||
if err := yaml.Unmarshal(data, c); err != nil {
|
||||
return fmt.Errorf("配置文件解析失败: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDatabaseConnectionString 获取数据库连接字符串
|
||||
func (c *Config) GetDatabaseConnectionString() string {
|
||||
// 构建PostgreSQL连接字符串
|
||||
return fmt.Sprintf(
|
||||
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
|
||||
c.Database.Username,
|
||||
c.Database.Password,
|
||||
c.Database.DBName,
|
||||
c.Database.Host,
|
||||
c.Database.Port,
|
||||
c.Database.SSLMode,
|
||||
)
|
||||
}
|
||||
@@ -1,42 +1,165 @@
|
||||
// Package logs 提供统一的日志记录功能
|
||||
// 支持不同级别的日志记录和格式化输出
|
||||
// Package logs 提供了高度可配置的日志功能,基于 uber-go/zap 实现。
|
||||
// 它支持将日志同时输出到控制台和文件,并提供日志滚动归档功能。
|
||||
// 该包还特别为 Gin 和 GORM 框架提供了开箱即用的日志接管能力。
|
||||
package logs
|
||||
|
||||
import (
|
||||
"log"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/config"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
gormlogger "gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
// Logger 代表日志记录器结构
|
||||
// Logger 是一个封装了 zap.SugaredLogger 的日志记录器。
|
||||
// 它提供了结构化日志记录的各种方法,并实现了 io.Writer 接口以兼容 Gin。
|
||||
type Logger struct {
|
||||
// logger 内部日志记录器
|
||||
logger *log.Logger
|
||||
*zap.SugaredLogger
|
||||
}
|
||||
|
||||
// NewLogger 创建并返回一个新的日志记录器实例
|
||||
func NewLogger() *Logger {
|
||||
return &Logger{
|
||||
logger: log.New(os.Stdout, "", 0),
|
||||
// NewLogger 根据提供的配置创建一个新的 Logger 实例。
|
||||
// 这是实现依赖注入的关键,在应用启动时调用一次。
|
||||
func NewLogger(cfg config.LogConfig) *Logger {
|
||||
// 1. 设置日志编码器
|
||||
encoder := getEncoder(cfg.Format)
|
||||
|
||||
// 2. 设置日志写入器 (支持文件和控制台)
|
||||
writeSyncer := getWriteSyncer(cfg)
|
||||
|
||||
// 3. 设置日志级别
|
||||
level := zap.NewAtomicLevel()
|
||||
if err := level.UnmarshalText([]byte(cfg.Level)); err != nil {
|
||||
level.SetLevel(zap.InfoLevel) // 解析失败则默认为 Info 级别
|
||||
}
|
||||
|
||||
// 4. 创建 Zap 核心
|
||||
core := zapcore.NewCore(encoder, writeSyncer, level)
|
||||
|
||||
// 5. 构建 Logger
|
||||
// zap.AddCaller() 会记录调用日志的代码行
|
||||
// zap.AddCallerSkip(1) 可以向上跳一层调用栈,如果我们将 logger.Info 等方法再封装一层,这个选项会很有用
|
||||
zapLogger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
|
||||
|
||||
return &Logger{zapLogger.Sugar()}
|
||||
}
|
||||
|
||||
// getEncoder 根据指定的格式返回一个 zapcore.Encoder。
|
||||
func getEncoder(format string) zapcore.Encoder {
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder // 时间格式: 2006-01-02T15:04:05.000Z0700
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder // 日志级别大写: INFO
|
||||
|
||||
if format == "json" {
|
||||
return zapcore.NewJSONEncoder(encoderConfig)
|
||||
}
|
||||
// 默认或 "console"
|
||||
return zapcore.NewConsoleEncoder(encoderConfig)
|
||||
}
|
||||
|
||||
// getWriteSyncer 根据配置创建日志写入目标。
|
||||
func getWriteSyncer(cfg config.LogConfig) zapcore.WriteSyncer {
|
||||
writers := []zapcore.WriteSyncer{os.Stdout}
|
||||
|
||||
if cfg.EnableFile {
|
||||
// 使用 lumberjack 实现日志滚动
|
||||
fileWriter := &lumberjack.Logger{
|
||||
Filename: cfg.FilePath,
|
||||
MaxSize: cfg.MaxSize,
|
||||
MaxBackups: cfg.MaxBackups,
|
||||
MaxAge: cfg.MaxAge,
|
||||
Compress: cfg.Compress,
|
||||
}
|
||||
writers = append(writers, zapcore.AddSync(fileWriter))
|
||||
}
|
||||
|
||||
return zapcore.NewMultiWriteSyncer(writers...)
|
||||
}
|
||||
|
||||
// Write 实现了 io.Writer 接口,用于接管 Gin 的默认输出。
|
||||
// Gin 的日志(如 [GIN-debug] Listening and serving HTTP on :8080)会通过这个方法写入。
|
||||
func (l *Logger) Write(p []byte) (n int, err error) {
|
||||
msg := strings.TrimSpace(string(p))
|
||||
if msg != "" {
|
||||
l.Info(msg) // 使用我们自己的 logger 来打印 Gin 的日志
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// --- GORM 日志适配器 ---
|
||||
|
||||
// GormLogger 是一个实现了 gormlogger.Interface 的适配器,
|
||||
// 它将 GORM 的日志重定向到我们的 zap Logger 中。
|
||||
type GormLogger struct {
|
||||
ZapLogger *Logger
|
||||
SlowThreshold time.Duration
|
||||
SkipErrRecordNotFound bool // 是否跳过 "record not found" 错误
|
||||
}
|
||||
|
||||
// NewGormLogger 创建一个新的 GORM 日志记录器实例。
|
||||
func NewGormLogger(zapLogger *Logger) *GormLogger {
|
||||
return &GormLogger{
|
||||
ZapLogger: zapLogger,
|
||||
SlowThreshold: 200 * time.Millisecond, // 慢查询阈值,超过200ms则警告
|
||||
SkipErrRecordNotFound: true, // 通常我们不关心 "record not found" 错误
|
||||
}
|
||||
}
|
||||
|
||||
// Info 记录信息级别日志
|
||||
func (l *Logger) Info(message string) {
|
||||
l.logger.Printf("[信息] %s %s", time.Now().Format(time.RFC3339), message)
|
||||
// LogMode 设置日志模式,这里我们总是使用 zap 的级别控制,所以这个方法可以为空。
|
||||
func (g *GormLogger) LogMode(level gormlogger.LogLevel) gormlogger.Interface {
|
||||
// GORM 的 LogLevel 在这里不起作用,因为我们完全由 Zap 控制
|
||||
return g
|
||||
}
|
||||
|
||||
// Error 记录错误级别日志
|
||||
func (l *Logger) Error(message string) {
|
||||
l.logger.Printf("[错误] %s %s", time.Now().Format(time.RFC3339), message)
|
||||
// Info 打印 Info 级别的日志。
|
||||
func (g *GormLogger) Info(ctx context.Context, msg string, data ...interface{}) {
|
||||
g.ZapLogger.Infof(msg, data...)
|
||||
}
|
||||
|
||||
// Debug 记录调试级别日志
|
||||
func (l *Logger) Debug(message string) {
|
||||
l.logger.Printf("[调试] %s %s", time.Now().Format(time.RFC3339), message)
|
||||
// Warn 打印 Warn 级别的日志。
|
||||
func (g *GormLogger) Warn(ctx context.Context, msg string, data ...interface{}) {
|
||||
g.ZapLogger.Warnf(msg, data...)
|
||||
}
|
||||
|
||||
// Warn 记录警告级别日志
|
||||
func (l *Logger) Warn(message string) {
|
||||
l.logger.Printf("[警告] %s %s", time.Now().Format(time.RFC3339), message)
|
||||
// Error 打印 Error 级别的日志。
|
||||
func (g *GormLogger) Error(ctx context.Context, msg string, data ...interface{}) {
|
||||
g.ZapLogger.Errorf(msg, data...)
|
||||
}
|
||||
|
||||
// Trace 打印 SQL 查询日志,这是 GORM 日志的核心。
|
||||
func (g *GormLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
|
||||
elapsed := time.Since(begin)
|
||||
sql, rows := fc()
|
||||
|
||||
fields := []interface{}{
|
||||
"sql", sql,
|
||||
"rows", rows,
|
||||
"elapsed", fmt.Sprintf("%.3fms", float64(elapsed.Nanoseconds())/1e6),
|
||||
}
|
||||
|
||||
// --- 逻辑修复开始 ---
|
||||
if err != nil {
|
||||
// 如果是 "record not found" 错误且我们配置了跳过,则直接返回
|
||||
if g.SkipErrRecordNotFound && strings.Contains(err.Error(), "record not found") {
|
||||
return
|
||||
}
|
||||
// 否则,记录为错误日志
|
||||
g.ZapLogger.With(fields...).Errorf("[GORM] error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 如果查询时间超过慢查询阈值,则记录警告
|
||||
if g.SlowThreshold != 0 && elapsed > g.SlowThreshold {
|
||||
g.ZapLogger.With(fields...).Warnf("[GORM] slow query")
|
||||
return
|
||||
}
|
||||
|
||||
// 正常情况,记录 Debug 级别的 SQL 查询
|
||||
g.ZapLogger.With(fields...).Debugf("[GORM] trace")
|
||||
// --- 逻辑修复结束 ---
|
||||
}
|
||||
|
||||
128
internal/logs/logs_test.go
Normal file
128
internal/logs/logs_test.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package logs_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/config"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// captureOutput 是一个辅助函数,用于捕获 logger 的输出到内存缓冲区
|
||||
func captureOutput(cfg config.LogConfig) (*logs.Logger, *bytes.Buffer) {
|
||||
var buf bytes.Buffer
|
||||
// 使用一个简单的 Console Encoder 进行测试,方便断言字符串
|
||||
encoderConfig := zap.NewDevelopmentEncoderConfig()
|
||||
encoderConfig.EncodeTime = nil // 忽略时间,避免测试结果不一致
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
encoderConfig.EncodeCaller = nil // 忽略调用者信息
|
||||
encoder := zapcore.NewConsoleEncoder(encoderConfig)
|
||||
|
||||
writer := zapcore.AddSync(&buf)
|
||||
|
||||
level := zap.NewAtomicLevel()
|
||||
_ = level.UnmarshalText([]byte(cfg.Level))
|
||||
|
||||
core := zapcore.NewCore(encoder, writer, level)
|
||||
// 在测试中我们直接操作 zap Logger,而不是通过封装的 NewLogger,以注入内存 writer
|
||||
zapLogger := zap.New(core)
|
||||
|
||||
logger := &logs.Logger{SugaredLogger: zapLogger.Sugar()}
|
||||
return logger, &buf
|
||||
}
|
||||
|
||||
func TestNewLogger(t *testing.T) {
|
||||
t.Run("Constructor does not panic", func(t *testing.T) {
|
||||
// 测试 Console 格式
|
||||
cfgConsole := config.LogConfig{Level: "info", Format: "console"}
|
||||
assert.NotPanics(t, func() { logs.NewLogger(cfgConsole) })
|
||||
|
||||
// 测试 JSON 格式
|
||||
cfgJSON := config.LogConfig{Level: "info", Format: "json"}
|
||||
assert.NotPanics(t, func() { logs.NewLogger(cfgJSON) })
|
||||
|
||||
// 测试文件日志启用
|
||||
// 不实际写入文件,只确保构造函数能正常运行
|
||||
cfgFile := config.LogConfig{
|
||||
Level: "info",
|
||||
EnableFile: true,
|
||||
FilePath: "test.log",
|
||||
}
|
||||
assert.NotPanics(t, func() { logs.NewLogger(cfgFile) })
|
||||
})
|
||||
}
|
||||
|
||||
func TestLogger_Write_ForGin(t *testing.T) {
|
||||
logger, buf := captureOutput(config.LogConfig{Level: "info"})
|
||||
|
||||
ginLog := "[GIN-debug] Listening and serving HTTP on :8080\n"
|
||||
_, err := logger.Write([]byte(ginLog))
|
||||
|
||||
assert.NoError(t, err)
|
||||
output := buf.String()
|
||||
// logger.Write 会将 gin 的日志转为 info 级别
|
||||
assert.Contains(t, output, "INFO")
|
||||
assert.Contains(t, output, strings.TrimSpace(ginLog))
|
||||
}
|
||||
|
||||
func TestGormLogger(t *testing.T) {
|
||||
logger, buf := captureOutput(config.LogConfig{Level: "debug"}) // 设置为 debug 以捕获所有级别
|
||||
gormLogger := logs.NewGormLogger(logger)
|
||||
|
||||
// 模拟 GORM 的 Trace 调用参数
|
||||
ctx := context.Background()
|
||||
sql := "SELECT * FROM users WHERE id = 1"
|
||||
rows := int64(1)
|
||||
fc := func() (string, int64) {
|
||||
return sql, rows
|
||||
}
|
||||
|
||||
t.Run("Slow Query", func(t *testing.T) {
|
||||
buf.Reset()
|
||||
// 模拟一个耗时超过 200ms 的查询
|
||||
begin := time.Now().Add(-300 * time.Millisecond)
|
||||
gormLogger.Trace(ctx, begin, fc, nil)
|
||||
|
||||
output := buf.String()
|
||||
assert.Contains(t, output, "WARN", "应包含 WARN 级别")
|
||||
assert.Contains(t, output, "[GORM] slow query", "应包含慢查询信息")
|
||||
// 修复:不再检查严格的 JSON 格式,只检查关键内容
|
||||
assert.Contains(t, output, "SELECT * FROM users WHERE id = 1", "应包含 SQL 语句")
|
||||
})
|
||||
|
||||
t.Run("Error Query", func(t *testing.T) {
|
||||
buf.Reset()
|
||||
queryError := errors.New("syntax error")
|
||||
gormLogger.Trace(ctx, time.Now(), fc, queryError)
|
||||
|
||||
output := buf.String()
|
||||
assert.Contains(t, output, "ERROR")
|
||||
assert.Contains(t, output, "[GORM] error: syntax error")
|
||||
})
|
||||
|
||||
t.Run("Record Not Found Error is Skipped", func(t *testing.T) {
|
||||
buf.Reset()
|
||||
queryError := errors.New("record not found") // 模拟 GORM 的 RecordNotFound 错误
|
||||
gormLogger.Trace(ctx, time.Now(), fc, queryError)
|
||||
|
||||
// 在修复 logs.go 中的 bug 后,这里应该为空
|
||||
assert.Empty(t, buf.String(), "开启 SkipErrRecordNotFound 后,record not found 错误不应产生任何日志")
|
||||
})
|
||||
|
||||
t.Run("Normal Query", func(t *testing.T) {
|
||||
buf.Reset()
|
||||
// 模拟一个快速查询
|
||||
gormLogger.Trace(ctx, time.Now(), fc, nil)
|
||||
|
||||
output := buf.String()
|
||||
assert.Contains(t, output, "DEBUG") // 正常查询是 Debug 级别
|
||||
assert.Contains(t, output, "[GORM] trace")
|
||||
})
|
||||
}
|
||||
@@ -5,7 +5,6 @@ package task
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -47,14 +46,14 @@ type TaskQueue struct {
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewTaskQueue 创建并返回一个新的任务队列实例
|
||||
func NewTaskQueue() *TaskQueue {
|
||||
// NewTaskQueue 创建并返回一个新的任务队列实例。
|
||||
func NewTaskQueue(logger *logs.Logger) *TaskQueue {
|
||||
pq := make(priorityQueue, 0)
|
||||
heap.Init(&pq)
|
||||
|
||||
return &TaskQueue{
|
||||
queue: &pq,
|
||||
logger: logs.NewLogger(),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +67,7 @@ func (tq *TaskQueue) AddTask(task Task) {
|
||||
priority: task.GetPriority(),
|
||||
}
|
||||
heap.Push(tq.queue, item)
|
||||
tq.logger.Info("任务已添加到队列: " + task.GetID())
|
||||
tq.logger.Infow("任务已添加到队列", "taskID", task.GetID())
|
||||
}
|
||||
|
||||
// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
|
||||
@@ -80,9 +79,8 @@ func (tq *TaskQueue) GetNextTask() Task {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取优先级最高的任务
|
||||
item := heap.Pop(tq.queue).(*taskItem)
|
||||
tq.logger.Info("从队列中获取任务: " + item.task.GetID())
|
||||
tq.logger.Infow("从队列中获取任务", "taskID", item.task.GetID())
|
||||
return item.task
|
||||
}
|
||||
|
||||
@@ -99,7 +97,6 @@ type priorityQueue []*taskItem
|
||||
|
||||
func (pq priorityQueue) Len() int { return len(pq) }
|
||||
|
||||
// Less 优先级小的优先级更高
|
||||
func (pq priorityQueue) Less(i, j int) bool {
|
||||
return pq[i].priority < pq[j].priority
|
||||
}
|
||||
@@ -148,22 +145,22 @@ type Executor struct {
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewExecutor 创建并返回一个新的任务执行器实例
|
||||
func NewExecutor(workers int) *Executor {
|
||||
// NewExecutor 创建并返回一个新的任务执行器实例。
|
||||
func NewExecutor(workers int, logger *logs.Logger) *Executor {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &Executor{
|
||||
taskQueue: NewTaskQueue(),
|
||||
taskQueue: NewTaskQueue(logger), // 将 logger 传递给 TaskQueue
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logger: logs.NewLogger(),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动任务执行器
|
||||
func (e *Executor) Start() {
|
||||
e.logger.Info(fmt.Sprintf("正在启动任务执行器,工作协程数: %d", e.workers))
|
||||
e.logger.Infow("正在启动任务执行器", "workers", e.workers)
|
||||
|
||||
// 启动工作协程
|
||||
for i := 0; i < e.workers; i++ {
|
||||
@@ -190,31 +187,31 @@ func (e *Executor) Stop() {
|
||||
// SubmitTask 提交任务到执行器
|
||||
func (e *Executor) SubmitTask(task Task) {
|
||||
e.taskQueue.AddTask(task)
|
||||
e.logger.Info("任务已提交: " + task.GetID())
|
||||
e.logger.Infow("任务已提交", "taskID", task.GetID())
|
||||
}
|
||||
|
||||
// worker 工作协程
|
||||
func (e *Executor) worker(id int) {
|
||||
defer e.wg.Done()
|
||||
|
||||
e.logger.Info(fmt.Sprintf("工作协程(id = %d)已启动", id))
|
||||
e.logger.Infow("工作协程已启动", "workerID", id)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
e.logger.Info(fmt.Sprintf("工作协程 %d 已停止", id))
|
||||
e.logger.Infow("工作协程已停止", "workerID", id)
|
||||
return
|
||||
default:
|
||||
// 获取下一个任务
|
||||
task := e.taskQueue.GetNextTask()
|
||||
if task != nil {
|
||||
e.logger.Info(fmt.Sprintf("工作协程 %d 正在执行任务: %s", id, task.GetID()))
|
||||
e.logger.Infow("工作协程正在执行任务", "workerID", id, "taskID", task.GetID())
|
||||
|
||||
// 执行任务
|
||||
if err := task.Execute(); err != nil {
|
||||
e.logger.Error("任务执行失败: " + task.GetID() + ", 错误: " + err.Error())
|
||||
e.logger.Errorw("任务执行失败", "workerID", id, "taskID", task.GetID(), "error", err)
|
||||
} else {
|
||||
e.logger.Info("任务执行成功: " + task.GetID())
|
||||
e.logger.Infow("任务执行成功", "workerID", id, "taskID", task.GetID())
|
||||
}
|
||||
} else {
|
||||
// 没有任务时短暂休眠
|
||||
|
||||
Reference in New Issue
Block a user