Files
pig-farm-controller/internal/core/application.go

239 lines
8.2 KiB
Go

package core
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// Application 是整个应用的核心,封装了所有组件和生命周期。
type Application struct {
Config *config.Config
Logger *logs.Logger
Storage database.Storage
Executor *task.Scheduler
API *api.API // 添加 API 对象
// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
analysisPlanTaskManager *task.AnalysisPlanTaskManager
}
// NewApplication 创建并初始化一个新的 Application 实例。
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
func NewApplication(configPath string) (*Application, error) {
// 加载配置
cfg := config.NewConfig()
if err := cfg.Load(configPath); err != nil {
return nil, fmt.Errorf("无法加载配置: %w", err)
}
// 初始化日志记录器
logger := logs.NewLogger(cfg.Log)
// 初始化数据库存储
storage, err := initStorage(cfg.Database, logger)
if err != nil {
return nil, err // 错误已在 initStorage 中被包装
}
// 初始化 Token 服务
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
// 初始化用户仓库
userRepo := repository.NewGormUserRepository(storage.GetDB())
// 初始化设备仓库
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
// 初始化计划仓库
planRepo := repository.NewGormPlanRepository(storage.GetDB())
// 初始化待执行任务仓库
pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
// 初始化执行日志仓库
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
// 初始化设备上行监听器
listenHandler := transport.NewChirpStackListener(logger)
// 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
// 初始化任务执行器
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
// 初始化 API 服务器
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager)
// 组装 Application 对象
app := &Application{
Config: cfg,
Logger: logger,
Storage: storage,
Executor: executor,
API: apiServer,
// 填充新增的字段
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
analysisPlanTaskManager: analysisPlanTaskManager,
}
return app, nil
}
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
func (app *Application) Start() error {
app.Logger.Info("应用启动中...")
// --- 新增逻辑:初始化待执行任务列表 ---
if err := app.initializePendingTasks(
app.planRepo, // 传入 planRepo
app.pendingTaskRepo, // 传入 pendingTaskRepo
app.executionLogRepo, // 传入 executionLogRepo
app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
app.Logger, // 传入 logger
); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
// 启动任务执行器
app.Executor.Start()
// 启动 API 服务器
app.API.Start()
// 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
// 接收到信号后,执行优雅关闭
return app.Stop()
}
// Stop 优雅地关闭应用的所有组件。
func (app *Application) Stop() error {
app.Logger.Info("应用关闭中...")
// 关闭 API 服务器
app.API.Stop()
// 关闭任务执行器
app.Executor.Stop()
// 断开数据库连接
if err := app.Storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err)
}
// 刷新日志缓冲区
_ = app.Logger.Sync()
app.Logger.Info("应用已成功关闭")
return nil
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
analysisPlanTaskManager *task.AnalysisPlanTaskManager,
logger *logs.Logger,
) error {
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
if err != nil {
return fmt.Errorf("查找需要修正的计划失败: %w", err)
}
for _, plan := range plansToCorrect {
logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
// 更新计划的执行计数
plan.ExecuteCount++
logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
if plan.ExecutionType == models.PlanExecutionTypeManual ||
(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
// 更新计划状态为已停止
plan.Status = models.PlanStatusStopeed
logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
}
// 保存更新后的计划
if err := planRepo.UpdatePlan(plan); err != nil {
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段一:固定次数计划修正完成。")
// 阶段二:清理所有待执行任务和相关日志
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表
if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
return fmt.Errorf("清空待执行任务列表失败: %w", err)
}
logger.Info("阶段二:待执行任务和相关日志清理完成。")
// 阶段三:初始刷新
logger.Info("阶段三:开始刷新待执行列表...")
if err := analysisPlanTaskManager.Refresh(); err != nil {
return fmt.Errorf("刷新待执行任务列表失败: %w", err)
}
logger.Info("阶段三:待执行任务列表初始化完成。")
logger.Info("待执行任务列表初始化完成。")
return nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
return storage, nil
}