package core import ( "fmt" "os" "os/signal" "syscall" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" "git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 type Application struct { Config *config.Config Logger *logs.Logger Storage database.Storage Executor *task.Scheduler API *api.API // 添加 API 对象 // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 planRepo repository.PlanRepository pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository pendingCollectionRepo repository.PendingCollectionRepository analysisPlanTaskManager *task.AnalysisPlanTaskManager } // NewApplication 创建并初始化一个新的 Application 实例。 // 这是应用的“组合根”,所有依赖都在这里被创建和注入。 func NewApplication(configPath string) (*Application, error) { // 加载配置 cfg := config.NewConfig() if err := cfg.Load(configPath); err != nil { return nil, fmt.Errorf("无法加载配置: %w", err) } // 初始化日志记录器 logger := logs.NewLogger(cfg.Log) // 初始化数据库存储 storage, err := initStorage(cfg.Database, logger) if err != nil { return nil, err // 错误已在 initStorage 中被包装 } // 初始化 Token 服务 tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) // --- 仓库对象初始化 --- userRepo := repository.NewGormUserRepository(storage.GetDB()) deviceRepo := repository.NewGormDeviceRepository(storage.GetDB()) areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB()) deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB()) planRepo := repository.NewGormPlanRepository(storage.GetDB()) pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB()) pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB()) executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB()) userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB()) pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB()) // 初始化事务管理器 unitOfWork := repository.NewGormUnitOfWork(storage.GetDB()) // --- 业务逻辑处理器初始化 --- pigFarmService := service.NewPigFarmService(pigFarmRepo, logger) pigBatchService := service.NewPigBatchService(pigBatchRepo, pigFarmRepo, unitOfWork, logger) // 初始化审计服务 auditService := audit.NewService(userActionLogRepo, logger) // 初始化设备上行监听器 listenHandler := webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo) // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) // 初始化设备通信器 (纯粹的通信客户端) comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger) // 初始化通用设备服务 generalDeviceService := device.NewGeneralDeviceService( deviceRepo, deviceCommandLogRepo, pendingCollectionRepo, logger, comm, ) // 初始化任务执行器 executor := task.NewScheduler( pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, analysisPlanTaskManager, logger, generalDeviceService, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers, ) // 初始化 API 服务器 apiServer := api.NewAPI( cfg.Server, logger, userRepo, deviceRepo, areaControllerRepo, deviceTemplateRepo, planRepo, pigFarmService, pigBatchService, userActionLogRepo, tokenService, auditService, listenHandler, analysisPlanTaskManager, ) // 组装 Application 对象 app := &Application{ Config: cfg, Logger: logger, Storage: storage, Executor: executor, API: apiServer, planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, pendingCollectionRepo: pendingCollectionRepo, analysisPlanTaskManager: analysisPlanTaskManager, } return app, nil } // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 func (app *Application) Start() error { app.Logger.Info("应用启动中...") // --- 清理待采集任务 --- if err := app.initializePendingCollections(); err != nil { // 这是一个非致命错误,记录它,但应用应继续启动 app.Logger.Error(err) } // --- 初始化待执行任务列表 --- if err := app.initializePendingTasks( app.planRepo, // 传入 planRepo app.pendingTaskRepo, // 传入 pendingTaskRepo app.executionLogRepo, // 传入 executionLogRepo app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager app.Logger, // 传入 logger ); err != nil { return fmt.Errorf("初始化待执行任务列表失败: %w", err) } // 启动任务执行器 app.Executor.Start() // 启动 API 服务器 app.API.Start() // 等待关闭信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit // 接收到信号后,执行优雅关闭 return app.Stop() } // Stop 优雅地关闭应用的所有组件。 func (app *Application) Stop() error { app.Logger.Info("应用关闭中...") // 关闭 API 服务器 app.API.Stop() // 关闭任务执行器 app.Executor.Stop() // 断开数据库连接 if err := app.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) } // 刷新日志缓冲区 _ = app.Logger.Sync() app.Logger.Info("应用已成功关闭") return nil } // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。 func (app *Application) initializePendingCollections() error { app.Logger.Info("开始清理所有未完成的采集请求...") // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut() if err != nil { return fmt.Errorf("清理未完成的采集请求失败: %v", err) } else if count > 0 { app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) } else { app.Logger.Info("没有需要清理的采集请求。") } return nil } // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 func (app *Application) initializePendingTasks( planRepo repository.PlanRepository, pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager, logger *logs.Logger, ) error { logger.Info("开始初始化待执行任务列表...") // 阶段一:修正因崩溃导致状态不一致的固定次数计划 logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") plansToCorrect, err := planRepo.FindPlansWithPendingTasks() if err != nil { return fmt.Errorf("查找需要修正的计划失败: %w", err) } for _, plan := range plansToCorrect { logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) // 更新计划的执行计数 plan.ExecuteCount++ logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) if plan.ExecutionType == models.PlanExecutionTypeManual || (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { // 更新计划状态为已停止 plan.Status = models.PlanStatusStopped logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) } // 保存更新后的计划 if err := planRepo.UpdatePlan(plan); err != nil { logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) // 这是一个非阻塞性错误,继续处理其他计划 } } logger.Info("阶段一:固定次数计划修正完成。") // 阶段二:清理所有待执行任务和相关日志 logger.Info("阶段二:开始清理所有待执行任务和相关日志...") // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() if err != nil { return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) } // 2. 收集所有受影响的唯一 PlanID affectedPlanIDs := make(map[uint]struct{}) for _, log := range incompletePlanLogs { affectedPlanIDs[log.PlanID] = struct{}{} } // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed for planID := range affectedPlanIDs { logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) // 这是一个非阻塞性错误,继续处理其他计划 } } logger.Info("阶段二:计划主表状态修正完成。") // 直接调用新的方法来更新计划执行日志状态为失败 if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) // 这是一个非阻塞性错误,继续执行 } // 直接调用新的方法来更新任务执行日志状态为取消 if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) // 这是一个非阻塞性错误,继续执行 } // 清空待执行列表 if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { return fmt.Errorf("清空待执行任务列表失败: %w", err) } logger.Info("阶段二:待执行任务和相关日志清理完成。") // 阶段三:初始刷新 logger.Info("阶段三:开始刷新待执行列表...") if err := analysisPlanTaskManager.Refresh(); err != nil { return fmt.Errorf("刷新待执行任务列表失败: %w", err) } logger.Info("阶段三:待执行任务列表初始化完成。") logger.Info("待执行任务列表初始化完成。") return nil } // initStorage 封装了数据库的初始化、连接和迁移逻辑。 func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { // 创建存储实例 storage := database.NewStorage(cfg, logger) if err := storage.Connect(); err != nil { // 错误已在 Connect 内部被记录,这里只需包装并返回 return nil, fmt.Errorf("数据库连接失败: %w", err) } // 执行数据库迁移 if err := storage.Migrate(models.GetAllModels()...); err != nil { return nil, fmt.Errorf("数据库迁移失败: %w", err) } logger.Info("数据库初始化完成。") return storage, nil }