Compare commits

...

7 Commits

Author SHA1 Message Date
74e42de7aa 重构AnalysisPlanTaskManager 2025-09-20 22:41:03 +08:00
b0eb135f44 重构AnalysisPlanTaskManager 2025-09-20 22:32:39 +08:00
056279bdc2 重构AnalysisPlanTaskManager 2025-09-20 22:32:00 +08:00
6711f55fba 重构AnalysisPlanTaskManager 2025-09-20 21:45:38 +08:00
e85d4f8ec3 重构AnalysisPlanTaskManager 2025-09-20 21:14:58 +08:00
40a892e09d todo 2025-09-20 18:11:48 +08:00
1f2d54d53e 修复bug 2025-09-20 17:11:04 +08:00
16 changed files with 731 additions and 218 deletions

View File

@@ -1,2 +0,0 @@
replace encoding/json.RawMessage object
replace git_huangwc_com_pig_pig-farm-controller_internal_app_controller_device.DeviceResponse device.DeviceResponse

View File

@@ -12,3 +12,6 @@
4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功 4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功
5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制 5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制
6. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能 6. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能
任务调度器执行触发器任务时要修改一下对应计划的执行次数(如果是指定次数的计划)

View File

@@ -560,9 +560,6 @@ const docTemplate = `{
} }
}, },
"definitions": { "definitions": {
"controller.Properties": {
"type": "object"
},
"controller.Response": { "controller.Response": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -596,7 +593,8 @@ const docTemplate = `{
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -623,7 +621,8 @@ const docTemplate = `{
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -652,7 +651,8 @@ const docTemplate = `{
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -952,7 +952,8 @@ const docTemplate = `{
"example": "打开风扇" "example": "打开风扇"
}, },
"parameters": { "parameters": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"type": { "type": {
"allOf": [ "allOf": [
@@ -984,7 +985,8 @@ const docTemplate = `{
"example": "打开风扇" "example": "打开风扇"
}, },
"parameters": { "parameters": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"plan_id": { "plan_id": {
"type": "integer", "type": "integer",

View File

@@ -549,9 +549,6 @@
} }
}, },
"definitions": { "definitions": {
"controller.Properties": {
"type": "object"
},
"controller.Response": { "controller.Response": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -585,7 +582,8 @@
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -612,7 +610,8 @@
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -641,7 +640,8 @@
"type": "integer" "type": "integer"
}, },
"properties": { "properties": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"sub_type": { "sub_type": {
"$ref": "#/definitions/models.DeviceSubType" "$ref": "#/definitions/models.DeviceSubType"
@@ -941,7 +941,8 @@
"example": "打开风扇" "example": "打开风扇"
}, },
"parameters": { "parameters": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"type": { "type": {
"allOf": [ "allOf": [
@@ -973,7 +974,8 @@
"example": "打开风扇" "example": "打开风扇"
}, },
"parameters": { "parameters": {
"$ref": "#/definitions/controller.Properties" "type": "object",
"additionalProperties": true
}, },
"plan_id": { "plan_id": {
"type": "integer", "type": "integer",

View File

@@ -1,6 +1,4 @@
definitions: definitions:
controller.Properties:
type: object
controller.Response: controller.Response:
properties: properties:
code: code:
@@ -21,7 +19,8 @@ definitions:
parent_id: parent_id:
type: integer type: integer
properties: properties:
$ref: '#/definitions/controller.Properties' additionalProperties: true
type: object
sub_type: sub_type:
$ref: '#/definitions/models.DeviceSubType' $ref: '#/definitions/models.DeviceSubType'
type: type:
@@ -39,7 +38,8 @@ definitions:
parent_id: parent_id:
type: integer type: integer
properties: properties:
$ref: '#/definitions/controller.Properties' additionalProperties: true
type: object
sub_type: sub_type:
$ref: '#/definitions/models.DeviceSubType' $ref: '#/definitions/models.DeviceSubType'
type: type:
@@ -61,7 +61,8 @@ definitions:
parent_id: parent_id:
type: integer type: integer
properties: properties:
$ref: '#/definitions/controller.Properties' additionalProperties: true
type: object
sub_type: sub_type:
$ref: '#/definitions/models.DeviceSubType' $ref: '#/definitions/models.DeviceSubType'
type: type:
@@ -271,7 +272,8 @@ definitions:
example: 打开风扇 example: 打开风扇
type: string type: string
parameters: parameters:
$ref: '#/definitions/controller.Properties' additionalProperties: true
type: object
type: type:
allOf: allOf:
- $ref: '#/definitions/models.TaskType' - $ref: '#/definitions/models.TaskType'
@@ -292,7 +294,8 @@ definitions:
example: 打开风扇 example: 打开风扇
type: string type: string
parameters: parameters:
$ref: '#/definitions/controller.Properties' additionalProperties: true
type: object
plan_id: plan_id:
example: 1 example: 1
type: integer type: integer

View File

@@ -1,7 +1,9 @@
package device package device
import ( import (
"encoding/json"
"errors" "errors"
"fmt"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -11,7 +13,6 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"gorm.io/datatypes"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -33,46 +34,54 @@ func NewController(repo repository.DeviceRepository, logger *logs.Logger) *Contr
// CreateDeviceRequest 定义了创建设备时需要传入的参数 // CreateDeviceRequest 定义了创建设备时需要传入的参数
type CreateDeviceRequest struct { type CreateDeviceRequest struct {
Name string `json:"name" binding:"required"` Name string `json:"name" binding:"required"`
Type models.DeviceType `json:"type" binding:"required"` Type models.DeviceType `json:"type" binding:"required"`
SubType models.DeviceSubType `json:"sub_type,omitempty"` SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"` ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"` Location string `json:"location,omitempty"`
Properties controller.Properties `json:"properties,omitempty"` Properties map[string]interface{} `json:"properties,omitempty"`
} }
// UpdateDeviceRequest 定义了更新设备时需要传入的参数 // UpdateDeviceRequest 定义了更新设备时需要传入的参数
type UpdateDeviceRequest struct { type UpdateDeviceRequest struct {
Name string `json:"name" binding:"required"` Name string `json:"name" binding:"required"`
Type models.DeviceType `json:"type" binding:"required"` Type models.DeviceType `json:"type" binding:"required"`
SubType models.DeviceSubType `json:"sub_type,omitempty"` SubType models.DeviceSubType `json:"sub_type,omitempty"`
ParentID *uint `json:"parent_id,omitempty"` ParentID *uint `json:"parent_id,omitempty"`
Location string `json:"location,omitempty"` Location string `json:"location,omitempty"`
Properties controller.Properties `json:"properties,omitempty"` Properties map[string]interface{} `json:"properties,omitempty"`
} }
// --- Response DTOs --- // --- Response DTOs ---
// DeviceResponse 定义了返回给客户端的单个设备信息的结构 // DeviceResponse 定义了返回给客户端的单个设备信息的结构
type DeviceResponse struct { type DeviceResponse struct {
ID uint `json:"id"` ID uint `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Type models.DeviceType `json:"type"` Type models.DeviceType `json:"type"`
SubType models.DeviceSubType `json:"sub_type"` SubType models.DeviceSubType `json:"sub_type"`
ParentID *uint `json:"parent_id"` ParentID *uint `json:"parent_id"`
Location string `json:"location"` Location string `json:"location"`
Properties controller.Properties `json:"properties"` Properties map[string]interface{} `json:"properties"`
CreatedAt string `json:"created_at"` CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"` UpdatedAt string `json:"updated_at"`
} }
// --- DTO 转换函数 --- // --- DTO 转换函数 ---
// newDeviceResponse 从数据库模型创建一个新的设备响应 DTO // newDeviceResponse 从数据库模型创建一个新的设备响应 DTO
func newDeviceResponse(device *models.Device) *DeviceResponse { func newDeviceResponse(device *models.Device) (*DeviceResponse, error) {
if device == nil { if device == nil {
return nil return nil, nil
} }
var props map[string]interface{}
if len(device.Properties) > 0 && string(device.Properties) != "null" {
if err := json.Unmarshal(device.Properties, &props); err != nil {
return nil, fmt.Errorf("解析设备属性失败 (ID: %d): %w", device.ID, err)
}
}
return &DeviceResponse{ return &DeviceResponse{
ID: device.ID, ID: device.ID,
Name: device.Name, Name: device.Name,
@@ -80,19 +89,23 @@ func newDeviceResponse(device *models.Device) *DeviceResponse {
SubType: device.SubType, SubType: device.SubType,
ParentID: device.ParentID, ParentID: device.ParentID,
Location: device.Location, Location: device.Location,
Properties: controller.Properties(device.Properties), Properties: props,
CreatedAt: device.CreatedAt.Format(time.RFC3339), CreatedAt: device.CreatedAt.Format(time.RFC3339),
UpdatedAt: device.UpdatedAt.Format(time.RFC3339), UpdatedAt: device.UpdatedAt.Format(time.RFC3339),
} }, nil
} }
// newListDeviceResponse 从数据库模型切片创建一个新的设备列表响应 DTO 切片 // newListDeviceResponse 从数据库模型切片创建一个新的设备列表响应 DTO 切片
func newListDeviceResponse(devices []*models.Device) []*DeviceResponse { func newListDeviceResponse(devices []*models.Device) ([]*DeviceResponse, error) {
list := make([]*DeviceResponse, 0, len(devices)) list := make([]*DeviceResponse, 0, len(devices))
for _, device := range devices { for _, device := range devices {
list = append(list, newDeviceResponse(device)) resp, err := newDeviceResponse(device)
if err != nil {
return nil, err
}
list = append(list, resp)
} }
return list return list, nil
} }
// --- Controller Methods --- // --- Controller Methods ---
@@ -114,13 +127,20 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
return return
} }
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
c.logger.Errorf("创建设备: 序列化属性失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "属性字段格式错误")
return
}
device := &models.Device{ device := &models.Device{
Name: req.Name, Name: req.Name,
Type: req.Type, Type: req.Type,
SubType: req.SubType, SubType: req.SubType,
ParentID: req.ParentID, ParentID: req.ParentID,
Location: req.Location, Location: req.Location,
Properties: datatypes.JSON(req.Properties), Properties: propertiesJSON,
} }
if err := c.repo.Create(device); err != nil { if err := c.repo.Create(device); err != nil {
@@ -129,7 +149,14 @@ func (c *Controller) CreateDevice(ctx *gin.Context) {
return return
} }
controller.SendResponse(ctx, controller.CodeCreated, "设备创建成功", newDeviceResponse(device)) resp, err := newDeviceResponse(device)
if err != nil {
c.logger.Errorf("创建设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "设备创建成功,但响应生成失败")
return
}
controller.SendResponse(ctx, controller.CodeCreated, "设备创建成功", resp)
} }
// GetDevice godoc // GetDevice godoc
@@ -158,7 +185,14 @@ func (c *Controller) GetDevice(ctx *gin.Context) {
return return
} }
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备信息成功", newDeviceResponse(device)) resp, err := newDeviceResponse(device)
if err != nil {
c.logger.Errorf("获取设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取设备信息失败: 内部数据格式错误")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备信息成功", resp)
} }
// ListDevices godoc // ListDevices godoc
@@ -176,7 +210,14 @@ func (c *Controller) ListDevices(ctx *gin.Context) {
return return
} }
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备列表成功", newListDeviceResponse(devices)) resp, err := newListDeviceResponse(devices)
if err != nil {
c.logger.Errorf("获取设备列表: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取设备列表失败: 内部数据格式错误")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "获取设备列表成功", resp)
} }
// UpdateDevice godoc // UpdateDevice godoc
@@ -216,13 +257,20 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
return return
} }
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
c.logger.Errorf("更新设备: 序列化属性失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "属性字段格式错误")
return
}
// 3. 更新从数据库中查出的现有设备对象的字段 // 3. 更新从数据库中查出的现有设备对象的字段
existingDevice.Name = req.Name existingDevice.Name = req.Name
existingDevice.Type = req.Type existingDevice.Type = req.Type
existingDevice.SubType = req.SubType existingDevice.SubType = req.SubType
existingDevice.ParentID = req.ParentID existingDevice.ParentID = req.ParentID
existingDevice.Location = req.Location existingDevice.Location = req.Location
existingDevice.Properties = datatypes.JSON(req.Properties) existingDevice.Properties = propertiesJSON
// 4. 将修改后的 existingDevice 对象保存回数据库 // 4. 将修改后的 existingDevice 对象保存回数据库
if err := c.repo.Update(existingDevice); err != nil { if err := c.repo.Update(existingDevice); err != nil {
@@ -231,7 +279,14 @@ func (c *Controller) UpdateDevice(ctx *gin.Context) {
return return
} }
controller.SendResponse(ctx, controller.CodeSuccess, "设备更新成功", newDeviceResponse(existingDevice)) resp, err := newDeviceResponse(existingDevice)
if err != nil {
c.logger.Errorf("更新设备: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "设备更新成功,但响应生成失败")
return
}
controller.SendResponse(ctx, controller.CodeSuccess, "设备更新成功", resp)
} }
// DeleteDevice godoc // DeleteDevice godoc

View File

@@ -1,15 +1,16 @@
package plan package plan
import ( import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller" "encoding/json"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/datatypes"
) )
// PlanToResponse 将Plan模型转换为PlanResponse // PlanToResponse 将Plan模型转换为PlanResponse
func PlanToResponse(plan *models.Plan) *PlanResponse { func PlanToResponse(plan *models.Plan) (*PlanResponse, error) {
if plan == nil { if plan == nil {
return nil return nil, nil
} }
response := &PlanResponse{ response := &PlanResponse{
@@ -28,7 +29,11 @@ func PlanToResponse(plan *models.Plan) *PlanResponse {
if plan.ContentType == models.PlanContentTypeSubPlans { if plan.ContentType == models.PlanContentTypeSubPlans {
response.SubPlans = make([]SubPlanResponse, len(plan.SubPlans)) response.SubPlans = make([]SubPlanResponse, len(plan.SubPlans))
for i, subPlan := range plan.SubPlans { for i, subPlan := range plan.SubPlans {
response.SubPlans[i] = SubPlanToResponse(&subPlan) subPlanResp, err := SubPlanToResponse(&subPlan)
if err != nil {
return nil, err
}
response.SubPlans[i] = subPlanResp
} }
} }
@@ -36,11 +41,15 @@ func PlanToResponse(plan *models.Plan) *PlanResponse {
if plan.ContentType == models.PlanContentTypeTasks { if plan.ContentType == models.PlanContentTypeTasks {
response.Tasks = make([]TaskResponse, len(plan.Tasks)) response.Tasks = make([]TaskResponse, len(plan.Tasks))
for i, task := range plan.Tasks { for i, task := range plan.Tasks {
response.Tasks[i] = TaskToResponse(&task) taskResp, err := TaskToResponse(&task)
if err != nil {
return nil, err
}
response.Tasks[i] = taskResp
} }
} }
return response return response, nil
} }
// PlanFromCreateRequest 将CreatePlanRequest转换为Plan模型并进行业务规则验证 // PlanFromCreateRequest 将CreatePlanRequest转换为Plan模型并进行业务规则验证
@@ -73,8 +82,11 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil { if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks)) plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks { for i, taskReq := range req.Tasks {
// 使用来自请求的ExecutionOrder task, err := TaskFromRequest(&taskReq)
plan.Tasks[i] = TaskFromRequest(&taskReq) if err != nil {
return nil, err
}
plan.Tasks[i] = task
} }
} }
@@ -120,8 +132,11 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil { if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks)) plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks { for i, taskReq := range req.Tasks {
// 使用来自请求的ExecutionOrder task, err := TaskFromRequest(&taskReq)
plan.Tasks[i] = TaskFromRequest(&taskReq) if err != nil {
return nil, err
}
plan.Tasks[i] = task
} }
} }
@@ -138,9 +153,9 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
} }
// SubPlanToResponse 将SubPlan模型转换为SubPlanResponse // SubPlanToResponse 将SubPlan模型转换为SubPlanResponse
func SubPlanToResponse(subPlan *models.SubPlan) SubPlanResponse { func SubPlanToResponse(subPlan *models.SubPlan) (SubPlanResponse, error) {
if subPlan == nil { if subPlan == nil {
return SubPlanResponse{} return SubPlanResponse{}, nil
} }
response := SubPlanResponse{ response := SubPlanResponse{
@@ -152,16 +167,27 @@ func SubPlanToResponse(subPlan *models.SubPlan) SubPlanResponse {
// 如果有完整的子计划数据,也进行转换 // 如果有完整的子计划数据,也进行转换
if subPlan.ChildPlan != nil { if subPlan.ChildPlan != nil {
response.ChildPlan = PlanToResponse(subPlan.ChildPlan) childPlanResp, err := PlanToResponse(subPlan.ChildPlan)
if err != nil {
return SubPlanResponse{}, err
}
response.ChildPlan = childPlanResp
} }
return response return response, nil
} }
// TaskToResponse 将Task模型转换为TaskResponse // TaskToResponse 将Task模型转换为TaskResponse
func TaskToResponse(task *models.Task) TaskResponse { func TaskToResponse(task *models.Task) (TaskResponse, error) {
if task == nil { if task == nil {
return TaskResponse{} return TaskResponse{}, nil
}
var params map[string]interface{}
if len(task.Parameters) > 0 && string(task.Parameters) != "null" {
if err := json.Unmarshal(task.Parameters, &params); err != nil {
return TaskResponse{}, fmt.Errorf("parsing task parameters failed (ID: %d): %w", task.ID, err)
}
} }
return TaskResponse{ return TaskResponse{
@@ -171,14 +197,19 @@ func TaskToResponse(task *models.Task) TaskResponse {
Description: task.Description, Description: task.Description,
ExecutionOrder: task.ExecutionOrder, ExecutionOrder: task.ExecutionOrder,
Type: task.Type, Type: task.Type,
Parameters: controller.Properties(task.Parameters), Parameters: params,
} }, nil
} }
// TaskFromRequest 将TaskRequest转换为Task模型 // TaskFromRequest 将TaskRequest转换为Task模型
func TaskFromRequest(req *TaskRequest) models.Task { func TaskFromRequest(req *TaskRequest) (models.Task, error) {
if req == nil { if req == nil {
return models.Task{} return models.Task{}, nil
}
paramsJSON, err := json.Marshal(req.Parameters)
if err != nil {
return models.Task{}, fmt.Errorf("serializing task parameters failed: %w", err)
} }
return models.Task{ return models.Task{
@@ -186,6 +217,6 @@ func TaskFromRequest(req *TaskRequest) models.Task {
Description: req.Description, Description: req.Description,
ExecutionOrder: req.ExecutionOrder, ExecutionOrder: req.ExecutionOrder,
Type: req.Type, Type: req.Type,
Parameters: datatypes.JSON(req.Parameters), Parameters: paramsJSON,
} }, nil
} }

View File

@@ -71,22 +71,22 @@ type SubPlanResponse struct {
// TaskRequest 定义任务请求结构体 // TaskRequest 定义任务请求结构体
type TaskRequest struct { type TaskRequest struct {
Name string `json:"name" example:"打开风扇"` Name string `json:"name" example:"打开风扇"`
Description string `json:"description" example:"打开1号风扇"` Description string `json:"description" example:"打开1号风扇"`
ExecutionOrder int `json:"execution_order" example:"1"` ExecutionOrder int `json:"execution_order" example:"1"`
Type models.TaskType `json:"type" example:"waiting"` Type models.TaskType `json:"type" example:"waiting"`
Parameters controller.Properties `json:"parameters,omitempty"` Parameters map[string]interface{} `json:"parameters,omitempty"`
} }
// TaskResponse 定义任务响应结构体 // TaskResponse 定义任务响应结构体
type TaskResponse struct { type TaskResponse struct {
ID int `json:"id" example:"1"` ID int `json:"id" example:"1"`
PlanID uint `json:"plan_id" example:"1"` PlanID uint `json:"plan_id" example:"1"`
Name string `json:"name" example:"打开风扇"` Name string `json:"name" example:"打开风扇"`
Description string `json:"description" example:"打开1号风扇"` Description string `json:"description" example:"打开1号风扇"`
ExecutionOrder int `json:"execution_order" example:"1"` ExecutionOrder int `json:"execution_order" example:"1"`
Type models.TaskType `json:"type" example:"waiting"` Type models.TaskType `json:"type" example:"waiting"`
Parameters controller.Properties `json:"parameters,omitempty"` Parameters map[string]interface{} `json:"parameters,omitempty"`
} }
// --- Controller 定义 --- // --- Controller 定义 ---
@@ -138,14 +138,19 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
return return
} }
// 创建成功后,调用 manager 创建或更新触发器 // 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil { if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToCreate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功 // 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err) c.logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
} }
// 使用已有的转换函数将创建后的模型转换为响应对象 // 使用已有的转换函数将创建后的模型转换为响应对象
resp := PlanToResponse(planToCreate) resp, err := PlanToResponse(planToCreate)
if err != nil {
c.logger.Errorf("创建计划: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "计划创建成功,但响应生成失败")
return
}
// 使用统一的成功响应函数 // 使用统一的成功响应函数
controller.SendResponse(ctx, controller.CodeCreated, "计划创建成功", resp) controller.SendResponse(ctx, controller.CodeCreated, "计划创建成功", resp)
@@ -183,7 +188,12 @@ func (c *Controller) GetPlan(ctx *gin.Context) {
} }
// 3. 将模型转换为响应 DTO // 3. 将模型转换为响应 DTO
resp := PlanToResponse(plan) resp, err := PlanToResponse(plan)
if err != nil {
c.logger.Errorf("获取计划详情: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取计划详情失败: 内部数据格式错误")
return
}
// 4. 发送成功响应 // 4. 发送成功响应
controller.SendResponse(ctx, controller.CodeSuccess, "获取计划详情成功", resp) controller.SendResponse(ctx, controller.CodeSuccess, "获取计划详情成功", resp)
@@ -208,7 +218,13 @@ func (c *Controller) ListPlans(ctx *gin.Context) {
// 2. 将模型转换为响应 DTO // 2. 将模型转换为响应 DTO
planResponses := make([]PlanResponse, 0, len(plans)) planResponses := make([]PlanResponse, 0, len(plans))
for _, p := range plans { for _, p := range plans {
planResponses = append(planResponses, *PlanToResponse(&p)) resp, err := PlanToResponse(&p)
if err != nil {
c.logger.Errorf("获取计划列表: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "获取计划列表失败: 内部数据格式错误")
return
}
planResponses = append(planResponses, *resp)
} }
// 3. 构造并发送成功响应 // 3. 构造并发送成功响应
@@ -271,10 +287,10 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
return return
} }
// 更新成功后,调用 manager 创建或更新触发器 // 更新成功后,调用 manager 确保触发器任务定义存在
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil { if err := c.analysisPlanTaskManager.EnsureAnalysisTaskDefinition(planToUpdate.ID); err != nil {
// 这是一个非阻塞性错误,我们只记录日志 // 这是一个非阻塞性错误,我们只记录日志
c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err) c.logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
} }
// 6. 获取更新后的完整计划用于响应 // 6. 获取更新后的完整计划用于响应
@@ -286,7 +302,12 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
} }
// 7. 将模型转换为响应 DTO // 7. 将模型转换为响应 DTO
resp := PlanToResponse(updatedPlan) resp, err := PlanToResponse(updatedPlan)
if err != nil {
c.logger.Errorf("更新计划: 序列化响应失败: %v", err)
controller.SendErrorResponse(ctx, controller.CodeInternalError, "计划更新成功,但响应生成失败")
return
}
// 8. 发送成功响应 // 8. 发送成功响应
controller.SendResponse(ctx, controller.CodeSuccess, "计划更新成功", resp) controller.SendResponse(ctx, controller.CodeSuccess, "计划更新成功", resp)

