Merge pull request 'issue_42' (#46) from issue_42 into main

Reviewed-on: #46
This commit is contained in:
2025-10-29 17:21:25 +08:00
22 changed files with 561 additions and 357 deletions

View File

@@ -112,4 +112,4 @@ notify:
# 定时采集配置 # 定时采集配置
collection: collection:
interval: 300 # 采集间隔 () interval: 1 # 采集间隔 (分钟)

View File

@@ -12,7 +12,7 @@ server:
# 日志配置 # 日志配置
log: log:
level: "debug" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal" level: "info" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
format: "console" # 日志格式: "console" 或 "json" format: "console" # 日志格式: "console" 或 "json"
enable_file: true # 是否启用文件日志 enable_file: true # 是否启用文件日志
file_path: "./app_logs/app.log" # 日志文件路径 file_path: "./app_logs/app.log" # 日志文件路径
@@ -90,4 +90,4 @@ lora_mesh:
# 定时采集配置 # 定时采集配置
collection: collection:
interval: 300 # 采集间隔 () interval: 1 # 采集间隔 (分钟)

View File

@@ -29,7 +29,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
@@ -54,7 +54,7 @@ type API struct {
pigBatchController *management.PigBatchController // 猪群控制器实例 pigBatchController *management.PigBatchController // 猪群控制器实例
monitorController *monitor.Controller // 数据监控控制器实例 monitorController *monitor.Controller // 数据监控控制器实例
listenHandler webhook.ListenHandler // 设备上行事件监听器 listenHandler webhook.ListenHandler // 设备上行事件监听器
analysisTaskManager *task.AnalysisPlanTaskManager // 计划触发器管理器实例 analysisTaskManager *scheduler.AnalysisPlanTaskManager // 计划触发器管理器实例
} }
// NewAPI 创建并返回一个新的 API 实例 // NewAPI 创建并返回一个新的 API 实例
@@ -74,7 +74,7 @@ func NewAPI(cfg config.ServerConfig,
notifyService domain_notify.Service, notifyService domain_notify.Service,
deviceService domain_device.Service, deviceService domain_device.Service,
listenHandler webhook.ListenHandler, listenHandler webhook.ListenHandler,
analysisTaskManager *task.AnalysisPlanTaskManager) *API { analysisTaskManager *scheduler.AnalysisPlanTaskManager) *API {
// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式) // 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
// 从配置中获取 Gin 模式 // 从配置中获取 Gin 模式
gin.SetMode(cfg.Mode) gin.SetMode(cfg.Mode)

View File

@@ -6,7 +6,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
@@ -20,11 +20,11 @@ import (
type Controller struct { type Controller struct {
logger *logs.Logger logger *logs.Logger
planRepo repository.PlanRepository planRepo repository.PlanRepository
analysisPlanTaskManager *task.AnalysisPlanTaskManager analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
} }
// NewController 创建一个新的 Controller 实例 // NewController 创建一个新的 Controller 实例
func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager) *Controller { func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager) *Controller {
return &Controller{ return &Controller{
logger: logger, logger: logger,
planRepo: planRepo, planRepo: planRepo,
@@ -61,7 +61,11 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
return return
} }
// --- 自动判断 ContentType --- // --- 业务规则处理 ---
// 1. 设置计划类型:用户创建的计划永远是自定义计划
planToCreate.PlanType = models.PlanTypeCustom
// 2. 自动判断 ContentType
if len(req.SubPlanIDs) > 0 { if len(req.SubPlanIDs) > 0 {
planToCreate.ContentType = models.PlanContentTypeSubPlans planToCreate.ContentType = models.PlanContentTypeSubPlans
} else { } else {
@@ -145,16 +149,25 @@ func (c *Controller) GetPlan(ctx *gin.Context) {
// ListPlans godoc // ListPlans godoc
// @Summary 获取计划列表 // @Summary 获取计划列表
// @Description 获取所有计划的列表 // @Description 获取所有计划的列表,支持按类型过滤和分页
// @Tags 计划管理 // @Tags 计划管理
// @Security BearerAuth // @Security BearerAuth
// @Produce json // @Produce json
// @Success 200 {object} controller.Response{data=[]dto.PlanResponse} "业务码为200代表成功获取列表" // @Param query query dto.ListPlansQuery false "查询参数"
// @Success 200 {object} controller.Response{data=dto.ListPlansResponse} "业务码为200代表成功获取列表"
// @Router /api/v1/plans [get] // @Router /api/v1/plans [get]
func (c *Controller) ListPlans(ctx *gin.Context) { func (c *Controller) ListPlans(ctx *gin.Context) {
const actionType = "获取计划列表" const actionType = "获取计划列表"
var query dto.ListPlansQuery
if err := ctx.ShouldBindQuery(&query); err != nil {
c.logger.Errorf("%s: 查询参数绑定失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "查询参数绑定失败", query)
return
}
// 1. 调用仓库层获取所有计划 // 1. 调用仓库层获取所有计划
plans, err := c.planRepo.ListBasicPlans() opts := repository.ListPlansOptions{PlanType: repository.PlanTypeFilter(query.PlanType)}
plans, total, err := c.planRepo.ListPlans(opts, query.Page, query.PageSize)
if err != nil { if err != nil {
c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err) c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划列表时发生内部错误", actionType, "数据库查询失败", nil) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划列表时发生内部错误", actionType, "数据库查询失败", nil)
@@ -176,7 +189,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
// 3. 构造并发送成功响应 // 3. 构造并发送成功响应
resp := dto.ListPlansResponse{ resp := dto.ListPlansResponse{
Plans: planResponses, Plans: planResponses,
Total: len(planResponses), Total: total,
} }
c.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(planResponses)) c.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(planResponses))
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取计划列表成功", resp, actionType, "获取计划列表成功", resp) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取计划列表成功", resp, actionType, "获取计划列表成功", resp)
@@ -184,7 +197,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
// UpdatePlan godoc // UpdatePlan godoc
// @Summary 更新计划 // @Summary 更新计划
// @Description 根据计划ID更新计划的详细信息。 // @Description 根据计划ID更新计划的详细信息。系统计划不允许修改。
// @Tags 计划管理 // @Tags 计划管理
// @Security BearerAuth // @Security BearerAuth
// @Accept json // @Accept json
@@ -212,7 +225,27 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return return
} }
// 3. 将请求转换为模型(转换函数带校验) // 3. 检查计划是否存在
existingPlan, err := c.planRepo.GetBasicPlanByID(uint(id))
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id)
return
}
c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id)
return
}
// 4. 业务规则:系统计划不允许修改
if existingPlan.PlanType == models.PlanTypeSystem {
c.logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许修改", actionType, "尝试修改系统计划", id)
return
}
// 5. 将请求转换为模型(转换函数带校验)
planToUpdate, err := dto.NewPlanFromUpdateRequest(&req) planToUpdate, err := dto.NewPlanFromUpdateRequest(&req)
if err != nil { if err != nil {
c.logger.Errorf("%s: 计划数据校验失败: %v", actionType, err) c.logger.Errorf("%s: 计划数据校验失败: %v", actionType, err)
@@ -229,20 +262,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
planToUpdate.ContentType = models.PlanContentTypeTasks planToUpdate.ContentType = models.PlanContentTypeTasks
} }
// 4. 检查计划是否存在 // 6. 调用仓库方法更新计划
_, err = c.planRepo.GetBasicPlanByID(uint(id))
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id)
return
}
c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id)
return
}
// 5. 调用仓库方法更新计划
// 只要是更新任务,就重置执行计数器 // 只要是更新任务,就重置执行计数器
planToUpdate.ExecuteCount = 0 // 重置计数器 planToUpdate.ExecuteCount = 0 // 重置计数器
c.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID) c.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
@@ -259,7 +279,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err) c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
} }
// 6. 获取更新后的完整计划用于响应 // 7. 获取更新后的完整计划用于响应
updatedPlan, err := c.planRepo.GetPlanByID(uint(id)) updatedPlan, err := c.planRepo.GetPlanByID(uint(id))
if err != nil { if err != nil {
c.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, id) c.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, id)
@@ -267,7 +287,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return return
} }
// 7. 将模型转换为响应 DTO // 8. 将模型转换为响应 DTO
resp, err := dto.NewPlanToResponse(updatedPlan) resp, err := dto.NewPlanToResponse(updatedPlan)
if err != nil { if err != nil {
c.logger.Errorf("%s: 序列化响应失败: %v, Updated Plan: %+v", actionType, err, updatedPlan) c.logger.Errorf("%s: 序列化响应失败: %v, Updated Plan: %+v", actionType, err, updatedPlan)
@@ -275,14 +295,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return return
} }
// 8. 发送成功响应 // 9. 发送成功响应
c.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID) c.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划更新成功", resp, actionType, "计划更新成功", resp) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划更新成功", resp, actionType, "计划更新成功", resp)
} }
// DeletePlan godoc // DeletePlan godoc
// @Summary 删除计划 // @Summary 删除计划
// @Description 根据计划ID删除计划。软删除 // @Description 根据计划ID删除计划。软删除系统计划不允许删除。
// @Tags 计划管理 // @Tags 计划管理
// @Security BearerAuth // @Security BearerAuth
// @Produce json // @Produce json
@@ -313,7 +333,14 @@ func (c *Controller) DeletePlan(ctx *gin.Context) {
return return
} }
// 3. 停止这个计划 // 3. 业务规则:系统计划不允许删除
if plan.PlanType == models.PlanTypeSystem {
c.logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许删除", actionType, "尝试删除系统计划", id)
return
}
// 4. 停止这个计划
if plan.Status == models.PlanStatusEnabled { if plan.Status == models.PlanStatusEnabled {
if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil { if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
@@ -322,21 +349,21 @@ func (c *Controller) DeletePlan(ctx *gin.Context) {
} }
} }
// 4. 调用仓库层删除计划 // 5. 调用仓库层删除计划
if err := c.planRepo.DeletePlan(uint(id)); err != nil { if err := c.planRepo.DeletePlan(uint(id)); err != nil {
c.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id) c.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "删除计划时发生内部错误", actionType, "数据库删除失败", id) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "删除计划时发生内部错误", actionType, "数据库删除失败", id)
return return
} }
// 5. 发送成功响应 // 6. 发送成功响应
c.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id) c.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划删除成功", nil, actionType, "计划删除成功", id) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划删除成功", nil, actionType, "计划删除成功", id)
} }
// StartPlan godoc // StartPlan godoc
// @Summary 启动计划 // @Summary 启动计划
// @Description 根据计划ID启动一个计划的执行。 // @Description 根据计划ID启动一个计划的执行。系统计划不允许手动启动。
// @Tags 计划管理 // @Tags 计划管理
// @Security BearerAuth // @Security BearerAuth
// @Produce json // @Produce json
@@ -367,7 +394,12 @@ func (c *Controller) StartPlan(ctx *gin.Context) {
return return
} }
// 3. 检查计划当前状态 // 3. 业务规则检查
if plan.PlanType == models.PlanTypeSystem {
c.logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许手动启动", actionType, "尝试手动启动系统计划", id)
return
}
if plan.Status == models.PlanStatusEnabled { if plan.Status == models.PlanStatusEnabled {
c.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id) c.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划已处于启动状态,无需重复操作", actionType, "计划已处于启动状态", id) controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划已处于启动状态,无需重复操作", actionType, "计划已处于启动状态", id)
@@ -416,7 +448,7 @@ func (c *Controller) StartPlan(ctx *gin.Context) {
// StopPlan godoc // StopPlan godoc
// @Summary 停止计划 // @Summary 停止计划
// @Description 根据计划ID停止一个正在执行的计划。 // @Description 根据计划ID停止一个正在执行的计划。系统计划不能被停止。
// @Tags 计划管理 // @Tags 计划管理
// @Security BearerAuth // @Security BearerAuth
// @Produce json // @Produce json
@@ -447,21 +479,28 @@ func (c *Controller) StopPlan(ctx *gin.Context) {
return return
} }
// 3. 检查计划当前状态 // 3. 业务规则:系统计划不允许停止
if plan.PlanType == models.PlanTypeSystem {
c.logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id)
controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许停止", actionType, "尝试停止系统计划", id)
return
}
// 4. 检查计划当前状态
if plan.Status != models.PlanStatusEnabled { if plan.Status != models.PlanStatusEnabled {
c.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status) c.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划当前不是启用状态", actionType, "计划未启用", id) controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划当前不是启用状态", actionType, "计划未启用", id)
return return
} }
// 4. 调用仓库层方法,该方法内部处理事务 // 5. 调用仓库层方法,该方法内部处理事务
if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil { if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id) c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error(), actionType, "停止计划失败", id) controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error(), actionType, "停止计划失败", id)
return return
} }
// 5. 发送成功响应 // 6. 发送成功响应
c.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id) c.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划已成功停止", nil, actionType, "计划已成功停止", id) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划已成功停止", nil, actionType, "计划已成功停止", id)
} }

