271 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package core
 | ||
| 
 | ||
| import (
 | ||
| 	"fmt"
 | ||
| 	"os"
 | ||
| 	"os/signal"
 | ||
| 	"syscall"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | ||
| )
 | ||
| 
 | ||
| // Application 是整个应用的核心,封装了所有组件和生命周期。
 | ||
| type Application struct {
 | ||
| 	Config   *config.Config
 | ||
| 	Logger   *logs.Logger
 | ||
| 	Storage  database.Storage
 | ||
| 	Executor *task.Scheduler
 | ||
| 	API      *api.API // 添加 API 对象
 | ||
| 
 | ||
| 	// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
 | ||
| 	planRepo                repository.PlanRepository
 | ||
| 	pendingTaskRepo         repository.PendingTaskRepository
 | ||
| 	executionLogRepo        repository.ExecutionLogRepository
 | ||
| 	analysisPlanTaskManager *task.AnalysisPlanTaskManager
 | ||
| }
 | ||
| 
 | ||
| // NewApplication 创建并初始化一个新的 Application 实例。
 | ||
| // 这是应用的“组合根”,所有依赖都在这里被创建和注入。
 | ||
| func NewApplication(configPath string) (*Application, error) {
 | ||
| 	//  加载配置
 | ||
| 	cfg := config.NewConfig()
 | ||
| 	if err := cfg.Load(configPath); err != nil {
 | ||
| 		return nil, fmt.Errorf("无法加载配置: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	//  初始化日志记录器
 | ||
| 	logger := logs.NewLogger(cfg.Log)
 | ||
| 
 | ||
| 	//  初始化数据库存储
 | ||
| 	storage, err := initStorage(cfg.Database, logger)
 | ||
| 	if err != nil {
 | ||
| 		return nil, err // 错误已在 initStorage 中被包装
 | ||
| 	}
 | ||
| 
 | ||
| 	//  初始化 Token 服务
 | ||
| 	tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
 | ||
| 
 | ||
| 	//  初始化用户仓库
 | ||
| 	userRepo := repository.NewGormUserRepository(storage.GetDB())
 | ||
| 
 | ||
| 	//  初始化设备仓库
 | ||
| 	deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
 | ||
| 
 | ||
| 	//  初始化计划仓库
 | ||
| 	planRepo := repository.NewGormPlanRepository(storage.GetDB())
 | ||
| 
 | ||
| 	//	初始化待执行任务仓库
 | ||
| 	pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
 | ||
| 
 | ||
| 	//  初始化执行日志仓库
 | ||
| 	executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
 | ||
| 
 | ||
| 	//  初始化传感器数据仓库
 | ||
| 	sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
 | ||
| 
 | ||
| 	// 初始化命令下发历史仓库
 | ||
| 	deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
 | ||
| 
 | ||
| 	// 初始化设备上行监听器
 | ||
| 	listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo)
 | ||
| 
 | ||
| 	// 初始化计划触发器管理器
 | ||
| 	analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
 | ||
| 
 | ||
| 	//  初始化任务执行器
 | ||
| 	executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
 | ||
| 
 | ||
| 	//  初始化 API 服务器
 | ||
| 	apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)
 | ||
| 
 | ||
| 	//  组装 Application 对象
 | ||
| 	app := &Application{
 | ||
| 		Config:   cfg,
 | ||
| 		Logger:   logger,
 | ||
| 		Storage:  storage,
 | ||
| 		Executor: executor,
 | ||
| 		API:      apiServer,
 | ||
| 		// 填充新增的字段
 | ||
| 		planRepo:                planRepo,
 | ||
| 		pendingTaskRepo:         pendingTaskRepo,
 | ||
| 		executionLogRepo:        executionLogRepo,
 | ||
| 		analysisPlanTaskManager: analysisPlanTaskManager,
 | ||
| 	}
 | ||
| 
 | ||
| 	return app, nil
 | ||
| }
 | ||
| 
 | ||
| // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
 | ||
