diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 835bf82..5200b7e 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -43,7 +43,7 @@ type API struct { engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 logger *logs.Logger // 日志记录器,用于输出日志信息 userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 - tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析 + tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 auditService audit.Service // 审计服务,用于记录用户操作 httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig @@ -69,7 +69,7 @@ func NewAPI(cfg config.ServerConfig, pigFarmService service.PigFarmService, pigBatchService service.PigBatchService, monitorService service.MonitorService, - tokenService token.TokenService, + tokenService token.Service, auditService audit.Service, notifyService domain_notify.Service, deviceService domain_device.Service, diff --git a/internal/app/controller/user/user_controller.go b/internal/app/controller/user/user_controller.go index 5fca24a..6ddd6d9 100644 --- a/internal/app/controller/user/user_controller.go +++ b/internal/app/controller/user/user_controller.go @@ -20,7 +20,7 @@ import ( type Controller struct { userRepo repository.UserRepository monitorService service.MonitorService - tokenService token.TokenService + tokenService token.Service notifyService domain_notify.Service logger *logs.Logger } @@ -30,7 +30,7 @@ func NewController( userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, - tokenService token.TokenService, + tokenService token.Service, notifyService domain_notify.Service, ) *Controller { return &Controller{ diff --git a/internal/app/middleware/auth.go b/internal/app/middleware/auth.go index e370954..d0fbdc1 100644 --- a/internal/app/middleware/auth.go +++ b/internal/app/middleware/auth.go @@ -14,7 +14,7 @@ import ( // AuthMiddleware 创建一个Gin中间件,用于JWT身份验证 // 它依赖于 TokenService 来解析和验证 token,并使用 UserRepository 来获取完整的用户信息 -func AuthMiddleware(tokenService token.TokenService, userRepo repository.UserRepository) gin.HandlerFunc { +func AuthMiddleware(tokenService token.Service, userRepo repository.UserRepository) gin.HandlerFunc { return func(c *gin.Context) { // 从 Authorization header 获取 token authHeader := c.GetHeader("Authorization") diff --git a/internal/core/application.go b/internal/core/application.go index 454beee..fcc0904 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -5,346 +5,97 @@ import ( "os" "os/signal" "syscall" - "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" - "git.huangwc.com/pig/pig-farm-controller/internal/app/service" - "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" - domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 type Application struct { - Config *config.Config - Logger *logs.Logger - Storage database.Storage - Executor *task.Scheduler - API *api.API - Collector collection.Collector + Config *config.Config + Logger *logs.Logger + API *api.API - // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 - planRepo repository.PlanRepository - pendingTaskRepo repository.PendingTaskRepository - executionLogRepo repository.ExecutionLogRepository - pendingCollectionRepo repository.PendingCollectionRepository - analysisPlanTaskManager *task.AnalysisPlanTaskManager - - // Lora Mesh 监听器 - loraMeshCommunicator transport.Listener - - // 通知服务 - NotifyService domain_notify.Service + Infra *Infrastructure + Domain *DomainServices + App *AppServices } // NewApplication 创建并初始化一个新的 Application 实例。 // 这是应用的“组合根”,所有依赖都在这里被创建和注入。 func NewApplication(configPath string) (*Application, error) { - // 加载配置 + // 1. 初始化基本组件: 配置和日志 cfg := config.NewConfig() if err := cfg.Load(configPath); err != nil { return nil, fmt.Errorf("无法加载配置: %w", err) } - - // 初始化日志记录器 logger := logs.NewLogger(cfg.Log) - // 初始化数据库存储 - storage, err := initStorage(cfg.Database, logger) + // 2. 初始化所有分层服务 + infra, err := initInfrastructure(cfg, logger) if err != nil { - return nil, err // 错误已在 initStorage 中被包装 + return nil, fmt.Errorf("初始化基础设施失败: %w", err) } + domain := initDomainServices(cfg, infra, logger) + appServices := initAppServices(infra, domain, logger) - // 初始化 Token 服务 - tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) - - // --- 仓库对象初始化 --- - userRepo := repository.NewGormUserRepository(storage.GetDB()) - deviceRepo := repository.NewGormDeviceRepository(storage.GetDB()) - areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB()) - deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB()) - planRepo := repository.NewGormPlanRepository(storage.GetDB()) - pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB()) - executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) - sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) - deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) - pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB()) - userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB()) - pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB()) - pigBatchLogRepo := repository.NewGormPigBatchLogRepository(storage.GetDB()) - pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB()) - pigPenRepo := repository.NewGormPigPenRepository(storage.GetDB()) - pigTransferLogRepo := repository.NewGormPigTransferLogRepository(storage.GetDB()) - pigTradeRepo := repository.NewGormPigTradeRepository(storage.GetDB()) - pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB()) - medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB()) - rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB()) - notificationRepo := repository.NewGormNotificationRepository(storage.GetDB()) - - // 初始化事务管理器 - unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger) - - // 初始化猪群管理领域 - pigPenTransferManager := pig.NewPigPenTransferManager(pigPenRepo, pigTransferLogRepo, pigBatchRepo) - pigTradeManager := pig.NewPigTradeManager(pigTradeRepo) - pigSickManager := pig.NewSickPigManager(pigSickPigLogRepo, medicationLogRepo) - pigBatchDomain := pig.NewPigBatchService(pigBatchRepo, pigBatchLogRepo, unitOfWork, - pigPenTransferManager, pigTradeManager, pigSickManager) - - // --- 业务逻辑处理器初始化 --- - pigFarmService := service.NewPigFarmService(pigFarmRepo, pigPenRepo, pigBatchRepo, pigBatchDomain, unitOfWork, logger) - pigBatchService := service.NewPigBatchService(pigBatchDomain, logger) - monitorService := service.NewMonitorService( - sensorDataRepo, - deviceCommandLogRepo, - executionLogRepo, - pendingCollectionRepo, - userActionLogRepo, - rawMaterialRepo, - medicationLogRepo, - pigBatchRepo, - pigBatchLogRepo, - pigTransferLogRepo, - pigSickPigLogRepo, - pigTradeRepo, - notificationRepo, - ) - - // 初始化审计服务 - auditService := audit.NewService(userActionLogRepo, logger) - - // 初始化通知服务 - notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo) - if err != nil { - return nil, fmt.Errorf("初始化通知服务失败: %w", err) - } - - // --- 初始化 LoRa 相关组件 --- - var listenHandler webhook.ListenHandler - var comm transport.Communicator - var loraListener transport.Listener - - if cfg.Lora.Mode == config.LoraMode_LoRaWAN { - logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。") - listenHandler = webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo) - comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger) - loraListener = lora.NewPlaceholderTransport(logger) - } else { - logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。") - listenHandler = webhook.NewPlaceholderListener(logger) - tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo) - loraListener = tp - comm = tp - if err != nil { - return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err) - } - } - - // 初始化计划触发器管理器 - analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) - - // 初始化通用设备服务 - generalDeviceService := device.NewGeneralDeviceService( - deviceRepo, - deviceCommandLogRepo, - pendingCollectionRepo, - logger, - comm, - ) - - // 初始化任务执行器 - executor := task.NewScheduler( - pendingTaskRepo, - executionLogRepo, - deviceRepo, - sensorDataRepo, - planRepo, - analysisPlanTaskManager, - logger, - generalDeviceService, - time.Duration(cfg.Task.Interval)*time.Second, - cfg.Task.NumWorkers, - ) - - // --- 初始化定时采集器 --- - timedCollector := collection.NewTimedCollector( - deviceRepo, - generalDeviceService, - logger, - time.Duration(cfg.Collection.Interval)*time.Second, - ) - - // 初始化 API 服务器 + // 3. 初始化 API 入口点 apiServer := api.NewAPI( cfg.Server, logger, - userRepo, - deviceRepo, - areaControllerRepo, - deviceTemplateRepo, - planRepo, - pigFarmService, - pigBatchService, - monitorService, - tokenService, - auditService, - notifyService, - generalDeviceService, - listenHandler, - analysisPlanTaskManager, + infra.Repos.UserRepo, + infra.Repos.DeviceRepo, + infra.Repos.AreaControllerRepo, + infra.Repos.DeviceTemplateRepo, + infra.Repos.PlanRepo, + appServices.PigFarmService, + appServices.PigBatchService, + appServices.MonitorService, + infra.TokenService, + appServices.AuditService, + infra.NotifyService, + domain.GeneralDeviceService, + infra.Lora.ListenHandler, + domain.AnalysisPlanTaskManager, ) - // 组装 Application 对象 + // 4. 组装 Application 对象 app := &Application{ - Config: cfg, - Logger: logger, - Storage: storage, - Executor: executor, - API: apiServer, - Collector: timedCollector, - planRepo: planRepo, - pendingTaskRepo: pendingTaskRepo, - executionLogRepo: executionLogRepo, - pendingCollectionRepo: pendingCollectionRepo, - analysisPlanTaskManager: analysisPlanTaskManager, - loraMeshCommunicator: loraListener, - NotifyService: notifyService, + Config: cfg, + Logger: logger, + API: apiServer, + Infra: infra, + Domain: domain, + App: appServices, } return app, nil } -// initNotifyService 根据配置初始化并返回一个通知领域服务。 -// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 -func initNotifyService( - cfg config.NotifyConfig, - log *logs.Logger, - userRepo repository.UserRepository, - notificationRepo repository.NotificationRepository, -) (domain_notify.Service, error) { - var availableNotifiers []notify.Notifier - - // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 - logNotifier := notify.NewLogNotifier(log) - availableNotifiers = append(availableNotifiers, logNotifier) - log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") - - // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 - if cfg.SMTP.Enabled { - smtpNotifier := notify.NewSMTPNotifier( - cfg.SMTP.Host, - cfg.SMTP.Port, - cfg.SMTP.Username, - cfg.SMTP.Password, - cfg.SMTP.Sender, - ) - availableNotifiers = append(availableNotifiers, smtpNotifier) - log.Info("SMTP通知器已启用") - } - - if cfg.WeChat.Enabled { - wechatNotifier := notify.NewWechatNotifier( - cfg.WeChat.CorpID, - cfg.WeChat.AgentID, - cfg.WeChat.Secret, - ) - availableNotifiers = append(availableNotifiers, wechatNotifier) - log.Info("企业微信通知器已启用") - } - - if cfg.Lark.Enabled { - larkNotifier := notify.NewLarkNotifier( - cfg.Lark.AppID, - cfg.Lark.AppSecret, - ) - availableNotifiers = append(availableNotifiers, larkNotifier) - log.Info("飞书通知器已启用") - } - - // 3. 动态确定首选通知器 - var primaryNotifier notify.Notifier - primaryNotifierType := notify.NotifierType(cfg.Primary) - - // 检查用户指定的主渠道是否已启用 - for _, n := range availableNotifiers { - if n.Type() == primaryNotifierType { - primaryNotifier = n - break - } - } - - // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用) - if primaryNotifier == nil { - primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 - log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) - } - - // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 - notifyService, err := domain_notify.NewFailoverService( - log, - userRepo, - availableNotifiers, - primaryNotifier.Type(), - cfg.FailureThreshold, - notificationRepo, - ) - if err != nil { - return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) - } - - log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) - return notifyService, nil -} - // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 func (app *Application) Start() error { app.Logger.Info("应用启动中...") - // -- 启动 LoRa Mesh 监听器 - if err := app.loraMeshCommunicator.Listen(); err != nil { + // 1. 启动底层监听器 + if err := app.Infra.Lora.LoraListener.Listen(); err != nil { return fmt.Errorf("启动 LoRa Mesh 监听器失败: %w", err) } - // --- 清理待采集任务 --- - if err := app.initializePendingCollections(); err != nil { - // 这是一个非致命错误,记录它,但应用应继续启动 - app.Logger.Error(err) + // 2. 初始化应用状态 (清理、刷新任务等) + if err := app.initializeState(); err != nil { + return fmt.Errorf("初始化应用状态失败: %w", err) } - // --- 初始化待执行任务列表 --- - if err := app.initializePendingTasks( - app.planRepo, // 传入 planRepo - app.pendingTaskRepo, // 传入 pendingTaskRepo - app.executionLogRepo, // 传入 executionLogRepo - app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager - app.Logger, // 传入 logger - ); err != nil { - return fmt.Errorf("初始化待执行任务列表失败: %w", err) - } + // 3. 启动后台工作协程 + app.Domain.Scheduler.Start() + app.Domain.TimedCollector.Start() - // 启动任务执行器 - app.Executor.Start() - - // 启动定时采集器 - app.Collector.Start() - - // 启动 API 服务器 + // 4. 启动 API 服务器 app.API.Start() - // 等待关闭信号 + // 5. 等待关闭信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit @@ -361,18 +112,18 @@ func (app *Application) Stop() error { app.API.Stop() // 关闭任务执行器 - app.Executor.Stop() + app.Domain.Scheduler.Stop() // 关闭定时采集器 - app.Collector.Stop() + app.Domain.TimedCollector.Stop() // 断开数据库连接 - if err := app.Storage.Disconnect(); err != nil { + if err := app.Infra.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) } // 关闭 LoRa Mesh 监听器 - if err := app.loraMeshCommunicator.Stop(); err != nil { + if err := app.Infra.Lora.LoraListener.Stop(); err != nil { app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err) } @@ -383,6 +134,22 @@ func (app *Application) Stop() error { return nil } +// initializeState 在应用启动时准备其初始数据状态。 +// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 +func (app *Application) initializeState() error { + // 清理待采集任务 (非致命错误) + if err := app.initializePendingCollections(); err != nil { + app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) + } + + // 初始化待执行任务列表 (致命错误) + if err := app.initializePendingTasks(); err != nil { + return fmt.Errorf("初始化待执行任务列表失败: %w", err) + } + + return nil +} + // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。 @@ -390,7 +157,7 @@ func (app *Application) initializePendingCollections() error { app.Logger.Info("开始清理所有未完成的采集请求...") // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 - count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut() + count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() if err != nil { return fmt.Errorf("清理未完成的采集请求失败: %v", err) } else if count > 0 { @@ -403,13 +170,13 @@ func (app *Application) initializePendingCollections() error { } // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 -func (app *Application) initializePendingTasks( - planRepo repository.PlanRepository, - pendingTaskRepo repository.PendingTaskRepository, - executionLogRepo repository.ExecutionLogRepository, - analysisPlanTaskManager *task.AnalysisPlanTaskManager, - logger *logs.Logger, -) error { +func (app *Application) initializePendingTasks() error { + logger := app.Logger + planRepo := app.Infra.Repos.PlanRepo + pendingTaskRepo := app.Infra.Repos.PendingTaskRepo + executionLogRepo := app.Infra.Repos.ExecutionLogRepo + analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager + logger.Info("开始初始化待执行任务列表...") // 阶段一:修正因崩溃导致状态不一致的固定次数计划 @@ -496,21 +263,3 @@ func (app *Application) initializePendingTasks( logger.Info("待执行任务列表初始化完成。") return nil } - -// initStorage 封装了数据库的初始化、连接和迁移逻辑。 -func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { - // 创建存储实例 - storage := database.NewStorage(cfg, logger) - if err := storage.Connect(); err != nil { - // 错误已在 Connect 内部被记录,这里只需包装并返回 - return nil, fmt.Errorf("数据库连接失败: %w", err) - } - - // 执行数据库迁移 - if err := storage.Migrate(models.GetAllModels()...); err != nil { - return nil, fmt.Errorf("数据库迁移失败: %w", err) - } - - logger.Info("数据库初始化完成。") - return storage, nil -} diff --git a/internal/core/initializers.go b/internal/core/initializers.go new file mode 100644 index 0000000..b9819b3 --- /dev/null +++ b/internal/core/initializers.go @@ -0,0 +1,362 @@ +package core + +import ( + "fmt" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/app/service" + "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" + "gorm.io/gorm" +) + +// Infrastructure 聚合了所有基础设施层的组件。 +type Infrastructure struct { + Storage database.Storage + Repos *Repositories + Lora *LoraComponents + NotifyService domain_notify.Service + TokenService token.Service +} + +// initInfrastructure 初始化所有基础设施层组件。 +func initInfrastructure(cfg *config.Config, logger *logs.Logger) (*Infrastructure, error) { + storage, err := initStorage(cfg.Database, logger) + if err != nil { + return nil, err + } + + repos := initRepositories(storage.GetDB(), logger) + + lora, err := initLora(cfg, logger, repos) + if err != nil { + return nil, err + } + + notifyService, err := initNotifyService(cfg.Notify, logger, repos.UserRepo, repos.NotificationRepo) + if err != nil { + return nil, fmt.Errorf("初始化通知服务失败: %w", err) + } + + tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) + + return &Infrastructure{ + Storage: storage, + Repos: repos, + Lora: lora, + NotifyService: notifyService, + TokenService: tokenService, + }, nil +} + +// Repositories 聚合了所有的仓库实例。 +type Repositories struct { + UserRepo repository.UserRepository + DeviceRepo repository.DeviceRepository + AreaControllerRepo repository.AreaControllerRepository + DeviceTemplateRepo repository.DeviceTemplateRepository + PlanRepo repository.PlanRepository + PendingTaskRepo repository.PendingTaskRepository + ExecutionLogRepo repository.ExecutionLogRepository + SensorDataRepo repository.SensorDataRepository + DeviceCommandLogRepo repository.DeviceCommandLogRepository + PendingCollectionRepo repository.PendingCollectionRepository + UserActionLogRepo repository.UserActionLogRepository + PigBatchRepo repository.PigBatchRepository + PigBatchLogRepo repository.PigBatchLogRepository + PigFarmRepo repository.PigFarmRepository + PigPenRepo repository.PigPenRepository + PigTransferLogRepo repository.PigTransferLogRepository + PigTradeRepo repository.PigTradeRepository + PigSickPigLogRepo repository.PigSickLogRepository + MedicationLogRepo repository.MedicationLogRepository + RawMaterialRepo repository.RawMaterialRepository + NotificationRepo repository.NotificationRepository + UnitOfWork repository.UnitOfWork +} + +// initRepositories 初始化所有的仓库。 +func initRepositories(db *gorm.DB, logger *logs.Logger) *Repositories { + return &Repositories{ + UserRepo: repository.NewGormUserRepository(db), + DeviceRepo: repository.NewGormDeviceRepository(db), + AreaControllerRepo: repository.NewGormAreaControllerRepository(db), + DeviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db), + PlanRepo: repository.NewGormPlanRepository(db), + PendingTaskRepo: repository.NewGormPendingTaskRepository(db), + ExecutionLogRepo: repository.NewGormExecutionLogRepository(db), + SensorDataRepo: repository.NewGormSensorDataRepository(db), + DeviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db), + PendingCollectionRepo: repository.NewGormPendingCollectionRepository(db), + UserActionLogRepo: repository.NewGormUserActionLogRepository(db), + PigBatchRepo: repository.NewGormPigBatchRepository(db), + PigBatchLogRepo: repository.NewGormPigBatchLogRepository(db), + PigFarmRepo: repository.NewGormPigFarmRepository(db), + PigPenRepo: repository.NewGormPigPenRepository(db), + PigTransferLogRepo: repository.NewGormPigTransferLogRepository(db), + PigTradeRepo: repository.NewGormPigTradeRepository(db), + PigSickPigLogRepo: repository.NewGormPigSickLogRepository(db), + MedicationLogRepo: repository.NewGormMedicationLogRepository(db), + RawMaterialRepo: repository.NewGormRawMaterialRepository(db), + NotificationRepo: repository.NewGormNotificationRepository(db), + UnitOfWork: repository.NewGormUnitOfWork(db, logger), + } +} + +// DomainServices 聚合了所有的领域服务实例。 +type DomainServices struct { + PigPenTransferManager pig.PigPenTransferManager + PigTradeManager pig.PigTradeManager + PigSickManager pig.SickPigManager + PigBatchDomain pig.PigBatchService + TimedCollector collection.Collector + GeneralDeviceService device.Service + AnalysisPlanTaskManager *task.AnalysisPlanTaskManager + Scheduler *task.Scheduler +} + +// initDomainServices 初始化所有的领域服务。 +func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.Logger) *DomainServices { + // 猪群管理相关 + pigPenTransferManager := pig.NewPigPenTransferManager(infra.Repos.PigPenRepo, infra.Repos.PigTransferLogRepo, infra.Repos.PigBatchRepo) + pigTradeManager := pig.NewPigTradeManager(infra.Repos.PigTradeRepo) + pigSickManager := pig.NewSickPigManager(infra.Repos.PigSickPigLogRepo, infra.Repos.MedicationLogRepo) + pigBatchDomain := pig.NewPigBatchService(infra.Repos.PigBatchRepo, infra.Repos.PigBatchLogRepo, infra.Repos.UnitOfWork, + pigPenTransferManager, pigTradeManager, pigSickManager) + + // 通用设备服务 + generalDeviceService := device.NewGeneralDeviceService( + infra.Repos.DeviceRepo, + infra.Repos.DeviceCommandLogRepo, + infra.Repos.PendingCollectionRepo, + logger, + infra.Lora.Comm, + ) + + // 计划任务管理器 + analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + + // 任务执行器 + scheduler := task.NewScheduler( + infra.Repos.PendingTaskRepo, + infra.Repos.ExecutionLogRepo, + infra.Repos.DeviceRepo, + infra.Repos.SensorDataRepo, + infra.Repos.PlanRepo, + analysisPlanTaskManager, + logger, + generalDeviceService, + time.Duration(cfg.Task.Interval)*time.Second, + cfg.Task.NumWorkers, + ) + + // 定时采集器 + timedCollector := collection.NewTimedCollector( + infra.Repos.DeviceRepo, + generalDeviceService, + logger, + time.Duration(cfg.Collection.Interval)*time.Second, + ) + + return &DomainServices{ + PigPenTransferManager: pigPenTransferManager, + PigTradeManager: pigTradeManager, + PigSickManager: pigSickManager, + PigBatchDomain: pigBatchDomain, + GeneralDeviceService: generalDeviceService, + AnalysisPlanTaskManager: analysisPlanTaskManager, + Scheduler: scheduler, + TimedCollector: timedCollector, + } +} + +// AppServices 聚合了所有的应用服务实例。 +type AppServices struct { + PigFarmService service.PigFarmService + PigBatchService service.PigBatchService + MonitorService service.MonitorService + AuditService audit.Service +} + +// initAppServices 初始化所有的应用服务。 +func initAppServices(infra *Infrastructure, domainServices *DomainServices, logger *logs.Logger) *AppServices { + pigFarmService := service.NewPigFarmService(infra.Repos.PigFarmRepo, infra.Repos.PigPenRepo, infra.Repos.PigBatchRepo, domainServices.PigBatchDomain, infra.Repos.UnitOfWork, logger) + pigBatchService := service.NewPigBatchService(domainServices.PigBatchDomain, logger) + monitorService := service.NewMonitorService( + infra.Repos.SensorDataRepo, + infra.Repos.DeviceCommandLogRepo, + infra.Repos.ExecutionLogRepo, + infra.Repos.PendingCollectionRepo, + infra.Repos.UserActionLogRepo, + infra.Repos.RawMaterialRepo, + infra.Repos.MedicationLogRepo, + infra.Repos.PigBatchRepo, + infra.Repos.PigBatchLogRepo, + infra.Repos.PigTransferLogRepo, + infra.Repos.PigSickPigLogRepo, + infra.Repos.PigTradeRepo, + infra.Repos.NotificationRepo, + ) + auditService := audit.NewService(infra.Repos.UserActionLogRepo, logger) + + return &AppServices{ + PigFarmService: pigFarmService, + PigBatchService: pigBatchService, + MonitorService: monitorService, + AuditService: auditService, + } +} + +// LoraComponents 聚合了所有 LoRa 相关组件。 +type LoraComponents struct { + ListenHandler webhook.ListenHandler + Comm transport.Communicator + LoraListener transport.Listener +} + +// initLora 根据配置初始化 LoRa 相关组件。 +func initLora( + cfg *config.Config, + logger *logs.Logger, + repos *Repositories, +) (*LoraComponents, error) { + var listenHandler webhook.ListenHandler + var comm transport.Communicator + var loraListener transport.Listener + + if cfg.Lora.Mode == config.LoraMode_LoRaWAN { + logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。") + listenHandler = webhook.NewChirpStackListener(logger, repos.SensorDataRepo, repos.DeviceRepo, repos.AreaControllerRepo, repos.DeviceCommandLogRepo, repos.PendingCollectionRepo) + comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger) + loraListener = lora.NewPlaceholderTransport(logger) + } else { + logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。") + listenHandler = webhook.NewPlaceholderListener(logger) + tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, repos.AreaControllerRepo, repos.PendingCollectionRepo, repos.DeviceRepo, repos.SensorDataRepo) + if err != nil { + return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err) + } + loraListener = tp + comm = tp + } + + return &LoraComponents{ + ListenHandler: listenHandler, + Comm: comm, + LoraListener: loraListener, + }, nil +} + +// initNotifyService 根据配置初始化并返回一个通知领域服务。 +// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 +func initNotifyService( + cfg config.NotifyConfig, + log *logs.Logger, + userRepo repository.UserRepository, + notificationRepo repository.NotificationRepository, +) (domain_notify.Service, error) { + var availableNotifiers []notify.Notifier + + // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 + logNotifier := notify.NewLogNotifier(log) + availableNotifiers = append(availableNotifiers, logNotifier) + log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") + + // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 + if cfg.SMTP.Enabled { + smtpNotifier := notify.NewSMTPNotifier( + cfg.SMTP.Host, + cfg.SMTP.Port, + cfg.SMTP.Username, + cfg.SMTP.Password, + cfg.SMTP.Sender, + ) + availableNotifiers = append(availableNotifiers, smtpNotifier) + log.Info("SMTP通知器已启用") + } + + if cfg.WeChat.Enabled { + wechatNotifier := notify.NewWechatNotifier( + cfg.WeChat.CorpID, + cfg.WeChat.AgentID, + cfg.WeChat.Secret, + ) + availableNotifiers = append(availableNotifiers, wechatNotifier) + log.Info("企业微信通知器已启用") + } + + if cfg.Lark.Enabled { + larkNotifier := notify.NewLarkNotifier( + cfg.Lark.AppID, + cfg.Lark.AppSecret, + ) + availableNotifiers = append(availableNotifiers, larkNotifier) + log.Info("飞书通知器已启用") + } + + // 3. 动态确定首选通知器 + var primaryNotifier notify.Notifier + primaryNotifierType := notify.NotifierType(cfg.Primary) + + // 检查用户指定的主渠道是否已启用 + for _, n := range availableNotifiers { + if n.Type() == primaryNotifierType { + primaryNotifier = n + break + } + } + + // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用) + if primaryNotifier == nil { + primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 + log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) + } + + // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 + notifyService, err := domain_notify.NewFailoverService( + log, + userRepo, + availableNotifiers, + primaryNotifier.Type(), + cfg.FailureThreshold, + notificationRepo, + ) + if err != nil { + return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) + } + + log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) + return notifyService, nil +} + +// initStorage 封装了数据库的初始化、连接和迁移逻辑。 +func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { + // 创建存储实例 + storage := database.NewStorage(cfg, logger) + if err := storage.Connect(); err != nil { + // 错误已在 Connect 内部被记录,这里只需包装并返回 + return nil, fmt.Errorf("数据库连接失败: %w", err) + } + + // 执行数据库迁移 + if err := storage.Migrate(models.GetAllModels()...); err != nil { + return nil, fmt.Errorf("数据库迁移失败: %w", err) + } + + logger.Info("数据库初始化完成。") + return storage, nil +} diff --git a/internal/domain/token/token_service.go b/internal/domain/token/token_service.go index 361e305..782f041 100644 --- a/internal/domain/token/token_service.go +++ b/internal/domain/token/token_service.go @@ -13,19 +13,19 @@ type Claims struct { jwt.RegisteredClaims } -// TokenService 定义了 token 操作的接口 -type TokenService interface { +// Service 定义了 token 操作的接口 +type Service interface { GenerateToken(userID uint) (string, error) ParseToken(tokenString string) (*Claims, error) } -// tokenService 是 TokenService 接口的实现 +// tokenService 是 Service 接口的实现 type tokenService struct { secret []byte } -// NewTokenService 创建并返回一个新的 TokenService 实例 -func NewTokenService(secret []byte) TokenService { +// NewTokenService 创建并返回一个新的 Service 实例 +func NewTokenService(secret []byte) Service { return &tokenService{secret: secret} }