View File

@@ -18,6 +18,7 @@ const (
// 客户端错误状态码 (4000-4999) // 客户端错误状态码 (4000-4999)
CodeBadRequest ResponseCode = 4000 // 请求参数错误 CodeBadRequest ResponseCode = 4000 // 请求参数错误
CodeUnauthorized ResponseCode = 4001 // 未授权 CodeUnauthorized ResponseCode = 4001 // 未授权
CodeForbidden ResponseCode = 4003 // 禁止访问
CodeNotFound ResponseCode = 4004 // 资源未找到 CodeNotFound ResponseCode = 4004 // 资源未找到
CodeConflict ResponseCode = 4009 // 资源冲突 CodeConflict ResponseCode = 4009 // 资源冲突

View File

@@ -17,6 +17,7 @@ func NewPlanToResponse(plan *models.Plan) (*PlanResponse, error) {
ID: plan.ID, ID: plan.ID,
Name: plan.Name, Name: plan.Name,
Description: plan.Description, Description: plan.Description,
PlanType: plan.PlanType,
ExecutionType: plan.ExecutionType, ExecutionType: plan.ExecutionType,
Status: plan.Status, Status: plan.Status,
ExecuteNum: plan.ExecuteNum, ExecuteNum: plan.ExecuteNum,
@@ -64,7 +65,7 @@ func NewPlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
ExecutionType: req.ExecutionType, ExecutionType: req.ExecutionType,
ExecuteNum: req.ExecuteNum, ExecuteNum: req.ExecuteNum,
CronExpression: req.CronExpression, CronExpression: req.CronExpression,
// ContentType 在控制器中设置,此处不再处理 // ContentType 和 PlanType 在控制器中设置,此处不再处理
} }
// 处理子计划 (通过ID引用) // 处理子计划 (通过ID引用)
@@ -116,7 +117,7 @@ func NewPlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
ExecutionType: req.ExecutionType, ExecutionType: req.ExecutionType,
ExecuteNum: req.ExecuteNum, ExecuteNum: req.ExecuteNum,
CronExpression: req.CronExpression, CronExpression: req.CronExpression,
// ContentType 在控制器中设置,此处不再处理 // ContentType 和 PlanType 在控制器中设置,此处不再处理
} }
// 处理子计划 (通过ID引用) // 处理子计划 (通过ID引用)