| func (app *Application) Start() error {
 | ||
| 	app.Logger.Info("应用启动中...")
 | ||
| 
 | ||
| 	// --- 新增逻辑:初始化待执行任务列表 ---
 | ||
| 	if err := app.initializePendingTasks(
 | ||
| 		app.planRepo,                // 传入 planRepo
 | ||
| 		app.pendingTaskRepo,         // 传入 pendingTaskRepo
 | ||
| 		app.executionLogRepo,        // 传入 executionLogRepo
 | ||
| 		app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
 | ||
| 		app.Logger,                  // 传入 logger
 | ||
| 	); err != nil {
 | ||
| 		return fmt.Errorf("初始化待执行任务列表失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 启动任务执行器
 | ||
| 	app.Executor.Start()
 | ||
| 
 | ||
| 	// 启动 API 服务器
 | ||
| 	app.API.Start()
 | ||
| 
 | ||
| 	// 等待关闭信号
 | ||
| 	quit := make(chan os.Signal, 1)
 | ||
| 	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 | ||
| 	<-quit
 | ||
| 
 | ||
| 	// 接收到信号后,执行优雅关闭
 | ||
| 	return app.Stop()
 | ||
| }
 | ||
| 
 | ||
| // Stop 优雅地关闭应用的所有组件。
 | ||
| func (app *Application) Stop() error {
 | ||
| 	app.Logger.Info("应用关闭中...")
 | ||
| 
 | ||
| 	// 关闭 API 服务器
 | ||
| 	app.API.Stop()
 | ||
| 
 | ||
| 	// 关闭任务执行器
 | ||
| 	app.Executor.Stop()
 | ||
| 
 | ||
| 	// 断开数据库连接
 | ||
| 	if err := app.Storage.Disconnect(); err != nil {
 | ||
| 		app.Logger.Errorw("数据库连接断开失败", "error", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 刷新日志缓冲区
 | ||
| 	_ = app.Logger.Sync()
 | ||
| 
 | ||
| 	app.Logger.Info("应用已成功关闭")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
 | ||
| func (app *Application) initializePendingTasks(
 | ||
| 	planRepo repository.PlanRepository,
 | ||
| 	pendingTaskRepo repository.PendingTaskRepository,
 | ||
| 	executionLogRepo repository.ExecutionLogRepository,
 | ||
| 	analysisPlanTaskManager *task.AnalysisPlanTaskManager,
 | ||
| 	logger *logs.Logger,
 | ||
| ) error {
 | ||
| 	logger.Info("开始初始化待执行任务列表...")
 | ||
| 
 | ||
| 	// 阶段一:修正因崩溃导致状态不一致的固定次数计划
 | ||
| 	logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
 | ||
| 	plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
 | ||
| 	if err != nil {
 | ||
| 		return fmt.Errorf("查找需要修正的计划失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	for _, plan := range plansToCorrect {
 | ||
| 		logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
 | ||
| 
 | ||
| 		// 更新计划的执行计数
 | ||
| 		plan.ExecuteCount++
 | ||
| 		logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
 | ||
| 
 | ||
| 		if plan.ExecutionType == models.PlanExecutionTypeManual ||
 | ||
| 			(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
 | ||
| 			// 更新计划状态为已停止
 | ||
| 			plan.Status = models.PlanStatusStopeed
 | ||
| 			logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
 | ||
| 
 | ||
| 		}
 | ||
| 		// 保存更新后的计划
 | ||
| 		if err := planRepo.UpdatePlan(plan); err != nil {
 | ||
| 			logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
 | ||
| 			// 这是一个非阻塞性错误,继续处理其他计划
 | ||
| 		}
 | ||
| 	}
 | ||
| 	logger.Info("阶段一:固定次数计划修正完成。")
 | ||
| 
 | ||
| 	// 阶段二:清理所有待执行任务和相关日志
 | ||
| 	logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
 | ||
| 
 | ||
| 	// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
 | ||
| 	// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
 | ||
| 	incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
 | ||
| 	if err != nil {
 | ||
| 		return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 2. 收集所有受影响的唯一 PlanID
 | ||
| 	affectedPlanIDs := make(map[uint]struct{})
 | ||
| 	for _, log := range incompletePlanLogs {
 | ||
| 		affectedPlanIDs[log.PlanID] = struct{}{}
 | ||
| 	}
 | ||
| 
 | ||
| 	// 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed
 | ||
| 	for planID := range affectedPlanIDs {
 | ||
| 		logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
 | ||
| 		// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
 | ||
| 		if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
 | ||
| 			logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
 | ||
| 			// 这是一个非阻塞性错误,继续处理其他计划
 | ||
| 		}
 | ||
| 	}
 | ||
| 	logger.Info("阶段二:计划主表状态修正完成。")
 | ||
| 
 | ||
| 	// 直接调用新的方法来更新计划执行日志状态为失败
 | ||
| 	if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
 | ||
| 		logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
 | ||
| 		// 这是一个非阻塞性错误,继续执行
 | ||
| 	}
 | ||
| 
 | ||
| 	// 直接调用新的方法来更新任务执行日志状态为取消
 | ||
| 	if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
 | ||
| 		logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
 | ||
| 		// 这是一个非阻塞性错误,继续执行
 | ||
| 	}
 | ||
| 
 | ||
| 	// 清空待执行列表
 | ||
| 	if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
 | ||
| 		return fmt.Errorf("清空待执行任务列表失败: %w", err)
 | ||
| 	}
 | ||
| 	logger.Info("阶段二:待执行任务和相关日志清理完成。")
 | ||
| 
 | ||
| 	// 阶段三:初始刷新
 | ||
| 	logger.Info("阶段三:开始刷新待执行列表...")
 | ||
| 	if err := analysisPlanTaskManager.Refresh(); err != nil {
 | ||
| 		return fmt.Errorf("刷新待执行任务列表失败: %w", err)
 | ||
| 	}
 | ||
| 	logger.Info("阶段三:待执行任务列表初始化完成。")
 | ||
| 
 | ||
| 	logger.Info("待执行任务列表初始化完成。")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // initStorage 封装了数据库的初始化、连接和迁移逻辑。
 | ||
| func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
 | ||
| 	// 创建存储实例
 | ||
| 	storage := database.NewStorage(cfg, logger)
 | ||
| 	if err := storage.Connect(); err != nil {
 | ||
| 		// 错误已在 Connect 内部被记录,这里只需包装并返回
 | ||
| 		return nil, fmt.Errorf("数据库连接失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 执行数据库迁移
 | ||
| 	if err := storage.Migrate(models.GetAllModels()...); err != nil {
 | ||
| 		return nil, fmt.Errorf("数据库迁移失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	logger.Info("数据库初始化完成。")
 | ||
| 	return storage, nil
 | ||
| }
 |