Compare commits

...

7 Commits

Author SHA1 Message Date
74e42de7aa 重构AnalysisPlanTaskManager 2025-09-20 22:41:03 +08:00
b0eb135f44 重构AnalysisPlanTaskManager 2025-09-20 22:32:39 +08:00
056279bdc2 重构AnalysisPlanTaskManager 2025-09-20 22:32:00 +08:00
6711f55fba 重构AnalysisPlanTaskManager 2025-09-20 21:45:38 +08:00
e85d4f8ec3 重构AnalysisPlanTaskManager 2025-09-20 21:14:58 +08:00
40a892e09d todo 2025-09-20 18:11:48 +08:00
1f2d54d53e 修复bug 2025-09-20 17:11:04 +08:00
16 changed files with 731 additions and 218 deletions

View File

@@ -1,2 +0,0 @@
replace encoding/json.RawMessage object
replace git_huangwc_com_pig_pig-farm-controller_internal_app_controller_device.DeviceResponse device.DeviceResponse

View File

@@ -12,3 +12,6 @@
4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功
5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制
6. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能
任务调度器执行触发器任务时要修改一下对应计划的执行次数(如果是指定次数的计划)

View File

@@ -560,9 +560,6 @@ const docTemplate = `{
}
},
"definitions": {
"controller.Properties": {
"type": "object"
},
"controller.Response": {
"type": "object",
"properties": {
@@ -596,7 +593,8 @@ const docTemplate = `{
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -623,7 +621,8 @@ const docTemplate = `{
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -652,7 +651,8 @@ const docTemplate = `{
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -952,7 +952,8 @@ const docTemplate = `{
"example": "打开风扇"
},
"parameters": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"type": {
"allOf": [
@@ -984,7 +985,8 @@ const docTemplate = `{
"example": "打开风扇"
},
"parameters": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"plan_id": {
"type": "integer",

View File

@@ -549,9 +549,6 @@
}
},
"definitions": {
"controller.Properties": {
"type": "object"
},
"controller.Response": {
"type": "object",
"properties": {
@@ -585,7 +582,8 @@
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -612,7 +610,8 @@
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -641,7 +640,8 @@
"type": "integer"
},
"properties": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"sub_type": {
"$ref": "#/definitions/models.DeviceSubType"
@@ -941,7 +941,8 @@
"example": "打开风扇"
},
"parameters": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"type": {
"allOf": [
@@ -973,7 +974,8 @@
"example": "打开风扇"
},
"parameters": {
"$ref": "#/definitions/controller.Properties"
"type": "object",
"additionalProperties": true
},
"plan_id": {
"type": "integer",

View File

@@ -1,6 +1,4 @@
definitions:
controller.Properties:
type: object
controller.Response:
properties:
code:
@@ -21,7 +19,8 @@ definitions:
parent_id:
type: integer
properties:
$ref: '#/definitions/controller.Properties'
additionalProperties: true
type: object
sub_type:
$ref: '#/definitions/models.DeviceSubType'
type:
@@ -39,7 +38,8 @@ definitions:
parent_id:
type: integer
properties:
$ref: '#/definitions/controller.Properties'
additionalProperties: true
type: object
sub_type:
$ref: '#/definitions/models.DeviceSubType'
type:
@@ -61,7 +61,8 @@ definitions:
parent_id:
type: integer
properties:
$ref: '#/definitions/controller.Properties'
additionalProperties: true
type: object
sub_type:
$ref: '#/definitions/models.DeviceSubType'
type:
@@ -271,7 +272,8 @@ definitions:
example: 打开风扇
type: string
parameters:
$ref: '#/definitions/controller.Properties'
additionalProperties: true
type: object
type:
allOf:
- $ref: '#/definitions/models.TaskType'
@@ -292,7 +294,8 @@ definitions:
example: 打开风扇
type: string
parameters:
$ref: '#/definitions/controller.Properties'
additionalProperties: true
type: object
plan_id:
example: 1
type: integer

View File

@@ -1,7 +1,9 @@
package device
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
@@ -11,7 +13,6 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/gin-gonic/gin"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@@ -38,7 +39,7 @@ type CreateDeviceRequest struct {
SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"`
Properties controller.Properties `json:"properties,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
}
// UpdateDeviceRequest 定义了更新设备时需要传入的参数
@@ -48,7 +49,7 @@ type UpdateDeviceRequest struct {
SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"`
Properties controller.Properties `json:"properties,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
}
// --- Response DTOs ---
@@ -61,7 +62,7 @@ type DeviceResponse struct {
SubType models.DeviceSubType `json:"sub_type"`
ParentID *uint `json:"parent_id"`
Location string `json:"location"`
Properties controller.Properties `json:"properties"`
Properties map[string]interface{} `json:"properties"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
@@ -69,10 +70,18 @@ type DeviceResponse struct {
// --- DTO 转换函数 ---
// newDeviceResponse 从数据库模型创建一个新的设备响应 DTO
func newDeviceResponse(device *models.Device) *DeviceResponse {
func newDeviceResponse(device *models.Device) (*DeviceResponse, error) {
if device == nil {
return nil
return nil, nil
}
var props map[string]interface{}
if len(device.Properties) > 0 && string(device.Properties) != "null" {
if err := json.Unmarshal(device.Properties, &props); err != nil {
return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err)
}
}
return &DeviceResponse{
ID: device.ID,
Name: device.Name,
@@ -80,19 +89,23 @@ func newDeviceResponse(device *models.Device) *DeviceResponse {
SubType: device.SubType,
ParentID: device.ParentID,
Location: device.Location,
Properties: controller.Properties(device.Properties),
Properties: props,
CreatedAt: device.CreatedAt.Format(time.RFC3339),
UpdatedAt: device.UpdatedAt.Format(time.RFC3339),
}
}, nil
}
// newListDeviceResponse 从数据库模型切片创建一个新的设备列表响应 DTO 切片
func newListDeviceResponse(devices []*models.Device) []*DeviceResponse {
func newListDeviceResponse(devices []*models.Device) ([]*DeviceResponse, error) {
list := make([]*DeviceResponse, 0, len(devices))
for _, device := range devices {
list = append(list, newDeviceResponse(device))
resp, err := newDeviceResponse(device)
if err != nil {
return nil, err
}
return list
list = append(list, resp)
}
return list, nil
}
// --- Controller Methods ---
@@ -114,13 +127,20 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
return
}
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
c.logger.Errorf("创建设备: 序列化属性失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "属性字段格式错误")
return
}
device := &models.Device{
Name: req.Name,
Type: req.Type,
SubType: req.SubType,
ParentID: req.ParentID,
Location: req.Location,
Properties: datatypes.JSON(req.Properties),
Properties: propertiesJSON,
}
if err := c.repo.Create(device); err != nil {
@@ -129,7 +149,14 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
return
}
controller.SendResponse(ctx, controller.CodeCreated, "设备创建成功", newDeviceResponse(device))
resp, err := newDeviceResponse(device)
if err != nil {
c.logger.Errorf("创建设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "设备创建成功,但响应生成失败")
return
}
controller.SendResponse(ctx, controller.CodeCreated, "设备创建成功", resp)
}
// GetDevice godoc
@@ -158,7 +185,14 @@ func (c *Controller) GetDevice(ctx *gin.Context) {
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备信息成功", newDeviceResponse(device))
resp, err := newDeviceResponse(device)
if err != nil {
c.logger.Errorf("获取设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取设备信息失败: 内部数据格式错误")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备信息成功", resp)
}
// ListDevices godoc
@@ -176,7 +210,14 @@ func (c *Controller) ListDevices(ctx *gin.Context) {
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备列表成功", newListDeviceResponse(devices))
resp, err := newListDeviceResponse(devices)
if err != nil {
c.logger.Errorf("获取设备列表: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取设备列表失败: 内部数据格式错误")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备列表成功", resp)
}
// UpdateDevice godoc
@@ -216,13 +257,20 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
return
}
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
c.logger.Errorf("更新设备: 序列化属性失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "属性字段格式错误")
return
}
// 3. 更新从数据库中查出的现有设备对象的字段
existingDevice.Name = req.Name
existingDevice.Type = req.Type
existingDevice.SubType = req.SubType
existingDevice.ParentID = req.ParentID
existingDevice.Location = req.Location
existingDevice.Properties = datatypes.JSON(req.Properties)
existingDevice.Properties = propertiesJSON
// 4. 将修改后的 existingDevice 对象保存回数据库
if err := c.repo.Update(existingDevice); err != nil {
@@ -231,7 +279,14 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "设备更新成功", newDeviceResponse(existingDevice))
resp, err := newDeviceResponse(existingDevice)
if err != nil {
c.logger.Errorf("更新设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "设备更新成功,但响应生成失败")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "设备更新成功", resp)
}
// DeleteDevice godoc

View File

@@ -1,15 +1,16 @@
package plan
import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
"encoding/json"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/datatypes"
)
// PlanToResponse 将Plan模型转换为PlanResponse
func PlanToResponse(plan *models.Plan) *PlanResponse {
func PlanToResponse(plan *models.Plan) (*PlanResponse, error) {
if plan == nil {
return nil
return nil, nil
}
response := &PlanResponse{
@@ -28,7 +29,11 @@ func PlanToResponse(plan *models.Plan) *PlanResponse {
if plan.ContentType == models.PlanContentTypeSubPlans {
response.SubPlans = make([]SubPlanResponse, len(plan.SubPlans))
for i, subPlan := range plan.SubPlans {
response.SubPlans[i] = SubPlanToResponse(&subPlan)
subPlanResp, err := SubPlanToResponse(&subPlan)
if err != nil {
return nil, err
}
response.SubPlans[i] = subPlanResp
}
}
@@ -36,11 +41,15 @@ func PlanToResponse(plan *models.Plan) *PlanResponse {
if plan.ContentType == models.PlanContentTypeTasks {
response.Tasks = make([]TaskResponse, len(plan.Tasks))
for i, task := range plan.Tasks {
response.Tasks[i] = TaskToResponse(&task)
taskResp, err := TaskToResponse(&task)
if err != nil {
return nil, err
}
response.Tasks[i] = taskResp
}
}
return response
return response, nil
}
// PlanFromCreateRequest 将CreatePlanRequest转换为Plan模型并进行业务规则验证
@@ -73,8 +82,11 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks {
// 使用来自请求的ExecutionOrder
plan.Tasks[i] = TaskFromRequest(&taskReq)
task, err := TaskFromRequest(&taskReq)
if err != nil {
return nil, err
}
plan.Tasks[i] = task
}
}
@@ -120,8 +132,11 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks {
// 使用来自请求的ExecutionOrder
plan.Tasks[i] = TaskFromRequest(&taskReq)
task, err := TaskFromRequest(&taskReq)
if err != nil {
return nil, err
}
plan.Tasks[i] = task
}
}
@@ -138,9 +153,9 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
}
// SubPlanToResponse 将SubPlan模型转换为SubPlanResponse
func SubPlanToResponse(subPlan *models.SubPlan) SubPlanResponse {
func SubPlanToResponse(subPlan *models.SubPlan) (SubPlanResponse, error) {
if subPlan == nil {
return SubPlanResponse{}
return SubPlanResponse{}, nil
}
response := SubPlanResponse{
@@ -152,16 +167,27 @@ func SubPlanToResponse(subPlan *models.SubPlan) SubPlanResponse {
// 如果有完整的子计划数据,也进行转换
if subPlan.ChildPlan != nil {
response.ChildPlan = PlanToResponse(subPlan.ChildPlan)
childPlanResp, err := PlanToResponse(subPlan.ChildPlan)
if err != nil {
return SubPlanResponse{}, err
}
response.ChildPlan = childPlanResp
}
return response
return response, nil
}
// TaskToResponse 将Task模型转换为TaskResponse
func TaskToResponse(task *models.Task) TaskResponse {
func TaskToResponse(task *models.Task) (TaskResponse, error) {
if task == nil {
return TaskResponse{}
return TaskResponse{}, nil
}
var params map[string]interface{}
if len(task.Parameters) > 0 && string(task.Parameters) != "null" {
if err := json.Unmarshal(task.Parameters, &params); err != nil {
return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err)
}
}
return TaskResponse{
@@ -171,14 +197,19 @@ func TaskToResponse(task *models.Task) TaskResponse {
Description: task.Description,
ExecutionOrder: task.ExecutionOrder,
Type: task.Type,
Parameters: controller.Properties(task.Parameters),
}
Parameters: params,
}, nil
}
// TaskFromRequest 将TaskRequest转换为Task模型
func TaskFromRequest(req *TaskRequest) models.Task {
func TaskFromRequest(req *TaskRequest) (models.Task, error) {
if req == nil {
return models.Task{}
return models.Task{}, nil
}
paramsJSON, err := json.Marshal(req.Parameters)
if err != nil {
return models.Task{}, fmt.Errorf("serializing task parameters failed: %w", err)
}
return models.Task{
@@ -186,6 +217,6 @@ func TaskFromRequest(req *TaskRequest) models.Task {
Description: req.Description,
ExecutionOrder: req.ExecutionOrder,
Type: req.Type,
Parameters: datatypes.JSON(req.Parameters),
}
Parameters: paramsJSON,
}, nil
}

View File

@@ -75,7 +75,7 @@ type TaskRequest struct {
Description string `json:"description" example:"打开1号风扇"`
ExecutionOrder int `json:"execution_order" example:"1"`
Type models.TaskType `json:"type" example:"waiting"`
Parameters controller.Properties `json:"parameters,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
// TaskResponse 定义任务响应结构体
@@ -86,7 +86,7 @@ type TaskResponse struct {
Description string `json:"description" example:"打开1号风扇"`
ExecutionOrder int `json:"execution_order" example:"1"`
Type models.TaskType `json:"type" example:"waiting"`
Parameters controller.Properties `json:"parameters,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
// --- Controller 定义 ---
@@ -138,14 +138,19 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
return
}
// 创建成功后,调用 manager 创建或更新触发器
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil {
// 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToCreate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err)
c.logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
}
// 使用已有的转换函数将创建后的模型转换为响应对象
resp := PlanToResponse(planToCreate)
resp, err := PlanToResponse(planToCreate)
if err != nil {
c.logger.Errorf("创建计划: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "计划创建成功,但响应生成失败")
return
}
// 使用统一的成功响应函数
controller.SendResponse(ctx, controller.CodeCreated, "计划创建成功", resp)
@@ -183,7 +188,12 @@ func (c *Controller) GetPlan(ctx *gin.Context) {
}
// 3. 将模型转换为响应 DTO
resp := PlanToResponse(plan)
resp, err := PlanToResponse(plan)
if err != nil {
c.logger.Errorf("获取计划详情: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取计划详情失败: 内部数据格式错误")
return
}
// 4. 发送成功响应
controller.SendResponse(ctx, controller.CodeSuccess, "获取计划详情成功", resp)
@@ -208,7 +218,13 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
// 2. 将模型转换为响应 DTO
planResponses := make([]PlanResponse, 0, len(plans))
for _, p := range plans {
planResponses = append(planResponses, *PlanToResponse(&p))
resp, err := PlanToResponse(&p)
if err != nil {
c.logger.Errorf("获取计划列表: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取计划列表失败: 内部数据格式错误")
return
}
planResponses = append(planResponses, *resp)
}
// 3. 构造并发送成功响应
@@ -271,10 +287,10 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return
}
// 更新成功后,调用 manager 创建或更新触发器
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil {
// 更新成功后,调用 manager 确保触发器任务定义存在
if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToUpdate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志
c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err)
c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
}
// 6. 获取更新后的完整计划用于响应
@@ -286,7 +302,12 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
}
// 7. 将模型转换为响应 DTO
resp := PlanToResponse(updatedPlan)
resp, err := PlanToResponse(updatedPlan)
if err != nil {
c.logger.Errorf("更新计划: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "计划更新成功,但响应生成失败")
return
}
// 8. 发送成功响应
controller.SendResponse(ctx, controller.CodeSuccess, "计划更新成功", resp)

View File

@@ -1,7 +1,6 @@
package controller
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
@@ -46,6 +45,3 @@ func SendResponse(ctx *gin.Context, code int, message string, data interface{})
func SendErrorResponse(ctx *gin.Context, code int, message string) {
SendResponse(ctx, code, message, nil)
}
// Properties 是一个自定义类型,用于在 Swagger 中正确表示 JSON 对象
type Properties json.RawMessage

View File

@@ -1,7 +1,9 @@
package task
import (
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -9,13 +11,15 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
)
// AnalysisPlanTaskManager 封装了创建和更新计划分析任务(即触发器)的逻辑
// 这是一个可被 Scheduler 和其他应用服务(如 PlanService共享的无状态组件
// AnalysisPlanTaskManager 负责管理分析计划的触发器任务
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
type AnalysisPlanTaskManager struct {
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
logger *logs.Logger
mu sync.Mutex
}
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
@@ -33,50 +37,281 @@ func NewAnalysisPlanTaskManager(
}
}
// CreateOrUpdateTrigger 为给定的 planID 创建或更新其关联的下一次触发任务。
// 这个方法是幂等的,可以安全地被多次调用
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(ctx context.Context, planID uint) error {
// 获取计划信息
plan, err := m.planRepo.GetBasicPlanByID(planID)
// Refresh 同步数据库中的计划状态和待执行队列中的触发任务。
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中
func (m *AnalysisPlanTaskManager) Refresh() error {
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Info("开始同步计划任务管理器...")
// 1. 一次性获取所有需要的数据
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
if err != nil {
m.logger.Errorf("[严重] 获取计划失败, 错误: %v", err)
return err
return fmt.Errorf("获取刷新数据失败: %w", err)
}
// 获取触发任务
task, err := m.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID)
if err != nil {
m.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err)
return err
// 2. 清理所有与失效计划相关的待执行任务
if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
// 仅记录错误,清理失败不应阻止新任务的添加
m.logger.Errorf("清理无效任务时出错: %v", err)
}
// 写入执行日志
taskLog := &models.TaskExecutionLog{
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil {
m.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err)
return err
// 3. 添加或更新触发器
if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil {
return fmt.Errorf("添加或更新触发器时出错: %w", err)
}
// 写入待执行队列
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err)
return err
}
pendingTask := &models.PendingTask{
TaskID: task.ID,
ExecuteAt: next,
TaskExecutionLogID: taskLog.ID,
}
err = m.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask})
if err != nil {
m.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err)
return err
}
m.logger.Infof("成功为 Plan %d 创建/更新了下一次的触发任务,执行时间: %v", planID, next)
m.logger.Info("计划任务管理器同步完成.")
return nil
}
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。
// 关键修改:如果触发器已存在,会根据计划类型更新其执行时间。
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
// 检查计划是否可执行
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("获取计划基本信息失败: %w", err)
}
if plan.Status != models.PlanStatusEnabled {
return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建更新触发器", planID, plan.Status)
}
// 查找现有触发器
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
if err != nil {
return fmt.Errorf("查找现有触发器失败: %w", err)
}
// 如果触发器已存在,则根据计划类型更新其执行时间
if existingTrigger != nil {
var expectedExecuteAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
// 手动计划,如果再次触发,则立即执行
expectedExecuteAt = time.Now()
} else { // 自动计划
// 自动计划,根据 Cron 表达式计算下一次执行时间
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
return fmt.Errorf("解析 Cron 表达式失败: %w", err)
}
expectedExecuteAt = next
}
// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
return fmt.Errorf("更新触发器执行时间失败: %w", err)
}
} else {
m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
}
return nil // 触发器已存在且已处理更新,直接返回
}
// 如果触发器不存在,则创建新的触发器
m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(plan)
}
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
}
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
}
if analysisTask == nil {
m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
} else {
m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
}
return nil
}
// --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
runnablePlans, err = m.planRepo.FindRunnablePlans()
if err != nil {
m.logger.Errorf("获取可执行计划列表失败: %v", err)
return
}
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
if err != nil {
m.logger.Errorf("获取失效计划列表失败: %v", err)
return
}
invalidPlanIDs = make([]uint, len(invalidPlans))
for i, p := range invalidPlans {
invalidPlanIDs[i] = p.ID
}
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
if err != nil {
m.logger.Errorf("获取所有待执行任务失败: %v", err)
return
}
return
}
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
if len(invalidPlanIDs) == 0 {
return nil // 没有需要清理的计划
}
invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs))
for _, id := range invalidPlanIDs {
invalidPlanIDSet[id] = struct{}{}
}
var tasksToDeleteIDs []uint
var logsToCancelIDs []uint
for _, pt := range allPendingTasks {
if pt.Task == nil { // 防御性编程,确保 Task 被预加载
continue
}
if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid {
tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID)
logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID)
}
}
if len(tasksToDeleteIDs) == 0 {
return nil // 没有找到需要清理的任务
}
m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
// 批量删除待执行任务
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
return fmt.Errorf("批量删除待执行任务失败: %w", err)
}
// 批量更新相关执行日志状态为“已取消”
if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
// 这是一个非关键性错误,只记录日志
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
}
return nil
}
// addOrUpdateTriggers 检查、更新或创建触发器。
func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
// 创建一个映射,存放所有已在队列中的计划触发器
pendingTriggersMap := make(map[uint]models.PendingTask)
for _, pt := range allPendingTasks {
if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis {
pendingTriggersMap[pt.Task.PlanID] = pt
}
}
for _, plan := range runnablePlans {
existingTrigger, exists := pendingTriggersMap[plan.ID]
if exists {
// --- 新增逻辑:检查并更新现有触发器 ---
// 只对自动计划检查时间更新
if plan.ExecutionType == models.PlanExecutionTypeAutomatic {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败跳过更新: %v", plan.ID, err)
continue
}
// 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致则更新
if !existingTrigger.ExecuteAt.Equal(next) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
}
}
}
} else {
// --- 原有逻辑:为缺失的计划创建新触发器 ---
m.logger.Infof("发现应执行但队列中缺失的计划 #%d正在为其创建触发器...", plan.ID)
if err := m.createTriggerTask(plan); err != nil {
m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
// 继续处理下一个,不因单点失败而中断
}
}
}
return nil
}
// createTriggerTask 是创建触发器任务的内部核心逻辑。
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("查找计划分析任务失败: %w", err)
}
// --- 如果触发器任务定义不存在,则自动创建 ---
if analysisTask == nil {
m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan)
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
analysisTask = newAnalysisTask
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
}
var executeAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
executeAt = time.Now()
} else {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err)
}
executeAt = next
}
taskLog := &models.TaskExecutionLog{
TaskID: analysisTask.ID,
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
return fmt.Errorf("创建任务执行日志失败: %w", err)
}
pendingTask := &models.PendingTask{
TaskID: analysisTask.ID,
ExecuteAt: executeAt,
TaskExecutionLogID: taskLog.ID,
}
if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
return fmt.Errorf("创建待执行任务失败: %w", err)
}
m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
return nil
}

