修改程序入口

This commit is contained in:
2025-11-05 16:00:43 +08:00
parent bee5104661
commit aaa2f1b22f
5 changed files with 129 additions and 106 deletions

View File

@@ -6,19 +6,19 @@
- **`internal/infra/logs/logs.go` (`logs.Logger` - 内部实现,非注入)** - **`internal/infra/logs/logs.go` (`logs.Logger` - 内部实现,非注入)**
- **核心包级函数实现 (根据 `implementation.md` 描述)**: - **核心包级函数实现 (根据 `implementation.md` 描述)**:
- [ ] 实现 `AddCompName(ctx context.Context, compName string) context.Context` - [x] 实现 `AddCompName(ctx context.Context, compName string) context.Context`
- [ ] 实现 - [x] 实现
`AddFuncName(upstreamCtx context.Context, selfCtx context.Context, funcName string) context.Context` `AddFuncName(upstreamCtx context.Context, selfCtx context.Context, funcName string) context.Context`
- [ ] 实现 `GetLogger(ctx context.Context) *Logger` - [x] 实现 `GetLogger(ctx context.Context) *Logger`
- [ ] 实现 - [x] 实现
`Trace(upstreamCtx context.Context, selfCtx context.Context, funcName string) (context.Context, *Logger)` `Trace(upstreamCtx context.Context, selfCtx context.Context, funcName string) (context.Context, *Logger)`
- **`Logger` 结构体改造**: - **`Logger` 结构体改造**:
- [ ] 无 - [x] 无
- **`GormLogger` 改造**: - **`GormLogger` 改造**:
- [ ] 修改 `GormLogger.Info` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。 - [x] 修改 `GormLogger.Info` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。
- [ ] 修改 `GormLogger.Warn` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。 - [x] 修改 `GormLogger.Warn` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。
- [ ] 修改 `GormLogger.Error` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。 - [x] 修改 `GormLogger.Error` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。
- [ ] 修改 `GormLogger.Trace` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。特别是 - [x] 修改 `GormLogger.Trace` 方法,从传入的 `ctx` 中获取 `logger` 实例,并使用该实例进行日志记录。特别是
`With(fields...)` 的调用需要调整。 `With(fields...)` 的调用需要调整。
--- ---
@@ -26,16 +26,16 @@
#### 2. 依赖注入与结构体改造 #### 2. 依赖注入与结构体改造
- **`internal/core/application.go`**: - **`internal/core/application.go`**:
- [ ] 移除 `Application` 结构体中的 `Logger *logs.Logger` 成员。 - [x] 移除 `Application` 结构体中的 `Logger *logs.Logger` 成员。
- [ ] 修改 `NewApplication` 函数,使其不再创建 `logger`,而是创建根 `context.Background()` - [x] 修改 `NewApplication` 函数,使其不再创建 `logger`,而是创建根 `context.Background()`
- [ ] 调整 `NewApplication`,将根 `context` 传递给 `initInfrastructure`, `initDomainServices`, `initAppServices` - [x] 调整 `NewApplication`,将根 `context` 传递给 `initInfrastructure`, `initDomainServices`, `initAppServices`
`api.NewAPI` `api.NewAPI`
- [ ] 移除 `Application` 结构体中所有对 `app.Logger` 的直接调用,改为通过 `context` 获取 `Logger` - [x] 移除 `Application` 结构体中所有对 `app.Logger` 的直接调用,改为通过 `context` 获取 `Logger`
- **`internal/core/component_initializers.go`**: - **`internal/core/component_initializers.go`**:
- [ ] **修改所有组件结构体定义**: 遍历所有相关组件Controllers, Services, Repositories 等),将其结构体中的 - [x] **修改所有组件结构体定义**: 遍历所有相关组件Controllers, Services, Repositories 等),将其结构体中的
`logger *logs.Logger` 成员变量替换为 `selfCtx context.Context` `logger *logs.Logger` 成员变量替换为 `selfCtx context.Context`
- [ ] **重构所有 `init...` 函数**: - [x] **重构所有 `init...` 函数**:
- 移除所有 `logger *logs.Logger` 参数,改为接收 `ctx context.Context` - 移除所有 `logger *logs.Logger` 参数,改为接收 `ctx context.Context`
- 在每个 `init...` 函数内部,为即将创建的组件生成其专属的 `selfCtx`。例如: - 在每个 `init...` 函数内部,为即将创建的组件生成其专属的 `selfCtx`。例如:
`selfCtx := logs.AddCompName(ctx, 'ComponentName')` `selfCtx := logs.AddCompName(ctx, 'ComponentName')`

