issue_42 #46
@@ -112,4 +112,4 @@ notify:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# 定时采集配置
 | 
					# 定时采集配置
 | 
				
			||||||
collection:
 | 
					collection:
 | 
				
			||||||
  interval: 300 # 采集间隔 (秒)
 | 
					  interval: 1 # 采集间隔 (分钟)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,7 +12,7 @@ server:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# 日志配置
 | 
					# 日志配置
 | 
				
			||||||
log:
 | 
					log:
 | 
				
			||||||
  level: "debug" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
 | 
					  level: "info" # 日志级别: "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
 | 
				
			||||||
  format: "console" # 日志格式: "console" 或 "json"
 | 
					  format: "console" # 日志格式: "console" 或 "json"
 | 
				
			||||||
  enable_file: true # 是否启用文件日志
 | 
					  enable_file: true # 是否启用文件日志
 | 
				
			||||||
  file_path: "./app_logs/app.log" # 日志文件路径
 | 
					  file_path: "./app_logs/app.log" # 日志文件路径
 | 
				
			||||||
@@ -90,4 +90,4 @@ lora_mesh:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# 定时采集配置
 | 
					# 定时采集配置
 | 
				
			||||||
collection:
 | 
					collection:
 | 
				
			||||||
  interval: 300 # 采集间隔 (秒)
 | 
					  interval: 1 # 采集间隔 (分钟)
 | 
				
			||||||