View File

@@ -1,7 +1,7 @@
package task
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
@@ -114,8 +114,7 @@ type Scheduler struct {
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
stopChan chan struct{} // 用于停止主循环的信号通道
}
// NewScheduler 创建一个新的调度器实例
@@ -128,8 +127,6 @@ func NewScheduler(
logger *logs.Logger,
interval time.Duration,
numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
@@ -140,8 +137,7 @@ func NewScheduler(
workers: numWorkers,
progressTracker: NewProgressTracker(),
taskFactory: taskFactory,
ctx: ctx,
cancel: cancel,
stopChan: make(chan struct{}), // 初始化停止信号通道
}
}
@@ -164,7 +160,7 @@ func (s *Scheduler) Start() {
// Stop 优雅地停止调度器
func (s *Scheduler) Stop() {
s.logger.Warnf("正在停止任务调度器...")
s.cancel() // 1. 发出取消信号,停止主循环
close(s.stopChan) // 1. 发出停止信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Warnf("任务调度器已安全停止")
@@ -178,9 +174,11 @@ func (s *Scheduler) run() {
for {
select {
case <-s.ctx.Done():
case <-s.stopChan:
// 收到停止信号,退出循环
return
case <-ticker.C:
// 定时触发任务认领和提交
go s.claimAndSubmit()
}
}
@@ -236,7 +234,7 @@ func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models
s.logger.Warnf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID)
}
// processTask 处理单个任务的逻辑 (当前为占位符)
// processTask 处理单个任务的逻辑
func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
@@ -256,10 +254,40 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
// 调用共享的 Manager 来处理触发器更新逻辑
err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID)
// --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 ---
planID := claimedLog.Task.PlanID
// 获取计划的最新数据
plan, err := s.planRepo.GetBasicPlanByID(planID)
if err != nil {
s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err)
s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err)
return
}
// 更新计划的执行计数器
plan.ExecuteCount++
// 如果是自动计划且达到执行次数上限,则更新计划状态为已停止
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
plan.Status = models.PlanStatusStopeed
s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d状态更新为 '执行完毕'。", planID, plan.ExecuteNum)
}
// 保存更新后的计划状态和执行计数
if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象
s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err)
return
}
// 更新计划执行日志状态为完成
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil {
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err)
// 这是一个非阻塞性错误,不中断后续流程
}
// 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器)
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
}
}
@@ -301,8 +329,18 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录
// 从任务的 Parameters 中解析出真实的 PlanID
var params struct {
PlanID uint `json:"plan_id"`
}
if err := json.Unmarshal(claimedLog.Task.Parameters, &params); err != nil {
s.logger.Errorf("解析任务参数中的计划ID失败日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
realPlanID := params.PlanID
planLog := &models.PlanExecutionLog{
PlanID: claimedLog.Task.PlanID,
PlanID: realPlanID, // 使用从参数中解析出的真实 PlanID
Status: models.ExecutionStatusStarted,
StartedAt: time.Now(),
}
@@ -312,7 +350,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
}
// 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID)
tasks, err := s.planRepo.FlattenPlanTasks(realPlanID)
if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
@@ -320,12 +358,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for _, task := range tasks {
taskLogs = append(taskLogs, &models.TaskExecutionLog{
for i, task := range tasks {
taskLogs[i] = &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID,
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
})
}
}
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
@@ -337,13 +375,13 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks {
pendingTasks = append(pendingTasks, &models.PendingTask{
pendingTasks[i] = &models.PendingTask{
TaskID: task.ID,
TaskExecutionLogID: pendingTasks[i].ID,
TaskExecutionLogID: taskLogs[i].ID, // 使用正确的 TaskExecutionLogID
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
})
}
}
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil {
@@ -352,7 +390,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
}
// 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks))
s.progressTracker.AddNewPlan(planLog.ID, len(tasks))
return nil
}