View File

@@ -1,7 +1,6 @@
package controller package controller
import ( import (
"encoding/json"
"net/http" "net/http"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -46,6 +45,3 @@ func SendResponse(ctx *gin.Context, code int, message string, data interface{})
func SendErrorResponse(ctx *gin.Context, code int, message string) { func SendErrorResponse(ctx *gin.Context, code int, message string) {
SendResponse(ctx, code, message, nil) SendResponse(ctx, code, message, nil)
} }
// Properties 是一个自定义类型,用于在 Swagger 中正确表示 JSON 对象
type Properties json.RawMessage

View File

@@ -1,7 +1,9 @@
package task package task
import ( import (
"context" "fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -9,13 +11,15 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
) )
// AnalysisPlanTaskManager 封装了创建和更新计划分析任务(即触发器)的逻辑 // AnalysisPlanTaskManager 负责管理分析计划的触发器任务
// 这是一个可被 Scheduler 和其他应用服务(如 PlanService共享的无状态组件 // 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
type AnalysisPlanTaskManager struct { type AnalysisPlanTaskManager struct {
planRepo repository.PlanRepository planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository executionLogRepo repository.ExecutionLogRepository
logger *logs.Logger logger *logs.Logger
mu sync.Mutex
} }
// NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。 // NewAnalysisPlanTaskManager 是 AnalysisPlanTaskManager 的构造函数。
@@ -33,50 +37,281 @@ func NewAnalysisPlanTaskManager(
} }
} }
// CreateOrUpdateTrigger 为给定的 planID 创建或更新其关联的下一次触发任务。 // Refresh 同步数据库中的计划状态和待执行队列中的触发任务。
// 这个方法是幂等的,可以安全地被多次调用 // 这是一个编排方法,将复杂的逻辑分解到多个内部方法中
func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(ctx context.Context, planID uint) error { func (m *AnalysisPlanTaskManager) Refresh() error {
// 获取计划信息 m.mu.Lock()
plan, err := m.planRepo.GetBasicPlanByID(planID) defer m.mu.Unlock()
m.logger.Info("开始同步计划任务管理器...")
// 1. 一次性获取所有需要的数据
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
if err != nil { if err != nil {
m.logger.Errorf("[严重] 获取计划失败, 错误: %v", err) return fmt.Errorf("获取刷新数据失败: %w", err)
return err
} }
// 获取触发任务 // 2. 清理所有与失效计划相关的待执行任务
task, err := m.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID) if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
if err != nil { // 仅记录错误,清理失败不应阻止新任务的添加
m.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err) m.logger.Errorf("清理无效任务时出错: %v", err)
return err
} }
// 写入执行日志 // 3. 添加或更新触发器
taskLog := &models.TaskExecutionLog{ if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil {
TaskID: task.ID, return fmt.Errorf("添加或更新触发器时出错: %w", err)
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil {
m.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err)
return err
} }
// 写入待执行队列 m.logger.Info("计划任务管理器同步完成.")
next, err := utils.GetNextCronTime(plan.CronExpression) return nil
if err != nil { }
m.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err)
return err // CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
} // 这个方法是幂等的:如果一个有效的触发器已存在,它将不会重复创建。
pendingTask := &models.PendingTask{ // 关键修改:如果触发器已存在,会根据计划类型更新其执行时间。
TaskID: task.ID, func (m *AnalysisPlanTaskManager) CreateOrUpdateTrigger(planID uint) error {
ExecuteAt: next, m.mu.Lock()
TaskExecutionLogID: taskLog.ID, defer m.mu.Unlock()
}
err = m.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask}) // 检查计划是否可执行
if err != nil { plan, err := m.planRepo.GetBasicPlanByID(planID)
m.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err) if err != nil {
return err return fmt.Errorf("获取计划基本信息失败: %w", err)
} }
if plan.Status != models.PlanStatusEnabled {
m.logger.Infof("成功为 Plan %d 创建/更新了下一次的触发任务,执行时间: %v", planID, next) return fmt.Errorf("计划 #%d 当前状态为 '%d',无法创建更新触发器", planID, plan.Status)
}
// 查找现有触发器
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
if err != nil {
return fmt.Errorf("查找现有触发器失败: %w", err)
}
// 如果触发器已存在,则根据计划类型更新其执行时间
if existingTrigger != nil {
var expectedExecuteAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
// 手动计划,如果再次触发,则立即执行
expectedExecuteAt = time.Now()
} else { // 自动计划
// 自动计划,根据 Cron 表达式计算下一次执行时间
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
return fmt.Errorf("解析 Cron 表达式失败: %w", err)
}
expectedExecuteAt = next
}
// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
return fmt.Errorf("更新触发器执行时间失败: %w", err)
}
} else {
m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
}
return nil // 触发器已存在且已处理更新,直接返回
}
// 如果触发器不存在,则创建新的触发器
m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(plan)
}
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *AnalysisPlanTaskManager) EnsureAnalysisTaskDefinition(planID uint) error {
m.mu.Lock()
defer m.mu.Unlock()
plan, err := m.planRepo.GetBasicPlanByID(planID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
}
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
}
if analysisTask == nil {
m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
} else {
m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
}
return nil
}
// --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。
func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
runnablePlans, err = m.planRepo.FindRunnablePlans()
if err != nil {
m.logger.Errorf("获取可执行计划列表失败: %v", err)
return
}
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
if err != nil {
m.logger.Errorf("获取失效计划列表失败: %v", err)
return
}
invalidPlanIDs = make([]uint, len(invalidPlans))
for i, p := range invalidPlans {
invalidPlanIDs[i] = p.ID
}
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
if err != nil {
m.logger.Errorf("获取所有待执行任务失败: %v", err)
return
}
return
}
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
if len(invalidPlanIDs) == 0 {
return nil // 没有需要清理的计划
}
invalidPlanIDSet := make(map[uint]struct{}, len(invalidPlanIDs))
for _, id := range invalidPlanIDs {
invalidPlanIDSet[id] = struct{}{}
}
var tasksToDeleteIDs []uint
var logsToCancelIDs []uint
for _, pt := range allPendingTasks {
if pt.Task == nil { // 防御性编程,确保 Task 被预加载
continue
}
if _, isInvalid := invalidPlanIDSet[pt.Task.PlanID]; isInvalid {
tasksToDeleteIDs = append(tasksToDeleteIDs, pt.ID)
logsToCancelIDs = append(logsToCancelIDs, pt.TaskExecutionLogID)
}
}
if len(tasksToDeleteIDs) == 0 {
return nil // 没有找到需要清理的任务
}
m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
// 批量删除待执行任务
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
return fmt.Errorf("批量删除待执行任务失败: %w", err)
}
// 批量更新相关执行日志状态为“已取消”
if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
// 这是一个非关键性错误,只记录日志
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
}
return nil
}
// addOrUpdateTriggers 检查、更新或创建触发器。
func (m *AnalysisPlanTaskManager) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
// 创建一个映射,存放所有已在队列中的计划触发器
pendingTriggersMap := make(map[uint]models.PendingTask)
for _, pt := range allPendingTasks {
if pt.Task != nil && pt.Task.Type == models.TaskPlanAnalysis {
pendingTriggersMap[pt.Task.PlanID] = pt
}
}
for _, plan := range runnablePlans {
existingTrigger, exists := pendingTriggersMap[plan.ID]
if exists {
// --- 新增逻辑:检查并更新现有触发器 ---
// 只对自动计划检查时间更新
if plan.ExecutionType == models.PlanExecutionTypeAutomatic {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败跳过更新: %v", plan.ID, err)
continue
}
// 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致则更新
if !existingTrigger.ExecuteAt.Equal(next) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
}
}
}
} else {
// --- 原有逻辑:为缺失的计划创建新触发器 ---
m.logger.Infof("发现应执行但队列中缺失的计划 #%d正在为其创建触发器...", plan.ID)
if err := m.createTriggerTask(plan); err != nil {
m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
// 继续处理下一个,不因单点失败而中断
}
}
}
return nil
}
// createTriggerTask 是创建触发器任务的内部核心逻辑。
func (m *AnalysisPlanTaskManager) createTriggerTask(plan *models.Plan) error {
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
if err != nil {
return fmt.Errorf("查找计划分析任务失败: %w", err)
}
// --- 如果触发器任务定义不存在,则自动创建 ---
if analysisTask == nil {
m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan)
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
analysisTask = newAnalysisTask
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
}
var executeAt time.Time
if plan.ExecutionType == models.PlanExecutionTypeManual {
executeAt = time.Now()
} else {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
return fmt.Errorf("解析 Cron 表达式 '%s' 失败: %w", plan.CronExpression, err)
}
executeAt = next
}
taskLog := &models.TaskExecutionLog{
TaskID: analysisTask.ID,
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
return fmt.Errorf("创建任务执行日志失败: %w", err)
}
pendingTask := &models.PendingTask{
TaskID: analysisTask.ID,
ExecuteAt: executeAt,
TaskExecutionLogID: taskLog.ID,
}
if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
return fmt.Errorf("创建待执行任务失败: %w", err)
}
m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
return nil return nil
} }