@@ -29,7 +29,7 @@ import (
 | 
				
			|||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
 | 
				
			||||||
	domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
						domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
	domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
						domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
@@ -40,21 +40,21 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// API 结构体定义了 HTTP 服务器及其依赖
 | 
					// API 结构体定义了 HTTP 服务器及其依赖
 | 
				
			||||||
type API struct {
 | 
					type API struct {
 | 
				
			||||||
	engine              *gin.Engine                    // Gin 引擎实例,用于处理 HTTP 请求
 | 
						engine              *gin.Engine                        // Gin 引擎实例,用于处理 HTTP 请求
 | 
				
			||||||
	logger              *logs.Logger                   // 日志记录器,用于输出日志信息
 | 
						logger              *logs.Logger                       // 日志记录器,用于输出日志信息
 | 
				
			||||||
	userRepo            repository.UserRepository      // 用户数据仓库接口,用于用户数据操作
 | 
						userRepo            repository.UserRepository          // 用户数据仓库接口,用于用户数据操作
 | 
				
			||||||
	tokenService        token.Service                  // Token 服务接口,用于 JWT token 的生成和解析
 | 
						tokenService        token.Service                      // Token 服务接口,用于 JWT token 的生成和解析
 | 
				
			||||||
	auditService        audit.Service                  // 审计服务,用于记录用户操作
 | 
						auditService        audit.Service                      // 审计服务,用于记录用户操作
 | 
				
			||||||
	httpServer          *http.Server                   // 标准库的 HTTP 服务器实例,用于启动和停止服务
 | 
						httpServer          *http.Server                       // 标准库的 HTTP 服务器实例,用于启动和停止服务
 | 
				
			||||||
	config              config.ServerConfig            // API 服务器的配置,使用 infra/config 包中的 ServerConfig
 | 
						config              config.ServerConfig                // API 服务器的配置,使用 infra/config 包中的 ServerConfig
 | 
				
			||||||
	userController      *user.Controller               // 用户控制器实例
 | 
						userController      *user.Controller                   // 用户控制器实例
 | 
				
			||||||
	deviceController    *device.Controller             // 设备控制器实例
 | 
						deviceController    *device.Controller                 // 设备控制器实例
 | 
				
			||||||
	planController      *plan.Controller               // 计划控制器实例
 | 
						planController      *plan.Controller                   // 计划控制器实例
 | 
				
			||||||
	pigFarmController   *management.PigFarmController  // 猪场管理控制器实例
 | 
						pigFarmController   *management.PigFarmController      // 猪场管理控制器实例
 | 
				
			||||||
	pigBatchController  *management.PigBatchController // 猪群控制器实例
 | 
						pigBatchController  *management.PigBatchController     // 猪群控制器实例
 | 
				
			||||||
	monitorController   *monitor.Controller            // 数据监控控制器实例
 | 
						monitorController   *monitor.Controller                // 数据监控控制器实例
 | 
				
			||||||
	listenHandler       webhook.ListenHandler          // 设备上行事件监听器
 | 
						listenHandler       webhook.ListenHandler              // 设备上行事件监听器
 | 
				
			||||||
	analysisTaskManager *task.AnalysisPlanTaskManager  // 计划触发器管理器实例
 | 
						analysisTaskManager *scheduler.AnalysisPlanTaskManager // 计划触发器管理器实例
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewAPI 创建并返回一个新的 API 实例
 | 
					// NewAPI 创建并返回一个新的 API 实例
 | 
				
			||||||
@@ -74,7 +74,7 @@ func NewAPI(cfg config.ServerConfig,
 | 
				
			|||||||
	notifyService domain_notify.Service,
 | 
						notifyService domain_notify.Service,
 | 
				
			||||||
	deviceService domain_device.Service,
 | 
						deviceService domain_device.Service,
 | 
				
			||||||
	listenHandler webhook.ListenHandler,
 | 
						listenHandler webhook.ListenHandler,
 | 
				
			||||||
	analysisTaskManager *task.AnalysisPlanTaskManager) *API {
 | 
						analysisTaskManager *scheduler.AnalysisPlanTaskManager) *API {
 | 
				
			||||||
	// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
 | 
						// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
 | 
				
			||||||
	// 从配置中获取 Gin 模式
 | 
						// 从配置中获取 Gin 模式
 | 
				
			||||||
	gin.SetMode(cfg.Mode)
 | 
						gin.SetMode(cfg.Mode)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,7 +6,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
@@ -20,11 +20,11 @@ import (
 | 
				
			|||||||
type Controller struct {
 | 
					type Controller struct {
 | 
				
			||||||
	logger                  *logs.Logger
 | 
						logger                  *logs.Logger
 | 
				
			||||||
	planRepo                repository.PlanRepository
 | 
						planRepo                repository.PlanRepository
 | 
				
			||||||
	analysisPlanTaskManager *task.AnalysisPlanTaskManager
 | 
						analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewController 创建一个新的 Controller 实例
 | 
					// NewController 创建一个新的 Controller 实例
 | 
				
			||||||
func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager) *Controller {
 | 
					func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *scheduler.AnalysisPlanTaskManager) *Controller {
 | 
				
			||||||
	return &Controller{
 | 
						return &Controller{
 | 
				
			||||||
		logger:                  logger,
 | 
							logger:                  logger,
 | 
				
			||||||
		planRepo:                planRepo,
 | 
							planRepo:                planRepo,
 | 
				
			||||||
@@ -61,7 +61,11 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// --- 自动判断 ContentType ---
 | 
						// --- 业务规则处理 ---
 | 
				
			||||||
 | 
						// 1. 设置计划类型:用户创建的计划永远是自定义计划
 | 
				
			||||||
 | 
						planToCreate.PlanType = models.PlanTypeCustom
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 2. 自动判断 ContentType
 | 
				
			||||||
	if len(req.SubPlanIDs) > 0 {
 | 
						if len(req.SubPlanIDs) > 0 {
 | 
				
			||||||
		planToCreate.ContentType = models.PlanContentTypeSubPlans
 | 
							planToCreate.ContentType = models.PlanContentTypeSubPlans
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
@@ -145,16 +149,25 @@ func (c *Controller) GetPlan(ctx *gin.Context) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// ListPlans godoc
 | 
					// ListPlans godoc
 | 
				
			||||||
// @Summary      获取计划列表
 | 
					// @Summary      获取计划列表
 | 
				
			||||||
// @Description  获取所有计划的列表
 | 
					// @Description  获取所有计划的列表,支持按类型过滤和分页
 | 
				
			||||||
// @Tags         计划管理
 | 
					// @Tags         计划管理
 | 
				
			||||||
// @Security     BearerAuth
 | 
					// @Security     BearerAuth
 | 
				
			||||||
// @Produce      json
 | 
					// @Produce      json
 | 
				
			||||||
// @Success      200 {object} controller.Response{data=[]dto.PlanResponse} "业务码为200代表成功获取列表"
 | 
					// @Param        query query dto.ListPlansQuery false "查询参数"
 | 
				
			||||||
 | 
					// @Success      200 {object} controller.Response{data=dto.ListPlansResponse} "业务码为200代表成功获取列表"
 | 
				
			||||||
// @Router       /api/v1/plans [get]
 | 
					// @Router       /api/v1/plans [get]
 | 
				
			||||||
func (c *Controller) ListPlans(ctx *gin.Context) {
 | 
					func (c *Controller) ListPlans(ctx *gin.Context) {
 | 
				
			||||||
	const actionType = "获取计划列表"
 | 
						const actionType = "获取计划列表"
 | 
				
			||||||
 | 
						var query dto.ListPlansQuery
 | 
				
			||||||
 | 
						if err := ctx.ShouldBindQuery(&query); err != nil {
 | 
				
			||||||
 | 
							c.logger.Errorf("%s: 查询参数绑定失败: %v", actionType, err)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "查询参数绑定失败", query)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 1. 调用仓库层获取所有计划
 | 
						// 1. 调用仓库层获取所有计划
 | 
				
			||||||
	plans, err := c.planRepo.ListBasicPlans()
 | 
						opts := repository.ListPlansOptions{PlanType: repository.PlanTypeFilter(query.PlanType)}
 | 
				
			||||||
 | 
						plans, total, err := c.planRepo.ListPlans(opts, query.Page, query.PageSize)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
 | 
							c.logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划列表时发生内部错误", actionType, "数据库查询失败", nil)
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划列表时发生内部错误", actionType, "数据库查询失败", nil)
 | 
				
			||||||
@@ -176,7 +189,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
 | 
				
			|||||||
	// 3. 构造并发送成功响应
 | 
						// 3. 构造并发送成功响应
 | 
				
			||||||
	resp := dto.ListPlansResponse{
 | 
						resp := dto.ListPlansResponse{
 | 
				
			||||||
		Plans: planResponses,
 | 
							Plans: planResponses,
 | 
				
			||||||
		Total: len(planResponses),
 | 
							Total: total,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	c.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(planResponses))
 | 
						c.logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(planResponses))
 | 
				
			||||||
	controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取计划列表成功", resp, actionType, "获取计划列表成功", resp)
 | 
						controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取计划列表成功", resp, actionType, "获取计划列表成功", resp)
 | 
				
			||||||
@@ -184,7 +197,7 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// UpdatePlan godoc
 | 
					// UpdatePlan godoc
 | 
				
			||||||
// @Summary      更新计划
 | 
					// @Summary      更新计划
 | 
				
			||||||
// @Description  根据计划ID更新计划的详细信息。
 | 
					// @Description  根据计划ID更新计划的详细信息。系统计划不允许修改。
 | 
				
			||||||
// @Tags         计划管理
 | 
					// @Tags         计划管理
 | 
				
			||||||
// @Security     BearerAuth
 | 
					// @Security     BearerAuth
 | 
				
			||||||
// @Accept       json
 | 
					// @Accept       json
 | 
				
			||||||
@@ -212,7 +225,27 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 3. 将请求转换为模型(转换函数带校验)
 | 
						// 3. 检查计划是否存在
 | 
				
			||||||
 | 
						existingPlan, err := c.planRepo.GetBasicPlanByID(uint(id))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
				
			||||||
 | 
								c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
				
			||||||
 | 
								controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 4. 业务规则:系统计划不允许修改
 | 
				
			||||||
 | 
						if existingPlan.PlanType == models.PlanTypeSystem {
 | 
				
			||||||
 | 
							c.logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, id)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许修改", actionType, "尝试修改系统计划", id)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 5. 将请求转换为模型(转换函数带校验)
 | 
				
			||||||
	planToUpdate, err := dto.NewPlanFromUpdateRequest(&req)
 | 
						planToUpdate, err := dto.NewPlanFromUpdateRequest(&req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 计划数据校验失败: %v", actionType, err)
 | 
							c.logger.Errorf("%s: 计划数据校验失败: %v", actionType, err)
 | 
				
			||||||
@@ -229,20 +262,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		planToUpdate.ContentType = models.PlanContentTypeTasks
 | 
							planToUpdate.ContentType = models.PlanContentTypeTasks
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 4. 检查计划是否存在
 | 
						// 6. 调用仓库方法更新计划
 | 
				
			||||||
	_, err = c.planRepo.GetBasicPlanByID(uint(id))
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if errors.Is(err, gorm.ErrRecordNotFound) {
 | 
					 | 
				
			||||||
			c.logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
 | 
					 | 
				
			||||||
			controller.SendErrorWithAudit(ctx, controller.CodeNotFound, "计划不存在", actionType, "计划不存在", id)
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		c.logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
 | 
					 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "获取计划信息时发生内部错误", actionType, "数据库查询失败", id)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 5. 调用仓库方法更新计划
 | 
					 | 
				
			||||||
	// 只要是更新任务,就重置执行计数器
 | 
						// 只要是更新任务,就重置执行计数器
 | 
				
			||||||
	planToUpdate.ExecuteCount = 0 // 重置计数器
 | 
						planToUpdate.ExecuteCount = 0 // 重置计数器
 | 
				
			||||||
	c.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
 | 
						c.logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
 | 
				
			||||||
@@ -259,7 +279,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
 | 
							c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 6. 获取更新后的完整计划用于响应
 | 
						// 7. 获取更新后的完整计划用于响应
 | 
				
			||||||
	updatedPlan, err := c.planRepo.GetPlanByID(uint(id))
 | 
						updatedPlan, err := c.planRepo.GetPlanByID(uint(id))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, id)
 | 
							c.logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, id)
 | 
				
			||||||
@@ -267,7 +287,7 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 7. 将模型转换为响应 DTO
 | 
						// 8. 将模型转换为响应 DTO
 | 
				
			||||||
	resp, err := dto.NewPlanToResponse(updatedPlan)
 | 
						resp, err := dto.NewPlanToResponse(updatedPlan)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 序列化响应失败: %v, Updated Plan: %+v", actionType, err, updatedPlan)
 | 
							c.logger.Errorf("%s: 序列化响应失败: %v, Updated Plan: %+v", actionType, err, updatedPlan)
 | 
				
			||||||
@@ -275,14 +295,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 8. 发送成功响应
 | 
						// 9. 发送成功响应
 | 
				
			||||||
	c.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
 | 
						c.logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
 | 
				
			||||||
	controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划更新成功", resp, actionType, "计划更新成功", resp)
 | 
						controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划更新成功", resp, actionType, "计划更新成功", resp)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DeletePlan godoc
 | 
					// DeletePlan godoc
 | 
				
			||||||
// @Summary      删除计划
 | 
					// @Summary      删除计划
 | 
				
			||||||
// @Description  根据计划ID删除计划。(软删除)
 | 
					// @Description  根据计划ID删除计划。(软删除)系统计划不允许删除。
 | 
				
			||||||
// @Tags         计划管理
 | 
					// @Tags         计划管理
 | 
				
			||||||
// @Security     BearerAuth
 | 
					// @Security     BearerAuth
 | 
				
			||||||
// @Produce      json
 | 
					// @Produce      json
 | 
				
			||||||
@@ -313,7 +333,14 @@ func (c *Controller) DeletePlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 3. 停止这个计划
 | 
						// 3. 业务规则:系统计划不允许删除
 | 
				
			||||||
 | 
						if plan.PlanType == models.PlanTypeSystem {
 | 
				
			||||||
 | 
							c.logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许删除", actionType, "尝试删除系统计划", id)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 4. 停止这个计划
 | 
				
			||||||
	if plan.Status == models.PlanStatusEnabled {
 | 
						if plan.Status == models.PlanStatusEnabled {
 | 
				
			||||||
		if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
 | 
							if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
 | 
				
			||||||
			c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
								c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
				
			||||||
@@ -322,21 +349,21 @@ func (c *Controller) DeletePlan(ctx *gin.Context) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 4. 调用仓库层删除计划
 | 
						// 5. 调用仓库层删除计划
 | 
				
			||||||
	if err := c.planRepo.DeletePlan(uint(id)); err != nil {
 | 
						if err := c.planRepo.DeletePlan(uint(id)); err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
 | 
							c.logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "删除计划时发生内部错误", actionType, "数据库删除失败", id)
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "删除计划时发生内部错误", actionType, "数据库删除失败", id)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 5. 发送成功响应
 | 
						// 6. 发送成功响应
 | 
				
			||||||
	c.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
 | 
						c.logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
 | 
				
			||||||
	controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划删除成功", nil, actionType, "计划删除成功", id)
 | 
						controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划删除成功", nil, actionType, "计划删除成功", id)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// StartPlan godoc
 | 
					// StartPlan godoc
 | 
				
			||||||
// @Summary      启动计划
 | 
					// @Summary      启动计划
 | 
				
			||||||
// @Description  根据计划ID启动一个计划的执行。
 | 
					// @Description  根据计划ID启动一个计划的执行。系统计划不允许手动启动。
 | 
				
			||||||
// @Tags         计划管理
 | 
					// @Tags         计划管理
 | 
				
			||||||
// @Security     BearerAuth
 | 
					// @Security     BearerAuth
 | 
				
			||||||
// @Produce      json
 | 
					// @Produce      json
 | 
				
			||||||
@@ -367,7 +394,12 @@ func (c *Controller) StartPlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 3. 检查计划当前状态
 | 
						// 3. 业务规则检查
 | 
				
			||||||
 | 
						if plan.PlanType == models.PlanTypeSystem {
 | 
				
			||||||
 | 
							c.logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许手动启动", actionType, "尝试手动启动系统计划", id)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if plan.Status == models.PlanStatusEnabled {
 | 
						if plan.Status == models.PlanStatusEnabled {
 | 
				
			||||||
		c.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
 | 
							c.logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划已处于启动状态,无需重复操作", actionType, "计划已处于启动状态", id)
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划已处于启动状态,无需重复操作", actionType, "计划已处于启动状态", id)
 | 
				
			||||||
@@ -416,7 +448,7 @@ func (c *Controller) StartPlan(ctx *gin.Context) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// StopPlan godoc
 | 
					// StopPlan godoc
 | 
				
			||||||
// @Summary      停止计划
 | 
					// @Summary      停止计划
 | 
				
			||||||
// @Description  根据计划ID停止一个正在执行的计划。
 | 
					// @Description  根据计划ID停止一个正在执行的计划。系统计划不能被停止。
 | 
				
			||||||
// @Tags         计划管理
 | 
					// @Tags         计划管理
 | 
				
			||||||
// @Security     BearerAuth
 | 
					// @Security     BearerAuth
 | 
				
			||||||
// @Produce      json
 | 
					// @Produce      json
 | 
				
			||||||
@@ -447,21 +479,28 @@ func (c *Controller) StopPlan(ctx *gin.Context) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 3. 检查计划当前状态
 | 
						// 3. 业务规则:系统计划不允许停止
 | 
				
			||||||
 | 
						if plan.PlanType == models.PlanTypeSystem {
 | 
				
			||||||
 | 
							c.logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id)
 | 
				
			||||||
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeForbidden, "系统计划不允许停止", actionType, "尝试停止系统计划", id)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 4. 检查计划当前状态
 | 
				
			||||||
	if plan.Status != models.PlanStatusEnabled {
 | 
						if plan.Status != models.PlanStatusEnabled {
 | 
				
			||||||
		c.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
 | 
							c.logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划当前不是启用状态", actionType, "计划未启用", id)
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "计划当前不是启用状态", actionType, "计划未启用", id)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 4. 调用仓库层方法,该方法内部处理事务
 | 
						// 5. 调用仓库层方法,该方法内部处理事务
 | 
				
			||||||
	if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
 | 
						if err := c.planRepo.StopPlanTransactionally(uint(id)); err != nil {
 | 
				
			||||||
		c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
							c.logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
 | 
				
			||||||
		controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error(), actionType, "停止计划失败", id)
 | 
							controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "停止计划时发生内部错误: "+err.Error(), actionType, "停止计划失败", id)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 5. 发送成功响应
 | 
						// 6. 发送成功响应
 | 
				
			||||||
	c.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
 | 
						c.logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
 | 
				
			||||||
	controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划已成功停止", nil, actionType, "计划已成功停止", id)
 | 
						controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "计划已成功停止", nil, actionType, "计划已成功停止", id)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,6 +18,7 @@ const (
 | 
				
			|||||||
	// 客户端错误状态码 (4000-4999)
 | 
						// 客户端错误状态码 (4000-4999)
 | 
				
			||||||
	CodeBadRequest   ResponseCode = 4000 // 请求参数错误
 | 
						CodeBadRequest   ResponseCode = 4000 // 请求参数错误
 | 
				
			||||||
	CodeUnauthorized ResponseCode = 4001 // 未授权
 | 
						CodeUnauthorized ResponseCode = 4001 // 未授权
 | 
				
			||||||
 | 
						CodeForbidden    ResponseCode = 4003 // 禁止访问
 | 
				
			||||||
	CodeNotFound     ResponseCode = 4004 // 资源未找到
 | 
						CodeNotFound     ResponseCode = 4004 // 资源未找到
 | 
				
			||||||
	CodeConflict     ResponseCode = 4009 // 资源冲突
 | 
						CodeConflict     ResponseCode = 4009 // 资源冲突
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,7 @@ func NewPlanToResponse(plan *models.Plan) (*PlanResponse, error) {
 | 
				
			|||||||
		ID:             plan.ID,
 | 
							ID:             plan.ID,
 | 
				
			||||||
		Name:           plan.Name,
 | 
							Name:           plan.Name,
 | 
				
			||||||
		Description:    plan.Description,
 | 
							Description:    plan.Description,
 | 
				
			||||||
 | 
							PlanType:       plan.PlanType,
 | 
				
			||||||
		ExecutionType:  plan.ExecutionType,
 | 
							ExecutionType:  plan.ExecutionType,
 | 
				
			||||||
		Status:         plan.Status,
 | 
							Status:         plan.Status,
 | 
				
			||||||
		ExecuteNum:     plan.ExecuteNum,
 | 
							ExecuteNum:     plan.ExecuteNum,
 | 
				
			||||||
@@ -64,7 +65,7 @@ func NewPlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
 | 
				
			|||||||
		ExecutionType:  req.ExecutionType,
 | 
							ExecutionType:  req.ExecutionType,
 | 
				
			||||||
		ExecuteNum:     req.ExecuteNum,
 | 
							ExecuteNum:     req.ExecuteNum,
 | 
				
			||||||
		CronExpression: req.CronExpression,
 | 
							CronExpression: req.CronExpression,
 | 
				
			||||||
		// ContentType 在控制器中设置,此处不再处理
 | 
							// ContentType 和 PlanType 在控制器中设置,此处不再处理
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 处理子计划 (通过ID引用)
 | 
						// 处理子计划 (通过ID引用)
 | 
				
			||||||
@@ -116,7 +117,7 @@ func NewPlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
 | 
				
			|||||||
		ExecutionType:  req.ExecutionType,
 | 
							ExecutionType:  req.ExecutionType,
 | 
				
			||||||
		ExecuteNum:     req.ExecuteNum,
 | 
							ExecuteNum:     req.ExecuteNum,
 | 
				
			||||||
		CronExpression: req.CronExpression,
 | 
							CronExpression: req.CronExpression,
 | 
				
			||||||
		// ContentType 在控制器中设置,此处不再处理
 | 
							// ContentType 和 PlanType 在控制器中设置,此处不再处理
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 处理子计划 (通过ID引用)
 | 
						// 处理子计划 (通过ID引用)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,6 +2,13 @@ package dto
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
					import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ListPlansQuery 定义了获取计划列表时的查询参数
 | 
				
			||||||
 | 
					type ListPlansQuery struct {
 | 
				
			||||||
 | 
						PlanType string `form:"planType,default=custom"` // 计划类型 (all, custom, system),默认为 custom
 | 
				
			||||||
 | 
						Page     int    `form:"page,default=1"`          // 页码
 | 
				
			||||||
 | 
						PageSize int    `form:"pageSize,default=10"`     // 每页大小
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CreatePlanRequest 定义创建计划请求的结构体
 | 
					// CreatePlanRequest 定义创建计划请求的结构体
 | 
				
			||||||
type CreatePlanRequest struct {
 | 
					type CreatePlanRequest struct {
 | 
				
			||||||
	Name           string                   `json:"name" binding:"required" example:"猪舍温度控制计划"`
 | 
						Name           string                   `json:"name" binding:"required" example:"猪舍温度控制计划"`
 | 
				
			||||||
@@ -18,6 +25,7 @@ type PlanResponse struct {
 | 
				
			|||||||
	ID             uint                     `json:"id" example:"1"`
 | 
						ID             uint                     `json:"id" example:"1"`
 | 
				
			||||||
	Name           string                   `json:"name" example:"猪舍温度控制计划"`
 | 
						Name           string                   `json:"name" example:"猪舍温度控制计划"`
 | 
				
			||||||
	Description    string                   `json:"description" example:"根据温度自动调节风扇和加热器"`
 | 
						Description    string                   `json:"description" example:"根据温度自动调节风扇和加热器"`
 | 
				
			||||||
 | 
						PlanType       models.PlanType          `json:"plan_type" example:"自定义任务"`
 | 
				
			||||||
	ExecutionType  models.PlanExecutionType `json:"execution_type" example:"自动"`
 | 
						ExecutionType  models.PlanExecutionType `json:"execution_type" example:"自动"`
 | 
				
			||||||
	Status         models.PlanStatus        `json:"status" example:"已启用"`
 | 
						Status         models.PlanStatus        `json:"status" example:"已启用"`
 | 
				
			||||||
	ExecuteNum     uint                     `json:"execute_num" example:"10"`
 | 
						ExecuteNum     uint                     `json:"execute_num" example:"10"`
 | 
				
			||||||
@@ -31,7 +39,7 @@ type PlanResponse struct {
 | 
				
			|||||||
// ListPlansResponse 定义获取计划列表响应的结构体
 | 
					// ListPlansResponse 定义获取计划列表响应的结构体
 | 
				
			||||||
type ListPlansResponse struct {
 | 
					type ListPlansResponse struct {
 | 
				
			||||||
	Plans []PlanResponse `json:"plans"`
 | 
						Plans []PlanResponse `json:"plans"`
 | 
				
			||||||
	Total int            `json:"total" example:"100"`
 | 
						Total int64          `json:"total" example:"100"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdatePlanRequest 定义更新计划请求的结构体
 | 
					// UpdatePlanRequest 定义更新计划请求的结构体
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -9,7 +9,6 @@ import (
 | 
				
			|||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Application 是整个应用的核心,封装了所有组件和生命周期。
 | 
					// Application 是整个应用的核心,封装了所有组件和生命周期。
 | 
				
			||||||
@@ -90,7 +89,6 @@ func (app *Application) Start() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// 3. 启动后台工作协程
 | 
						// 3. 启动后台工作协程
 | 
				
			||||||
	app.Domain.Scheduler.Start()
 | 
						app.Domain.Scheduler.Start()
 | 
				
			||||||
	app.Domain.TimedCollector.Start()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 4. 启动 API 服务器
 | 
						// 4. 启动 API 服务器
 | 
				
			||||||
	app.API.Start()
 | 
						app.API.Start()
 | 
				
			||||||
@@ -114,9 +112,6 @@ func (app *Application) Stop() error {
 | 
				
			|||||||
	// 关闭任务执行器
 | 
						// 关闭任务执行器
 | 
				
			||||||
	app.Domain.Scheduler.Stop()
 | 
						app.Domain.Scheduler.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 关闭定时采集器
 | 
					 | 
				
			||||||
	app.Domain.TimedCollector.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 断开数据库连接
 | 
						// 断开数据库连接
 | 
				
			||||||
	if err := app.Infra.Storage.Disconnect(); err != nil {
 | 
						if err := app.Infra.Storage.Disconnect(); err != nil {
 | 
				
			||||||
		app.Logger.Errorw("数据库连接断开失败", "error", err)
 | 
							app.Logger.Errorw("数据库连接断开失败", "error", err)
 | 
				
			||||||
@@ -133,133 +128,3 @@ func (app *Application) Stop() error {
 | 
				
			|||||||
	app.Logger.Info("应用已成功关闭")
 | 
						app.Logger.Info("应用已成功关闭")
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// initializeState 在应用启动时准备其初始数据状态。
 | 
					 | 
				
			||||||
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
 | 
					 | 
				
			||||||
func (app *Application) initializeState() error {
 | 
					 | 
				
			||||||
	// 清理待采集任务 (非致命错误)
 | 
					 | 
				
			||||||
	if err := app.initializePendingCollections(); err != nil {
 | 
					 | 
				
			||||||
		app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 初始化待执行任务列表 (致命错误)
 | 
					 | 
				
			||||||
	if err := app.initializePendingTasks(); err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("初始化待执行任务列表失败: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
 | 
					 | 
				
			||||||
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
 | 
					 | 
				
			||||||
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
 | 
					 | 
				
			||||||
func (app *Application) initializePendingCollections() error {
 | 
					 | 
				
			||||||
	app.Logger.Info("开始清理所有未完成的采集请求...")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
 | 
					 | 
				
			||||||
	count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("清理未完成的采集请求失败: %v", err)
 | 
					 | 
				
			||||||
	} else if count > 0 {
 | 
					 | 
				
			||||||
		app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		app.Logger.Info("没有需要清理的采集请求。")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
 | 
					 | 
				
			||||||
func (app *Application) initializePendingTasks() error {
 | 
					 | 
				
			||||||
	logger := app.Logger
 | 
					 | 
				
			||||||
	planRepo := app.Infra.Repos.PlanRepo
 | 
					 | 
				
			||||||
	pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
 | 
					 | 
				
			||||||
	executionLogRepo := app.Infra.Repos.ExecutionLogRepo
 | 
					 | 
				
			||||||
	analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	logger.Info("开始初始化待执行任务列表...")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 阶段一:修正因崩溃导致状态不一致的固定次数计划
 | 
					 | 
				
			||||||
	logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
 | 
					 | 
				
			||||||
	plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("查找需要修正的计划失败: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, plan := range plansToCorrect {
 | 
					 | 
				
			||||||
		logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// 更新计划的执行计数
 | 
					 | 
				
			||||||
		plan.ExecuteCount++
 | 
					 | 
				
			||||||
		logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if plan.ExecutionType == models.PlanExecutionTypeManual ||
 | 
					 | 
				
			||||||
			(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
 | 
					 | 
				
			||||||
			// 更新计划状态为已停止
 | 
					 | 
				
			||||||
			plan.Status = models.PlanStatusStopped
 | 
					 | 
				
			||||||
			logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// 保存更新后的计划
 | 
					 | 
				
			||||||
		if err := planRepo.UpdatePlan(plan); err != nil {
 | 
					 | 
				
			||||||
			logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
 | 
					 | 
				
			||||||
			// 这是一个非阻塞性错误,继续处理其他计划
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logger.Info("阶段一:固定次数计划修正完成。")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 阶段二:清理所有待执行任务和相关日志
 | 
					 | 
				
			||||||
	logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
 | 
					 | 
				
			||||||
	// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
 | 
					 | 
				
			||||||
	incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 2. 收集所有受影响的唯一 PlanID
 | 
					 | 
				
			||||||
	affectedPlanIDs := make(map[uint]struct{})
 | 
					 | 
				
			||||||
	for _, log := range incompletePlanLogs {
 | 
					 | 
				
			||||||
		affectedPlanIDs[log.PlanID] = struct{}{}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed
 | 
					 | 
				
			||||||
	for planID := range affectedPlanIDs {
 | 
					 | 
				
			||||||
		logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
 | 
					 | 
				
			||||||
		// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
 | 
					 | 
				
			||||||
		if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
 | 
					 | 
				
			||||||
			logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
 | 
					 | 
				
			||||||
			// 这是一个非阻塞性错误,继续处理其他计划
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logger.Info("阶段二:计划主表状态修正完成。")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 直接调用新的方法来更新计划执行日志状态为失败
 | 
					 | 
				
			||||||
	if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
 | 
					 | 
				
			||||||
		logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
 | 
					 | 
				
			||||||
		// 这是一个非阻塞性错误,继续执行
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 直接调用新的方法来更新任务执行日志状态为取消
 | 
					 | 
				
			||||||
	if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
 | 
					 | 
				
			||||||
		logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
 | 
					 | 
				
			||||||
		// 这是一个非阻塞性错误,继续执行
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 清空待执行列表
 | 
					 | 
				
			||||||
	if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("清空待执行任务列表失败: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logger.Info("阶段二:待执行任务和相关日志清理完成。")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 阶段三:初始刷新
 | 
					 | 
				
			||||||
	logger.Info("阶段三:开始刷新待执行列表...")
 | 
					 | 
				
			||||||
	if err := analysisPlanTaskManager.Refresh(); err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("刷新待执行任务列表失败: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logger.Info("阶段三:待执行任务列表初始化完成。")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	logger.Info("待执行任务列表初始化完成。")
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -7,10 +7,10 @@ import (
 | 
				
			|||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
	domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
						domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
				
			||||||
@@ -123,10 +123,10 @@ type DomainServices struct {
 | 
				
			|||||||
	PigTradeManager         pig.PigTradeManager
 | 
						PigTradeManager         pig.PigTradeManager
 | 
				
			||||||
	PigSickManager          pig.SickPigManager
 | 
						PigSickManager          pig.SickPigManager
 | 
				
			||||||
	PigBatchDomain          pig.PigBatchService
 | 
						PigBatchDomain          pig.PigBatchService
 | 
				
			||||||
	TimedCollector          collection.Collector
 | 
					 | 
				
			||||||
	GeneralDeviceService    device.Service
 | 
						GeneralDeviceService    device.Service
 | 
				
			||||||
	AnalysisPlanTaskManager *task.AnalysisPlanTaskManager
 | 
						taskFactory             scheduler.TaskFactory
 | 
				
			||||||
	Scheduler               *task.Scheduler
 | 
						AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
 | 
				
			||||||
 | 
						Scheduler               *scheduler.Scheduler
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// initDomainServices 初始化所有的领域服务。
 | 
					// initDomainServices 初始化所有的领域服务。
 | 
				
			||||||
@@ -148,30 +148,26 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
 | 
				
			|||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 计划任务管理器
 | 
						// 计划任务管理器
 | 
				
			||||||
	analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
 | 
						analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 任务工厂
 | 
				
			||||||
 | 
						taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 任务执行器
 | 
						// 任务执行器
 | 
				
			||||||
	scheduler := task.NewScheduler(
 | 
						planScheduler := scheduler.NewScheduler(
 | 
				
			||||||
		infra.Repos.PendingTaskRepo,
 | 
							infra.Repos.PendingTaskRepo,
 | 
				
			||||||
		infra.Repos.ExecutionLogRepo,
 | 
							infra.Repos.ExecutionLogRepo,
 | 
				
			||||||
		infra.Repos.DeviceRepo,
 | 
							infra.Repos.DeviceRepo,
 | 
				
			||||||
		infra.Repos.SensorDataRepo,
 | 
							infra.Repos.SensorDataRepo,
 | 
				
			||||||
		infra.Repos.PlanRepo,
 | 
							infra.Repos.PlanRepo,
 | 
				
			||||||
		analysisPlanTaskManager,
 | 
							analysisPlanTaskManager,
 | 
				
			||||||
 | 
							taskFactory,
 | 
				
			||||||
		logger,
 | 
							logger,
 | 
				
			||||||
		generalDeviceService,
 | 
							generalDeviceService,
 | 
				
			||||||
		time.Duration(cfg.Task.Interval)*time.Second,
 | 
							time.Duration(cfg.Task.Interval)*time.Second,
 | 
				
			||||||
		cfg.Task.NumWorkers,
 | 
							cfg.Task.NumWorkers,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 定时采集器
 | 
					 | 
				
			||||||
	timedCollector := collection.NewTimedCollector(
 | 
					 | 
				
			||||||
		infra.Repos.DeviceRepo,
 | 
					 | 
				
			||||||
		generalDeviceService,
 | 
					 | 
				
			||||||
		logger,
 | 
					 | 
				
			||||||
		time.Duration(cfg.Collection.Interval)*time.Second,
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return &DomainServices{
 | 
						return &DomainServices{
 | 
				
			||||||
		PigPenTransferManager:   pigPenTransferManager,
 | 
							PigPenTransferManager:   pigPenTransferManager,
 | 
				
			||||||
		PigTradeManager:         pigTradeManager,
 | 
							PigTradeManager:         pigTradeManager,
 | 
				
			||||||
@@ -179,8 +175,8 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
 | 
				
			|||||||
		PigBatchDomain:          pigBatchDomain,
 | 
							PigBatchDomain:          pigBatchDomain,
 | 
				
			||||||
		GeneralDeviceService:    generalDeviceService,
 | 
							GeneralDeviceService:    generalDeviceService,
 | 
				
			||||||
		AnalysisPlanTaskManager: analysisPlanTaskManager,
 | 
							AnalysisPlanTaskManager: analysisPlanTaskManager,
 | 
				
			||||||
		Scheduler:               scheduler,
 | 
							taskFactory:             taskFactory,
 | 
				
			||||||
		TimedCollector:          timedCollector,
 | 
							Scheduler:               planScheduler,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
							
								
								
									
										217
									
								
								internal/core/data_initializer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										217
									
								
								internal/core/data_initializer.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,217 @@
 | 
				
			|||||||
 | 
					package core
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						// PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称
 | 
				
			||||||
 | 
						PlanNameTimedFullDataCollection = "定时全量数据采集"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// initializeState 在应用启动时准备其初始数据状态。
 | 
				
			||||||
 | 
					// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
 | 
				
			||||||
 | 
					func (app *Application) initializeState() error {
 | 
				
			||||||
 | 
						// 初始化预定义系统计划 (致命错误)
 | 
				
			||||||
 | 
						if err := app.initializeSystemPlans(); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("初始化预定义系统计划失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 清理待采集任务 (非致命错误)
 | 
				
			||||||
 | 
						if err := app.initializePendingCollections(); err != nil {
 | 
				
			||||||
 | 
							app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 初始化待执行任务列表 (致命错误)
 | 
				
			||||||
 | 
						if err := app.initializePendingTasks(); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("初始化待执行任务列表失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// initializeSystemPlans 确保预定义的系统计划在数据库中存在。
 | 
				
			||||||
 | 
					func (app *Application) initializeSystemPlans() error {
 | 
				
			||||||
 | 
						app.Logger.Info("开始检查并创建预定义的系统计划...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 动态构建预定义计划列表
 | 
				
			||||||
 | 
						predefinedSystemPlans := app.getPredefinedSystemPlans()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 1. 获取所有已存在的系统计划
 | 
				
			||||||
 | 
						existingPlans, _, err := app.Infra.Repos.PlanRepo.ListPlans(repository.ListPlansOptions{
 | 
				
			||||||
 | 
							PlanType: repository.PlanTypeFilterSystem,
 | 
				
			||||||
 | 
						}, 1, 999) // 使用一个较大的 pageSize 来获取所有系统计划
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("获取现有系统计划失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 2. 为了方便查找, 将现有计划名放入一个 map
 | 
				
			||||||
 | 
						existingPlanNames := make(map[string]bool)
 | 
				
			||||||
 | 
						for _, p := range existingPlans {
 | 
				
			||||||
 | 
							existingPlanNames[p.Name] = true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 3. 遍历预定义的计划列表
 | 
				
			||||||
 | 
						for _, predefinedPlan := range predefinedSystemPlans {
 | 
				
			||||||
 | 
							// 4. 如果计划不存在, 则创建
 | 
				
			||||||
 | 
							if !existingPlanNames[predefinedPlan.Name] {
 | 
				
			||||||
 | 
								app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
 | 
				
			||||||
 | 
								if err := app.Infra.Repos.PlanRepo.CreatePlan(&predefinedPlan); err != nil {
 | 
				
			||||||
 | 
									// 错误现在是致命的
 | 
				
			||||||
 | 
									return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						app.Logger.Info("预定义系统计划检查完成。")
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。
 | 
				
			||||||
 | 
					func (app *Application) getPredefinedSystemPlans() []models.Plan {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 根据配置创建定时全量采集计划
 | 
				
			||||||
 | 
						interval := app.Config.Collection.Interval
 | 
				
			||||||
 | 
						if interval <= 0 {
 | 
				
			||||||
 | 
							interval = 1 // 确保间隔至少为1分钟
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cronExpression := fmt.Sprintf("*/%d * * * *", interval)
 | 
				
			||||||
 | 
						timedCollectionPlan := models.Plan{
 | 
				
			||||||
 | 
							Name:           PlanNameTimedFullDataCollection,
 | 
				
			||||||
 | 
							Description:    fmt.Sprintf("这是一个系统预定义的计划, 每 %d 秒自动触发一次全量数据采集。", app.Config.Collection.Interval),
 | 
				
			||||||
 | 
							PlanType:       models.PlanTypeSystem,
 | 
				
			||||||
 | 
							ExecutionType:  models.PlanExecutionTypeAutomatic,
 | 
				
			||||||
 | 
							CronExpression: cronExpression,
 | 
				
			||||||
 | 
							Status:         models.PlanStatusEnabled,
 | 
				
			||||||
 | 
							ContentType:    models.PlanContentTypeTasks,
 | 
				
			||||||
 | 
							Tasks: []models.Task{
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									Name:           "全量采集",
 | 
				
			||||||
 | 
									Description:    "触发一次全量数据采集",
 | 
				
			||||||
 | 
									ExecutionOrder: 1,
 | 
				
			||||||
 | 
									Type:           models.TaskTypeFullCollection,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return []models.Plan{timedCollectionPlan}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
 | 
				
			||||||
 | 
					// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
 | 
				
			||||||
 | 
					// 这保证了系统在每次启动时都处于一个干净、确定的状态。
 | 
				
			||||||
 | 
					func (app *Application) initializePendingCollections() error {
 | 
				
			||||||
 | 
						app.Logger.Info("开始清理所有未完成的采集请求...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
 | 
				
			||||||
 | 
						count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("清理未完成的采集请求失败: %v", err)
 | 
				
			||||||
 | 
						} else if count > 0 {
 | 
				
			||||||
 | 
							app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							app.Logger.Info("没有需要清理的采集请求。")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
 | 
				
			||||||
 | 
					func (app *Application) initializePendingTasks() error {
 | 
				
			||||||
 | 
						logger := app.Logger
 | 
				
			||||||
 | 
						planRepo := app.Infra.Repos.PlanRepo
 | 
				
			||||||
 | 
						pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
 | 
				
			||||||
 | 
						executionLogRepo := app.Infra.Repos.ExecutionLogRepo
 | 
				
			||||||
 | 
						analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						logger.Info("开始初始化待执行任务列表...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 阶段一:修正因崩溃导致状态不一致的固定次数计划
 | 
				
			||||||
 | 
						logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
 | 
				
			||||||
 | 
						plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("查找需要修正的计划失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, plan := range plansToCorrect {
 | 
				
			||||||
 | 
							logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 更新计划的执行计数
 | 
				
			||||||
 | 
							plan.ExecuteCount++
 | 
				
			||||||
 | 
							logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if plan.ExecutionType == models.PlanExecutionTypeManual ||
 | 
				
			||||||
 | 
								(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
 | 
				
			||||||
 | 
								// 更新计划状态为已停止
 | 
				
			||||||
 | 
								plan.Status = models.PlanStatusStopped
 | 
				
			||||||
 | 
								logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// 保存更新后的计划
 | 
				
			||||||
 | 
							if err := planRepo.UpdatePlan(plan); err != nil {
 | 
				
			||||||
 | 
								logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
 | 
				
			||||||
 | 
								// 这是一个非阻塞性错误,继续处理其他计划
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						logger.Info("阶段一:固定次数计划修正完成。")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 阶段二:清理所有待执行任务和相关日志
 | 
				
			||||||
 | 
						logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
 | 
				
			||||||
 | 
						// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
 | 
				
			||||||
 | 
						incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 2. 收集所有受影响的唯一 PlanID
 | 
				
			||||||
 | 
						affectedPlanIDs := make(map[uint]struct{})
 | 
				
			||||||
 | 
						for _, log := range incompletePlanLogs {
 | 
				
			||||||
 | 
							affectedPlanIDs[log.PlanID] = struct{}{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed
 | 
				
			||||||
 | 
						for planID := range affectedPlanIDs {
 | 
				
			||||||
 | 
							logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
 | 
				
			||||||
 | 
							// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
 | 
				
			||||||
 | 
							if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
 | 
				
			||||||
 | 
								logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
 | 
				
			||||||
 | 
								// 这是一个非阻塞性错误,继续处理其他计划
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						logger.Info("阶段二:计划主表状态修正完成。")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 直接调用新的方法来更新计划执行日志状态为失败
 | 
				
			||||||
 | 
						if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
 | 
				
			||||||
 | 
							logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
 | 
				
			||||||
 | 
							// 这是一个非阻塞性错误,继续执行
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 直接调用新的方法来更新任务执行日志状态为取消
 | 
				
			||||||
 | 
						if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
 | 
				
			||||||
 | 
							logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
 | 
				
			||||||
 | 
							// 这是一个非阻塞性错误,继续执行
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 清空待执行列表
 | 
				
			||||||
 | 
						if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("清空待执行任务列表失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						logger.Info("阶段二:待执行任务和相关日志清理完成。")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 阶段三:初始刷新
 | 
				
			||||||
 | 
						logger.Info("阶段三:开始刷新待执行列表...")
 | 
				
			||||||
 | 
						if err := analysisPlanTaskManager.Refresh(); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("刷新待执行任务列表失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						logger.Info("阶段三:待执行任务列表初始化完成。")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						logger.Info("待执行任务列表初始化完成。")
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -1,6 +0,0 @@
 | 
				
			|||||||
package collection
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type Collector interface {
 | 
					 | 
				
			||||||
	Start()
 | 
					 | 
				
			||||||
	Stop()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@@ -1,89 +0,0 @@
 | 
				
			|||||||
package collection
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
					 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令
 | 
					 | 
				
			||||||
type TimedCollector struct {
 | 
					 | 
				
			||||||
	deviceRepo    repository.DeviceRepository
 | 
					 | 
				
			||||||
	deviceService device.Service
 | 
					 | 
				
			||||||
	logger        *logs.Logger
 | 
					 | 
				
			||||||
	interval      time.Duration
 | 
					 | 
				
			||||||
	ticker        *time.Ticker
 | 
					 | 
				
			||||||
	done          chan bool
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewTimedCollector 创建一个定时采集器实例
 | 
					 | 
				
			||||||
func NewTimedCollector(
 | 
					 | 
				
			||||||
	deviceRepo repository.DeviceRepository,
 | 
					 | 
				
			||||||
	deviceService device.Service,
 | 
					 | 
				
			||||||
	logger *logs.Logger,
 | 
					 | 
				
			||||||
	interval time.Duration,
 | 
					 | 
				
			||||||
) Collector {
 | 
					 | 
				
			||||||
	return &TimedCollector{
 | 
					 | 
				
			||||||
		deviceRepo:    deviceRepo,
 | 
					 | 
				
			||||||
		deviceService: deviceService,
 | 
					 | 
				
			||||||
		logger:        logger,
 | 
					 | 
				
			||||||
		interval:      interval,
 | 
					 | 
				
			||||||
		done:          make(chan bool),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Start 开始定时采集
 | 
					 | 
				
			||||||
func (c *TimedCollector) Start() {
 | 
					 | 
				
			||||||
	c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval)
 | 
					 | 
				
			||||||
	c.ticker = time.NewTicker(c.interval)
 | 
					 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		for {
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case <-c.done:
 | 
					 | 
				
			||||||
				return
 | 
					 | 
				
			||||||
			case <-c.ticker.C:
 | 
					 | 
				
			||||||
				c.collect()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Stop 停止定时采集
 | 
					 | 
				
			||||||
func (c *TimedCollector) Stop() {
 | 
					 | 
				
			||||||
	c.logger.Info("定时采集器停止")
 | 
					 | 
				
			||||||
	c.ticker.Stop()
 | 
					 | 
				
			||||||
	c.done <- true
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// collect 是核心的采集逻辑
 | 
					 | 
				
			||||||
func (c *TimedCollector) collect() {
 | 
					 | 
				
			||||||
	c.logger.Info("开始新一轮的设备数据采集")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	sensors, err := c.deviceRepo.ListAllSensors()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if len(sensors) == 0 {
 | 
					 | 
				
			||||||
		c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集")
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	sensorsByController := make(map[uint][]*models.Device)
 | 
					 | 
				
			||||||
	for _, sensor := range sensors {
 | 
					 | 
				
			||||||
		sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for controllerID, controllerSensors := range sensorsByController {
 | 
					 | 
				
			||||||
		c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors))
 | 
					 | 
				
			||||||
		if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil {
 | 
					 | 
				
			||||||
			c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	c.logger.Info("本轮设备数据采集完成")
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@@ -1,4 +1,4 @@
 | 
				
			|||||||
package task
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
@@ -1,4 +1,4 @@
 | 
				
			|||||||
package task
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
@@ -83,6 +83,7 @@ type Scheduler struct {
 | 
				
			|||||||
	deviceRepo              repository.DeviceRepository
 | 
						deviceRepo              repository.DeviceRepository
 | 
				
			||||||
	sensorDataRepo          repository.SensorDataRepository
 | 
						sensorDataRepo          repository.SensorDataRepository
 | 
				
			||||||
	planRepo                repository.PlanRepository
 | 
						planRepo                repository.PlanRepository
 | 
				
			||||||
 | 
						taskFactory             TaskFactory
 | 
				
			||||||
	analysisPlanTaskManager *AnalysisPlanTaskManager
 | 
						analysisPlanTaskManager *AnalysisPlanTaskManager
 | 
				
			||||||
	progressTracker         *ProgressTracker
 | 
						progressTracker         *ProgressTracker
 | 
				
			||||||
	deviceService           device.Service
 | 
						deviceService           device.Service
 | 
				
			||||||
@@ -100,6 +101,7 @@ func NewScheduler(
 | 
				
			|||||||
	sensorDataRepo repository.SensorDataRepository,
 | 
						sensorDataRepo repository.SensorDataRepository,
 | 
				
			||||||
	planRepo repository.PlanRepository,
 | 
						planRepo repository.PlanRepository,
 | 
				
			||||||
	analysisPlanTaskManager *AnalysisPlanTaskManager,
 | 
						analysisPlanTaskManager *AnalysisPlanTaskManager,
 | 
				
			||||||
 | 
						taskFactory TaskFactory,
 | 
				
			||||||
	logger *logs.Logger,
 | 
						logger *logs.Logger,
 | 
				
			||||||
	deviceService device.Service,
 | 
						deviceService device.Service,
 | 
				
			||||||
	interval time.Duration,
 | 
						interval time.Duration,
 | 
				
			||||||
@@ -112,6 +114,7 @@ func NewScheduler(
 | 
				
			|||||||
		sensorDataRepo:          sensorDataRepo,
 | 
							sensorDataRepo:          sensorDataRepo,
 | 
				
			||||||
		planRepo:                planRepo,
 | 
							planRepo:                planRepo,
 | 
				
			||||||
		analysisPlanTaskManager: analysisPlanTaskManager,
 | 
							analysisPlanTaskManager: analysisPlanTaskManager,
 | 
				
			||||||
 | 
							taskFactory:             taskFactory,
 | 
				
			||||||
		logger:                  logger,
 | 
							logger:                  logger,
 | 
				
			||||||
		deviceService:           deviceService,
 | 
							deviceService:           deviceService,
 | 
				
			||||||
		pollingInterval:         interval,
 | 
							pollingInterval:         interval,
 | 
				
			||||||
@@ -271,7 +274,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// 执行普通任务
 | 
							// 执行普通任务
 | 
				
			||||||
		task := s.taskFactory(claimedLog)
 | 
							task := s.taskFactory.Production(claimedLog)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := task.Execute(); err != nil {
 | 
							if err := task.Execute(); err != nil {
 | 
				
			||||||
			s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
 | 
								s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
 | 
				
			||||||
@@ -283,20 +286,6 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// taskFactory 会根据任务类型初始化对应任务
 | 
					 | 
				
			||||||
func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
 | 
					 | 
				
			||||||
	switch claimedLog.Task.Type {
 | 
					 | 
				
			||||||
	case models.TaskTypeWaiting:
 | 
					 | 
				
			||||||
		return NewDelayTask(s.logger, claimedLog)
 | 
					 | 
				
			||||||
	case models.TaskTypeReleaseFeedWeight:
 | 
					 | 
				
			||||||
		return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	default:
 | 
					 | 
				
			||||||
		// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
 | 
					 | 
				
			||||||
		panic("不支持的任务类型")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
 | 
					// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
 | 
				
			||||||
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
 | 
					func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
 | 
				
			||||||
	// 创建Plan执行记录
 | 
						// 创建Plan执行记录
 | 
				
			||||||
							
								
								
									
										23
									
								
								internal/domain/scheduler/task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								internal/domain/scheduler/task.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,23 @@
 | 
				
			|||||||
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Task 定义了所有可被调度器执行的任务必须实现的接口。
 | 
				
			||||||
 | 
					type Task interface {
 | 
				
			||||||
 | 
						// Execute 是任务的核心执行逻辑。
 | 
				
			||||||
 | 
						// ctx: 用于控制任务的超时或取消。
 | 
				
			||||||
 | 
						// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
 | 
				
			||||||
 | 
						// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
 | 
				
			||||||
 | 
						Execute() error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
 | 
				
			||||||
 | 
						// log: 任务执行的上下文。
 | 
				
			||||||
 | 
						// executeErr: 从 Execute 方法返回的原始错误。
 | 
				
			||||||
 | 
						OnFailure(executeErr error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。
 | 
				
			||||||
 | 
					type TaskFactory interface {
 | 
				
			||||||
 | 
						// Production 根据指定的任务执行日志创建一个任务实例。
 | 
				
			||||||
 | 
						Production(claimedLog *models.TaskExecutionLog) Task
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -4,6 +4,7 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -19,7 +20,7 @@ type DelayTask struct {
 | 
				
			|||||||
	logger        *logs.Logger
 | 
						logger        *logs.Logger
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task {
 | 
					func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task {
 | 
				
			||||||
	return &DelayTask{
 | 
						return &DelayTask{
 | 
				
			||||||
		executionTask: executionTask,
 | 
							executionTask: executionTask,
 | 
				
			||||||
		logger:        logger,
 | 
							logger:        logger,
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										93
									
								
								internal/domain/task/full_collection_task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								internal/domain/task/full_collection_task.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,93 @@
 | 
				
			|||||||
 | 
					package task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集
 | 
				
			||||||
 | 
					type FullCollectionTask struct {
 | 
				
			||||||
 | 
						log           *models.TaskExecutionLog
 | 
				
			||||||
 | 
						deviceRepo    repository.DeviceRepository
 | 
				
			||||||
 | 
						deviceService device.Service
 | 
				
			||||||
 | 
						logger        *logs.Logger
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewFullCollectionTask 创建一个全量采集任务实例
 | 
				
			||||||
 | 
					func NewFullCollectionTask(
 | 
				
			||||||
 | 
						log *models.TaskExecutionLog,
 | 
				
			||||||
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
 | 
						deviceService device.Service,
 | 
				
			||||||
 | 
						logger *logs.Logger,
 | 
				
			||||||
 | 
					) *FullCollectionTask {
 | 
				
			||||||
 | 
						return &FullCollectionTask{
 | 
				
			||||||
 | 
							log:           log,
 | 
				
			||||||
 | 
							deviceRepo:    deviceRepo,
 | 
				
			||||||
 | 
							deviceService: deviceService,
 | 
				
			||||||
 | 
							logger:        logger,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Execute 是任务的核心执行逻辑
 | 
				
			||||||
 | 
					func (t *FullCollectionTask) Execute() error {
 | 
				
			||||||
 | 
						t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						sensors, err := t.deviceRepo.ListAllSensors()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(sensors) == 0 {
 | 
				
			||||||
 | 
							t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						sensorsByController := make(map[uint][]*models.Device)
 | 
				
			||||||
 | 
						for _, sensor := range sensors {
 | 
				
			||||||
 | 
							sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var firstError error
 | 
				
			||||||
 | 
						for controllerID, controllerSensors := range sensorsByController {
 | 
				
			||||||
 | 
							t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令",
 | 
				
			||||||
 | 
								"task_id", t.log.TaskID,
 | 
				
			||||||
 | 
								"task_type", t.log.Task.Type,
 | 
				
			||||||
 | 
								"log_id", t.log.ID,
 | 
				
			||||||
 | 
								"controller_id", controllerID,
 | 
				
			||||||
 | 
								"sensor_count", len(controllerSensors),
 | 
				
			||||||
 | 
							)
 | 
				
			||||||
 | 
							if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil {
 | 
				
			||||||
 | 
								t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败",
 | 
				
			||||||
 | 
									"task_id", t.log.TaskID,
 | 
				
			||||||
 | 
									"task_type", t.log.Task.Type,
 | 
				
			||||||
 | 
									"log_id", t.log.ID,
 | 
				
			||||||
 | 
									"controller_id", controllerID,
 | 
				
			||||||
 | 
									"error", err,
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
								if firstError == nil {
 | 
				
			||||||
 | 
									firstError = err // 保存第一个错误
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if firstError != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑
 | 
				
			||||||
 | 
					func (t *FullCollectionTask) OnFailure(executeErr error) {
 | 
				
			||||||
 | 
						t.logger.Errorw("全量采集任务执行失败",
 | 
				
			||||||
 | 
							"task_id", t.log.TaskID,
 | 
				
			||||||
 | 
							"task_type", t.log.Task.Type,
 | 
				
			||||||
 | 
							"log_id", t.log.ID,
 | 
				
			||||||
 | 
							"error", executeErr,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -6,6 +6,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
@@ -40,12 +41,12 @@ func NewReleaseFeedWeightTask(
 | 
				
			|||||||
	deviceRepo repository.DeviceRepository,
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
	deviceService device.Service,
 | 
						deviceService device.Service,
 | 
				
			||||||
	logger *logs.Logger,
 | 
						logger *logs.Logger,
 | 
				
			||||||
) Task {
 | 
					) scheduler.Task {
 | 
				
			||||||
	return &ReleaseFeedWeightTask{
 | 
						return &ReleaseFeedWeightTask{
 | 
				
			||||||
		claimedLog:     claimedLog,
 | 
							claimedLog:     claimedLog,
 | 
				
			||||||
		deviceRepo:     deviceRepo,
 | 
							deviceRepo:     deviceRepo,
 | 
				
			||||||
		sensorDataRepo: sensorDataRepo,
 | 
							sensorDataRepo: sensorDataRepo,
 | 
				
			||||||
		feedPort:       deviceService, // 直接注入
 | 
							feedPort:       deviceService,
 | 
				
			||||||
		logger:         logger,
 | 
							logger:         logger,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,30 +1,45 @@
 | 
				
			|||||||
package task
 | 
					package task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Task 定义了所有可被调度器执行的任务必须实现的接口。
 | 
					type taskFactory struct {
 | 
				
			||||||
type Task interface {
 | 
						logger         *logs.Logger
 | 
				
			||||||
	// Execute 是任务的核心执行逻辑。
 | 
						sensorDataRepo repository.SensorDataRepository
 | 
				
			||||||
	// ctx: 用于控制任务的超时或取消。
 | 
						deviceRepo     repository.DeviceRepository
 | 
				
			||||||
	// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
 | 
						deviceService  device.Service
 | 
				
			||||||
	// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
 | 
					 | 
				
			||||||
	Execute() error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
 | 
					 | 
				
			||||||
	// log: 任务执行的上下文。
 | 
					 | 
				
			||||||
	// executeErr: 从 Execute 方法返回的原始错误。
 | 
					 | 
				
			||||||
	OnFailure(executeErr error)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数
 | 
					func NewTaskFactory(
 | 
				
			||||||
var TaskFactory = func(tt models.TaskType) Task {
 | 
						logger *logs.Logger,
 | 
				
			||||||
	switch tt {
 | 
						sensorDataRepo repository.SensorDataRepository,
 | 
				
			||||||
	case models.TaskTypeWaiting:
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
		return &DelayTask{}
 | 
						deviceService device.Service,
 | 
				
			||||||
	default:
 | 
					) scheduler.TaskFactory {
 | 
				
			||||||
		// 出现位置任务类型说明业务逻辑出现重大问题, 一个异常任务被创建了出来
 | 
						return &taskFactory{
 | 
				
			||||||
		panic("发现未知任务类型")
 | 
							logger:         logger,
 | 
				
			||||||
 | 
							sensorDataRepo: sensorDataRepo,
 | 
				
			||||||
 | 
							deviceRepo:     deviceRepo,
 | 
				
			||||||
 | 
							deviceService:  deviceService,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task {
 | 
				
			||||||
 | 
						switch claimedLog.Task.Type {
 | 
				
			||||||
 | 
						case models.TaskTypeWaiting:
 | 
				
			||||||
 | 
							return NewDelayTask(t.logger, claimedLog)
 | 
				
			||||||
 | 
						case models.TaskTypeReleaseFeedWeight:
 | 
				
			||||||
 | 
							return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger)
 | 
				
			||||||
 | 
						case models.TaskTypeFullCollection:
 | 
				
			||||||
 | 
							return NewFullCollectionTask(claimedLog, t.deviceRepo, t.deviceService, t.logger)
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
 | 
				
			||||||
 | 
							t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type)
 | 
				
			||||||
 | 
							panic("不支持的任务类型") // 显式panic防编译器报错
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -200,13 +200,18 @@ type LarkConfig struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// CollectionConfig 代表定时采集配置
 | 
					// CollectionConfig 代表定时采集配置
 | 
				
			||||||
type CollectionConfig struct {
 | 
					type CollectionConfig struct {
 | 
				
			||||||
 | 
						// Interval 采集间隔(分钟), 默认 1
 | 
				
			||||||
	Interval int `yaml:"interval"`
 | 
						Interval int `yaml:"interval"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewConfig 创建并返回一个新的配置实例
 | 
					// NewConfig 创建并返回一个新的配置实例
 | 
				
			||||||
func NewConfig() *Config {
 | 
					func NewConfig() *Config {
 | 
				
			||||||
	// 默认值可以在这里设置,但我们优先使用配置文件中的值
 | 
						// 默认值可以在这里设置,但我们优先使用配置文件中的值
 | 
				
			||||||
	return &Config{}
 | 
						return &Config{
 | 
				
			||||||
 | 
							Collection: CollectionConfig{
 | 
				
			||||||
 | 
								Interval: 1, // 默认为1分钟
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Load 从指定路径加载配置文件
 | 
					// Load 从指定路径加载配置文件
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,6 +34,7 @@ const (
 | 
				
			|||||||
	TaskPlanAnalysis          TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务
 | 
						TaskPlanAnalysis          TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务
 | 
				
			||||||
	TaskTypeWaiting           TaskType = "等待"   // 等待任务
 | 
						TaskTypeWaiting           TaskType = "等待"   // 等待任务
 | 
				
			||||||
	TaskTypeReleaseFeedWeight TaskType = "下料"   // 下料口释放指定重量任务
 | 
						TaskTypeReleaseFeedWeight TaskType = "下料"   // 下料口释放指定重量任务
 | 
				
			||||||
 | 
						TaskTypeFullCollection    TaskType = "全量采集" // 新增的全量采集任务
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// -- Task Parameters --
 | 
					// -- Task Parameters --
 | 
				
			||||||
@@ -52,12 +53,20 @@ const (
 | 
				
			|||||||
	PlanStatusFailed   PlanStatus = "执行失败" // 执行失败
 | 
						PlanStatusFailed   PlanStatus = "执行失败" // 执行失败
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type PlanType string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						PlanTypeCustom PlanType = "自定义任务"
 | 
				
			||||||
 | 
						PlanTypeSystem PlanType = "系统任务"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Plan 代表系统中的一个计划,可以包含子计划或任务
 | 
					// Plan 代表系统中的一个计划,可以包含子计划或任务
 | 
				
			||||||
type Plan struct {
 | 
					type Plan struct {
 | 
				
			||||||
	gorm.Model
 | 
						gorm.Model
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	Name          string            `gorm:"not null" json:"name"`
 | 
						Name          string            `gorm:"not null" json:"name"`
 | 
				
			||||||
	Description   string            `json:"description"`
 | 
						Description   string            `json:"description"`
 | 
				
			||||||
 | 
						PlanType      PlanType          `gorm:"not null;index" json:"plan_type"` // 任务类型, 包括系统任务和用户自定义任务
 | 
				
			||||||
	ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"`
 | 
						ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"`
 | 
				
			||||||
	Status        PlanStatus        `gorm:"default:'已禁用';index" json:"status"` // 计划是否被启动
 | 
						Status        PlanStatus        `gorm:"default:'已禁用';index" json:"status"` // 计划是否被启动
 | 
				
			||||||
	ExecuteNum    uint              `gorm:"default:0" json:"execute_num"`      // 计划预期执行次数
 | 
						ExecuteNum    uint              `gorm:"default:0" json:"execute_num"`      // 计划预期执行次数
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,11 +21,25 @@ var (
 | 
				
			|||||||
	ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划")
 | 
						ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划")
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// PlanTypeFilter 定义计划类型的过滤器
 | 
				
			||||||
 | 
					type PlanTypeFilter string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						PlanTypeFilterAll    PlanTypeFilter = "all"
 | 
				
			||||||
 | 
						PlanTypeFilterCustom PlanTypeFilter = "custom"
 | 
				
			||||||
 | 
						PlanTypeFilterSystem PlanTypeFilter = "system"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ListPlansOptions 定义了查询计划时的可选参数
 | 
				
			||||||
 | 
					type ListPlansOptions struct {
 | 
				
			||||||
 | 
						PlanType PlanTypeFilter
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PlanRepository 定义了与计划模型相关的数据库操作接口
 | 
					// PlanRepository 定义了与计划模型相关的数据库操作接口
 | 
				
			||||||
// 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现
 | 
					// 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现
 | 
				
			||||||
type PlanRepository interface {
 | 
					type PlanRepository interface {
 | 
				
			||||||
	// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情
 | 
						// ListPlans 获取计划列表,支持过滤和分页
 | 
				
			||||||
	ListBasicPlans() ([]models.Plan, error)
 | 
						ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error)
 | 
				
			||||||
	// GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情
 | 
						// GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情
 | 
				
			||||||
	GetBasicPlanByID(id uint) (*models.Plan, error)
 | 
						GetBasicPlanByID(id uint) (*models.Plan, error)
 | 
				
			||||||
	// GetPlanByID 根据ID获取计划,包含子计划和任务详情
 | 
						// GetPlanByID 根据ID获取计划,包含子计划和任务详情
 | 
				
			||||||
@@ -81,15 +95,37 @@ func NewGormPlanRepository(db *gorm.DB) PlanRepository {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情
 | 
					// ListPlans 获取计划列表,支持过滤和分页
 | 
				
			||||||
func (r *gormPlanRepository) ListBasicPlans() ([]models.Plan, error) {
 | 
					func (r *gormPlanRepository) ListPlans(opts ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) {
 | 
				
			||||||
	var plans []models.Plan
 | 
						if page <= 0 || pageSize <= 0 {
 | 
				
			||||||
	// GORM 默认不会加载关联,除非使用 Preload,所以直接 Find 即可满足要求
 | 
							return nil, 0, ErrInvalidPagination
 | 
				
			||||||
	result := r.db.Find(&plans)
 | 
					 | 
				
			||||||
	if result.Error != nil {
 | 
					 | 
				
			||||||
		return nil, result.Error
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return plans, nil
 | 
					
 | 
				
			||||||
 | 
						var plans []models.Plan
 | 
				
			||||||
 | 
						var total int64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						query := r.db.Model(&models.Plan{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						switch opts.PlanType {
 | 
				
			||||||
 | 
						case PlanTypeFilterCustom:
 | 
				
			||||||
 | 
							query = query.Where("plan_type = ?", models.PlanTypeCustom)
 | 
				
			||||||
 | 
						case PlanTypeFilterSystem:
 | 
				
			||||||
 | 
							query = query.Where("plan_type = ?", models.PlanTypeSystem)
 | 
				
			||||||
 | 
						case PlanTypeFilterAll:
 | 
				
			||||||
 | 
							// 不添加 plan_type 的过滤条件
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// 默认查询自定义
 | 
				
			||||||
 | 
							query = query.Where("plan_type = ?", models.PlanTypeCustom)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := query.Count(&total).Error; err != nil {
 | 
				
			||||||
 | 
							return nil, 0, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						offset := (page - 1) * pageSize
 | 
				
			||||||
 | 
						err := query.Limit(pageSize).Offset(offset).Order("id DESC").Find(&plans).Error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return plans, total, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情
 | 
					// GetBasicPlanByID 根据ID获取计划的基本信息,不包含子计划和任务详情
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user