View File

@@ -70,7 +70,7 @@ type Device struct {
gorm.Model
// Name 是设备的业务名称,应清晰可读,例如 "1号猪舍温度传感器" 或 "做料车间主控"
Name string `gorm:"unique;not null" json:"name"`
Name string `gorm:"not null" json:"name"`
// Type 是设备的高级类别,用于区分区域主控和普通设备。建立索引以优化按类型查询。
Type DeviceType `gorm:"not null;index" json:"type"`

View File

@@ -16,11 +16,13 @@ type PendingTask struct {
// TaskID 使用 int 类型以容纳特殊的负数ID代表系统任务
TaskID int `gorm:"index"`
// Task 字段,用于在代码中访问关联的任务详情
// GORM 会根据 TaskID 字段自动填充此关联
Task *Task `gorm:"foreignKey:TaskID"`
ExecuteAt time.Time `gorm:"index"` // 任务执行时间
TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID
// 关联关系定义
// 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录
// ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理
TaskExecutionLog TaskExecutionLog `gorm:"foreignKey:TaskExecutionLogID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`

View File

@@ -8,11 +8,16 @@ import (
// ExecutionLogRepository 定义了与执行日志交互的接口。
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
type ExecutionLogRepository interface {
UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
UpdateLogStatus(logID uint, status models.ExecutionStatus) error
CreateTaskExecutionLog(log *models.TaskExecutionLog) error
CreatePlanExecutionLog(log *models.PlanExecutionLog) error
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error
UpdateTaskExecutionLog(log *models.TaskExecutionLog) error
FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error)
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error
}
// gormExecutionLogRepository 是使用 GORM 的具体实现。
@@ -26,6 +31,23 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
return &gormExecutionLogRepository{db: db}
}
func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
if len(logIDs) == 0 {
return nil
}
return r.db.Model(&models.TaskExecutionLog{}).
Where("id IN ?", logIDs).
Update("status", status).Error
}
func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error {
return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
}
func (r *gormExecutionLogRepository) CreateTaskExecutionLog(log *models.TaskExecutionLog) error {
return r.db.Create(log).Error
}
// CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。
func (r *gormExecutionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error {
return r.db.Create(log).Error
@@ -41,6 +63,9 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLog(log *models.PlanExec
// CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。
// 这是“预写日志”步骤的关键。
func (r *gormExecutionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error {
if len(logs) == 0 {
return nil
}
// GORM 的 Create 传入一个切片指针会执行批量插入。
return r.db.Create(&logs).Error
}
@@ -63,3 +88,8 @@ func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models.
}
return &log, nil
}
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
func (r *gormExecutionLogRepository) UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
return r.db.Model(&models.PlanExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
}

View File

@@ -1,6 +1,8 @@
package repository
import (
"errors"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -10,7 +12,15 @@ import (
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface {
FindAllPendingTasks() ([]models.PendingTask, error)
FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error)
DeletePendingTasksByIDs(ids []uint) error
CreatePendingTask(task *models.PendingTask) error
CreatePendingTasksInBatch(tasks []*models.PendingTask) error
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
@@ -28,11 +38,50 @@ func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
return &gormPendingTaskRepository{db: db}
}
func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) {
var tasks []models.PendingTask
// 预加载 Task 以便后续访问 Task.PlanID
err := r.db.Preload("Task").Find(&tasks).Error
return tasks, err
}
func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error) {
var pendingTask models.PendingTask
// 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id
err := r.db.
Joins("JOIN tasks ON tasks.id = pending_tasks.task_id").
Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).
First(&pendingTask).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误
}
return &pendingTask, err
}
func (r *gormPendingTaskRepository) DeletePendingTasksByIDs(ids []uint) error {
if len(ids) == 0 {
return nil
}
return r.db.Where("id IN ?", ids).Delete(&models.PendingTask{}).Error
}
func (r *gormPendingTaskRepository) CreatePendingTask(task *models.PendingTask) error {
return r.db.Create(task).Error
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
if len(tasks) == 0 {
return nil
}
return r.db.Create(&tasks).Error
}
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error {
return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
}
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
var log models.TaskExecutionLog