View File

@@ -1,7 +1,7 @@
package task package task
import ( import (
"context" "encoding/json"
"errors" "errors"
"sync" "sync"
"time" "time"
@@ -112,10 +112,9 @@ type Scheduler struct {
progressTracker *ProgressTracker progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
pool *ants.Pool // 使用 ants 协程池来管理并发 pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context stopChan chan struct{} // 用于停止主循环的信号通道
cancel context.CancelFunc
} }
// NewScheduler 创建一个新的调度器实例 // NewScheduler 创建一个新的调度器实例
@@ -128,8 +127,6 @@ func NewScheduler(
logger *logs.Logger, logger *logs.Logger,
interval time.Duration, interval time.Duration,
numWorkers int) *Scheduler { numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{ return &Scheduler{
pendingTaskRepo: pendingTaskRepo, pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo, executionLogRepo: executionLogRepo,
@@ -140,8 +137,7 @@ func NewScheduler(
workers: numWorkers, workers: numWorkers,
progressTracker: NewProgressTracker(), progressTracker: NewProgressTracker(),
taskFactory: taskFactory, taskFactory: taskFactory,
ctx: ctx, stopChan: make(chan struct{}), // 初始化停止信号通道
cancel: cancel,
} }
} }
@@ -164,9 +160,9 @@ func (s *Scheduler) Start() {
// Stop 优雅地停止调度器 // Stop 优雅地停止调度器
func (s *Scheduler) Stop() { func (s *Scheduler) Stop() {
s.logger.Warnf("正在停止任务调度器...") s.logger.Warnf("正在停止任务调度器...")
s.cancel() // 1. 发出取消信号,停止主循环 close(s.stopChan) // 1. 发出停止信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成 s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕) s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Warnf("任务调度器已安全停止") s.logger.Warnf("任务调度器已安全停止")
} }
@@ -178,9 +174,11 @@ func (s *Scheduler) run() {
for { for {
select { select {
case <-s.ctx.Done(): case <-s.stopChan:
// 收到停止信号,退出循环
return return
case <-ticker.C: case <-ticker.C:
// 定时触发任务认领和提交
go s.claimAndSubmit() go s.claimAndSubmit()
} }
} }
@@ -236,7 +234,7 @@ func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models
s.logger.Warnf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID) s.logger.Warnf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID)
} }
// processTask 处理单个任务的逻辑 (当前为占位符) // processTask 处理单个任务的逻辑
func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
@@ -256,10 +254,40 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
// 调用共享的 Manager 来处理触发器更新逻辑 // --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 ---
err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID) planID := claimedLog.Task.PlanID
// 获取计划的最新数据
plan, err := s.planRepo.GetBasicPlanByID(planID)
if err != nil { if err != nil {
s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err) s.logger.Errorf("获取计划 %d 的基本信息失败: %v", planID, err)
return
}
// 更新计划的执行计数器
plan.ExecuteCount++
// 如果是自动计划且达到执行次数上限,则更新计划状态为已停止
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
plan.Status = models.PlanStatusStopeed
s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d状态更新为 '执行完毕'。", planID, plan.ExecuteNum)
}
// 保存更新后的计划状态和执行计数
if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象
s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err)
return
}
// 更新计划执行日志状态为完成
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil {
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err)
// 这是一个非阻塞性错误,不中断后续流程
}
// 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器)
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
} }
} }
@@ -301,8 +329,18 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录 // 创建Plan执行记录
// 从任务的 Parameters 中解析出真实的 PlanID
var params struct {
PlanID uint `json:"plan_id"`
}
if err := json.Unmarshal(claimedLog.Task.Parameters, &params); err != nil {
s.logger.Errorf("解析任务参数中的计划ID失败日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
realPlanID := params.PlanID
planLog := &models.PlanExecutionLog{ planLog := &models.PlanExecutionLog{
PlanID: claimedLog.Task.PlanID, PlanID: realPlanID, // 使用从参数中解析出的真实 PlanID
Status: models.ExecutionStatusStarted, Status: models.ExecutionStatusStarted,
StartedAt: time.Now(), StartedAt: time.Now(),
} }
@@ -312,7 +350,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
} }
// 解析出Task列表 // 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID) tasks, err := s.planRepo.FlattenPlanTasks(realPlanID)
if err != nil { if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err return err
@@ -320,12 +358,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入执行历史 // 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks)) taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for _, task := range tasks { for i, task := range tasks {
taskLogs = append(taskLogs, &models.TaskExecutionLog{ taskLogs[i] = &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID, PlanExecutionLogID: planLog.ID,
TaskID: task.ID, TaskID: task.ID,
Status: models.ExecutionStatusWaiting, Status: models.ExecutionStatusWaiting,
}) }
} }
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs) err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
@@ -337,13 +375,13 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 写入待执行队列 // 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks)) pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks { for i, task := range tasks {
pendingTasks = append(pendingTasks, &models.PendingTask{ pendingTasks[i] = &models.PendingTask{
TaskID: task.ID, TaskID: task.ID,
TaskExecutionLogID: pendingTasks[i].ID, TaskExecutionLogID: taskLogs[i].ID, // 使用正确的 TaskExecutionLogID
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发 // 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second), ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
}) }
} }
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks) err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil { if err != nil {
@@ -352,7 +390,7 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
} }
// 将Task列表加入待执行队列中 // 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks)) s.progressTracker.AddNewPlan(planLog.ID, len(tasks))
return nil return nil
} }