View File

@@ -2,6 +2,13 @@ package dto
import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
// ListPlansQuery 定义了获取计划列表时的查询参数
type ListPlansQuery struct {
PlanType string `form:"planType,default=custom"` // 计划类型 (all, custom, system),默认为 custom
Page int `form:"page,default=1"` // 页码
PageSize int `form:"pageSize,default=10"` // 每页大小
}
// CreatePlanRequest 定义创建计划请求的结构体 // CreatePlanRequest 定义创建计划请求的结构体
type CreatePlanRequest struct { type CreatePlanRequest struct {
Name string `json:"name" binding:"required" example:"猪舍温度控制计划"` Name string `json:"name" binding:"required" example:"猪舍温度控制计划"`
@@ -18,6 +25,7 @@ type PlanResponse struct {
ID uint `json:"id" example:"1"` ID uint `json:"id" example:"1"`
Name string `json:"name" example:"猪舍温度控制计划"` Name string `json:"name" example:"猪舍温度控制计划"`
Description string `json:"description" example:"根据温度自动调节风扇和加热器"` Description string `json:"description" example:"根据温度自动调节风扇和加热器"`
PlanType models.PlanType `json:"plan_type" example:"自定义任务"`
ExecutionType models.PlanExecutionType `json:"execution_type" example:"自动"` ExecutionType models.PlanExecutionType `json:"execution_type" example:"自动"`
Status models.PlanStatus `json:"status" example:"已启用"` Status models.PlanStatus `json:"status" example:"已启用"`
ExecuteNum uint `json:"execute_num" example:"10"` ExecuteNum uint `json:"execute_num" example:"10"`
@@ -31,7 +39,7 @@ type PlanResponse struct {
// ListPlansResponse 定义获取计划列表响应的结构体 // ListPlansResponse 定义获取计划列表响应的结构体
type ListPlansResponse struct { type ListPlansResponse struct {
Plans []PlanResponse `json:"plans"` Plans []PlanResponse `json:"plans"`
Total int `json:"total" example:"100"` Total int64 `json:"total" example:"100"`
} }
// UpdatePlanRequest 定义更新计划请求的结构体 // UpdatePlanRequest 定义更新计划请求的结构体

View File

@@ -9,7 +9,6 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/api" "git.huangwc.com/pig/pig-farm-controller/internal/app/api"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
) )
// Application 是整个应用的核心,封装了所有组件和生命周期。 // Application 是整个应用的核心,封装了所有组件和生命周期。
@@ -90,7 +89,6 @@ func (app *Application) Start() error {
// 3. 启动后台工作协程 // 3. 启动后台工作协程
app.Domain.Scheduler.Start() app.Domain.Scheduler.Start()
app.Domain.TimedCollector.Start()
// 4. 启动 API 服务器 // 4. 启动 API 服务器
app.API.Start() app.API.Start()
@@ -114,9 +112,6 @@ func (app *Application) Stop() error {
// 关闭任务执行器 // 关闭任务执行器
app.Domain.Scheduler.Stop() app.Domain.Scheduler.Stop()
// 关闭定时采集器
app.Domain.TimedCollector.Stop()
// 断开数据库连接 // 断开数据库连接
if err := app.Infra.Storage.Disconnect(); err != nil { if err := app.Infra.Storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err) app.Logger.Errorw("数据库连接断开失败", "error", err)
@@ -133,133 +128,3 @@ func (app *Application) Stop() error {
app.Logger.Info("应用已成功关闭") app.Logger.Info("应用已成功关闭")
return nil return nil
} }
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error {
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
func (app *Application) initializePendingCollections() error {
app.Logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
} else {
app.Logger.Info("没有需要清理的采集请求。")
}
return nil
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks() error {
logger := app.Logger
planRepo := app.Infra.Repos.PlanRepo
pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
executionLogRepo := app.Infra.Repos.ExecutionLogRepo
analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
if err != nil {
return fmt.Errorf("查找需要修正的计划失败: %w", err)
}
for _, plan := range plansToCorrect {
logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
// 更新计划的执行计数
plan.ExecuteCount++
logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
if plan.ExecutionType == models.PlanExecutionTypeManual ||
(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
// 更新计划状态为已停止
plan.Status = models.PlanStatusStopped
logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
}
// 保存更新后的计划
if err := planRepo.UpdatePlan(plan); err != nil {
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段一:固定次数计划修正完成。")
// 阶段二:清理所有待执行任务和相关日志
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
if err != nil {
return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
}
// 2. 收集所有受影响的唯一 PlanID
affectedPlanIDs := make(map[uint]struct{})
for _, log := range incompletePlanLogs {
affectedPlanIDs[log.PlanID] = struct{}{}
}
// 3. 对于每个受影响的 PlanID重置其 execute_count 并将其状态设置为 Failed
for planID := range affectedPlanIDs {
logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段二:计划主表状态修正完成。")
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表
if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
return fmt.Errorf("清空待执行任务列表失败: %w", err)
}
logger.Info("阶段二:待执行任务和相关日志清理完成。")
// 阶段三:初始刷新
logger.Info("阶段三:开始刷新待执行列表...")
if err := analysisPlanTaskManager.Refresh(); err != nil {
return fmt.Errorf("刷新待执行任务列表失败: %w", err)
}
logger.Info("阶段三:待执行任务列表初始化完成。")
logger.Info("待执行任务列表初始化完成。")
return nil
}

View File

@@ -7,10 +7,10 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
@@ -123,10 +123,10 @@ type DomainServices struct {
PigTradeManager pig.PigTradeManager PigTradeManager pig.PigTradeManager
PigSickManager pig.SickPigManager PigSickManager pig.SickPigManager
PigBatchDomain pig.PigBatchService PigBatchDomain pig.PigBatchService
TimedCollector collection.Collector
GeneralDeviceService device.Service GeneralDeviceService device.Service
AnalysisPlanTaskManager *task.AnalysisPlanTaskManager taskFactory scheduler.TaskFactory
Scheduler *task.Scheduler AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
Scheduler *scheduler.Scheduler
} }
// initDomainServices 初始化所有的领域服务。 // initDomainServices 初始化所有的领域服务。
@@ -148,30 +148,26 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
) )
// 计划任务管理器 // 计划任务管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
// 任务工厂
taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService)
// 任务执行器 // 任务执行器
scheduler := task.NewScheduler( planScheduler := scheduler.NewScheduler(
infra.Repos.PendingTaskRepo, infra.Repos.PendingTaskRepo,
infra.Repos.ExecutionLogRepo, infra.Repos.ExecutionLogRepo,
infra.Repos.DeviceRepo, infra.Repos.DeviceRepo,
infra.Repos.SensorDataRepo, infra.Repos.SensorDataRepo,
infra.Repos.PlanRepo, infra.Repos.PlanRepo,
analysisPlanTaskManager, analysisPlanTaskManager,
taskFactory,
logger, logger,
generalDeviceService, generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second, time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers, cfg.Task.NumWorkers,
) )
// 定时采集器
timedCollector := collection.NewTimedCollector(
infra.Repos.DeviceRepo,
generalDeviceService,
logger,
time.Duration(cfg.Collection.Interval)*time.Second,
)
return &DomainServices{ return &DomainServices{
PigPenTransferManager: pigPenTransferManager, PigPenTransferManager: pigPenTransferManager,
PigTradeManager: pigTradeManager, PigTradeManager: pigTradeManager,
@@ -179,8 +175,8 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
PigBatchDomain: pigBatchDomain, PigBatchDomain: pigBatchDomain,
GeneralDeviceService: generalDeviceService, GeneralDeviceService: generalDeviceService,
AnalysisPlanTaskManager: analysisPlanTaskManager, AnalysisPlanTaskManager: analysisPlanTaskManager,
Scheduler: scheduler, taskFactory: taskFactory,
TimedCollector: timedCollector, Scheduler: planScheduler,
} }
} }

