Merge pull request 'issue_10' (#12) from issue_10 into main
Reviewed-on: #12
This commit is contained in:
31
docs/docs.go
31
docs/docs.go
@@ -741,22 +741,26 @@ const docTemplate = `{
|
|||||||
"enum": [
|
"enum": [
|
||||||
0,
|
0,
|
||||||
1,
|
1,
|
||||||
2
|
2,
|
||||||
|
3
|
||||||
],
|
],
|
||||||
"x-enum-comments": {
|
"x-enum-comments": {
|
||||||
"PlanStatusDisabled": "禁用计划",
|
"PlanStatusDisabled": "禁用计划",
|
||||||
"PlanStatusEnabled": "启用计划",
|
"PlanStatusEnabled": "启用计划",
|
||||||
|
"PlanStatusFailed": "执行失败",
|
||||||
"PlanStatusStopeed": "执行完毕"
|
"PlanStatusStopeed": "执行完毕"
|
||||||
},
|
},
|
||||||
"x-enum-descriptions": [
|
"x-enum-descriptions": [
|
||||||
"启用计划",
|
|
||||||
"禁用计划",
|
"禁用计划",
|
||||||
"执行完毕"
|
"启用计划",
|
||||||
|
"执行完毕",
|
||||||
|
"执行失败"
|
||||||
],
|
],
|
||||||
"x-enum-varnames": [
|
"x-enum-varnames": [
|
||||||
"PlanStatusEnabled",
|
|
||||||
"PlanStatusDisabled",
|
"PlanStatusDisabled",
|
||||||
"PlanStatusStopeed"
|
"PlanStatusEnabled",
|
||||||
|
"PlanStatusStopeed",
|
||||||
|
"PlanStatusFailed"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"models.TaskType": {
|
"models.TaskType": {
|
||||||
@@ -781,19 +785,10 @@ const docTemplate = `{
|
|||||||
"plan.CreatePlanRequest": {
|
"plan.CreatePlanRequest": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"required": [
|
"required": [
|
||||||
"content_type",
|
|
||||||
"execution_type",
|
"execution_type",
|
||||||
"name"
|
"name"
|
||||||
],
|
],
|
||||||
"properties": {
|
"properties": {
|
||||||
"content_type": {
|
|
||||||
"allOf": [
|
|
||||||
{
|
|
||||||
"$ref": "#/definitions/models.PlanContentType"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"example": "tasks"
|
|
||||||
},
|
|
||||||
"cron_expression": {
|
"cron_expression": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"example": "0 0 6 * * *"
|
"example": "0 0 6 * * *"
|
||||||
@@ -1005,14 +1000,6 @@ const docTemplate = `{
|
|||||||
"plan.UpdatePlanRequest": {
|
"plan.UpdatePlanRequest": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"content_type": {
|
|
||||||
"allOf": [
|
|
||||||
{
|
|
||||||
"$ref": "#/definitions/models.PlanContentType"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"example": "tasks"
|
|
||||||
},
|
|
||||||
"cron_expression": {
|
"cron_expression": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"example": "0 0 6 * * *"
|
"example": "0 0 6 * * *"
|
||||||
|
|||||||
@@ -730,22 +730,26 @@
|
|||||||
"enum": [
|
"enum": [
|
||||||
0,
|
0,
|
||||||
1,
|
1,
|
||||||
2
|
2,
|
||||||
|
3
|
||||||
],
|
],
|
||||||
"x-enum-comments": {
|
"x-enum-comments": {
|
||||||
"PlanStatusDisabled": "禁用计划",
|
"PlanStatusDisabled": "禁用计划",
|
||||||
"PlanStatusEnabled": "启用计划",
|
"PlanStatusEnabled": "启用计划",
|
||||||
|
"PlanStatusFailed": "执行失败",
|
||||||
"PlanStatusStopeed": "执行完毕"
|
"PlanStatusStopeed": "执行完毕"
|
||||||
},
|
},
|
||||||
"x-enum-descriptions": [
|
"x-enum-descriptions": [
|
||||||
"启用计划",
|
|
||||||
"禁用计划",
|
"禁用计划",
|
||||||
"执行完毕"
|
"启用计划",
|
||||||
|
"执行完毕",
|
||||||
|
"执行失败"
|
||||||
],
|
],
|
||||||
"x-enum-varnames": [
|
"x-enum-varnames": [
|
||||||
"PlanStatusEnabled",
|
|
||||||
"PlanStatusDisabled",
|
"PlanStatusDisabled",
|
||||||
"PlanStatusStopeed"
|
"PlanStatusEnabled",
|
||||||
|
"PlanStatusStopeed",
|
||||||
|
"PlanStatusFailed"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"models.TaskType": {
|
"models.TaskType": {
|
||||||
@@ -770,19 +774,10 @@
|
|||||||
"plan.CreatePlanRequest": {
|
"plan.CreatePlanRequest": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"required": [
|
"required": [
|
||||||
"content_type",
|
|
||||||
"execution_type",
|
"execution_type",
|
||||||
"name"
|
"name"
|
||||||
],
|
],
|
||||||
"properties": {
|
"properties": {
|
||||||
"content_type": {
|
|
||||||
"allOf": [
|
|
||||||
{
|
|
||||||
"$ref": "#/definitions/models.PlanContentType"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"example": "tasks"
|
|
||||||
},
|
|
||||||
"cron_expression": {
|
"cron_expression": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"example": "0 0 6 * * *"
|
"example": "0 0 6 * * *"
|
||||||
@@ -994,14 +989,6 @@
|
|||||||
"plan.UpdatePlanRequest": {
|
"plan.UpdatePlanRequest": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"content_type": {
|
|
||||||
"allOf": [
|
|
||||||
{
|
|
||||||
"$ref": "#/definitions/models.PlanContentType"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"example": "tasks"
|
|
||||||
},
|
|
||||||
"cron_expression": {
|
"cron_expression": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"example": "0 0 6 * * *"
|
"example": "0 0 6 * * *"
|
||||||
|
|||||||
@@ -129,20 +129,24 @@ definitions:
|
|||||||
- 0
|
- 0
|
||||||
- 1
|
- 1
|
||||||
- 2
|
- 2
|
||||||
|
- 3
|
||||||
format: int32
|
format: int32
|
||||||
type: integer
|
type: integer
|
||||||
x-enum-comments:
|
x-enum-comments:
|
||||||
PlanStatusDisabled: 禁用计划
|
PlanStatusDisabled: 禁用计划
|
||||||
PlanStatusEnabled: 启用计划
|
PlanStatusEnabled: 启用计划
|
||||||
|
PlanStatusFailed: 执行失败
|
||||||
PlanStatusStopeed: 执行完毕
|
PlanStatusStopeed: 执行完毕
|
||||||
x-enum-descriptions:
|
x-enum-descriptions:
|
||||||
- 启用计划
|
|
||||||
- 禁用计划
|
- 禁用计划
|
||||||
|
- 启用计划
|
||||||
- 执行完毕
|
- 执行完毕
|
||||||
|
- 执行失败
|
||||||
x-enum-varnames:
|
x-enum-varnames:
|
||||||
- PlanStatusEnabled
|
|
||||||
- PlanStatusDisabled
|
- PlanStatusDisabled
|
||||||
|
- PlanStatusEnabled
|
||||||
- PlanStatusStopeed
|
- PlanStatusStopeed
|
||||||
|
- PlanStatusFailed
|
||||||
models.TaskType:
|
models.TaskType:
|
||||||
enum:
|
enum:
|
||||||
- plan_analysis
|
- plan_analysis
|
||||||
@@ -159,10 +163,6 @@ definitions:
|
|||||||
- TaskTypeWaiting
|
- TaskTypeWaiting
|
||||||
plan.CreatePlanRequest:
|
plan.CreatePlanRequest:
|
||||||
properties:
|
properties:
|
||||||
content_type:
|
|
||||||
allOf:
|
|
||||||
- $ref: '#/definitions/models.PlanContentType'
|
|
||||||
example: tasks
|
|
||||||
cron_expression:
|
cron_expression:
|
||||||
example: 0 0 6 * * *
|
example: 0 0 6 * * *
|
||||||
type: string
|
type: string
|
||||||
@@ -188,7 +188,6 @@ definitions:
|
|||||||
$ref: '#/definitions/plan.TaskRequest'
|
$ref: '#/definitions/plan.TaskRequest'
|
||||||
type: array
|
type: array
|
||||||
required:
|
required:
|
||||||
- content_type
|
|
||||||
- execution_type
|
- execution_type
|
||||||
- name
|
- name
|
||||||
type: object
|
type: object
|
||||||
@@ -306,10 +305,6 @@ definitions:
|
|||||||
type: object
|
type: object
|
||||||
plan.UpdatePlanRequest:
|
plan.UpdatePlanRequest:
|
||||||
properties:
|
properties:
|
||||||
content_type:
|
|
||||||
allOf:
|
|
||||||
- $ref: '#/definitions/models.PlanContentType'
|
|
||||||
example: tasks
|
|
||||||
cron_expression:
|
cron_expression:
|
||||||
example: 0 0 6 * * *
|
example: 0 0 6 * * *
|
||||||
type: string
|
type: string
|
||||||
|
|||||||
@@ -64,13 +64,14 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
|
|||||||
ExecutionType: req.ExecutionType,
|
ExecutionType: req.ExecutionType,
|
||||||
ExecuteNum: req.ExecuteNum,
|
ExecuteNum: req.ExecuteNum,
|
||||||
CronExpression: req.CronExpression,
|
CronExpression: req.CronExpression,
|
||||||
ContentType: req.ContentType,
|
// ContentType 在控制器中设置,此处不再处理
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理子计划 (通过ID引用)
|
// 处理子计划 (通过ID引用)
|
||||||
if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil {
|
if req.SubPlanIDs != nil {
|
||||||
plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs))
|
subPlanSlice := req.SubPlanIDs
|
||||||
for i, childPlanID := range req.SubPlanIDs {
|
plan.SubPlans = make([]models.SubPlan, len(subPlanSlice))
|
||||||
|
for i, childPlanID := range subPlanSlice {
|
||||||
plan.SubPlans[i] = models.SubPlan{
|
plan.SubPlans[i] = models.SubPlan{
|
||||||
ChildPlanID: childPlanID,
|
ChildPlanID: childPlanID,
|
||||||
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
|
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
|
||||||
@@ -79,9 +80,10 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理任务
|
// 处理任务
|
||||||
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
|
if req.Tasks != nil {
|
||||||
plan.Tasks = make([]models.Task, len(req.Tasks))
|
taskSlice := req.Tasks
|
||||||
for i, taskReq := range req.Tasks {
|
plan.Tasks = make([]models.Task, len(taskSlice))
|
||||||
|
for i, taskReq := range taskSlice {
|
||||||
task, err := TaskFromRequest(&taskReq)
|
task, err := TaskFromRequest(&taskReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -114,13 +116,14 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
|
|||||||
ExecutionType: req.ExecutionType,
|
ExecutionType: req.ExecutionType,
|
||||||
ExecuteNum: req.ExecuteNum,
|
ExecuteNum: req.ExecuteNum,
|
||||||
CronExpression: req.CronExpression,
|
CronExpression: req.CronExpression,
|
||||||
ContentType: req.ContentType,
|
// ContentType 在控制器中设置,此处不再处理
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理子计划 (通过ID引用)
|
// 处理子计划 (通过ID引用)
|
||||||
if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil {
|
if req.SubPlanIDs != nil {
|
||||||
plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs))
|
subPlanSlice := req.SubPlanIDs
|
||||||
for i, childPlanID := range req.SubPlanIDs {
|
plan.SubPlans = make([]models.SubPlan, len(subPlanSlice))
|
||||||
|
for i, childPlanID := range subPlanSlice {
|
||||||
plan.SubPlans[i] = models.SubPlan{
|
plan.SubPlans[i] = models.SubPlan{
|
||||||
ChildPlanID: childPlanID,
|
ChildPlanID: childPlanID,
|
||||||
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
|
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
|
||||||
@@ -129,9 +132,10 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理任务
|
// 处理任务
|
||||||
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
|
if req.Tasks != nil {
|
||||||
plan.Tasks = make([]models.Task, len(req.Tasks))
|
taskSlice := req.Tasks
|
||||||
for i, taskReq := range req.Tasks {
|
plan.Tasks = make([]models.Task, len(taskSlice))
|
||||||
|
for i, taskReq := range taskSlice {
|
||||||
task, err := TaskFromRequest(&taskReq)
|
task, err := TaskFromRequest(&taskReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ type CreatePlanRequest struct {
|
|||||||
ExecutionType models.PlanExecutionType `json:"execution_type" binding:"required" example:"automatic"`
|
ExecutionType models.PlanExecutionType `json:"execution_type" binding:"required" example:"automatic"`
|
||||||
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
|
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
|
||||||
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
|
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
|
||||||
ContentType models.PlanContentType `json:"content_type" binding:"required" example:"tasks"`
|
|
||||||
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
|
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
|
||||||
Tasks []TaskRequest `json:"tasks,omitempty"`
|
Tasks []TaskRequest `json:"tasks,omitempty"`
|
||||||
}
|
}
|
||||||
@@ -55,7 +54,6 @@ type UpdatePlanRequest struct {
|
|||||||
ExecutionType models.PlanExecutionType `json:"execution_type" example:"automatic"`
|
ExecutionType models.PlanExecutionType `json:"execution_type" example:"automatic"`
|
||||||
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
|
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
|
||||||
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
|
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
|
||||||
ContentType models.PlanContentType `json:"content_type" example:"tasks"`
|
|
||||||
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
|
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
|
||||||
Tasks []TaskRequest `json:"tasks,omitempty"`
|
Tasks []TaskRequest `json:"tasks,omitempty"`
|
||||||
}
|
}
|
||||||
@@ -132,6 +130,14 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- 自动判断 ContentType ---
|
||||||
|
if len(req.SubPlanIDs) > 0 {
|
||||||
|
planToCreate.ContentType = models.PlanContentTypeSubPlans
|
||||||
|
} else {
|
||||||
|
// 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供)
|
||||||
|
planToCreate.ContentType = models.PlanContentTypeTasks
|
||||||
|
}
|
||||||
|
|
||||||
// 调用仓库方法创建计划
|
// 调用仓库方法创建计划
|
||||||
if err := c.planRepo.CreatePlan(planToCreate); err != nil {
|
if err := c.planRepo.CreatePlan(planToCreate); err != nil {
|
||||||
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "创建计划失败: "+err.Error())
|
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "创建计划失败: "+err.Error())
|
||||||
@@ -269,6 +275,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
|
|||||||
}
|
}
|
||||||
planToUpdate.ID = uint(id) // 确保ID被设置
|
planToUpdate.ID = uint(id) // 确保ID被设置
|
||||||
|
|
||||||
|
// --- 自动判断 ContentType ---
|
||||||
|
if len(req.SubPlanIDs) > 0 {
|
||||||
|
planToUpdate.ContentType = models.PlanContentTypeSubPlans
|
||||||
|
} else {
|
||||||
|
// 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供)
|
||||||
|
planToUpdate.ContentType = models.PlanContentTypeTasks
|
||||||
|
}
|
||||||
|
|
||||||
// 4. 检查计划是否存在
|
// 4. 检查计划是否存在
|
||||||
_, err = c.planRepo.GetBasicPlanByID(uint(id))
|
_, err = c.planRepo.GetBasicPlanByID(uint(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
|
invalidPlans, err := m.planRepo.FindInactivePlans()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Errorf("获取失效计划列表失败: %v", err)
|
m.logger.Errorf("获取失效计划列表失败: %v", err)
|
||||||
return
|
return
|
||||||
@@ -215,7 +215,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 批量更新相关执行日志状态为“已取消”
|
// 批量更新相关执行日志状态为“已取消”
|
||||||
if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
|
if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
|
||||||
// 这是一个非关键性错误,只记录日志
|
// 这是一个非关键性错误,只记录日志
|
||||||
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
|
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,29 +0,0 @@
|
|||||||
package task
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PlanAnalysisTask 用于在任务执行队列中触发一个plan的执行
|
|
||||||
// 该任务会解析plan生成扁平化的待执行任务表, 并将任务列表插入任务执行队列
|
|
||||||
// 该任务会预写入plan所有待执行任务的执行日志
|
|
||||||
// 每个plan执行完毕时 或 创建plan时 都应该重新创建一个 PlanAnalysisTask 以便触发下次plan执行
|
|
||||||
// 更新plan后应当更新对应 PlanAnalysisTask
|
|
||||||
type PlanAnalysisTask struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PlanAnalysisTask) Execute() error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PlanAnalysisTask) ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PlanAnalysisTask) OnFailure(executeErr error) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
@@ -13,49 +13,22 @@ import (
|
|||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁
|
// ProgressTracker 仅用于在内存中提供计划执行的并发锁
|
||||||
type ProgressTracker struct {
|
type ProgressTracker struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
cond *sync.Cond // 用于实现阻塞锁
|
cond *sync.Cond // 用于实现阻塞锁
|
||||||
totalTasks map[uint]int // key: planExecutionLogID, value: total tasks
|
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
|
||||||
completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks
|
|
||||||
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProgressTracker 创建一个新的进度跟踪器
|
// NewProgressTracker 创建一个新的进度跟踪器
|
||||||
func NewProgressTracker() *ProgressTracker {
|
func NewProgressTracker() *ProgressTracker {
|
||||||
t := &ProgressTracker{
|
t := &ProgressTracker{
|
||||||
totalTasks: make(map[uint]int),
|
runningPlans: make(map[uint]bool),
|
||||||
completedTasks: make(map[uint]int),
|
|
||||||
runningPlans: make(map[uint]bool),
|
|
||||||
}
|
}
|
||||||
t.cond = sync.NewCond(&t.mu)
|
t.cond = sync.NewCond(&t.mu)
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddNewPlan 添加一个新的计划,并初始化进度跟踪器
|
|
||||||
func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) {
|
|
||||||
t.mu.Lock()
|
|
||||||
t.totalTasks[planLogID] = totalTasks
|
|
||||||
t.completedTasks[planLogID] = 0
|
|
||||||
t.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompletedTask 通知计数器一个任务被完成
|
|
||||||
func (t *ProgressTracker) CompletedTask(planLogID uint) {
|
|
||||||
t.mu.Lock()
|
|
||||||
t.completedTasks[planLogID]++
|
|
||||||
t.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsPlanOver 检查计划是否完成
|
|
||||||
func (t *ProgressTracker) IsPlanOver(planLogID uint) bool {
|
|
||||||
t.mu.Lock()
|
|
||||||
defer t.mu.Unlock()
|
|
||||||
|
|
||||||
return t.completedTasks[planLogID] >= t.totalTasks[planLogID]
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
|
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
|
||||||
func (t *ProgressTracker) TryLock(planLogID uint) bool {
|
func (t *ProgressTracker) TryLock(planLogID uint) bool {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
@@ -248,62 +221,33 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
claimedLog.Status = models.ExecutionStatusFailed
|
claimedLog.Status = models.ExecutionStatusFailed
|
||||||
claimedLog.Output = err.Error()
|
claimedLog.Output = err.Error()
|
||||||
|
|
||||||
|
// 任务失败时,调用统一的终止服务
|
||||||
|
s.handlePlanTermination(claimedLog.PlanExecutionLogID, "子任务执行失败: "+err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
|
|
||||||
|
|
||||||
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
|
// 如果是计划分析任务,它的职责是解析和分发任务,到此即完成,不参与后续的计划完成度检查。
|
||||||
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
|
if claimedLog.Task.Type == models.TaskPlanAnalysis {
|
||||||
// --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 ---
|
s.logger.Warnf("完成计划分析任务, 日志ID: %d", claimedLog.ID)
|
||||||
var planID uint
|
return
|
||||||
// 根据任务类型获取正确的 PlanID
|
|
||||||
if claimedLog.Task.Type == models.TaskPlanAnalysis {
|
|
||||||
var params struct {
|
|
||||||
PlanID uint `json:"plan_id"`
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(claimedLog.Task.Parameters, ¶ms); err != nil {
|
|
||||||
s.logger.Errorf("解析任务参数中的计划ID失败,日志ID: %d, 错误: %v", claimedLog.ID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
planID = params.PlanID
|
|
||||||
} else {
|
|
||||||
planID = claimedLog.Task.PlanID
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取计划的最新数据
|
|
||||||
plan, err := s.planRepo.GetPlanByID(planID) // Changed to GetPlanByID to include sub-plans
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) // Updated error message
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新计划的执行计数器
|
|
||||||
plan.ExecuteCount++
|
|
||||||
|
|
||||||
// 如果是自动计划且达到执行次数上限,则更新计划状态为已停止
|
|
||||||
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
|
|
||||||
plan.Status = models.PlanStatusStopeed
|
|
||||||
s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d,状态更新为 '执行完毕'。", planID, plan.ExecuteNum)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 保存更新后的计划状态和执行计数
|
|
||||||
if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象
|
|
||||||
s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新计划执行日志状态为完成
|
|
||||||
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil {
|
|
||||||
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err)
|
|
||||||
// 这是一个非阻塞性错误,不中断后续流程
|
|
||||||
}
|
|
||||||
|
|
||||||
// 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器)
|
|
||||||
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
|
|
||||||
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- 以下是常规任务的完成逻辑 ---
|
||||||
|
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
|
||||||
|
|
||||||
|
// 检查是否是最后一个任务
|
||||||
|
incompleteCount, err := s.executionLogRepo.CountIncompleteTasksByPlanLogID(claimedLog.PlanExecutionLogID)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Errorf("检查计划 %d 的未完成任务数时出错: %v", claimedLog.PlanExecutionLogID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果此计划执行中,未完成的任务只剩下当前这一个(因为当前任务的状态此时在数据库中仍为 'started'),
|
||||||
|
// 则认为整个计划已完成。
|
||||||
|
if incompleteCount == 1 {
|
||||||
|
s.handlePlanCompletion(claimedLog.PlanExecutionLogID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// runTask 用于执行具体任务
|
// runTask 用于执行具体任务
|
||||||
@@ -402,8 +346,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将Task列表加入待执行队列中
|
// --- 处理空计划的边缘情况 ---
|
||||||
s.progressTracker.AddNewPlan(planLog.ID, len(tasks))
|
// 如果一个计划被解析后,发现其任务列表为空,
|
||||||
|
// 那么它实际上已经“执行”完毕了,我们需要在这里手动为它创建下一次的触发器。
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
s.handlePlanCompletion(planLog.ID)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -419,3 +367,82 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handlePlanTermination 集中处理计划的终止逻辑(失败或取消)
|
||||||
|
func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) {
|
||||||
|
// 1. 从待执行队列中删除所有相关的子任务
|
||||||
|
if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil {
|
||||||
|
s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 将父计划的执行日志标记为失败
|
||||||
|
if err := s.executionLogRepo.FailPlanExecution(planLogID, reason); err != nil {
|
||||||
|
s.logger.Errorf("标记计划执行日志 %d 为失败时出错: %v", planLogID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 将所有未完成的子任务日志标记为已取消
|
||||||
|
if err := s.executionLogRepo.CancelIncompleteTasksByPlanLogID(planLogID, "父计划失败或被取消"); err != nil {
|
||||||
|
s.logger.Errorf("取消计划 %d 的后续任务日志时出错: %v", planLogID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 将计划本身的状态更新为失败
|
||||||
|
planLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Errorf("无法找到计划执行日志 %d 以更新父计划状态: %v", planLogID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.planRepo.UpdatePlanStatus(planLog.PlanID, models.PlanStatusFailed); err != nil {
|
||||||
|
s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePlanCompletion 集中处理计划成功完成后的所有逻辑
|
||||||
|
func (s *Scheduler) handlePlanCompletion(planLogID uint) {
|
||||||
|
s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", planLogID)
|
||||||
|
|
||||||
|
// 1. 通过 PlanExecutionLog 反查正确的顶层 PlanID
|
||||||
|
planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Errorf("获取计划执行日志 %d 失败: %v", planLogID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
topLevelPlanID := planExecutionLog.PlanID // 这才是正确的顶层计划ID
|
||||||
|
|
||||||
|
// 2. 获取计划的最新数据,这里我们只需要基本信息来判断执行类型和次数
|
||||||
|
plan, err := s.planRepo.GetBasicPlanByID(topLevelPlanID)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Errorf("获取计划 %d 的基本信息失败: %v", topLevelPlanID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 在内存中计算新的计数值和状态
|
||||||
|
newExecuteCount := plan.ExecuteCount + 1
|
||||||
|
newStatus := plan.Status // 默认为当前状态
|
||||||
|
|
||||||
|
// 如果是自动计划且达到执行次数上限,或计划是手动类型,则更新计划状态为已停止
|
||||||
|
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && newExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
|
||||||
|
newStatus = models.PlanStatusStopeed
|
||||||
|
s.logger.Infof("计划 %d 已完成执行,状态更新为 '执行完毕'。", topLevelPlanID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 使用专门的方法来原子性地更新计数值和状态
|
||||||
|
if err := s.planRepo.UpdatePlanStateAfterExecution(topLevelPlanID, newExecuteCount, newStatus); err != nil {
|
||||||
|
s.logger.Errorf("更新计划 %d 的执行后状态失败: %v", topLevelPlanID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 更新计划执行日志状态为完成
|
||||||
|
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(planLogID, models.ExecutionStatusCompleted); err != nil {
|
||||||
|
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", planLogID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. 调用共享的 Manager 来处理触发器更新逻辑
|
||||||
|
// 只有当计划在本次执行后仍然是 Enabled 状态时,才需要创建下一次的触发器。
|
||||||
|
if newStatus == models.PlanStatusEnabled {
|
||||||
|
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(topLevelPlanID); err != nil {
|
||||||
|
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", topLevelPlanID, err)
|
||||||
|
} else {
|
||||||
|
s.logger.Infof("计划 %d 状态为 '%d',无需创建下一次触发器。", topLevelPlanID, newStatus)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -191,44 +191,17 @@ func (app *Application) initializePendingTasks(
|
|||||||
|
|
||||||
// 阶段二:清理所有待执行任务和相关日志
|
// 阶段二:清理所有待执行任务和相关日志
|
||||||
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
|
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
|
||||||
pendingTasks, err := pendingTaskRepo.FindAllPendingTasks()
|
|
||||||
if err != nil {
|
// 直接调用新的方法来更新计划执行日志状态为失败
|
||||||
return fmt.Errorf("获取待执行任务失败: %w", err)
|
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
|
||||||
|
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
|
||||||
|
// 这是一个非阻塞性错误,继续执行
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskLogIDsToCancel []uint
|
// 直接调用新的方法来更新任务执行日志状态为取消
|
||||||
var planLogIDsToFail []uint
|
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
|
||||||
|
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
|
||||||
for _, pt := range pendingTasks {
|
// 这是一个非阻塞性错误,继续执行
|
||||||
// 确保 Task 和 TaskExecutionLog 已预加载
|
|
||||||
if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载
|
|
||||||
logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog,跳过处理。", pt.ID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 收集任务执行日志ID,所有未完成的任务都标记为取消
|
|
||||||
taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID)
|
|
||||||
|
|
||||||
// 收集计划执行日志ID
|
|
||||||
if pt.TaskExecutionLog.PlanExecutionLogID != 0 {
|
|
||||||
planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 批量更新 TaskExecutionLog 状态为取消
|
|
||||||
if len(taskLogIDsToCancel) > 0 {
|
|
||||||
if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil {
|
|
||||||
logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err)
|
|
||||||
// 这是一个非阻塞性错误,继续执行
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 批量更新 PlanExecutionLog 状态为失败
|
|
||||||
if len(planLogIDsToFail) > 0 {
|
|
||||||
if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil {
|
|
||||||
logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err)
|
|
||||||
// 这是一个非阻塞性错误,继续执行
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 清空待执行列表
|
// 清空待执行列表
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ const (
|
|||||||
PlanStatusDisabled PlanStatus = 0 // 禁用计划
|
PlanStatusDisabled PlanStatus = 0 // 禁用计划
|
||||||
PlanStatusEnabled PlanStatus = 1 // 启用计划
|
PlanStatusEnabled PlanStatus = 1 // 启用计划
|
||||||
PlanStatusStopeed PlanStatus = 2 // 执行完毕
|
PlanStatusStopeed PlanStatus = 2 // 执行完毕
|
||||||
|
PlanStatusFailed PlanStatus = 3 // 执行失败
|
||||||
)
|
)
|
||||||
|
|
||||||
// Plan 代表系统中的一个计划,可以包含子计划或任务
|
// Plan 代表系统中的一个计划,可以包含子计划或任务
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
@@ -10,8 +11,8 @@ import (
|
|||||||
// ExecutionLogRepository 定义了与执行日志交互的接口。
|
// ExecutionLogRepository 定义了与执行日志交互的接口。
|
||||||
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
|
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
|
||||||
type ExecutionLogRepository interface {
|
type ExecutionLogRepository interface {
|
||||||
UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
|
UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
|
||||||
UpdateLogStatus(logID uint, status models.ExecutionStatus) error
|
UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error
|
||||||
CreateTaskExecutionLog(log *models.TaskExecutionLog) error
|
CreateTaskExecutionLog(log *models.TaskExecutionLog) error
|
||||||
CreatePlanExecutionLog(log *models.PlanExecutionLog) error
|
CreatePlanExecutionLog(log *models.PlanExecutionLog) error
|
||||||
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
|
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
|
||||||
@@ -30,6 +31,23 @@ type ExecutionLogRepository interface {
|
|||||||
FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error)
|
FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error)
|
||||||
// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志
|
// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志
|
||||||
FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error)
|
FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error)
|
||||||
|
|
||||||
|
// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed
|
||||||
|
FailAllIncompletePlanExecutionLogs() error
|
||||||
|
// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled
|
||||||
|
CancelAllIncompleteTaskExecutionLogs() error
|
||||||
|
|
||||||
|
// FindPlanExecutionLogByID 根据ID查找计划执行日志
|
||||||
|
FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error)
|
||||||
|
|
||||||
|
// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量
|
||||||
|
CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error)
|
||||||
|
|
||||||
|
// FailPlanExecution 将指定的计划执行标记为失败
|
||||||
|
FailPlanExecution(planLogID uint, errorMessage string) error
|
||||||
|
|
||||||
|
// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务
|
||||||
|
CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormExecutionLogRepository 是使用 GORM 的具体实现。
|
// gormExecutionLogRepository 是使用 GORM 的具体实现。
|
||||||
@@ -43,7 +61,7 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
|
|||||||
return &gormExecutionLogRepository{db: db}
|
return &gormExecutionLogRepository{db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
|
func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
|
||||||
if len(logIDs) == 0 {
|
if len(logIDs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -52,7 +70,7 @@ func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status
|
|||||||
Update("status", status).Error
|
Update("status", status).Error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error {
|
func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
|
||||||
return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
|
return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +135,7 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs [
|
|||||||
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
|
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
|
||||||
func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) {
|
func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) {
|
||||||
var logs []models.PlanExecutionLog
|
var logs []models.PlanExecutionLog
|
||||||
err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error
|
err := r.db.Where("status = ? OR status = ?", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).Find(&logs).Error
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,3 +161,60 @@ func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID(
|
|||||||
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error
|
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error
|
||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed
|
||||||
|
func (r *gormExecutionLogRepository) FailAllIncompletePlanExecutionLogs() error {
|
||||||
|
return r.db.Model(&models.PlanExecutionLog{}).
|
||||||
|
Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).
|
||||||
|
Updates(map[string]interface{}{"status": models.ExecutionStatusFailed, "ended_at": time.Now(), "error": "系统中断"}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled
|
||||||
|
func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() error {
|
||||||
|
return r.db.Model(&models.TaskExecutionLog{}).
|
||||||
|
Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).
|
||||||
|
Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindPlanExecutionLogByID 根据ID查找计划执行日志
|
||||||
|
func (r *gormExecutionLogRepository) FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) {
|
||||||
|
var log models.PlanExecutionLog
|
||||||
|
err := r.db.First(&log, id).Error
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &log, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量
|
||||||
|
func (r *gormExecutionLogRepository) CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) {
|
||||||
|
var count int64
|
||||||
|
err := r.db.Model(&models.TaskExecutionLog{}).
|
||||||
|
Where("plan_execution_log_id = ? AND status IN (?, ?)",
|
||||||
|
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).
|
||||||
|
Count(&count).Error
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// FailPlanExecution 将指定的计划执行标记为失败
|
||||||
|
func (r *gormExecutionLogRepository) FailPlanExecution(planLogID uint, errorMessage string) error {
|
||||||
|
return r.db.Model(&models.PlanExecutionLog{}).
|
||||||
|
Where("id = ?", planLogID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"status": models.ExecutionStatusFailed,
|
||||||
|
"error": errorMessage,
|
||||||
|
"ended_at": time.Now(),
|
||||||
|
}).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务
|
||||||
|
func (r *gormExecutionLogRepository) CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error {
|
||||||
|
return r.db.Model(&models.TaskExecutionLog{}).
|
||||||
|
Where("plan_execution_log_id = ? AND status IN (?, ?)",
|
||||||
|
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"status": models.ExecutionStatusCancelled,
|
||||||
|
"output": reason,
|
||||||
|
"ended_at": time.Now(),
|
||||||
|
}).Error
|
||||||
|
}
|
||||||
|
|||||||
@@ -31,6 +31,9 @@ type PendingTaskRepository interface {
|
|||||||
RequeueTask(originalPendingTask *models.PendingTask) error
|
RequeueTask(originalPendingTask *models.PendingTask) error
|
||||||
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
|
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
|
||||||
FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error)
|
FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error)
|
||||||
|
|
||||||
|
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
|
||||||
|
DeletePendingTasksByPlanLogID(planLogID uint) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormPendingTaskRepository 是使用 GORM 的具体实现。
|
// gormPendingTaskRepository 是使用 GORM 的具体实现。
|
||||||
@@ -164,3 +167,12 @@ func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []ui
|
|||||||
err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
|
err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
|
||||||
return pendingTasks, err
|
return pendingTasks, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
|
||||||
|
func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(planLogID uint) error {
|
||||||
|
// 使用子查询找到所有与 planLogID 相关的 task_execution_log_id
|
||||||
|
subQuery := r.db.Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID)
|
||||||
|
|
||||||
|
// 使用子查询的结果来删除待执行任务
|
||||||
|
return r.db.Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error
|
||||||
|
}
|
||||||
|
|||||||
@@ -46,8 +46,8 @@ type PlanRepository interface {
|
|||||||
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
|
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
|
||||||
// FindRunnablePlans 获取所有应执行的计划
|
// FindRunnablePlans 获取所有应执行的计划
|
||||||
FindRunnablePlans() ([]*models.Plan, error)
|
FindRunnablePlans() ([]*models.Plan, error)
|
||||||
// FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划
|
// FindInactivePlans 获取所有已禁用或已停止的计划
|
||||||
FindDisabledAndStoppedPlans() ([]*models.Plan, error)
|
FindInactivePlans() ([]*models.Plan, error)
|
||||||
// FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务
|
// FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务
|
||||||
FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error)
|
FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error)
|
||||||
|
|
||||||
@@ -62,6 +62,9 @@ type PlanRepository interface {
|
|||||||
|
|
||||||
// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志
|
// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志
|
||||||
StopPlanTransactionally(planID uint) error
|
StopPlanTransactionally(planID uint) error
|
||||||
|
|
||||||
|
// UpdatePlanStateAfterExecution 更新计划执行后的状态(计数和状态)
|
||||||
|
UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// gormPlanRepository 是 PlanRepository 的 GORM 实现
|
// gormPlanRepository 是 PlanRepository 的 GORM 实现
|
||||||
@@ -600,10 +603,10 @@ func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) {
|
|||||||
return plans, err
|
return plans, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) {
|
func (r *gormPlanRepository) FindInactivePlans() ([]*models.Plan, error) {
|
||||||
var plans []*models.Plan
|
var plans []*models.Plan
|
||||||
err := r.db.
|
err := r.db.
|
||||||
Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed).
|
Where("status != ?", models.PlanStatusEnabled).
|
||||||
Find(&plans).Error
|
Find(&plans).Error
|
||||||
return plans, err
|
return plans, err
|
||||||
}
|
}
|
||||||
@@ -694,7 +697,7 @@ func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 3.1 批量更新任务执行日志状态为“已取消”
|
// 3.1 批量更新任务执行日志状态为“已取消”
|
||||||
if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil {
|
if err := executionLogRepoTx.UpdateTaskExecutionLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil {
|
||||||
return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err)
|
return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -735,3 +738,10 @@ func (r *gormPlanRepository) UpdatePlanStatus(id uint, status models.PlanStatus)
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *gormPlanRepository) UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error {
|
||||||
|
return r.db.Model(&models.Plan{}).Where("id = ?", planID).Updates(map[string]interface{}{
|
||||||
|
"execute_count": newCount,
|
||||||
|
"status": newStatus,
|
||||||
|
}).Error
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user