View File

@@ -42,6 +42,15 @@ type PlanRepository interface {
DeleteTask(id int) error
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
// FindRunnablePlans 获取所有应执行的计划
FindRunnablePlans() ([]*models.Plan, error)
// FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划
FindDisabledAndStoppedPlans() ([]*models.Plan, error)
// FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务
FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error)
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error)
}
// gormPlanRepository 是 PlanRepository 的 GORM 实现
@@ -174,7 +183,9 @@ func (r *gormPlanRepository) CreatePlan(plan *models.Plan) error {
}
// 3. 创建触发器Task
if err := r.createPlanAnalysisTask(tx, plan); err != nil {
// 关键修改:调用 createPlanAnalysisTask 并处理其返回的 Task 对象
_, err := r.createPlanAnalysisTask(tx, plan)
if err != nil {
return err
}
return nil
@@ -517,63 +528,100 @@ func (r *gormPlanRepository) deleteTask(tx *gorm.DB, id int) error {
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) {
return r.findPlanAnalysisTaskByParamsPlanID(r.db, paramsPlanID)
}
// findPlanAnalysisTaskByParamsPlanID 使用指定db根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) findPlanAnalysisTaskByParamsPlanID(tx *gorm.DB, paramsPlanID uint) (*models.Task, error) {
var task models.Task
// 构造JSON查询条件查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务
// TODO 在JSON字段中查找特定键值的语法取决于数据库类型这里使用PostgreSQL的语法
// TODO 如果使用的是MySQL则需要相应调整查询条件
result := tx.Where(
"type = ? AND parameters->>'plan_id' = ?",
models.TaskPlanAnalysis,
fmt.Sprintf("%d", paramsPlanID),
).First(&task)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("未找到Parameters.PlanID为%d的TaskPlanAnalysis类型任务", paramsPlanID)
}
return nil, fmt.Errorf("查找任务时出错: %w", result.Error)
}
return &task, nil
return r.findPlanAnalysisTask(r.db, paramsPlanID)
}
// createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error {
// 关键修改Task.PlanID 设置为 0实际 PlanID 存储在 Parameters 中,并返回创建的 Task
func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) (*models.Task, error) {
m := map[string]interface{}{
models.ParamsPlanID: plan.ID,
}
parameters, err := json.Marshal(m)
if err != nil {
return err
return nil, err
}
task := &models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("'%v'计划触发器", plan.Name),
Description: fmt.Sprintf("计划名: %v, 计划ID: %v", plan.Name, plan.ID),
ExecutionOrder: 0,
PlanID: 0, // 关键:设置为 0避免被常规 PlanID 查询查到
Name: fmt.Sprintf("'%s'计划触发器", plan.Name),
Description: fmt.Sprintf("计划名: %s, 计划ID: %d", plan.Name, plan.ID),
ExecutionOrder: 0, // 触发器任务的执行顺序通常为0或不关心
Type: models.TaskPlanAnalysis,
Parameters: datatypes.JSON(parameters),
}
return tx.Create(task).Error
if err := tx.Create(task).Error; err != nil {
return nil, err
}
return task, nil
}
// updatePlanAnalysisTask 使用简单粗暴的删除再创建方式实现更新, 以控制AnalysisPlanTask的定义全部在createPlanAnalysisTask方法中
// updatePlanAnalysisTask 使用更安全的方式更新触发器任务
func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error {
task, err := r.findPlanAnalysisTaskByParamsPlanID(tx, plan.ID)
if err != nil {
task, err := r.findPlanAnalysisTask(tx, plan.ID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("查找现有计划分析任务失败: %w", err)
}
// 如果触发器任务不存在,则创建一个
if task == nil {
_, err := r.createPlanAnalysisTask(tx, plan)
return err
}
err = r.deleteTask(tx, task.ID)
if err != nil {
return err
}
return r.createPlanAnalysisTask(tx, plan)
// 如果存在,则更新它的名称和描述以反映计划的最新信息
task.Name = fmt.Sprintf("'%s'计划触发器", plan.Name)
task.Description = fmt.Sprintf("计划名: %s, 计划ID: %d", plan.Name, plan.ID)
return tx.Save(task).Error
}
func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) {
var plans []*models.Plan
err := r.db.
Where("status = ?", models.PlanStatusEnabled).
Where(
r.db.Where("execution_type = ?", models.PlanExecutionTypeManual).
Or("execution_type = ? AND (execute_num = 0 OR execute_count < execute_num)", models.PlanExecutionTypeAutomatic),
).
Find(&plans).Error
return plans, err
}
func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) {
var plans []*models.Plan
err := r.db.
Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed).
Find(&plans).Error
return plans, err
}
// findPlanAnalysisTask 是一个内部使用的、更高效的查找方法
// 关键修改:通过查询 parameters JSON 字段来查找
func (r *gormPlanRepository) findPlanAnalysisTask(tx *gorm.DB, planID uint) (*models.Task, error) {
var task models.Task
err := tx.Where("type = ? AND parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).First(&task).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误返回nil, nil
}
return &task, err
}
// FindPlanAnalysisTaskByPlanID 是暴露给外部的公共方法
// 关键修改:通过查询 parameters JSON 字段来查找
func (r *gormPlanRepository) FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) {
return r.findPlanAnalysisTask(r.db, planID)
}
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
// 这个方法是公开的,主要由 TaskManager 在发现触发器任务定义丢失时调用。
func (r *gormPlanRepository) CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error) {
var createdTask *models.Task
err := r.db.Transaction(func(tx *gorm.DB) error {
var err error
createdTask, err = r.createPlanAnalysisTask(tx, plan)
return err
})
return createdTask, err
}