From 675711cdcf9504e763790054ef99e805af6ac98f Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 15:30:16 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=8B=86=E5=88=86task=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/initializers.go | 17 ++++-- .../analysis_plan_task_manager.go | 2 +- .../domain/{task => scheduler}/scheduler.go | 21 ++------ internal/domain/scheduler/task.go | 23 ++++++++ internal/domain/task/delay_task.go | 3 +- .../domain/task/release_feed_weight_task.go | 5 +- internal/domain/task/task.go | 53 ++++++++++++------- 7 files changed, 79 insertions(+), 45 deletions(-) rename internal/domain/{task => scheduler}/analysis_plan_task_manager.go (99%) rename internal/domain/{task => scheduler}/scheduler.go (96%) create mode 100644 internal/domain/scheduler/task.go diff --git a/internal/core/initializers.go b/internal/core/initializers.go index b9819b3..91d8a6a 100644 --- a/internal/core/initializers.go +++ b/internal/core/initializers.go @@ -11,6 +11,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -125,8 +126,9 @@ type DomainServices struct { PigBatchDomain pig.PigBatchService TimedCollector collection.Collector GeneralDeviceService device.Service - AnalysisPlanTaskManager *task.AnalysisPlanTaskManager - Scheduler *task.Scheduler + taskFactory scheduler.TaskFactory + AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager + Scheduler *scheduler.Scheduler } // initDomainServices 初始化所有的领域服务。 @@ -148,16 +150,20 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. ) // 计划任务管理器 - analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + + // 任务工厂 + taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService) // 任务执行器 - scheduler := task.NewScheduler( + planScheduler := scheduler.NewScheduler( infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, infra.Repos.DeviceRepo, infra.Repos.SensorDataRepo, infra.Repos.PlanRepo, analysisPlanTaskManager, + taskFactory, logger, generalDeviceService, time.Duration(cfg.Task.Interval)*time.Second, @@ -179,7 +185,8 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. PigBatchDomain: pigBatchDomain, GeneralDeviceService: generalDeviceService, AnalysisPlanTaskManager: analysisPlanTaskManager, - Scheduler: scheduler, + taskFactory: taskFactory, + Scheduler: planScheduler, TimedCollector: timedCollector, } } diff --git a/internal/domain/task/analysis_plan_task_manager.go b/internal/domain/scheduler/analysis_plan_task_manager.go similarity index 99% rename from internal/domain/task/analysis_plan_task_manager.go rename to internal/domain/scheduler/analysis_plan_task_manager.go index 2a77f16..a443e78 100644 --- a/internal/domain/task/analysis_plan_task_manager.go +++ b/internal/domain/scheduler/analysis_plan_task_manager.go @@ -1,4 +1,4 @@ -package task +package scheduler import ( "fmt" diff --git a/internal/domain/task/scheduler.go b/internal/domain/scheduler/scheduler.go similarity index 96% rename from internal/domain/task/scheduler.go rename to internal/domain/scheduler/scheduler.go index ebb8ca6..9ef01a9 100644 --- a/internal/domain/task/scheduler.go +++ b/internal/domain/scheduler/scheduler.go @@ -1,4 +1,4 @@ -package task +package scheduler import ( "errors" @@ -83,6 +83,7 @@ type Scheduler struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository + taskFactory TaskFactory analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker deviceService device.Service @@ -100,6 +101,7 @@ func NewScheduler( sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, analysisPlanTaskManager *AnalysisPlanTaskManager, + taskFactory TaskFactory, logger *logs.Logger, deviceService device.Service, interval time.Duration, @@ -112,6 +114,7 @@ func NewScheduler( sensorDataRepo: sensorDataRepo, planRepo: planRepo, analysisPlanTaskManager: analysisPlanTaskManager, + taskFactory: taskFactory, logger: logger, deviceService: deviceService, pollingInterval: interval, @@ -271,7 +274,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } else { // 执行普通任务 - task := s.taskFactory(claimedLog) + task := s.taskFactory.Production(claimedLog) if err := task.Execute(); err != nil { s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) @@ -283,20 +286,6 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { return nil } -// taskFactory 会根据任务类型初始化对应任务 -func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { - switch claimedLog.Task.Type { - case models.TaskTypeWaiting: - return NewDelayTask(s.logger, claimedLog) - case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger) - - default: - // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 - panic("不支持的任务类型") - } -} - // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 diff --git a/internal/domain/scheduler/task.go b/internal/domain/scheduler/task.go new file mode 100644 index 0000000..a8019be --- /dev/null +++ b/internal/domain/scheduler/task.go @@ -0,0 +1,23 @@ +package scheduler + +import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + +// Task 定义了所有可被调度器执行的任务必须实现的接口。 +type Task interface { + // Execute 是任务的核心执行逻辑。 + // ctx: 用于控制任务的超时或取消。 + // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 + // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 + Execute() error + + // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 + // log: 任务执行的上下文。 + // executeErr: 从 Execute 方法返回的原始错误。 + OnFailure(executeErr error) +} + +// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。 +type TaskFactory interface { + // Production 根据指定的任务执行日志创建一个任务实例。 + Production(claimedLog *models.TaskExecutionLog) Task +} diff --git a/internal/domain/task/delay_task.go b/internal/domain/task/delay_task.go index dcfdb41..62eb53b 100644 --- a/internal/domain/task/delay_task.go +++ b/internal/domain/task/delay_task.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) @@ -19,7 +20,7 @@ type DelayTask struct { logger *logs.Logger } -func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task { return &DelayTask{ executionTask: executionTask, logger: logger, diff --git a/internal/domain/task/release_feed_weight_task.go b/internal/domain/task/release_feed_weight_task.go index a8c7d15..0e0a3bd 100644 --- a/internal/domain/task/release_feed_weight_task.go +++ b/internal/domain/task/release_feed_weight_task.go @@ -6,6 +6,7 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -40,12 +41,12 @@ func NewReleaseFeedWeightTask( deviceRepo repository.DeviceRepository, deviceService device.Service, logger *logs.Logger, -) Task { +) scheduler.Task { return &ReleaseFeedWeightTask{ claimedLog: claimedLog, deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, - feedPort: deviceService, // 直接注入 + feedPort: deviceService, logger: logger, } } diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 77623aa..6a5dba4 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -1,30 +1,43 @@ package task import ( + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) -// Task 定义了所有可被调度器执行的任务必须实现的接口。 -type Task interface { - // Execute 是任务的核心执行逻辑。 - // ctx: 用于控制任务的超时或取消。 - // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 - // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 - Execute() error - - // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 - // log: 任务执行的上下文。 - // executeErr: 从 Execute 方法返回的原始错误。 - OnFailure(executeErr error) +type taskFactory struct { + logger *logs.Logger + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + deviceService device.Service } -// TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数 -var TaskFactory = func(tt models.TaskType) Task { - switch tt { - case models.TaskTypeWaiting: - return &DelayTask{} - default: - // 出现位置任务类型说明业务逻辑出现重大问题, 一个异常任务被创建了出来 - panic("发现未知任务类型") +func NewTaskFactory( + logger *logs.Logger, + sensorDataRepo repository.SensorDataRepository, + deviceRepo repository.DeviceRepository, + deviceService device.Service, +) scheduler.TaskFactory { + return &taskFactory{ + logger: logger, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + deviceService: deviceService, + } +} + +func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task { + switch claimedLog.Task.Type { + case models.TaskTypeWaiting: + return NewDelayTask(t.logger, claimedLog) + case models.TaskTypeReleaseFeedWeight: + return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger) + default: + // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 + t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) + panic("不支持的任务类型") // 显式panic防编译器报错 } } From 94e8768424f79a9f284ccdabb287a52056ba9658 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 15:48:49 +0800 Subject: [PATCH 2/7] =?UTF-8?q?plan=E5=A2=9E=E5=8A=A0=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/controller/plan/plan_controller.go | 6 +++--- internal/infra/models/plan.go | 8 ++++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index cad8afe..5e30d13 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -6,7 +6,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/controller" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -20,11 +20,11 @@ import ( type Controller struct { logger *logs.Logger planRepo repository.PlanRepository - analysisPlanTaskManager *task.AnalysisPlanTaskManager + analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager } // NewController 创建一个新的 Controller 实例 -func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager) *Controller { +func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager) *Controller { return &Controller{ logger: logger, planRepo: planRepo, diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 15189fc..33a6cb7 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -52,12 +52,20 @@ const ( PlanStatusFailed PlanStatus = "执行失败" // 执行失败 ) +type PlanType string + +const ( + PlanTypeCustom PlanType = "自定义任务" + PlanTypeSystem PlanType = "系统任务" +) + // Plan 代表系统中的一个计划,可以包含子计划或任务 type Plan struct { gorm.Model Name string `gorm:"not null" json:"name"` Description string `json:"description"` + PlanType PlanType `gorm:"not null;index" json:"plan_type"` // 任务类型, 包括系统任务和用户自定义任务 ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"` Status PlanStatus `gorm:"default:'已禁用';index" json:"status"` // 计划是否被启动 ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数 From 1ee3e638f74349817a183e7fd335af755b57fa89 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 16:25:39 +0800 Subject: [PATCH 3/7] =?UTF-8?q?controller=E8=B0=83=E6=95=B4,=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E8=AE=A1=E5=88=92=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/api/api.go | 34 +++--- .../app/controller/plan/plan_controller.go | 107 ++++++++++++------ internal/app/controller/response.go | 1 + internal/app/dto/plan_converter.go | 5 +- internal/app/dto/plan_dto.go | 10 +- internal/infra/repository/plan_repository.go | 56 +++++++-- 6 files changed, 149 insertions(+), 64 deletions(-) diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 5200b7e..024b40f 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -29,7 +29,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" @@ -40,21 +40,21 @@ import ( // API 结构体定义了 HTTP 服务器及其依赖 type API struct { - engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 - logger *logs.Logger // 日志记录器,用于输出日志信息 - userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 - tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 - auditService audit.Service // 审计服务,用于记录用户操作 - httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 - config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig - userController *user.Controller // 用户控制器实例 - deviceController *device.Controller // 设备控制器实例 - planController *plan.Controller // 计划控制器实例 - pigFarmController *management.PigFarmController // 猪场管理控制器实例 - pigBatchController *management.PigBatchController // 猪群控制器实例 - monitorController *monitor.Controller // 数据监控控制器实例 - listenHandler webhook.ListenHandler // 设备上行事件监听器 - analysisTaskManager *task.AnalysisPlanTaskManager // 计划触发器管理器实例 + engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 + logger *logs.Logger // 日志记录器,用于输出日志信息 + userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 + tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 + auditService audit.Service // 审计服务,用于记录用户操作 + httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 + config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig + userController *user.Controller // 用户控制器实例 + deviceController *device.Controller // 设备控制器实例 + planController *plan.Controller // 计划控制器实例 + pigFarmController *management.PigFarmController // 猪场管理控制器实例 + pigBatchController *management.PigBatchController // 猪群控制器实例 + monitorController *monitor.Controller // 数据监控控制器实例 + listenHandler webhook.ListenHandler // 设备上行事件监听器 + analysisTaskManager *scheduler.AnalysisPlanTaskManager // 计划触发器管理器实例 } // NewAPI 创建并返回一个新的 API 实例 @@ -74,7 +74,7 @@ func NewAPI(cfg config.ServerConfig, notifyService domain_notify.Service, deviceService domain_device.Service, listenHandler webhook.ListenHandler, - analysisTaskManager *task.AnalysisPlanTaskManager) *API { + analysisTaskManager *scheduler.AnalysisPlanTaskManager) *API { // 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式) // 从配置中获取 Gin 模式 gin.SetMode(cfg.Mode) diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index 5e30d13..1671d96 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -61,7 +61,11 @@ func (c *Controller) CreatePlan(ctx *gin.Context) { return } - // --- 自动判断 ContentType --- + // --- 业务规则处理 --- + // 1. 设置计划类型:用户创建的计划永远是自定义计划 + planToCreate.PlanType = models.PlanTypeCustom + + // 2. 自动判断 ContentType if len(req.SubPlanIDs) > 0 { planToCreate.ContentType = models.PlanContentTypeSubPlans } else { @@ -145,16 +149,25 @@ func (c *Controller) GetPlan(ctx *gin.Context) { // ListPlans godoc // @Summary 获取计划列表 -// @Description 获取所有计划的列表 +// @Description 获取所有计划的列表,支持按类型过滤和分页 // @Tags 计划管理 // @Security BearerAuth // @Produce json -// @Success 200 {object} controller.Response{data=[]dto.PlanResponse} "业务码为200代表成功获取列表" +// @Param query query dto.ListPlansQuery false "查询参数" +// @Success 200 {object} controller.Response{data=dto.ListPlansResponse} "业务码为200代表成功获取列表" // @Router /api/v1/plans [get] func (c *Controller) ListPlans(ctx *gin.Context) { const actionType = "获取计划列表" + var query dto.ListPlansQuery + if err := ctx.ShouldBindQuery(&query); err != nil { + c.logger.Errorf("%s: 查询参数绑定失败: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "查询参数绑定失败", query) + return + } + // 1. 调用仓库层获取所有计划 - plans, err := c.planRepo.ListBasicPlans() + opts := repository.ListPlansOptions{PlanType: repository.PlanTypeFilter(query.PlanType)} + plans, total, err := c.planRepo.ListPlans(opts, query.Page, query.PageSize) if err != nil { c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划列表时发生内部错误", actionType, "数据库查询失败", nil) @@ -176,7 +189,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) { // 3. 构造并发送成功响应 resp := dto.ListPlansResponse{ Plans: planResponses, - Total: len(planResponses), + Total: total, } c.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(planResponses)) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取计划列表成功", resp, actionType, "获取计划列表成功", resp) @@ -184,7 +197,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) { // UpdatePlan godoc // @Summary 更新计划 -// @Description 根据计划ID更新计划的详细信息。 +// @Description 根据计划ID更新计划的详细信息。系统计划不允许修改。 // @Tags 计划管理 // @Security BearerAuth // @Accept json @@ -212,7 +225,27 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { return } - // 3. 将请求转换为模型(转换函数带校验) + // 3. 检查计划是否存在 + existingPlan, err := c.planRepo.GetBasicPlanByID(uint(id)) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) + controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id) + return + } + c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id) + controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id) + return + } + + // 4. 业务规则:系统计划不允许修改 + if existingPlan.PlanType == models.PlanTypeSystem { + c.logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, id) + controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许修改", actionType, "尝试修改系统计划", id) + return + } + + // 5. 将请求转换为模型(转换函数带校验) planToUpdate, err := dto.NewPlanFromUpdateRequest(&req) if err != nil { c.logger.Errorf("%s: 计划数据校验失败: %v", actionType, err) @@ -229,20 +262,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { planToUpdate.ContentType = models.PlanContentTypeTasks } - // 4. 检查计划是否存在 - _, err = c.planRepo.GetBasicPlanByID(uint(id)) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id) - controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id) - return - } - c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id) - controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id) - return - } - - // 5. 调用仓库方法更新计划 + // 6. 调用仓库方法更新计划 // 只要是更新任务,就重置执行计数器 planToUpdate.ExecuteCount = 0 // 重置计数器 c.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID) @@ -259,7 +279,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err) } - // 6. 获取更新后的完整计划用于响应 + // 7. 获取更新后的完整计划用于响应 updatedPlan, err := c.planRepo.GetPlanByID(uint(id)) if err != nil { c.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, id) @@ -267,7 +287,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { return } - // 7. 将模型转换为响应 DTO + // 8. 将模型转换为响应 DTO resp, err := dto.NewPlanToResponse(updatedPlan) if err != nil { c.logger.Errorf("%s: 序列化响应失败: %v, Updated Plan: %+v", actionType, err, updatedPlan) @@ -275,14 +295,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { return } - // 8. 发送成功响应 + // 9. 发送成功响应 c.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划更新成功", resp, actionType, "计划更新成功", resp) } // DeletePlan godoc // @Summary 删除计划 -// @Description 根据计划ID删除计划。(软删除) +// @Description 根据计划ID删除计划。(软删除)系统计划不允许删除。 // @Tags 计划管理 // @Security BearerAuth // @Produce json @@ -313,7 +333,14 @@ func (c *Controller) DeletePlan(ctx *gin.Context) { return } - // 3. 停止这个计划 + // 3. 业务规则:系统计划不允许删除 + if plan.PlanType == models.PlanTypeSystem { + c.logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id) + controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许删除", actionType, "尝试删除系统计划", id) + return + } + + // 4. 停止这个计划 if plan.Status == models.PlanStatusEnabled { if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil { c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) @@ -322,21 +349,21 @@ func (c *Controller) DeletePlan(ctx *gin.Context) { } } - // 4. 调用仓库层删除计划 + // 5. 调用仓库层删除计划 if err := c.planRepo.DeletePlan(uint(id)); err != nil { c.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "删除计划时发生内部错误", actionType, "数据库删除失败", id) return } - // 5. 发送成功响应 + // 6. 发送成功响应 c.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划删除成功", nil, actionType, "计划删除成功", id) } // StartPlan godoc // @Summary 启动计划 -// @Description 根据计划ID启动一个计划的执行。 +// @Description 根据计划ID启动一个计划的执行。系统计划不允许手动启动。 // @Tags 计划管理 // @Security BearerAuth // @Produce json @@ -367,7 +394,12 @@ func (c *Controller) StartPlan(ctx *gin.Context) { return } - // 3. 检查计划当前状态 + // 3. 业务规则检查 + if plan.PlanType == models.PlanTypeSystem { + c.logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id) + controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许手动启动", actionType, "尝试手动启动系统计划", id) + return + } if plan.Status == models.PlanStatusEnabled { c.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id) controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划已处于启动状态,无需重复操作", actionType, "计划已处于启动状态", id) @@ -416,7 +448,7 @@ func (c *Controller) StartPlan(ctx *gin.Context) { // StopPlan godoc // @Summary 停止计划 -// @Description 根据计划ID停止一个正在执行的计划。 +// @Description 根据计划ID停止一个正在执行的计划。系统计划不能被停止。 // @Tags 计划管理 // @Security BearerAuth // @Produce json @@ -447,21 +479,28 @@ func (c *Controller) StopPlan(ctx *gin.Context) { return } - // 3. 检查计划当前状态 + // 3. 业务规则:系统计划不允许停止 + if plan.PlanType == models.PlanTypeSystem { + c.logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id) + controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许停止", actionType, "尝试停止系统计划", id) + return + } + + // 4. 检查计划当前状态 if plan.Status != models.PlanStatusEnabled { c.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status) controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划当前不是启用状态", actionType, "计划未启用", id) return } - // 4. 调用仓库层方法,该方法内部处理事务 + // 5. 调用仓库层方法,该方法内部处理事务 if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil { c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error(), actionType, "停止计划失败", id) return } - // 5. 发送成功响应 + // 6. 发送成功响应 c.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划已成功停止", nil, actionType, "计划已成功停止", id) } diff --git a/internal/app/controller/response.go b/internal/app/controller/response.go index 7f180c7..cb06298 100644 --- a/internal/app/controller/response.go +++ b/internal/app/controller/response.go @@ -18,6 +18,7 @@ const ( // 客户端错误状态码 (4000-4999) CodeBadRequest ResponseCode = 4000 // 请求参数错误 CodeUnauthorized ResponseCode = 4001 // 未授权 + CodeForbidden ResponseCode = 4003 // 禁止访问 CodeNotFound ResponseCode = 4004 // 资源未找到 CodeConflict ResponseCode = 4009 // 资源冲突 diff --git a/internal/app/dto/plan_converter.go b/internal/app/dto/plan_converter.go index 2ead166..7763ecf 100644 --- a/internal/app/dto/plan_converter.go +++ b/internal/app/dto/plan_converter.go @@ -17,6 +17,7 @@ func NewPlanToResponse(plan *models.Plan) (*PlanResponse, error) { ID: plan.ID, Name: plan.Name, Description: plan.Description, + PlanType: plan.PlanType, ExecutionType: plan.ExecutionType, Status: plan.Status, ExecuteNum: plan.ExecuteNum, @@ -64,7 +65,7 @@ func NewPlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) { ExecutionType: req.ExecutionType, ExecuteNum: req.ExecuteNum, CronExpression: req.CronExpression, - // ContentType 在控制器中设置,此处不再处理 + // ContentType 和 PlanType 在控制器中设置,此处不再处理 } // 处理子计划 (通过ID引用) @@ -116,7 +117,7 @@ func NewPlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) { ExecutionType: req.ExecutionType, ExecuteNum: req.ExecuteNum, CronExpression: req.CronExpression, - // ContentType 在控制器中设置,此处不再处理 + // ContentType 和 PlanType 在控制器中设置,此处不再处理 } // 处理子计划 (通过ID引用) diff --git a/internal/app/dto/plan_dto.go b/internal/app/dto/plan_dto.go index 37935a2..c84a3ed 100644 --- a/internal/app/dto/plan_dto.go +++ b/internal/app/dto/plan_dto.go @@ -2,6 +2,13 @@ package dto import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" +// ListPlansQuery 定义了获取计划列表时的查询参数 +type ListPlansQuery struct { + PlanType string `form:"planType,default=custom"` // 计划类型 (all, custom, system),默认为 custom + Page int `form:"page,default=1"` // 页码 + PageSize int `form:"pageSize,default=10"` // 每页大小 +} + // CreatePlanRequest 定义创建计划请求的结构体 type CreatePlanRequest struct { Name string `json:"name" binding:"required" example:"猪舍温度控制计划"` @@ -18,6 +25,7 @@ type PlanResponse struct { ID uint `json:"id" example:"1"` Name string `json:"name" example:"猪舍温度控制计划"` Description string `json:"description" example:"根据温度自动调节风扇和加热器"` + PlanType models.PlanType `json:"plan_type" example:"自定义任务"` ExecutionType models.PlanExecutionType `json:"execution_type" example:"自动"` Status models.PlanStatus `json:"status" example:"已启用"` ExecuteNum uint `json:"execute_num" example:"10"` @@ -31,7 +39,7 @@ type PlanResponse struct { // ListPlansResponse 定义获取计划列表响应的结构体 type ListPlansResponse struct { Plans []PlanResponse `json:"plans"` - Total int `json:"total" example:"100"` + Total int64 `json:"total" example:"100"` } // UpdatePlanRequest 定义更新计划请求的结构体 diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 6c6876a..c877b8d 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -21,11 +21,25 @@ var ( ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划") ) +// PlanTypeFilter 定义计划类型的过滤器 +type PlanTypeFilter string + +const ( + PlanTypeFilterAll PlanTypeFilter = "all" + PlanTypeFilterCustom PlanTypeFilter = "custom" + PlanTypeFilterSystem PlanTypeFilter = "system" +) + +// ListPlansOptions 定义了查询计划时的可选参数 +type ListPlansOptions struct { + PlanType PlanTypeFilter +} + // PlanRepository 定义了与计划模型相关的数据库操作接口 // 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现 type PlanRepository interface { - // ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情 - ListBasicPlans() ([]models.Plan, error) + // ListPlans 获取计划列表,支持过滤和分页 + ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) // GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情 GetBasicPlanByID(id uint) (*models.Plan, error) // GetPlanByID 根据ID获取计划,包含子计划和任务详情 @@ -81,15 +95,37 @@ func NewGormPlanRepository(db *gorm.DB) PlanRepository { } } -// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情 -func (r *gormPlanRepository) ListBasicPlans() ([]models.Plan, error) { - var plans []models.Plan - // GORM 默认不会加载关联,除非使用 Preload,所以直接 Find 即可满足要求 - result := r.db.Find(&plans) - if result.Error != nil { - return nil, result.Error +// ListPlans 获取计划列表,支持过滤和分页 +func (r *gormPlanRepository) ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) { + if page <= 0 || pageSize <= 0 { + return nil, 0, ErrInvalidPagination } - return plans, nil + + var plans []models.Plan + var total int64 + + query := r.db.Model(&models.Plan{}) + + switch opts.PlanType { + case PlanTypeFilterCustom: + query = query.Where("plan_type = ?", models.PlanTypeCustom) + case PlanTypeFilterSystem: + query = query.Where("plan_type = ?", models.PlanTypeSystem) + case PlanTypeFilterAll: + // 不添加 plan_type 的过滤条件 + default: + // 默认查询自定义 + query = query.Where("plan_type = ?", models.PlanTypeCustom) + } + + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + + offset := (page - 1) * pageSize + err := query.Limit(pageSize).Offset(offset).Order("id DESC").Find(&plans).Error + + return plans, total, err } // GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情 From 5050f76066fca1a0a988651e5b3d27f72664a669 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 16:37:05 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=85=A8=E9=87=8F?= =?UTF-8?q?=E9=87=87=E9=9B=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/domain/task/full_collection_task.go | 93 ++++++++++++++++++++ internal/domain/task/task.go | 2 + internal/infra/models/plan.go | 1 + 3 files changed, 96 insertions(+) create mode 100644 internal/domain/task/full_collection_task.go diff --git a/internal/domain/task/full_collection_task.go b/internal/domain/task/full_collection_task.go new file mode 100644 index 0000000..8802600 --- /dev/null +++ b/internal/domain/task/full_collection_task.go @@ -0,0 +1,93 @@ +package task + +import ( + "fmt" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集 +type FullCollectionTask struct { + log *models.TaskExecutionLog + deviceRepo repository.DeviceRepository + deviceService device.Service + logger *logs.Logger +} + +// NewFullCollectionTask 创建一个全量采集任务实例 +func NewFullCollectionTask( + log *models.TaskExecutionLog, + deviceRepo repository.DeviceRepository, + deviceService device.Service, + logger *logs.Logger, +) *FullCollectionTask { + return &FullCollectionTask{ + log: log, + deviceRepo: deviceRepo, + deviceService: deviceService, + logger: logger, + } +} + +// Execute 是任务的核心执行逻辑 +func (t *FullCollectionTask) Execute() error { + t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + + sensors, err := t.deviceRepo.ListAllSensors() + if err != nil { + return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err) + } + + if len(sensors) == 0 { + t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + return nil + } + + sensorsByController := make(map[uint][]*models.Device) + for _, sensor := range sensors { + sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) + } + + var firstError error + for controllerID, controllerSensors := range sensorsByController { + t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "controller_id", controllerID, + "sensor_count", len(controllerSensors), + ) + if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil { + t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "controller_id", controllerID, + "error", err, + ) + if firstError == nil { + firstError = err // 保存第一个错误 + } + } + } + + if firstError != nil { + return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError) + } + + t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + return nil +} + +// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 +func (t *FullCollectionTask) OnFailure(executeErr error) { + t.logger.Errorw("全量采集任务执行失败", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "error", executeErr, + ) +} diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 6a5dba4..54d23e1 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -35,6 +35,8 @@ func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler. return NewDelayTask(t.logger, claimedLog) case models.TaskTypeReleaseFeedWeight: return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger) + case models.TaskTypeFullCollection: + return NewFullCollectionTask(claimedLog, t.deviceRepo, t.deviceService, t.logger) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 33a6cb7..87bbca9 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -34,6 +34,7 @@ const ( TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 TaskTypeWaiting TaskType = "等待" // 等待任务 TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 + TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 ) // -- Task Parameters -- From 85bd5254c101fb0679dab467b4b3ab38133870d7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 17:10:48 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=85=A8=E9=87=8F?= =?UTF-8?q?=E9=87=87=E9=9B=86=E7=B3=BB=E7=BB=9F=E8=AE=A1=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.example.yml | 2 +- config.yml | 2 +- internal/core/application.go | 131 ----------- ...tializers.go => component_initializers.go} | 0 internal/core/data_initializer.go | 217 ++++++++++++++++++ internal/infra/config/config.go | 7 +- 6 files changed, 225 insertions(+), 134 deletions(-) rename internal/core/{initializers.go => component_initializers.go} (100%) create mode 100644 internal/core/data_initializer.go diff --git a/config.example.yml b/config.example.yml index ae486b5..1a74a77 100644 --- a/config.example.yml +++ b/config.example.yml @@ -112,4 +112,4 @@ notify: # 定时采集配置 collection: - interval: 300 # 采集间隔 (秒) + interval: 1 # 采集间隔 (分钟) diff --git a/config.yml b/config.yml index ace399d..d3eaee2 100644 --- a/config.yml +++ b/config.yml @@ -90,4 +90,4 @@ lora_mesh: # 定时采集配置 collection: - interval: 300 # 采集间隔 (秒) \ No newline at end of file + interval: 1 # 采集间隔 (分钟) \ No newline at end of file diff --git a/internal/core/application.go b/internal/core/application.go index fcc0904..74ee2da 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -9,7 +9,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/api" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 @@ -133,133 +132,3 @@ func (app *Application) Stop() error { app.Logger.Info("应用已成功关闭") return nil } - -// initializeState 在应用启动时准备其初始数据状态。 -// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 -func (app *Application) initializeState() error { - // 清理待采集任务 (非致命错误) - if err := app.initializePendingCollections(); err != nil { - app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) - } - - // 初始化待执行任务列表 (致命错误) - if err := app.initializePendingTasks(); err != nil { - return fmt.Errorf("初始化待执行任务列表失败: %w", err) - } - - return nil -} - -// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 -// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 -// 这保证了系统在每次启动时都处于一个干净、确定的状态。 -func (app *Application) initializePendingCollections() error { - app.Logger.Info("开始清理所有未完成的采集请求...") - - // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 - count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() - if err != nil { - return fmt.Errorf("清理未完成的采集请求失败: %v", err) - } else if count > 0 { - app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) - } else { - app.Logger.Info("没有需要清理的采集请求。") - } - - return nil -} - -// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 -func (app *Application) initializePendingTasks() error { - logger := app.Logger - planRepo := app.Infra.Repos.PlanRepo - pendingTaskRepo := app.Infra.Repos.PendingTaskRepo - executionLogRepo := app.Infra.Repos.ExecutionLogRepo - analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager - - logger.Info("开始初始化待执行任务列表...") - - // 阶段一:修正因崩溃导致状态不一致的固定次数计划 - logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") - plansToCorrect, err := planRepo.FindPlansWithPendingTasks() - if err != nil { - return fmt.Errorf("查找需要修正的计划失败: %w", err) - } - - for _, plan := range plansToCorrect { - logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) - - // 更新计划的执行计数 - plan.ExecuteCount++ - logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) - - if plan.ExecutionType == models.PlanExecutionTypeManual || - (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { - // 更新计划状态为已停止 - plan.Status = models.PlanStatusStopped - logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) - - } - // 保存更新后的计划 - if err := planRepo.UpdatePlan(plan); err != nil { - logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) - // 这是一个非阻塞性错误,继续处理其他计划 - } - } - logger.Info("阶段一:固定次数计划修正完成。") - - // 阶段二:清理所有待执行任务和相关日志 - logger.Info("阶段二:开始清理所有待执行任务和相关日志...") - - // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- - // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) - incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() - if err != nil { - return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) - } - - // 2. 收集所有受影响的唯一 PlanID - affectedPlanIDs := make(map[uint]struct{}) - for _, log := range incompletePlanLogs { - affectedPlanIDs[log.PlanID] = struct{}{} - } - - // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed - for planID := range affectedPlanIDs { - logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) - // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 - if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { - logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) - // 这是一个非阻塞性错误,继续处理其他计划 - } - } - logger.Info("阶段二:计划主表状态修正完成。") - - // 直接调用新的方法来更新计划执行日志状态为失败 - if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { - logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - - // 直接调用新的方法来更新任务执行日志状态为取消 - if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { - logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - - // 清空待执行列表 - if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { - return fmt.Errorf("清空待执行任务列表失败: %w", err) - } - logger.Info("阶段二:待执行任务和相关日志清理完成。") - - // 阶段三:初始刷新 - logger.Info("阶段三:开始刷新待执行列表...") - if err := analysisPlanTaskManager.Refresh(); err != nil { - return fmt.Errorf("刷新待执行任务列表失败: %w", err) - } - logger.Info("阶段三:待执行任务列表初始化完成。") - - logger.Info("待执行任务列表初始化完成。") - return nil -} diff --git a/internal/core/initializers.go b/internal/core/component_initializers.go similarity index 100% rename from internal/core/initializers.go rename to internal/core/component_initializers.go diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go new file mode 100644 index 0000000..6be1ae0 --- /dev/null +++ b/internal/core/data_initializer.go @@ -0,0 +1,217 @@ +package core + +import ( + "fmt" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +const ( + // PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称 + PlanNameTimedFullDataCollection = "定时全量数据采集" +) + +// initializeState 在应用启动时准备其初始数据状态。 +// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 +func (app *Application) initializeState() error { + // 初始化预定义系统计划 (致命错误) + if err := app.initializeSystemPlans(); err != nil { + return fmt.Errorf("初始化预定义系统计划失败: %w", err) + } + + // 清理待采集任务 (非致命错误) + if err := app.initializePendingCollections(); err != nil { + app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) + } + + // 初始化待执行任务列表 (致命错误) + if err := app.initializePendingTasks(); err != nil { + return fmt.Errorf("初始化待执行任务列表失败: %w", err) + } + + return nil +} + +// initializeSystemPlans 确保预定义的系统计划在数据库中存在。 +func (app *Application) initializeSystemPlans() error { + app.Logger.Info("开始检查并创建预定义的系统计划...") + + // 动态构建预定义计划列表 + predefinedSystemPlans := app.getPredefinedSystemPlans() + + // 1. 获取所有已存在的系统计划 + existingPlans, _, err := app.Infra.Repos.PlanRepo.ListPlans(repository.ListPlansOptions{ + PlanType: repository.PlanTypeFilterSystem, + }, 1, 999) // 使用一个较大的 pageSize 来获取所有系统计划 + if err != nil { + return fmt.Errorf("获取现有系统计划失败: %w", err) + } + + // 2. 为了方便查找, 将现有计划名放入一个 map + existingPlanNames := make(map[string]bool) + for _, p := range existingPlans { + existingPlanNames[p.Name] = true + } + + // 3. 遍历预定义的计划列表 + for _, predefinedPlan := range predefinedSystemPlans { + // 4. 如果计划不存在, 则创建 + if !existingPlanNames[predefinedPlan.Name] { + app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) + if err := app.Infra.Repos.PlanRepo.CreatePlan(&predefinedPlan); err != nil { + // 错误现在是致命的 + return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) + } else { + app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) + } + } + } + + app.Logger.Info("预定义系统计划检查完成。") + return nil +} + +// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。 +func (app *Application) getPredefinedSystemPlans() []models.Plan { + + // 根据配置创建定时全量采集计划 + interval := app.Config.Collection.Interval + if interval <= 0 { + interval = 1 // 确保间隔至少为1分钟 + } + cronExpression := fmt.Sprintf("*/%d * * * *", interval) + timedCollectionPlan := models.Plan{ + Name: PlanNameTimedFullDataCollection, + Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 秒自动触发一次全量数据采集。", app.Config.Collection.Interval), + PlanType: models.PlanTypeSystem, + ExecutionType: models.PlanExecutionTypeAutomatic, + CronExpression: cronExpression, + Status: models.PlanStatusEnabled, + ContentType: models.PlanContentTypeTasks, + Tasks: []models.Task{ + { + Name: "全量采集", + Description: "触发一次全量数据采集", + ExecutionOrder: 1, + Type: models.TaskTypeFullCollection, + }, + }, + } + + return []models.Plan{timedCollectionPlan} +} + +// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 +// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 +// 这保证了系统在每次启动时都处于一个干净、确定的状态。 +func (app *Application) initializePendingCollections() error { + app.Logger.Info("开始清理所有未完成的采集请求...") + + // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 + count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() + if err != nil { + return fmt.Errorf("清理未完成的采集请求失败: %v", err) + } else if count > 0 { + app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) + } else { + app.Logger.Info("没有需要清理的采集请求。") + } + + return nil +} + +// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 +func (app *Application) initializePendingTasks() error { + logger := app.Logger + planRepo := app.Infra.Repos.PlanRepo + pendingTaskRepo := app.Infra.Repos.PendingTaskRepo + executionLogRepo := app.Infra.Repos.ExecutionLogRepo + analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager + + logger.Info("开始初始化待执行任务列表...") + + // 阶段一:修正因崩溃导致状态不一致的固定次数计划 + logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") + plansToCorrect, err := planRepo.FindPlansWithPendingTasks() + if err != nil { + return fmt.Errorf("查找需要修正的计划失败: %w", err) + } + + for _, plan := range plansToCorrect { + logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) + + // 更新计划的执行计数 + plan.ExecuteCount++ + logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) + + if plan.ExecutionType == models.PlanExecutionTypeManual || + (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { + // 更新计划状态为已停止 + plan.Status = models.PlanStatusStopped + logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) + + } + // 保存更新后的计划 + if err := planRepo.UpdatePlan(plan); err != nil { + logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) + // 这是一个非阻塞性错误,继续处理其他计划 + } + } + logger.Info("阶段一:固定次数计划修正完成。") + + // 阶段二:清理所有待执行任务和相关日志 + logger.Info("阶段二:开始清理所有待执行任务和相关日志...") + + // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- + // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) + incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() + if err != nil { + return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) + } + + // 2. 收集所有受影响的唯一 PlanID + affectedPlanIDs := make(map[uint]struct{}) + for _, log := range incompletePlanLogs { + affectedPlanIDs[log.PlanID] = struct{}{} + } + + // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed + for planID := range affectedPlanIDs { + logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) + // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 + if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { + logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) + // 这是一个非阻塞性错误,继续处理其他计划 + } + } + logger.Info("阶段二:计划主表状态修正完成。") + + // 直接调用新的方法来更新计划执行日志状态为失败 + if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + + // 直接调用新的方法来更新任务执行日志状态为取消 + if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + + // 清空待执行列表 + if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { + return fmt.Errorf("清空待执行任务列表失败: %w", err) + } + logger.Info("阶段二:待执行任务和相关日志清理完成。") + + // 阶段三:初始刷新 + logger.Info("阶段三:开始刷新待执行列表...") + if err := analysisPlanTaskManager.Refresh(); err != nil { + return fmt.Errorf("刷新待执行任务列表失败: %w", err) + } + logger.Info("阶段三:待执行任务列表初始化完成。") + + logger.Info("待执行任务列表初始化完成。") + return nil +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index b22f97b..81256b5 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -200,13 +200,18 @@ type LarkConfig struct { // CollectionConfig 代表定时采集配置 type CollectionConfig struct { + // Interval 采集间隔(分钟), 默认 1 Interval int `yaml:"interval"` } // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 - return &Config{} + return &Config{ + Collection: CollectionConfig{ + Interval: 1, // 默认为1分钟 + }, + } } // Load 从指定路径加载配置文件 From 403d46b77766d4fb8ffde22e6f02d671ea2f6fdf Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 17:13:03 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E5=88=A0=E6=8E=89=E5=8E=9F=E6=9D=A5?= =?UTF-8?q?=E7=9A=84=E5=AE=9A=E6=97=B6=E9=87=87=E9=9B=86=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/application.go | 4 - internal/core/component_initializers.go | 11 --- internal/domain/collection/collector.go | 6 -- internal/domain/collection/timed_collector.go | 89 ------------------- 4 files changed, 110 deletions(-) delete mode 100644 internal/domain/collection/collector.go delete mode 100644 internal/domain/collection/timed_collector.go diff --git a/internal/core/application.go b/internal/core/application.go index 74ee2da..3448082 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -89,7 +89,6 @@ func (app *Application) Start() error { // 3. 启动后台工作协程 app.Domain.Scheduler.Start() - app.Domain.TimedCollector.Start() // 4. 启动 API 服务器 app.API.Start() @@ -113,9 +112,6 @@ func (app *Application) Stop() error { // 关闭任务执行器 app.Domain.Scheduler.Stop() - // 关闭定时采集器 - app.Domain.TimedCollector.Stop() - // 断开数据库连接 if err := app.Infra.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index 91d8a6a..68c354f 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -7,7 +7,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" @@ -124,7 +123,6 @@ type DomainServices struct { PigTradeManager pig.PigTradeManager PigSickManager pig.SickPigManager PigBatchDomain pig.PigBatchService - TimedCollector collection.Collector GeneralDeviceService device.Service taskFactory scheduler.TaskFactory AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager @@ -170,14 +168,6 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. cfg.Task.NumWorkers, ) - // 定时采集器 - timedCollector := collection.NewTimedCollector( - infra.Repos.DeviceRepo, - generalDeviceService, - logger, - time.Duration(cfg.Collection.Interval)*time.Second, - ) - return &DomainServices{ PigPenTransferManager: pigPenTransferManager, PigTradeManager: pigTradeManager, @@ -187,7 +177,6 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs. AnalysisPlanTaskManager: analysisPlanTaskManager, taskFactory: taskFactory, Scheduler: planScheduler, - TimedCollector: timedCollector, } } diff --git a/internal/domain/collection/collector.go b/internal/domain/collection/collector.go deleted file mode 100644 index ae28dd7..0000000 --- a/internal/domain/collection/collector.go +++ /dev/null @@ -1,6 +0,0 @@ -package collection - -type Collector interface { - Start() - Stop() -} diff --git a/internal/domain/collection/timed_collector.go b/internal/domain/collection/timed_collector.go deleted file mode 100644 index f6855ad..0000000 --- a/internal/domain/collection/timed_collector.go +++ /dev/null @@ -1,89 +0,0 @@ -package collection - -import ( - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" -) - -// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令 -type TimedCollector struct { - deviceRepo repository.DeviceRepository - deviceService device.Service - logger *logs.Logger - interval time.Duration - ticker *time.Ticker - done chan bool -} - -// NewTimedCollector 创建一个定时采集器实例 -func NewTimedCollector( - deviceRepo repository.DeviceRepository, - deviceService device.Service, - logger *logs.Logger, - interval time.Duration, -) Collector { - return &TimedCollector{ - deviceRepo: deviceRepo, - deviceService: deviceService, - logger: logger, - interval: interval, - done: make(chan bool), - } -} - -// Start 开始定时采集 -func (c *TimedCollector) Start() { - c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval) - c.ticker = time.NewTicker(c.interval) - go func() { - for { - select { - case <-c.done: - return - case <-c.ticker.C: - c.collect() - } - } - }() -} - -// Stop 停止定时采集 -func (c *TimedCollector) Stop() { - c.logger.Info("定时采集器停止") - c.ticker.Stop() - c.done <- true -} - -// collect 是核心的采集逻辑 -func (c *TimedCollector) collect() { - c.logger.Info("开始新一轮的设备数据采集") - - sensors, err := c.deviceRepo.ListAllSensors() - if err != nil { - c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err) - return - } - - if len(sensors) == 0 { - c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集") - return - } - - sensorsByController := make(map[uint][]*models.Device) - for _, sensor := range sensors { - sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) - } - - for controllerID, controllerSensors := range sensorsByController { - c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors)) - if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil { - c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err) - } - } - - c.logger.Info("本轮设备数据采集完成") -} From b09d32b1d7b6182ae798178d911e2b9aaa0fc2ee Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 29 Oct 2025 17:21:23 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9config.yml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index d3eaee2..bf282ac 100644 --- a/config.yml +++ b/config.yml @@ -12,7 +12,7 @@ server: # 日志配置 log: - level: "debug" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal" + level: "info" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal" format: "console" # 日志格式: "console" 或 "json" enable_file: true # 是否启用文件日志 file_path: "./app_logs/app.log" # 日志文件路径