View File

@@ -70,7 +70,7 @@ type Device struct {
gorm.Model gorm.Model
// Name 是设备的业务名称,应清晰可读,例如 "1号猪舍温度传感器" 或 "做料车间主控" // Name 是设备的业务名称,应清晰可读,例如 "1号猪舍温度传感器" 或 "做料车间主控"
Name string `gorm:"unique;not null" json:"name"` Name string `gorm:"not null" json:"name"`
// Type 是设备的高级类别,用于区分区域主控和普通设备。建立索引以优化按类型查询。 // Type 是设备的高级类别,用于区分区域主控和普通设备。建立索引以优化按类型查询。
Type DeviceType `gorm:"not null;index" json:"type"` Type DeviceType `gorm:"not null;index" json:"type"`

View File

@@ -16,11 +16,13 @@ type PendingTask struct {
// TaskID 使用 int 类型以容纳特殊的负数ID代表系统任务 // TaskID 使用 int 类型以容纳特殊的负数ID代表系统任务
TaskID int `gorm:"index"` TaskID int `gorm:"index"`
// Task 字段,用于在代码中访问关联的任务详情
// GORM 会根据 TaskID 字段自动填充此关联
Task *Task `gorm:"foreignKey:TaskID"`
ExecuteAt time.Time `gorm:"index"` // 任务执行时间 ExecuteAt time.Time `gorm:"index"` // 任务执行时间
TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID
// 关联关系定义
// 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录 // 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录
// ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理 // ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理
TaskExecutionLog TaskExecutionLog `gorm:"foreignKey:TaskExecutionLogID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"` TaskExecutionLog TaskExecutionLog `gorm:"foreignKey:TaskExecutionLogID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`

View File

@@ -8,11 +8,16 @@ import (
// ExecutionLogRepository 定义了与执行日志交互的接口。 // ExecutionLogRepository 定义了与执行日志交互的接口。
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。 // 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
type ExecutionLogRepository interface { type ExecutionLogRepository interface {
UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
UpdateLogStatus(logID uint, status models.ExecutionStatus) error
CreateTaskExecutionLog(log *models.TaskExecutionLog) error
CreatePlanExecutionLog(log *models.PlanExecutionLog) error CreatePlanExecutionLog(log *models.PlanExecutionLog) error
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error
UpdateTaskExecutionLog(log *models.TaskExecutionLog) error UpdateTaskExecutionLog(log *models.TaskExecutionLog) error
FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error)
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error
} }
// gormExecutionLogRepository 是使用 GORM 的具体实现。 // gormExecutionLogRepository 是使用 GORM 的具体实现。
@@ -26,6 +31,23 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
return &gormExecutionLogRepository{db: db} return &gormExecutionLogRepository{db: db}
} }
func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
if len(logIDs) == 0 {
return nil
}
return r.db.Model(&models.TaskExecutionLog{}).
Where("id IN ?", logIDs).
Update("status", status).Error
}
func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error {
return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
}
func (r *gormExecutionLogRepository) CreateTaskExecutionLog(log *models.TaskExecutionLog) error {
return r.db.Create(log).Error
}
// CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。 // CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。
func (r *gormExecutionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error { func (r *gormExecutionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error {
return r.db.Create(log).Error return r.db.Create(log).Error
@@ -41,6 +63,9 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLog(log *models.PlanExec
// CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。 // CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。
// 这是“预写日志”步骤的关键。 // 这是“预写日志”步骤的关键。
func (r *gormExecutionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error { func (r *gormExecutionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error {
if len(logs) == 0 {
return nil
}
// GORM 的 Create 传入一个切片指针会执行批量插入。 // GORM 的 Create 传入一个切片指针会执行批量插入。
return r.db.Create(&logs).Error return r.db.Create(&logs).Error
} }
@@ -63,3 +88,8 @@ func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models.
} }
return &log, nil return &log, nil
} }
// UpdatePlanExecutionLogStatus 更新计划执行日志的状态
func (r *gormExecutionLogRepository) UpdatePlanExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
return r.db.Model(&models.PlanExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
}

View File

@@ -1,6 +1,8 @@
package repository package repository
import ( import (
"errors"
"fmt"
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -10,7 +12,15 @@ import (
// PendingTaskRepository 定义了与待执行任务队列交互的接口。 // PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface { type PendingTaskRepository interface {
FindAllPendingTasks() ([]models.PendingTask, error)
FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error)
DeletePendingTasksByIDs(ids []uint) error
CreatePendingTask(task *models.PendingTask) error
CreatePendingTasksInBatch(tasks []*models.PendingTask) error CreatePendingTasksInBatch(tasks []*models.PendingTask) error
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error
// ClaimNextAvailableTask 原子地认领下一个可用的任务。 // ClaimNextAvailableTask 原子地认领下一个可用的任务。
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。 // 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
@@ -28,11 +38,50 @@ func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
return &gormPendingTaskRepository{db: db} return &gormPendingTaskRepository{db: db}
} }
func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) {
var tasks []models.PendingTask
// 预加载 Task 以便后续访问 Task.PlanID
err := r.db.Preload("Task").Find(&tasks).Error
return tasks, err
}
func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error) {
var pendingTask models.PendingTask
// 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id
err := r.db.
Joins("JOIN tasks ON tasks.id = pending_tasks.task_id").
Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).
First(&pendingTask).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误
}
return &pendingTask, err
}
func (r *gormPendingTaskRepository) DeletePendingTasksByIDs(ids []uint) error {
if len(ids) == 0 {
return nil
}
return r.db.Where("id IN ?", ids).Delete(&models.PendingTask{}).Error
}
func (r *gormPendingTaskRepository) CreatePendingTask(task *models.PendingTask) error {
return r.db.Create(task).Error
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。 // CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error { func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
if len(tasks) == 0 {
return nil
}
return r.db.Create(&tasks).Error return r.db.Create(&tasks).Error
} }
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error {
return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
}
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。 // ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
var log models.TaskExecutionLog var log models.TaskExecutionLog