View File

@@ -0,0 +1,217 @@
package core
import (
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
const (
// PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称
PlanNameTimedFullDataCollection = "定时全量数据采集"
)
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error {
// 初始化预定义系统计划 (致命错误)
if err := app.initializeSystemPlans(); err != nil {
return fmt.Errorf("初始化预定义系统计划失败: %w", err)
}
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
return nil
}
// initializeSystemPlans 确保预定义的系统计划在数据库中存在。
func (app *Application) initializeSystemPlans() error {
app.Logger.Info("开始检查并创建预定义的系统计划...")
// 动态构建预定义计划列表
predefinedSystemPlans := app.getPredefinedSystemPlans()
// 1. 获取所有已存在的系统计划
existingPlans, _, err := app.Infra.Repos.PlanRepo.ListPlans(repository.ListPlansOptions{
PlanType: repository.PlanTypeFilterSystem,
}, 1, 999) // 使用一个较大的 pageSize 来获取所有系统计划
if err != nil {
return fmt.Errorf("获取现有系统计划失败: %w", err)
}
// 2. 为了方便查找, 将现有计划名放入一个 map
existingPlanNames := make(map[string]bool)
for _, p := range existingPlans {
existingPlanNames[p.Name] = true
}
// 3. 遍历预定义的计划列表
for _, predefinedPlan := range predefinedSystemPlans {
// 4. 如果计划不存在, 则创建
if !existingPlanNames[predefinedPlan.Name] {
app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.Repos.PlanRepo.CreatePlan(&predefinedPlan); err != nil {
// 错误现在是致命的
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
}
app.Logger.Info("预定义系统计划检查完成。")
return nil
}
// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。
func (app *Application) getPredefinedSystemPlans() []models.Plan {
// 根据配置创建定时全量采集计划
interval := app.Config.Collection.Interval
if interval <= 0 {
interval = 1 // 确保间隔至少为1分钟
}
cronExpression := fmt.Sprintf("*/%d * * * *", interval)
timedCollectionPlan := models.Plan{
Name: PlanNameTimedFullDataCollection,
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 秒自动触发一次全量数据采集。", app.Config.Collection.Interval),
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: cronExpression,
Status: models.PlanStatusEnabled,
ContentType: models.PlanContentTypeTasks,
Tasks: []models.Task{
{
Name: "全量采集",
Description: "触发一次全量数据采集",
ExecutionOrder: 1,
Type: models.TaskTypeFullCollection,
},
},
}
return []models.Plan{timedCollectionPlan}
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
func (app *Application) initializePendingCollections() error {
app.Logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
} else {
app.Logger.Info("没有需要清理的采集请求。")
}
return nil
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks() error {
logger := app.Logger
planRepo := app.Infra.Repos.PlanRepo
pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
executionLogRepo := app.Infra.Repos.ExecutionLogRepo
analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
if err != nil {
return fmt.Errorf("查找需要修正的计划失败: %w", err)
}
for _, plan := range plansToCorrect {
logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
// 更新计划的执行计数
plan.ExecuteCount++
logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
if plan.ExecutionType == models.PlanExecutionTypeManual ||
(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
// 更新计划状态为已停止
plan.Status = models.PlanStatusStopped
logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
}
// 保存更新后的计划
if err := planRepo.UpdatePlan(plan); err != nil {
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段一:固定次数计划修正完成。")
// 阶段二:清理所有待执行任务和相关日志
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
if err != nil {
return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
}
// 2. 收集所有受影响的唯一 PlanID
affectedPlanIDs := make(map[uint]struct{})
for _, log := range incompletePlanLogs {
affectedPlanIDs[log.PlanID] = struct{}{}
}
// 3. 对于每个受影响的 PlanID重置其 execute_count 并将其状态设置为 Failed
for planID := range affectedPlanIDs {
logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段二:计划主表状态修正完成。")
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表
if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
return fmt.Errorf("清空待执行任务列表失败: %w", err)
}
logger.Info("阶段二:待执行任务和相关日志清理完成。")
// 阶段三:初始刷新
logger.Info("阶段三:开始刷新待执行列表...")
if err := analysisPlanTaskManager.Refresh(); err != nil {
return fmt.Errorf("刷新待执行任务列表失败: %w", err)
}
logger.Info("阶段三:待执行任务列表初始化完成。")
logger.Info("待执行任务列表初始化完成。")
return nil
}

View File

@@ -1,6 +0,0 @@
package collection
type Collector interface {
Start()
Stop()
}

View File

@@ -1,89 +0,0 @@
package collection
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令
type TimedCollector struct {
deviceRepo repository.DeviceRepository
deviceService device.Service
logger *logs.Logger
interval time.Duration
ticker *time.Ticker
done chan bool
}
// NewTimedCollector 创建一个定时采集器实例
func NewTimedCollector(
deviceRepo repository.DeviceRepository,
deviceService device.Service,
logger *logs.Logger,
interval time.Duration,
) Collector {
return &TimedCollector{
deviceRepo: deviceRepo,
deviceService: deviceService,
logger: logger,
interval: interval,
done: make(chan bool),
}
}
// Start 开始定时采集
func (c *TimedCollector) Start() {
c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval)
c.ticker = time.NewTicker(c.interval)
go func() {
for {
select {
case <-c.done:
return
case <-c.ticker.C:
c.collect()
}
}
}()
}
// Stop 停止定时采集
func (c *TimedCollector) Stop() {
c.logger.Info("定时采集器停止")
c.ticker.Stop()
c.done <- true
}
// collect 是核心的采集逻辑
func (c *TimedCollector) collect() {
c.logger.Info("开始新一轮的设备数据采集")
sensors, err := c.deviceRepo.ListAllSensors()
if err != nil {
c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err)
return
}
if len(sensors) == 0 {
c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集")
return
}
sensorsByController := make(map[uint][]*models.Device)
for _, sensor := range sensors {
sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
}
for controllerID, controllerSensors := range sensorsByController {
c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors))
if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil {
c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err)
}
}
c.logger.Info("本轮设备数据采集完成")
}

View File

@@ -1,4 +1,4 @@
package task package scheduler
import ( import (
"fmt" "fmt"

View File

@@ -1,4 +1,4 @@
package task package scheduler
import ( import (
"errors" "errors"
@@ -83,6 +83,7 @@ type Scheduler struct {
deviceRepo repository.DeviceRepository deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository sensorDataRepo repository.SensorDataRepository
planRepo repository.PlanRepository planRepo repository.PlanRepository
taskFactory TaskFactory
analysisPlanTaskManager *AnalysisPlanTaskManager analysisPlanTaskManager *AnalysisPlanTaskManager
progressTracker *ProgressTracker progressTracker *ProgressTracker
deviceService device.Service deviceService device.Service
@@ -100,6 +101,7 @@ func NewScheduler(
sensorDataRepo repository.SensorDataRepository, sensorDataRepo repository.SensorDataRepository,
planRepo repository.PlanRepository, planRepo repository.PlanRepository,
analysisPlanTaskManager *AnalysisPlanTaskManager, analysisPlanTaskManager *AnalysisPlanTaskManager,
taskFactory TaskFactory,
logger *logs.Logger, logger *logs.Logger,
deviceService device.Service, deviceService device.Service,
interval time.Duration, interval time.Duration,
@@ -112,6 +114,7 @@ func NewScheduler(
sensorDataRepo: sensorDataRepo, sensorDataRepo: sensorDataRepo,
planRepo: planRepo, planRepo: planRepo,
analysisPlanTaskManager: analysisPlanTaskManager, analysisPlanTaskManager: analysisPlanTaskManager,
taskFactory: taskFactory,
logger: logger, logger: logger,
deviceService: deviceService, deviceService: deviceService,
pollingInterval: interval, pollingInterval: interval,
@@ -271,7 +274,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
} else { } else {
// 执行普通任务 // 执行普通任务
task := s.taskFactory(claimedLog) task := s.taskFactory.Production(claimedLog)
if err := task.Execute(); err != nil { if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
@@ -283,20 +286,6 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
return nil return nil
} }
// taskFactory 会根据任务类型初始化对应任务
func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
switch claimedLog.Task.Type {
case models.TaskTypeWaiting:
return NewDelayTask(s.logger, claimedLog)
case models.TaskTypeReleaseFeedWeight:
return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger)
default:
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
panic("不支持的任务类型")
}
}
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录 // 创建Plan执行记录

View File

@@ -0,0 +1,23 @@
package scheduler
import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
// Task 定义了所有可被调度器执行的任务必须实现的接口。
type Task interface {
// Execute 是任务的核心执行逻辑。
// ctx: 用于控制任务的超时或取消。
// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
Execute() error
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
// log: 任务执行的上下文。
// executeErr: 从 Execute 方法返回的原始错误。
OnFailure(executeErr error)
}
// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。
type TaskFactory interface {
// Production 根据指定的任务执行日志创建一个任务实例。
Production(claimedLog *models.TaskExecutionLog) Task
}

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
) )
@@ -19,7 +20,7 @@ type DelayTask struct {
logger *logs.Logger logger *logs.Logger
} }
func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task {
return &DelayTask{ return &DelayTask{
executionTask: executionTask, executionTask: executionTask,
logger: logger, logger: logger,

View File

@@ -0,0 +1,93 @@
package task
import (
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集
type FullCollectionTask struct {
log *models.TaskExecutionLog
deviceRepo repository.DeviceRepository
deviceService device.Service
logger *logs.Logger
}
// NewFullCollectionTask 创建一个全量采集任务实例
func NewFullCollectionTask(
log *models.TaskExecutionLog,
deviceRepo repository.DeviceRepository,
deviceService device.Service,
logger *logs.Logger,
) *FullCollectionTask {
return &FullCollectionTask{
log: log,
deviceRepo: deviceRepo,
deviceService: deviceService,
logger: logger,
}
}
// Execute 是任务的核心执行逻辑
func (t *FullCollectionTask) Execute() error {
t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
sensors, err := t.deviceRepo.ListAllSensors()
if err != nil {
return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err)
}
if len(sensors) == 0 {
t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
return nil
}
sensorsByController := make(map[uint][]*models.Device)
for _, sensor := range sensors {
sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
}
var firstError error
for controllerID, controllerSensors := range sensorsByController {
t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令",
"task_id", t.log.TaskID,
"task_type", t.log.Task.Type,
"log_id", t.log.ID,
"controller_id", controllerID,
"sensor_count", len(controllerSensors),
)
if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil {
t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败",
"task_id", t.log.TaskID,
"task_type", t.log.Task.Type,
"log_id", t.log.ID,
"controller_id", controllerID,
"error", err,
)
if firstError == nil {
firstError = err // 保存第一个错误
}
}
}
if firstError != nil {
return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError)
}
t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
return nil
}
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑
func (t *FullCollectionTask) OnFailure(executeErr error) {
t.logger.Errorw("全量采集任务执行失败",
"task_id", t.log.TaskID,
"task_type", t.log.Task.Type,
"log_id", t.log.ID,
"error", executeErr,
)
}

View File

@@ -6,6 +6,7 @@ import (
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
@@ -40,12 +41,12 @@ func NewReleaseFeedWeightTask(
deviceRepo repository.DeviceRepository, deviceRepo repository.DeviceRepository,
deviceService device.Service, deviceService device.Service,
logger *logs.Logger, logger *logs.Logger,
) Task { ) scheduler.Task {
return &ReleaseFeedWeightTask{ return &ReleaseFeedWeightTask{
claimedLog: claimedLog, claimedLog: claimedLog,
deviceRepo: deviceRepo, deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo, sensorDataRepo: sensorDataRepo,
feedPort: deviceService, // 直接注入 feedPort: deviceService,
logger: logger, logger: logger,
} }
} }

View File

@@ -1,30 +1,45 @@
package task package task
import ( import (
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
) )
// Task 定义了所有可被调度器执行的任务必须实现的接口。 type taskFactory struct {
type Task interface { logger *logs.Logger
// Execute 是任务的核心执行逻辑。 sensorDataRepo repository.SensorDataRepository
// ctx: 用于控制任务的超时或取消。 deviceRepo repository.DeviceRepository
// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 deviceService device.Service
// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
Execute() error
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
// log: 任务执行的上下文。
// executeErr: 从 Execute 方法返回的原始错误。
OnFailure(executeErr error)
} }
// TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数 func NewTaskFactory(
var TaskFactory = func(tt models.TaskType) Task { logger *logs.Logger,
switch tt { sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository,
deviceService device.Service,
) scheduler.TaskFactory {
return &taskFactory{
logger: logger,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
deviceService: deviceService,
}
}
func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task {
switch claimedLog.Task.Type {
case models.TaskTypeWaiting: case models.TaskTypeWaiting:
return &DelayTask{} return NewDelayTask(t.logger, claimedLog)
case models.TaskTypeReleaseFeedWeight:
return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger)
case models.TaskTypeFullCollection:
return NewFullCollectionTask(claimedLog, t.deviceRepo, t.deviceService, t.logger)
default: default:
// 出现位置任务类型说明业务逻辑出现重大问题, 一个异常任务被创建了出来 // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常任务类型
panic("发现未知任务类型") t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type)
panic("不支持的任务类型") // 显式panic防编译器报错
} }
} }

