issue_10 #12

Merged
huang merged 11 commits from issue_10 into main 2025-09-23 21:35:54 +08:00
13 changed files with 289 additions and 233 deletions

View File

@@ -741,22 +741,26 @@ const docTemplate = `{
"enum": [
0,
1,
2
2,
3
],
"x-enum-comments": {
"PlanStatusDisabled": "禁用计划",
"PlanStatusEnabled": "启用计划",
"PlanStatusFailed": "执行失败",
"PlanStatusStopeed": "执行完毕"
},
"x-enum-descriptions": [
"启用计划",
"禁用计划",
"执行完毕"
"启用计划",
"执行完毕",
"执行失败"
],
"x-enum-varnames": [
"PlanStatusEnabled",
"PlanStatusDisabled",
"PlanStatusStopeed"
"PlanStatusEnabled",
"PlanStatusStopeed",
"PlanStatusFailed"
]
},
"models.TaskType": {
@@ -781,19 +785,10 @@ const docTemplate = `{
"plan.CreatePlanRequest": {
"type": "object",
"required": [
"content_type",
"execution_type",
"name"
],
"properties": {
"content_type": {
"allOf": [
{
"$ref": "#/definitions/models.PlanContentType"
}
],
"example": "tasks"
},
"cron_expression": {
"type": "string",
"example": "0 0 6 * * *"
@@ -1005,14 +1000,6 @@ const docTemplate = `{
"plan.UpdatePlanRequest": {
"type": "object",
"properties": {
"content_type": {
"allOf": [
{
"$ref": "#/definitions/models.PlanContentType"
}
],
"example": "tasks"
},
"cron_expression": {
"type": "string",
"example": "0 0 6 * * *"

View File

@@ -730,22 +730,26 @@
"enum": [
0,
1,
2
2,
3
],
"x-enum-comments": {
"PlanStatusDisabled": "禁用计划",
"PlanStatusEnabled": "启用计划",
"PlanStatusFailed": "执行失败",
"PlanStatusStopeed": "执行完毕"
},
"x-enum-descriptions": [
"启用计划",
"禁用计划",
"执行完毕"
"启用计划",
"执行完毕",
"执行失败"
],
"x-enum-varnames": [
"PlanStatusEnabled",
"PlanStatusDisabled",
"PlanStatusStopeed"
"PlanStatusEnabled",
"PlanStatusStopeed",
"PlanStatusFailed"
]
},
"models.TaskType": {
@@ -770,19 +774,10 @@
"plan.CreatePlanRequest": {
"type": "object",
"required": [
"content_type",
"execution_type",
"name"
],
"properties": {
"content_type": {
"allOf": [
{
"$ref": "#/definitions/models.PlanContentType"
}
],
"example": "tasks"
},
"cron_expression": {
"type": "string",
"example": "0 0 6 * * *"
@@ -994,14 +989,6 @@
"plan.UpdatePlanRequest": {
"type": "object",
"properties": {
"content_type": {
"allOf": [
{
"$ref": "#/definitions/models.PlanContentType"
}
],
"example": "tasks"
},
"cron_expression": {
"type": "string",
"example": "0 0 6 * * *"

View File

@@ -129,20 +129,24 @@ definitions:
- 0
- 1
- 2
- 3
format: int32
type: integer
x-enum-comments:
PlanStatusDisabled: 禁用计划
PlanStatusEnabled: 启用计划
PlanStatusFailed: 执行失败
PlanStatusStopeed: 执行完毕
x-enum-descriptions:
- 启用计划
- 禁用计划
- 启用计划
- 执行完毕
- 执行失败
x-enum-varnames:
- PlanStatusEnabled
- PlanStatusDisabled
- PlanStatusEnabled
- PlanStatusStopeed
- PlanStatusFailed
models.TaskType:
enum:
- plan_analysis
@@ -159,10 +163,6 @@ definitions:
- TaskTypeWaiting
plan.CreatePlanRequest:
properties:
content_type:
allOf:
- $ref: '#/definitions/models.PlanContentType'
example: tasks
cron_expression:
example: 0 0 6 * * *
type: string
@@ -188,7 +188,6 @@ definitions:
$ref: '#/definitions/plan.TaskRequest'
type: array
required:
- content_type
- execution_type
- name
type: object
@@ -306,10 +305,6 @@ definitions:
type: object
plan.UpdatePlanRequest:
properties:
content_type:
allOf:
- $ref: '#/definitions/models.PlanContentType'
example: tasks
cron_expression:
example: 0 0 6 * * *
type: string

View File

@@ -64,13 +64,14 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
ExecutionType: req.ExecutionType,
ExecuteNum: req.ExecuteNum,
CronExpression: req.CronExpression,
ContentType: req.ContentType,
// ContentType 在控制器中设置,此处不再处理
}
// 处理子计划 (通过ID引用)
if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil {
plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs))
for i, childPlanID := range req.SubPlanIDs {
if req.SubPlanIDs != nil {
subPlanSlice := req.SubPlanIDs
plan.SubPlans = make([]models.SubPlan, len(subPlanSlice))
for i, childPlanID := range subPlanSlice {
plan.SubPlans[i] = models.SubPlan{
ChildPlanID: childPlanID,
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
@@ -79,9 +80,10 @@ func PlanFromCreateRequest(req *CreatePlanRequest) (*models.Plan, error) {
}
// 处理任务
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks {
if req.Tasks != nil {
taskSlice := req.Tasks
plan.Tasks = make([]models.Task, len(taskSlice))
for i, taskReq := range taskSlice {
task, err := TaskFromRequest(&taskReq)
if err != nil {
return nil, err
@@ -114,13 +116,14 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
ExecutionType: req.ExecutionType,
ExecuteNum: req.ExecuteNum,
CronExpression: req.CronExpression,
ContentType: req.ContentType,
// ContentType 在控制器中设置,此处不再处理
}
// 处理子计划 (通过ID引用)
if req.ContentType == models.PlanContentTypeSubPlans && req.SubPlanIDs != nil {
plan.SubPlans = make([]models.SubPlan, len(req.SubPlanIDs))
for i, childPlanID := range req.SubPlanIDs {
if req.SubPlanIDs != nil {
subPlanSlice := req.SubPlanIDs
plan.SubPlans = make([]models.SubPlan, len(subPlanSlice))
for i, childPlanID := range subPlanSlice {
plan.SubPlans[i] = models.SubPlan{
ChildPlanID: childPlanID,
ExecutionOrder: i, // 默认执行顺序, ReorderSteps会再次确认
@@ -129,9 +132,10 @@ func PlanFromUpdateRequest(req *UpdatePlanRequest) (*models.Plan, error) {
}
// 处理任务
if req.ContentType == models.PlanContentTypeTasks && req.Tasks != nil {
plan.Tasks = make([]models.Task, len(req.Tasks))
for i, taskReq := range req.Tasks {
if req.Tasks != nil {
taskSlice := req.Tasks
plan.Tasks = make([]models.Task, len(taskSlice))
for i, taskReq := range taskSlice {
task, err := TaskFromRequest(&taskReq)
if err != nil {
return nil, err

View File

@@ -22,7 +22,6 @@ type CreatePlanRequest struct {
ExecutionType models.PlanExecutionType `json:"execution_type" binding:"required" example:"automatic"`
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
ContentType models.PlanContentType `json:"content_type" binding:"required" example:"tasks"`
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
Tasks []TaskRequest `json:"tasks,omitempty"`
}
@@ -55,7 +54,6 @@ type UpdatePlanRequest struct {
ExecutionType models.PlanExecutionType `json:"execution_type" example:"automatic"`
ExecuteNum uint `json:"execute_num,omitempty" example:"10"`
CronExpression string `json:"cron_expression" example:"0 0 6 * * *"`
ContentType models.PlanContentType `json:"content_type" example:"tasks"`
SubPlanIDs []uint `json:"sub_plan_ids,omitempty"`
Tasks []TaskRequest `json:"tasks,omitempty"`
}
@@ -132,6 +130,14 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
return
}
// --- 自动判断 ContentType ---
if len(req.SubPlanIDs) > 0 {
planToCreate.ContentType = models.PlanContentTypeSubPlans
} else {
// 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供)
planToCreate.ContentType = models.PlanContentTypeTasks
}
// 调用仓库方法创建计划
if err := c.planRepo.CreatePlan(planToCreate); err != nil {
controller.SendErrorResponse(ctx, controller.CodeBadRequest, "创建计划失败: "+err.Error())
@@ -269,6 +275,14 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
}
planToUpdate.ID = uint(id) // 确保ID被设置
// --- 自动判断 ContentType ---
if len(req.SubPlanIDs) > 0 {
planToUpdate.ContentType = models.PlanContentTypeSubPlans
} else {
// 如果 SubPlanIDs 未提供,则默认为 Tasks 类型(即使 Tasks 字段也未提供)
planToUpdate.ContentType = models.PlanContentTypeTasks
}
// 4. 检查计划是否存在
_, err = c.planRepo.GetBasicPlanByID(uint(id))
if err != nil {

View File

@@ -161,7 +161,7 @@ func (m *AnalysisPlanTaskManager) getRefreshData() (runnablePlans []*models.Plan
return
}
invalidPlans, err := m.planRepo.FindDisabledAndStoppedPlans()
invalidPlans, err := m.planRepo.FindInactivePlans()
if err != nil {
m.logger.Errorf("获取失效计划列表失败: %v", err)
return
@@ -215,7 +215,7 @@ func (m *AnalysisPlanTaskManager) cleanupInvalidTasks(invalidPlanIDs []uint, all
}
// 批量更新相关执行日志状态为“已取消”
if err := m.executionLogRepo.UpdateLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
// 这是一个非关键性错误,只记录日志
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
}

View File

@@ -1,29 +0,0 @@
package task
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
)
// PlanAnalysisTask 用于在任务执行队列中触发一个plan的执行
// 该任务会解析plan生成扁平化的待执行任务表, 并将任务列表插入任务执行队列
// 该任务会预写入plan所有待执行任务的执行日志
// 每个plan执行完毕时 或 创建plan时 都应该重新创建一个 PlanAnalysisTask 以便触发下次plan执行
// 更新plan后应当更新对应 PlanAnalysisTask
type PlanAnalysisTask struct {
}
func (p *PlanAnalysisTask) Execute() error {
//TODO implement me
panic("implement me")
}
func (p *PlanAnalysisTask) ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error {
//TODO implement me
panic("implement me")
}
func (p *PlanAnalysisTask) OnFailure(executeErr error) {
//TODO implement me
panic("implement me")
}

View File

@@ -13,49 +13,22 @@ import (
"gorm.io/gorm"
)
// ProgressTracker 在内存中跟踪计划执行状态,包括进度和执行
// ProgressTracker 仅用于在内存中提供计划执行的并发
type ProgressTracker struct {
mu sync.Mutex
cond *sync.Cond // 用于实现阻塞锁
totalTasks map[uint]int // key: planExecutionLogID, value: total tasks
completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
mu sync.Mutex
cond *sync.Cond // 用于实现阻塞锁
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
}
// NewProgressTracker 创建一个新的进度跟踪器
func NewProgressTracker() *ProgressTracker {
t := &ProgressTracker{
totalTasks: make(map[uint]int),
completedTasks: make(map[uint]int),
runningPlans: make(map[uint]bool),
runningPlans: make(map[uint]bool),
}
t.cond = sync.NewCond(&t.mu)
return t
}
// AddNewPlan 添加一个新的计划,并初始化进度跟踪器
func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) {
t.mu.Lock()
t.totalTasks[planLogID] = totalTasks
t.completedTasks[planLogID] = 0
t.mu.Unlock()
}
// CompletedTask 通知计数器一个任务被完成
func (t *ProgressTracker) CompletedTask(planLogID uint) {
t.mu.Lock()
t.completedTasks[planLogID]++
t.mu.Unlock()
}
// IsPlanOver 检查计划是否完成
func (t *ProgressTracker) IsPlanOver(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.completedTasks[planLogID] >= t.totalTasks[planLogID]
}
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
func (t *ProgressTracker) TryLock(planLogID uint) bool {
t.mu.Lock()
@@ -248,62 +221,33 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
if err != nil {
claimedLog.Status = models.ExecutionStatusFailed
claimedLog.Output = err.Error()
// 任务失败时,调用统一的终止服务
s.handlePlanTermination(claimedLog.PlanExecutionLogID, "子任务执行失败: "+err.Error())
return
}
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
// --- 新增逻辑:更新计划执行次数并判断是否需要触发下一次执行 ---
var planID uint
// 根据任务类型获取正确的 PlanID
if claimedLog.Task.Type == models.TaskPlanAnalysis {
var params struct {
PlanID uint `json:"plan_id"`
}
if err := json.Unmarshal(claimedLog.Task.Parameters, &params); err != nil {
s.logger.Errorf("解析任务参数中的计划ID失败日志ID: %d, 错误: %v", claimedLog.ID, err)
return
}
planID = params.PlanID
} else {
planID = claimedLog.Task.PlanID
}
// 获取计划的最新数据
plan, err := s.planRepo.GetPlanByID(planID) // Changed to GetPlanByID to include sub-plans
if err != nil {
s.logger.Errorf("获取计划 %d 的完整信息失败: %v", planID, err) // Updated error message
return
}
// 更新计划的执行计数器
plan.ExecuteCount++
// 如果是自动计划且达到执行次数上限,则更新计划状态为已停止
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && plan.ExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
plan.Status = models.PlanStatusStopeed
s.logger.Infof("计划 %d (自动执行) 已达到最大执行次数 %d状态更新为 '执行完毕'。", planID, plan.ExecuteNum)
}
// 保存更新后的计划状态和执行计数
if err := s.planRepo.UpdatePlan(plan); err != nil { // UpdatePlan 可以更新整个 Plan 对象
s.logger.Errorf("更新计划 %d 的执行计数和状态失败: %v", planID, err)
return
}
// 更新计划执行日志状态为完成
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(claimedLog.PlanExecutionLogID, models.ExecutionStatusCompleted); err != nil {
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", claimedLog.PlanExecutionLogID, err)
// 这是一个非阻塞性错误,不中断后续流程
}
// 调用共享的 Manager 来处理触发器更新逻辑 (Manager 会根据最新的 Plan 状态决定是否创建新触发器)
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(planID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", planID, err)
}
// 如果是计划分析任务,它的职责是解析和分发任务,到此即完成,不参与后续的计划完成度检查。
if claimedLog.Task.Type == models.TaskPlanAnalysis {
s.logger.Warnf("完成计划分析任务, 日志ID: %d", claimedLog.ID)
return
}
// --- 以下是常规任务的完成逻辑 ---
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
// 检查是否是最后一个任务
incompleteCount, err := s.executionLogRepo.CountIncompleteTasksByPlanLogID(claimedLog.PlanExecutionLogID)
if err != nil {
s.logger.Errorf("检查计划 %d 的未完成任务数时出错: %v", claimedLog.PlanExecutionLogID, err)
return
}
// 如果此计划执行中,未完成的任务只剩下当前这一个(因为当前任务的状态此时在数据库中仍为 'started'
// 则认为整个计划已完成。
if incompleteCount == 1 {
s.handlePlanCompletion(claimedLog.PlanExecutionLogID)
}
}
// runTask 用于执行具体任务
@@ -402,8 +346,12 @@ func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
return err
}
// 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(planLog.ID, len(tasks))
// --- 处理空计划的边缘情况 ---
// 如果一个计划被解析后,发现其任务列表为空,
// 那么它实际上已经“执行”完毕了,我们需要在这里手动为它创建下一次的触发器。
if len(tasks) == 0 {
s.handlePlanCompletion(planLog.ID)
}
return nil
}
@@ -419,3 +367,82 @@ func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutio
return nil
}
// handlePlanTermination 集中处理计划的终止逻辑(失败或取消)
func (s *Scheduler) handlePlanTermination(planLogID uint, reason string) {
// 1. 从待执行队列中删除所有相关的子任务
if err := s.pendingTaskRepo.DeletePendingTasksByPlanLogID(planLogID); err != nil {
s.logger.Errorf("从待执行队列中删除计划 %d 的后续任务时出错: %v", planLogID, err)
}
// 2. 将父计划的执行日志标记为失败
if err := s.executionLogRepo.FailPlanExecution(planLogID, reason); err != nil {
s.logger.Errorf("标记计划执行日志 %d 为失败时出错: %v", planLogID, err)
}
// 3. 将所有未完成的子任务日志标记为已取消
if err := s.executionLogRepo.CancelIncompleteTasksByPlanLogID(planLogID, "父计划失败或被取消"); err != nil {
s.logger.Errorf("取消计划 %d 的后续任务日志时出错: %v", planLogID, err)
}
// 4. 将计划本身的状态更新为失败
planLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID)
if err != nil {
s.logger.Errorf("无法找到计划执行日志 %d 以更新父计划状态: %v", planLogID, err)
return
}
if err := s.planRepo.UpdatePlanStatus(planLog.PlanID, models.PlanStatusFailed); err != nil {
s.logger.Errorf("更新计划 %d 状态为 '失败' 时出错: %v", planLog.PlanID, err)
}
}
// handlePlanCompletion 集中处理计划成功完成后的所有逻辑
func (s *Scheduler) handlePlanCompletion(planLogID uint) {
s.logger.Infof("计划执行 %d 的所有任务已完成,开始处理计划完成逻辑...", planLogID)
// 1. 通过 PlanExecutionLog 反查正确的顶层 PlanID
planExecutionLog, err := s.executionLogRepo.FindPlanExecutionLogByID(planLogID)
if err != nil {
s.logger.Errorf("获取计划执行日志 %d 失败: %v", planLogID, err)
return
}
topLevelPlanID := planExecutionLog.PlanID // 这才是正确的顶层计划ID
// 2. 获取计划的最新数据,这里我们只需要基本信息来判断执行类型和次数
plan, err := s.planRepo.GetBasicPlanByID(topLevelPlanID)
if err != nil {
s.logger.Errorf("获取计划 %d 的基本信息失败: %v", topLevelPlanID, err)
return
}
// 3. 在内存中计算新的计数值和状态
newExecuteCount := plan.ExecuteCount + 1
newStatus := plan.Status // 默认为当前状态
// 如果是自动计划且达到执行次数上限,或计划是手动类型,则更新计划状态为已停止
if (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteNum > 0 && newExecuteCount >= plan.ExecuteNum) || plan.ExecutionType == models.PlanExecutionTypeManual {
newStatus = models.PlanStatusStopeed
s.logger.Infof("计划 %d 已完成执行,状态更新为 '执行完毕'。", topLevelPlanID)
}
// 4. 使用专门的方法来原子性地更新计数值和状态
if err := s.planRepo.UpdatePlanStateAfterExecution(topLevelPlanID, newExecuteCount, newStatus); err != nil {
s.logger.Errorf("更新计划 %d 的执行后状态失败: %v", topLevelPlanID, err)
return
}
// 5. 更新计划执行日志状态为完成
if err := s.executionLogRepo.UpdatePlanExecutionLogStatus(planLogID, models.ExecutionStatusCompleted); err != nil {
s.logger.Errorf("更新计划执行日志 %d 状态为 '完成' 失败: %v", planLogID, err)
}
// 6. 调用共享的 Manager 来处理触发器更新逻辑
// 只有当计划在本次执行后仍然是 Enabled 状态时,才需要创建下一次的触发器。
if newStatus == models.PlanStatusEnabled {
if err := s.analysisPlanTaskManager.CreateOrUpdateTrigger(topLevelPlanID); err != nil {
s.logger.Errorf("为计划 %d 创建/更新触发器失败: %v", topLevelPlanID, err)
} else {
s.logger.Infof("计划 %d 状态为 '%d',无需创建下一次触发器。", topLevelPlanID, newStatus)
}
}
}

View File

@@ -191,44 +191,17 @@ func (app *Application) initializePendingTasks(
// 阶段二:清理所有待执行任务和相关日志
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
pendingTasks, err := pendingTaskRepo.FindAllPendingTasks()
if err != nil {
return fmt.Errorf("获取待执行任务失败: %w", err)
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
var taskLogIDsToCancel []uint
var planLogIDsToFail []uint
for _, pt := range pendingTasks {
// 确保 Task 和 TaskExecutionLog 已预加载
if pt.Task == nil || pt.TaskExecutionLog.ID == 0 { // TaskExecutionLog.ID为零说明没加载
logger.Warnf("待执行任务 %d 缺少关联的 Task 或 TaskExecutionLog跳过处理。", pt.ID)
continue
}
// 收集任务执行日志ID所有未完成的任务都标记为取消
taskLogIDsToCancel = append(taskLogIDsToCancel, pt.TaskExecutionLog.ID)
// 收集计划执行日志ID
if pt.TaskExecutionLog.PlanExecutionLogID != 0 {
planLogIDsToFail = append(planLogIDsToFail, pt.TaskExecutionLog.PlanExecutionLogID)
}
}
// 批量更新 TaskExecutionLog 状态为取消
if len(taskLogIDsToCancel) > 0 {
if err := executionLogRepo.UpdateLogStatusByIDs(taskLogIDsToCancel, models.ExecutionStatusCancelled); err != nil {
logger.Errorf("批量更新任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
}
// 批量更新 PlanExecutionLog 状态为失败
if len(planLogIDsToFail) > 0 {
if err := executionLogRepo.UpdatePlanExecutionLogsStatusByIDs(planLogIDsToFail, models.ExecutionStatusFailed); err != nil {
logger.Errorf("批量更新计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表

View File

@@ -44,6 +44,7 @@ const (
PlanStatusDisabled PlanStatus = 0 // 禁用计划
PlanStatusEnabled PlanStatus = 1 // 启用计划
PlanStatusStopeed PlanStatus = 2 // 执行完毕
PlanStatusFailed PlanStatus = 3 // 执行失败
)
// Plan 代表系统中的一个计划,可以包含子计划或任务

View File

@@ -2,6 +2,7 @@ package repository
import (
"errors"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
@@ -10,8 +11,8 @@ import (
// ExecutionLogRepository 定义了与执行日志交互的接口。
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
type ExecutionLogRepository interface {
UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
UpdateLogStatus(logID uint, status models.ExecutionStatus) error
UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error
UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error
CreateTaskExecutionLog(log *models.TaskExecutionLog) error
CreatePlanExecutionLog(log *models.PlanExecutionLog) error
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
@@ -30,6 +31,23 @@ type ExecutionLogRepository interface {
FindInProgressPlanExecutionLogByPlanID(planID uint) (*models.PlanExecutionLog, error)
// FindIncompleteTaskExecutionLogsByPlanLogID 根据计划日志ID查找所有未完成的任务日志
FindIncompleteTaskExecutionLogsByPlanLogID(planLogID uint) ([]models.TaskExecutionLog, error)
// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed
FailAllIncompletePlanExecutionLogs() error
// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled
CancelAllIncompleteTaskExecutionLogs() error
// FindPlanExecutionLogByID 根据ID查找计划执行日志
FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error)
// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量
CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error)
// FailPlanExecution 将指定的计划执行标记为失败
FailPlanExecution(planLogID uint, errorMessage string) error
// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务
CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error
}
// gormExecutionLogRepository 是使用 GORM 的具体实现。
@@ -43,7 +61,7 @@ func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
return &gormExecutionLogRepository{db: db}
}
func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatusByIDs(logIDs []uint, status models.ExecutionStatus) error {
if len(logIDs) == 0 {
return nil
}
@@ -52,7 +70,7 @@ func (r *gormExecutionLogRepository) UpdateLogStatusByIDs(logIDs []uint, status
Update("status", status).Error
}
func (r *gormExecutionLogRepository) UpdateLogStatus(logID uint, status models.ExecutionStatus) error {
func (r *gormExecutionLogRepository) UpdateTaskExecutionLogStatus(logID uint, status models.ExecutionStatus) error {
return r.db.Model(&models.TaskExecutionLog{}).Where("id = ?", logID).Update("status", status).Error
}
@@ -117,7 +135,7 @@ func (r *gormExecutionLogRepository) UpdatePlanExecutionLogsStatusByIDs(logIDs [
// FindIncompletePlanExecutionLogs 查找所有未完成的计划执行日志
func (r *gormExecutionLogRepository) FindIncompletePlanExecutionLogs() ([]models.PlanExecutionLog, error) {
var logs []models.PlanExecutionLog
err := r.db.Where("status = ?", models.ExecutionStatusStarted).Find(&logs).Error
err := r.db.Where("status = ? OR status = ?", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).Find(&logs).Error
return logs, err
}
@@ -143,3 +161,60 @@ func (r *gormExecutionLogRepository) FindIncompleteTaskExecutionLogsByPlanLogID(
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).Find(&logs).Error
return logs, err
}
// FailAllIncompletePlanExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的计划状态都修改为 ExecutionStatusFailed
func (r *gormExecutionLogRepository) FailAllIncompletePlanExecutionLogs() error {
return r.db.Model(&models.PlanExecutionLog{}).
Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).
Updates(map[string]interface{}{"status": models.ExecutionStatusFailed, "ended_at": time.Now(), "error": "系统中断"}).Error
}
// CancelAllIncompleteTaskExecutionLogs 将所有状态为 ExecutionStatusStarted 和 ExecutionStatusWaiting 的任务状态修改为 ExecutionStatusCancelled
func (r *gormExecutionLogRepository) CancelAllIncompleteTaskExecutionLogs() error {
return r.db.Model(&models.TaskExecutionLog{}).
Where("status IN (?, ?)", models.ExecutionStatusStarted, models.ExecutionStatusWaiting).
Updates(map[string]interface{}{"status": models.ExecutionStatusCancelled, "ended_at": time.Now(), "output": "系统中断"}).Error
}
// FindPlanExecutionLogByID 根据ID查找计划执行日志
func (r *gormExecutionLogRepository) FindPlanExecutionLogByID(id uint) (*models.PlanExecutionLog, error) {
var log models.PlanExecutionLog
err := r.db.First(&log, id).Error
if err != nil {
return nil, err
}
return &log, nil
}
// CountIncompleteTasksByPlanLogID 计算一个计划执行中未完成的任务数量
func (r *gormExecutionLogRepository) CountIncompleteTasksByPlanLogID(planLogID uint) (int64, error) {
var count int64
err := r.db.Model(&models.TaskExecutionLog{}).
Where("plan_execution_log_id = ? AND status IN (?, ?)",
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).
Count(&count).Error
return count, err
}
// FailPlanExecution 将指定的计划执行标记为失败
func (r *gormExecutionLogRepository) FailPlanExecution(planLogID uint, errorMessage string) error {
return r.db.Model(&models.PlanExecutionLog{}).
Where("id = ?", planLogID).
Updates(map[string]interface{}{
"status": models.ExecutionStatusFailed,
"error": errorMessage,
"ended_at": time.Now(),
}).Error
}
// CancelIncompleteTasksByPlanLogID 取消一个计划执行中的所有未完成任务
func (r *gormExecutionLogRepository) CancelIncompleteTasksByPlanLogID(planLogID uint, reason string) error {
return r.db.Model(&models.TaskExecutionLog{}).
Where("plan_execution_log_id = ? AND status IN (?, ?)",
planLogID, models.ExecutionStatusWaiting, models.ExecutionStatusStarted).
Updates(map[string]interface{}{
"status": models.ExecutionStatusCancelled,
"output": reason,
"ended_at": time.Now(),
}).Error
}

View File

@@ -31,6 +31,9 @@ type PendingTaskRepository interface {
RequeueTask(originalPendingTask *models.PendingTask) error
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error)
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
DeletePendingTasksByPlanLogID(planLogID uint) error
}
// gormPendingTaskRepository 是使用 GORM 的具体实现。
@@ -164,3 +167,12 @@ func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []ui
err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
return pendingTasks, err
}
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(planLogID uint) error {
// 使用子查询找到所有与 planLogID 相关的 task_execution_log_id
subQuery := r.db.Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID)
// 使用子查询的结果来删除待执行任务
return r.db.Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error
}

View File

@@ -46,8 +46,8 @@ type PlanRepository interface {
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
// FindRunnablePlans 获取所有应执行的计划
FindRunnablePlans() ([]*models.Plan, error)
// FindDisabledAndStoppedPlans 获取所有已禁用或已停止的计划
FindDisabledAndStoppedPlans() ([]*models.Plan, error)
// FindInactivePlans 获取所有已禁用或已停止的计划
FindInactivePlans() ([]*models.Plan, error)
// FindPlanAnalysisTaskByPlanID 根据 PlanID 找到其关联的 'plan_analysis' 任务
FindPlanAnalysisTaskByPlanID(planID uint) (*models.Task, error)
@@ -62,6 +62,9 @@ type PlanRepository interface {
// StopPlanTransactionally 停止一个计划的执行,包括更新状态、移除待执行任务和更新执行日志
StopPlanTransactionally(planID uint) error
// UpdatePlanStateAfterExecution 更新计划执行后的状态(计数和状态)
UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error
}
// gormPlanRepository 是 PlanRepository 的 GORM 实现
@@ -600,10 +603,10 @@ func (r *gormPlanRepository) FindRunnablePlans() ([]*models.Plan, error) {
return plans, err
}
func (r *gormPlanRepository) FindDisabledAndStoppedPlans() ([]*models.Plan, error) {
func (r *gormPlanRepository) FindInactivePlans() ([]*models.Plan, error) {
var plans []*models.Plan
err := r.db.
Where("status = ? OR status = ?", models.PlanStatusDisabled, models.PlanStatusStopeed).
Where("status != ?", models.PlanStatusEnabled).
Find(&plans).Error
return plans, err
}
@@ -694,7 +697,7 @@ func (r *gormPlanRepository) StopPlanTransactionally(planID uint) error {
}
// 3.1 批量更新任务执行日志状态为“已取消”
if err := executionLogRepoTx.UpdateLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil {
if err := executionLogRepoTx.UpdateTaskExecutionLogStatusByIDs(taskLogIDs, models.ExecutionStatusCancelled); err != nil {
return fmt.Errorf("批量更新任务执行日志状态为 '已取消' 失败: %w", err)
}
@@ -735,3 +738,10 @@ func (r *gormPlanRepository) UpdatePlanStatus(id uint, status models.PlanStatus)
}
return nil
}
func (r *gormPlanRepository) UpdatePlanStateAfterExecution(planID uint, newCount uint, newStatus models.PlanStatus) error {
return r.db.Model(&models.Plan{}).Where("id = ?", planID).Updates(map[string]interface{}{
"execute_count": newCount,
"status": newStatus,
}).Error
}