View File

@@ -42,6 +42,15 @@ type PlanRepository interface {
DeleteTask(id int) error DeleteTask(id int) error
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task // FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
// FindRunnablePlans 获取所有应执行的计划
FindRunnablePlans() ([]*models.Plan, error)
// FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划
FindDisabledAndStoppedPlans() ([]*models.Plan, error)
// FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务
FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error)
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error)
} }
// gormPlanRepository 是 PlanRepository 的 GORM 实现 // gormPlanRepository 是 PlanRepository 的 GORM 实现
@@ -174,7 +183,9 @@ func (r *gormPlanRepository) CreatePlan(plan *models.Plan) error {
} }
// 3. 创建触发器Task // 3. 创建触发器Task
if err := r.createPlanAnalysisTask(tx, plan); err != nil { // 关键修改:调用 createPlanAnalysisTask 并处理其返回的 Task 对象
_, err := r.createPlanAnalysisTask(tx, plan)
if err != nil {
return err return err
} }
return nil return nil
@@ -517,63 +528,100 @@ func (r *gormPlanRepository) deleteTask(tx *gorm.DB, id int) error {
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task // FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) { func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) {
return r.findPlanAnalysisTaskByParamsPlanID(r.db, paramsPlanID) return r.findPlanAnalysisTask(r.db, paramsPlanID)
}
// findPlanAnalysisTaskByParamsPlanID 使用指定db根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) findPlanAnalysisTaskByParamsPlanID(tx *gorm.DB, paramsPlanID uint) (*models.Task, error) {
var task models.Task
// 构造JSON查询条件查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务
// TODO 在JSON字段中查找特定键值的语法取决于数据库类型这里使用PostgreSQL的语法
// TODO 如果使用的是MySQL则需要相应调整查询条件
result := tx.Where(
"type = ? AND parameters->>'plan_id' = ?",
models.TaskPlanAnalysis,
fmt.Sprintf("%d", paramsPlanID),
).First(&task)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("未找到Parameters.PlanID为%d的TaskPlanAnalysis类型任务", paramsPlanID)
}
return nil, fmt.Errorf("查找任务时出错: %w", result.Error)
}
return &task, nil
} }
// createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task // createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error { // 关键修改Task.PlanID 设置为 0实际 PlanID 存储在 Parameters 中,并返回创建的 Task
func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) (*models.Task, error) {
m := map[string]interface{}{ m := map[string]interface{}{
models.ParamsPlanID: plan.ID, models.ParamsPlanID: plan.ID,
} }
parameters, err := json.Marshal(m) parameters, err := json.Marshal(m)
if err != nil { if err != nil {
return err return nil, err
} }
task := &models.Task{ task := &models.Task{
PlanID: plan.ID, PlanID: 0, // 关键:设置为 0避免被常规 PlanID 查询查到
Name: fmt.Sprintf("'%v'计划触发器", plan.Name), Name: fmt.Sprintf("'%s'计划触发器", plan.Name),
Description: fmt.Sprintf("计划名: %v, 计划ID: %v", plan.Name, plan.ID), Description: fmt.Sprintf("计划名: %s, 计划ID: %d", plan.Name, plan.ID),
ExecutionOrder: 0, ExecutionOrder: 0, // 触发器任务的执行顺序通常为0或不关心
Type: models.TaskPlanAnalysis, Type: models.TaskPlanAnalysis,
Parameters: datatypes.JSON(parameters), Parameters: datatypes.JSON(parameters),
} }
return tx.Create(task).Error if err := tx.Create(task).Error; err != nil {
return nil, err
}
return task, nil
} }
// updatePlanAnalysisTask 使用简单粗暴的删除再创建方式实现更新, 以控制AnalysisPlanTask的定义全部在createPlanAnalysisTask方法中 // updatePlanAnalysisTask 使用更安全的方式更新触发器任务
func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error { func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error {
task, err := r.findPlanAnalysisTaskByParamsPlanID(tx, plan.ID) task, err := r.findPlanAnalysisTask(tx, plan.ID)
if err != nil { if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("查找现有计划分析任务失败: %w", err)
}
// 如果触发器任务不存在,则创建一个
if task == nil {
_, err := r.createPlanAnalysisTask(tx, plan)
return err return err
} }
err = r.deleteTask(tx, task.ID)
if err != nil { // 如果存在,则更新它的名称和描述以反映计划的最新信息
return err task.Name = fmt.Sprintf("'%s'计划触发器", plan.Name)
} task.Description = fmt.Sprintf("计划名: %s, 计划ID: %d", plan.Name, plan.ID)
return r.createPlanAnalysisTask(tx, plan)
return tx.Save(task).Error
}
func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) {
var plans []*models.Plan
err := r.db.
Where("status = ?", models.PlanStatusEnabled).
Where(
r.db.Where("execution_type = ?", models.PlanExecutionTypeManual).
Or("execution_type = ? AND (execute_num = 0 OR execute_count < execute_num)", models.PlanExecutionTypeAutomatic),
).
Find(&plans).Error
return plans, err
}
func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) {
var plans []*models.Plan
err := r.db.
Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed).
Find(&plans).Error
return plans, err
}
// findPlanAnalysisTask 是一个内部使用的、更高效的查找方法
// 关键修改:通过查询 parameters JSON 字段来查找
func (r *gormPlanRepository) findPlanAnalysisTask(tx *gorm.DB, planID uint) (*models.Task, error) {
var task models.Task
err := tx.Where("type = ? AND parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).First(&task).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误返回nil, nil
}
return &task, err
}
// FindPlanAnalysisTaskByPlanID 是暴露给外部的公共方法
// 关键修改:通过查询 parameters JSON 字段来查找
func (r *gormPlanRepository) FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error) {
return r.findPlanAnalysisTask(r.db, planID)
}
// CreatePlanAnalysisTask 创建一个 plan_analysis 类型的任务并返回它
// 这个方法是公开的,主要由 TaskManager 在发现触发器任务定义丢失时调用。
func (r *gormPlanRepository) CreatePlanAnalysisTask(plan *models.Plan) (*models.Task, error) {
var createdTask *models.Task
err := r.db.Transaction(func(tx *gorm.DB) error {
var err error
createdTask, err = r.createPlanAnalysisTask(tx, plan)
return err
})
return createdTask, err
} }