View File

@@ -1,6 +1,7 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
@@ -14,7 +15,7 @@ import (
// Application 是整个应用的核心,封装了所有组件和生命周期。 // Application 是整个应用的核心,封装了所有组件和生命周期。
type Application struct { type Application struct {
Config *config.Config Config *config.Config
Logger *logs.Logger Ctx context.Context
API *api.API API *api.API
Infra *Infrastructure Infra *Infrastructure
@@ -30,20 +31,24 @@ func NewApplication(configPath string) (*Application, error) {
if err := cfg.Load(configPath); err != nil { if err := cfg.Load(configPath); err != nil {
return nil, fmt.Errorf("无法加载配置: %w", err) return nil, fmt.Errorf("无法加载配置: %w", err)
} }
logger := logs.NewLogger(cfg.Log) // 初始化全局日志记录器
logs.InitDefaultLogger(cfg.Log)
// 为 Application 本身创建 Ctx
selfCtx := logs.AddCompName(context.Background(), "Application")
ctx := logs.AddFuncName(selfCtx, selfCtx, "NewApplication")
// 2. 初始化所有分层服务 // 2. 初始化所有分层服务
infra, err := initInfrastructure(cfg, logger) infra, err := initInfrastructure(ctx, cfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("初始化基础设施失败: %w", err) return nil, fmt.Errorf("初始化基础设施失败: %w", err)
} }
domain := initDomainServices(cfg, infra, logger) domain := initDomainServices(ctx, cfg, infra)
appServices := initAppServices(infra, domain, logger) appServices := initAppServices(ctx, infra, domain)
// 3. 初始化 API 入口点 // 3. 初始化 API 入口点
apiServer := api.NewAPI( apiServer := api.NewAPI(
cfg.Server, cfg.Server,
logger,
infra.repos.userRepo, infra.repos.userRepo,
appServices.pigFarmService, appServices.pigFarmService,
appServices.pigBatchService, appServices.pigBatchService,
@@ -59,7 +64,7 @@ func NewApplication(configPath string) (*Application, error) {
// 4. 组装 Application 对象 // 4. 组装 Application 对象
app := &Application{ app := &Application{
Config: cfg, Config: cfg,
Logger: logger, Ctx: selfCtx,
API: apiServer, API: apiServer,
Infra: infra, Infra: infra,
Domain: domain, Domain: domain,
@@ -71,7 +76,8 @@ func NewApplication(configPath string) (*Application, error) {
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
func (app *Application) Start() error { func (app *Application) Start() error {
app.Logger.Info("应用启动中...") startCtx, logger := logs.Trace(app.Ctx, app.Ctx, "Start")
logger.Info("应用启动中...")
// 1. 启动底层监听器 // 1. 启动底层监听器
if err := app.Infra.lora.loraListener.Listen(); err != nil { if err := app.Infra.lora.loraListener.Listen(); err != nil {
@@ -79,7 +85,7 @@ func (app *Application) Start() error {
} }
// 2. 初始化应用状态 (清理、刷新任务等) // 2. 初始化应用状态 (清理、刷新任务等)
if err := app.initializeState(); err != nil { if err := app.initializeState(startCtx); err != nil {
return fmt.Errorf("初始化应用状态失败: %w", err) return fmt.Errorf("初始化应用状态失败: %w", err)
} }
@@ -100,7 +106,8 @@ func (app *Application) Start() error {
// Stop 优雅地关闭应用的所有组件。 // Stop 优雅地关闭应用的所有组件。
func (app *Application) Stop() error { func (app *Application) Stop() error {
app.Logger.Info("应用关闭中...") logger := logs.TraceLogger(app.Ctx, app.Ctx, "Stop")
logger.Info("应用关闭中...")
// 关闭 API 服务器 // 关闭 API 服务器
app.API.Stop() app.API.Stop()
@@ -110,17 +117,17 @@ func (app *Application) Stop() error {
// 断开数据库连接 // 断开数据库连接
if err := app.Infra.storage.Disconnect(); err != nil { if err := app.Infra.storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err) logger.Errorw("数据库连接断开失败", "error", err)
} }
// 关闭 LoRa Mesh 监听器 // 关闭 LoRa Mesh 监听器
if err := app.Infra.lora.loraListener.Stop(); err != nil { if err := app.Infra.lora.loraListener.Stop(); err != nil {
app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err) logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err)
} }
// 刷新日志缓冲区 // 刷新日志缓冲区
_ = app.Logger.Sync() _ = logger.Sync()
app.Logger.Info("应用已成功关闭") logger.Info("应用已成功关闭")
return nil return nil
} }

View File

@@ -1,6 +1,7 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@@ -34,20 +35,20 @@ type Infrastructure struct {
} }
// initInfrastructure 初始化所有基础设施层组件。 // initInfrastructure 初始化所有基础设施层组件。
func initInfrastructure(cfg *config.Config, logger *logs.Logger) (*Infrastructure, error) { func initInfrastructure(ctx context.Context, cfg *config.Config) (*Infrastructure, error) {
storage, err := initStorage(cfg.Database, logger) storage, err := initStorage(ctx, cfg.Database)
if err != nil { if err != nil {
return nil, err return nil, err
} }
repos := initRepositories(storage.GetDB(), logger) repos := initRepositories(ctx, storage.GetDB())
lora, err := initLora(cfg, logger, repos) lora, err := initLora(ctx, cfg, repos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
notifyService, err := initNotifyService(cfg.Notify, logger, repos.userRepo, repos.notificationRepo) notifyService, err := initNotifyService(ctx, cfg.Notify, repos.userRepo, repos.notificationRepo)
if err != nil { if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err) return nil, fmt.Errorf("初始化通知服务失败: %w", err)
} }
@@ -90,30 +91,31 @@ type Repositories struct {
} }
// initRepositories 初始化所有的仓库。 // initRepositories 初始化所有的仓库。
func initRepositories(db *gorm.DB, logger *logs.Logger) *Repositories { func initRepositories(ctx context.Context, db *gorm.DB) *Repositories {
baseCtx := context.Background()
return &Repositories{ return &Repositories{
userRepo: repository.NewGormUserRepository(db), userRepo: repository.NewGormUserRepository(db, logs.AddCompName(baseCtx, "UserRepo")),
deviceRepo: repository.NewGormDeviceRepository(db), deviceRepo: repository.NewGormDeviceRepository(db, logs.AddCompName(baseCtx, "DeviceRepo")),
areaControllerRepo: repository.NewGormAreaControllerRepository(db), areaControllerRepo: repository.NewGormAreaControllerRepository(db, logs.AddCompName(baseCtx, "AreaControllerRepo")),
deviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db), deviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db, logs.AddCompName(baseCtx, "DeviceTemplateRepo")),
planRepo: repository.NewGormPlanRepository(db), planRepo: repository.NewGormPlanRepository(db, logs.AddCompName(baseCtx, "PlanRepo")),
pendingTaskRepo: repository.NewGormPendingTaskRepository(db), pendingTaskRepo: repository.NewGormPendingTaskRepository(db, logs.AddCompName(baseCtx, "PendingTaskRepo")),
executionLogRepo: repository.NewGormExecutionLogRepository(db), executionLogRepo: repository.NewGormExecutionLogRepository(db, logs.AddCompName(baseCtx, "ExecutionLogRepo")),
sensorDataRepo: repository.NewGormSensorDataRepository(db), sensorDataRepo: repository.NewGormSensorDataRepository(db, logs.AddCompName(baseCtx, "SensorDataRepo")),
deviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db), deviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db, logs.AddCompName(baseCtx, "DeviceCommandLogRepo")),
pendingCollectionRepo: repository.NewGormPendingCollectionRepository(db), pendingCollectionRepo: repository.NewGormPendingCollectionRepository(db, logs.AddCompName(baseCtx, "PendingCollectionRepo")),
userActionLogRepo: repository.NewGormUserActionLogRepository(db), userActionLogRepo: repository.NewGormUserActionLogRepository(db, logs.AddCompName(baseCtx, "UserActionLogRepo")),
pigBatchRepo: repository.NewGormPigBatchRepository(db), pigBatchRepo: repository.NewGormPigBatchRepository(db, logs.AddCompName(baseCtx, "PigBatchRepo")),
pigBatchLogRepo: repository.NewGormPigBatchLogRepository(db), pigBatchLogRepo: repository.NewGormPigBatchLogRepository(db, logs.AddCompName(baseCtx, "PigBatchLogRepo")),
pigFarmRepo: repository.NewGormPigFarmRepository(db), pigFarmRepo: repository.NewGormPigFarmRepository(db, logs.AddCompName(baseCtx, "PigFarmRepo")),
pigPenRepo: repository.NewGormPigPenRepository(db), pigPenRepo: repository.NewGormPigPenRepository(db, logs.AddCompName(baseCtx, "PigPenRepo")),
pigTransferLogRepo: repository.NewGormPigTransferLogRepository(db), pigTransferLogRepo: repository.NewGormPigTransferLogRepository(db, logs.AddCompName(baseCtx, "PigTransferLogRepo")),
pigTradeRepo: repository.NewGormPigTradeRepository(db), pigTradeRepo: repository.NewGormPigTradeRepository(db, logs.AddCompName(baseCtx, "PigTradeRepo")),
pigSickPigLogRepo: repository.NewGormPigSickLogRepository(db), pigSickPigLogRepo: repository.NewGormPigSickLogRepository(db, logs.AddCompName(baseCtx, "PigSickPigLogRepo")),
medicationLogRepo: repository.NewGormMedicationLogRepository(db), medicationLogRepo: repository.NewGormMedicationLogRepository(db, logs.AddCompName(baseCtx, "MedicationLogRepo")),
rawMaterialRepo: repository.NewGormRawMaterialRepository(db), rawMaterialRepo: repository.NewGormRawMaterialRepository(db, logs.AddCompName(baseCtx, "RawMaterialRepo")),
notificationRepo: repository.NewGormNotificationRepository(db), notificationRepo: repository.NewGormNotificationRepository(db, logs.AddCompName(baseCtx, "NotificationRepo")),
unitOfWork: repository.NewGormUnitOfWork(db, logger), unitOfWork: repository.NewGormUnitOfWork(db, logs.AddCompName(baseCtx, "UnitOfWork")),
} }
} }
@@ -131,7 +133,10 @@ type DomainServices struct {
} }
// initDomainServices 初始化所有的领域服务。 // initDomainServices 初始化所有的领域服务。
func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.Logger) *DomainServices { func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastructure) *DomainServices {
logger := logs.GetLogger(ctx)
baseCtx := context.Background()
// 猪群管理相关 // 猪群管理相关
pigPenTransferManager := pig.NewPigPenTransferManager(infra.repos.pigPenRepo, infra.repos.pigTransferLogRepo, infra.repos.pigBatchRepo) pigPenTransferManager := pig.NewPigPenTransferManager(infra.repos.pigPenRepo, infra.repos.pigTransferLogRepo, infra.repos.pigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(infra.repos.pigTradeRepo) pigTradeManager := pig.NewPigTradeManager(infra.repos.pigTradeRepo)
@@ -144,15 +149,15 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
infra.repos.deviceRepo, infra.repos.deviceRepo,
infra.repos.deviceCommandLogRepo, infra.repos.deviceCommandLogRepo,
infra.repos.pendingCollectionRepo, infra.repos.pendingCollectionRepo,
logger, logs.AddCompName(baseCtx, "GeneralDeviceService"),
infra.lora.comm, infra.lora.comm,
) )
// 任务工厂 // 任务工厂
taskFactory := task.NewTaskFactory(logger, infra.repos.sensorDataRepo, infra.repos.deviceRepo, generalDeviceService) taskFactory := task.NewTaskFactory(logs.AddCompName(baseCtx, "TaskFactory"), infra.repos.sensorDataRepo, infra.repos.deviceRepo, generalDeviceService)
// 计划任务管理器 // 计划任务管理器
analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(infra.repos.planRepo, infra.repos.pendingTaskRepo, infra.repos.executionLogRepo, logger) analysisPlanTaskManager := plan.NewAnalysisPlanTaskManager(infra.repos.planRepo, infra.repos.pendingTaskRepo, infra.repos.executionLogRepo, logs.AddCompName(baseCtx, "AnalysisPlanTaskManager"))
// 任务执行器 // 任务执行器
planExecutionManager := plan.NewPlanExecutionManager( planExecutionManager := plan.NewPlanExecutionManager(
@@ -163,7 +168,7 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
infra.repos.planRepo, infra.repos.planRepo,
analysisPlanTaskManager, analysisPlanTaskManager,
taskFactory, taskFactory,
logger, logs.AddCompName(baseCtx, "PlanExecutionManager"),
generalDeviceService, generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second, time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers, cfg.Task.NumWorkers,
@@ -177,7 +182,7 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
infra.repos.deviceRepo, infra.repos.deviceRepo,
infra.repos.unitOfWork, infra.repos.unitOfWork,
taskFactory, taskFactory,
logger) logs.AddCompName(baseCtx, "PlanService"))
return &DomainServices{ return &DomainServices{
pigPenTransferManager: pigPenTransferManager, pigPenTransferManager: pigPenTransferManager,
@@ -204,9 +209,10 @@ type AppServices struct {
} }
// initAppServices 初始化所有的应用服务。 // initAppServices 初始化所有的应用服务。
func initAppServices(infra *Infrastructure, domainServices *DomainServices, logger *logs.Logger) *AppServices { func initAppServices(ctx context.Context, infra *Infrastructure, domainServices *DomainServices) *AppServices {
pigFarmService := service.NewPigFarmService(infra.repos.pigFarmRepo, infra.repos.pigPenRepo, infra.repos.pigBatchRepo, domainServices.pigBatchDomain, infra.repos.unitOfWork, logger) baseCtx := context.Background()
pigBatchService := service.NewPigBatchService(domainServices.pigBatchDomain, logger) pigFarmService := service.NewPigFarmService(infra.repos.pigFarmRepo, infra.repos.pigPenRepo, infra.repos.pigBatchRepo, domainServices.pigBatchDomain, infra.repos.unitOfWork, logs.AddCompName(baseCtx, "PigFarmService"))
pigBatchService := service.NewPigBatchService(domainServices.pigBatchDomain, logs.AddCompName(baseCtx, "PigBatchService"))
monitorService := service.NewMonitorService( monitorService := service.NewMonitorService(
infra.repos.sensorDataRepo, infra.repos.sensorDataRepo,
infra.repos.deviceCommandLogRepo, infra.repos.deviceCommandLogRepo,
@@ -229,9 +235,9 @@ func initAppServices(infra *Infrastructure, domainServices *DomainServices, logg
infra.repos.deviceTemplateRepo, infra.repos.deviceTemplateRepo,
domainServices.generalDeviceService, domainServices.generalDeviceService,
) )
auditService := audit.NewService(infra.repos.userActionLogRepo, logger) auditService := audit.NewService(infra.repos.userActionLogRepo, logs.AddCompName(baseCtx, "AuditService"))
planService := service.NewPlanService(logger, domainServices.planService) planService := service.NewPlanService(logs.AddCompName(baseCtx, "AppPlanService"), domainServices.planService)
userService := service.NewUserService(infra.repos.userRepo, infra.tokenService, infra.notifyService, logger) userService := service.NewUserService(infra.repos.userRepo, infra.tokenService, infra.notifyService, logs.AddCompName(baseCtx, "UserService"))
return &AppServices{ return &AppServices{
pigFarmService: pigFarmService, pigFarmService: pigFarmService,
@@ -253,23 +259,25 @@ type LoraComponents struct {
// initLora 根据配置初始化 LoRa 相关组件。 // initLora 根据配置初始化 LoRa 相关组件。
func initLora( func initLora(
ctx context.Context,
cfg *config.Config, cfg *config.Config,
logger *logs.Logger,
repos *Repositories, repos *Repositories,
) (*LoraComponents, error) { ) (*LoraComponents, error) {
var listenHandler webhook.ListenHandler var listenHandler webhook.ListenHandler
var comm transport.Communicator var comm transport.Communicator
var loraListener transport.Listener var loraListener transport.Listener
baseCtx := context.Background()
logger := logs.GetLogger(ctx)
if cfg.Lora.Mode == config.LoraMode_LoRaWAN { if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。") logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, repos.sensorDataRepo, repos.deviceRepo, repos.areaControllerRepo, repos.deviceCommandLogRepo, repos.pendingCollectionRepo) listenHandler = webhook.NewChirpStackListener(logs.AddCompName(baseCtx, "ChirpStackListener"), repos.sensorDataRepo, repos.deviceRepo, repos.areaControllerRepo, repos.deviceCommandLogRepo, repos.pendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger) comm = lora.NewChirpStackTransport(cfg.ChirpStack, logs.AddCompName(baseCtx, "ChirpStackTransport"))
loraListener = lora.NewPlaceholderTransport(logger) loraListener = lora.NewPlaceholderTransport(logs.AddCompName(baseCtx, "PlaceholderTransport"))
} else { } else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。") logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger) listenHandler = webhook.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, repos.areaControllerRepo, repos.pendingCollectionRepo, repos.deviceRepo, repos.sensorDataRepo) tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logs.AddCompName(baseCtx, "LoRaMeshTransport"), repos.areaControllerRepo, repos.pendingCollectionRepo, repos.deviceRepo, repos.sensorDataRepo)
if err != nil { if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err) return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
} }
@@ -287,17 +295,19 @@ func initLora(
// initNotifyService 根据配置初始化并返回一个通知领域服务。 // initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 // 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService( func initNotifyService(
ctx context.Context,
cfg config.NotifyConfig, cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository, userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository, notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) { ) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier var availableNotifiers []notify.Notifier
logger := logs.GetLogger(ctx)
baseCtx := context.Background()
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log) logNotifier := notify.NewLogNotifier(logs.AddCompName(baseCtx, "LogNotifier"))
availableNotifiers = append(availableNotifiers, logNotifier) availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") logger.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled { if cfg.SMTP.Enabled {
@@ -309,7 +319,7 @@ func initNotifyService(
cfg.SMTP.Sender, cfg.SMTP.Sender,
) )
availableNotifiers = append(availableNotifiers, smtpNotifier) availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用") logger.Info("SMTP通知器已启用")
} }
if cfg.WeChat.Enabled { if cfg.WeChat.Enabled {
@@ -319,7 +329,7 @@ func initNotifyService(
cfg.WeChat.Secret, cfg.WeChat.Secret,
) )
availableNotifiers = append(availableNotifiers, wechatNotifier) availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用") logger.Info("企业微信通知器已启用")
} }
if cfg.Lark.Enabled { if cfg.Lark.Enabled {
@@ -328,7 +338,7 @@ func initNotifyService(
cfg.Lark.AppSecret, cfg.Lark.AppSecret,
) )
availableNotifiers = append(availableNotifiers, larkNotifier) availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用") logger.Info("飞书通知器已启用")
} }
// 3. 动态确定首选通知器 // 3. 动态确定首选通知器
@@ -346,12 +356,12 @@ func initNotifyService(
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用) // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil { if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) logger.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
} }
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService( notifyService, err := domain_notify.NewFailoverService(
log, logs.AddCompName(baseCtx, "FailoverNotifyService"),
userRepo, userRepo,
availableNotifiers, availableNotifiers,
primaryNotifier.Type(), primaryNotifier.Type(),
@@ -362,14 +372,14 @@ func initNotifyService(
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
} }
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) logger.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil return notifyService, nil
} }
// initStorage 封装了数据库的初始化、连接和迁移逻辑。 // initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { func initStorage(ctx context.Context, cfg config.DatabaseConfig) (database.Storage, error) {
// 创建存储实例 // 创建存储实例
storage := database.NewStorage(cfg, logger) storage := database.NewStorage(cfg, logs.AddCompName(context.Background(), "Storage"))
if err := storage.Connect(); err != nil { if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回 // 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err) return nil, fmt.Errorf("数据库连接失败: %w", err)
@@ -380,6 +390,6 @@ func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Stora
return nil, fmt.Errorf("数据库迁移失败: %w", err) return nil, fmt.Errorf("数据库迁移失败: %w", err)
} }
logger.Info("数据库初始化完成。") logs.GetLogger(ctx).Info("数据库初始化完成。")
return storage, nil return storage, nil
} }