View File

@@ -200,13 +200,18 @@ type LarkConfig struct {
// CollectionConfig 代表定时采集配置 // CollectionConfig 代表定时采集配置
type CollectionConfig struct { type CollectionConfig struct {
// Interval 采集间隔(分钟), 默认 1
Interval int `yaml:"interval"` Interval int `yaml:"interval"`
} }
// NewConfig 创建并返回一个新的配置实例 // NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config { func NewConfig() *Config {
// 默认值可以在这里设置,但我们优先使用配置文件中的值 // 默认值可以在这里设置,但我们优先使用配置文件中的值
return &Config{} return &Config{
Collection: CollectionConfig{
Interval: 1, // 默认为1分钟
},
}
} }
// Load 从指定路径加载配置文件 // Load 从指定路径加载配置文件

View File

@@ -34,6 +34,7 @@ const (
TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务
TaskTypeWaiting TaskType = "等待" // 等待任务 TaskTypeWaiting TaskType = "等待" // 等待任务
TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务
TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务
) )
// -- Task Parameters -- // -- Task Parameters --
@@ -52,12 +53,20 @@ const (
PlanStatusFailed PlanStatus = "执行失败" // 执行失败 PlanStatusFailed PlanStatus = "执行失败" // 执行失败
) )
type PlanType string
const (
PlanTypeCustom PlanType = "自定义任务"
PlanTypeSystem PlanType = "系统任务"
)
// Plan 代表系统中的一个计划,可以包含子计划或任务 // Plan 代表系统中的一个计划,可以包含子计划或任务
type Plan struct { type Plan struct {
gorm.Model gorm.Model
Name string `gorm:"not null" json:"name"` Name string `gorm:"not null" json:"name"`
Description string `json:"description"` Description string `json:"description"`
PlanType PlanType `gorm:"not null;index" json:"plan_type"` // 任务类型, 包括系统任务和用户自定义任务
ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"` ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"`
Status PlanStatus `gorm:"default:'已禁用';index" json:"status"` // 计划是否被启动 Status PlanStatus `gorm:"default:'已禁用';index" json:"status"` // 计划是否被启动
ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数 ExecuteNum uint `gorm:"default:0" json:"execute_num"` // 计划预期执行次数

View File

@@ -21,11 +21,25 @@ var (
ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划") ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划")
) )
// PlanTypeFilter 定义计划类型的过滤器
type PlanTypeFilter string
const (
PlanTypeFilterAll PlanTypeFilter = "all"
PlanTypeFilterCustom PlanTypeFilter = "custom"
PlanTypeFilterSystem PlanTypeFilter = "system"
)
// ListPlansOptions 定义了查询计划时的可选参数
type ListPlansOptions struct {
PlanType PlanTypeFilter
}
// PlanRepository 定义了与计划模型相关的数据库操作接口 // PlanRepository 定义了与计划模型相关的数据库操作接口
// 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现 // 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现
type PlanRepository interface { type PlanRepository interface {
// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情 // ListPlans 获取计划列表,支持过滤和分页
ListBasicPlans() ([]models.Plan, error) ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error)
// GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情 // GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情
GetBasicPlanByID(id uint) (*models.Plan, error) GetBasicPlanByID(id uint) (*models.Plan, error)
// GetPlanByID 根据ID获取计划包含子计划和任务详情 // GetPlanByID 根据ID获取计划包含子计划和任务详情
@@ -81,15 +95,37 @@ func NewGormPlanRepository(db *gorm.DB) PlanRepository {
} }
} }
// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情 // ListPlans 获取计划列表,支持过滤和分页
func (r *gormPlanRepository) ListBasicPlans() ([]models.Plan, error) { func (r *gormPlanRepository) ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) {
var plans []models.Plan if page <= 0 || pageSize <= 0 {
// GORM 默认不会加载关联,除非使用 Preload所以直接 Find 即可满足要求 return nil, 0, ErrInvalidPagination
result := r.db.Find(&plans)
if result.Error != nil {
return nil, result.Error
} }
return plans, nil
var plans []models.Plan
var total int64
query := r.db.Model(&models.Plan{})
switch opts.PlanType {
case PlanTypeFilterCustom:
query = query.Where("plan_type = ?", models.PlanTypeCustom)
case PlanTypeFilterSystem:
query = query.Where("plan_type = ?", models.PlanTypeSystem)
case PlanTypeFilterAll:
// 不添加 plan_type 的过滤条件
default:
// 默认查询自定义
query = query.Where("plan_type = ?", models.PlanTypeCustom)
}
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
offset := (page - 1) * pageSize
err := query.Limit(pageSize).Offset(offset).Order("id DESC").Find(&plans).Error
return plans, total, err
} }
// GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情 // GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情