View File

@@ -1,8 +1,10 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
) )
@@ -14,19 +16,20 @@ const (
// initializeState 在应用启动时准备其初始数据状态。 // initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 // 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error { func (app *Application) initializeState(ctx context.Context) error {
newCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeState")
// 初始化预定义系统计划 (致命错误) // 初始化预定义系统计划 (致命错误)
if err := app.initializeSystemPlans(); err != nil { if err := app.initializeSystemPlans(ctx); err != nil {
return fmt.Errorf("初始化预定义系统计划失败: %w", err) return fmt.Errorf("初始化预定义系统计划失败: %w", err)
} }
// 清理待采集任务 (非致命错误) // 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil { if err := app.initializePendingCollections(newCtx); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
} }
// 初始化待执行任务列表 (致命错误) // 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil { if err := app.initializePendingTasks(newCtx); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err) return fmt.Errorf("初始化待执行任务列表失败: %w", err)
} }
@@ -34,8 +37,9 @@ func (app *Application) initializeState() error {
} }
// initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。 // initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。
func (app *Application) initializeSystemPlans() error { func (app *Application) initializeSystemPlans(ctx context.Context) error {
app.Logger.Info("开始检查并更新预定义的系统计划...") logger := logs.TraceLogger(ctx, app.Ctx, "InitializeSystemPlans")
logger.Info("开始检查并更新预定义的系统计划...")
// 动态构建预定义计划列表 // 动态构建预定义计划列表
predefinedSystemPlans := app.getPredefinedSystemPlans() predefinedSystemPlans := app.getPredefinedSystemPlans()
@@ -60,7 +64,7 @@ func (app *Application) initializeSystemPlans() error {
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新 // 如果计划存在,则进行无差别更新
app.Logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
// 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划 // 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划
predefinedPlan.ID = foundExistingPlan.ID predefinedPlan.ID = foundExistingPlan.ID
@@ -69,20 +73,20 @@ func (app *Application) initializeSystemPlans() error {
if err := app.Infra.repos.planRepo.UpdatePlan(predefinedPlan); err != nil { if err := app.Infra.repos.planRepo.UpdatePlan(predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) return fmt.Errorf("更新预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else { } else {
app.Logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} }
} else { } else {
// 如果计划不存在, 则创建 // 如果计划不存在, 则创建
app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(predefinedPlan); err != nil { if err := app.Infra.repos.planRepo.CreatePlan(predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else { } else {
app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
} }
} }
} }
app.Logger.Info("预定义系统计划检查完成。") logger.Info("预定义系统计划检查完成。")
return nil return nil
} }
@@ -119,25 +123,26 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan {
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。
func (app *Application) initializePendingCollections() error { func (app *Application) initializePendingCollections(ctx context.Context) error {
app.Logger.Info("开始清理所有未完成的采集请求...") logger := logs.TraceLogger(ctx, app.Ctx, "InitializePendingCollections")
logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。 // 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.Infra.repos.pendingCollectionRepo.MarkAllPendingAsTimedOut() count, err := app.Infra.repos.pendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil { if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err) return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 { } else if count > 0 {
app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
} else { } else {
app.Logger.Info("没有需要清理的采集请求。") logger.Info("没有需要清理的采集请求。")
} }
return nil return nil
} }
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks() error { func (app *Application) initializePendingTasks(ctx context.Context) error {
logger := app.Logger logger := logs.TraceLogger(ctx, app.Ctx, "InitializePendingTasks")
planRepo := app.Infra.repos.planRepo planRepo := app.Infra.repos.planRepo
pendingTaskRepo := app.Infra.repos.pendingTaskRepo pendingTaskRepo := app.Infra.repos.pendingTaskRepo
executionLogRepo := app.Infra.repos.executionLogRepo executionLogRepo := app.Infra.repos.executionLogRepo

View File

@@ -4,6 +4,7 @@ import (
"log" "log"
"git.huangwc.com/pig/pig-farm-controller/internal/core" "git.huangwc.com/pig/pig-farm-controller/internal/core"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
) )
func main() { func main() {
@@ -21,6 +22,6 @@ func main() {
if err := app.Start(); err != nil { if err := app.Start(); err != nil {
// 如果 Start 过程(主要是优雅关闭阶段)出现错误, // 如果 Start 过程(主要是优雅关闭阶段)出现错误,
// 此时我们的 logger 已经是可用的,可以用它来记录错误。 // 此时我们的 logger 已经是可用的,可以用它来记录错误。
app.Logger.Errorf("应用启动或关闭时发生错误: %v", err) logs.GetLogger(app.Ctx).Errorf("应用启动或关闭时发生错误: %v", err)
} }
} }