From ceba0c280ea237885f7102efe7428ce5e69bb547 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 17 Sep 2025 20:02:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E5=99=A8?= =?UTF-8?q?=E5=85=B3=E4=BA=8E=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E9=83=A8?= =?UTF-8?q?=E5=88=86=E5=AE=9E=E7=8E=B0(=E6=B2=A1=E6=B5=8B=E6=B2=A1?= =?UTF-8?q?=E6=A3=80=E6=9F=A5,=20=E4=BD=86=E5=BA=94=E8=AF=A5=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=AE=8C=E4=BA=86)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO-List | 7 +- go.mod | 1 + go.sum | 2 + internal/infra/models/plan.go | 5 + .../repository/execution_log_repository.go | 24 +- internal/infra/repository/plan_repository.go | 123 ++++++++- internal/infra/task/scheduler.go | 244 ++++++++++++++++-- internal/infra/utils/time.go | 22 ++ 8 files changed, 392 insertions(+), 36 deletions(-) create mode 100644 internal/infra/utils/time.go diff --git a/TODO-List b/TODO-List index 00bfc63..fbe444e 100644 --- a/TODO-List +++ b/TODO-List @@ -1,9 +1,10 @@ // TODO 列表 // TODO 可以实现的问题 -plan执行到一半时如果用户删掉里面的task, 后续调度器执行task时可能会找不到这个任务的细节 -1. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能 -2. 系统启动时应该检查一遍执行历史库, 将所有显示为执行中的任务都修正为执行失败并报错 +1. plan执行到一半时如果用户删掉里面的task, 后续调度器执行task时可能会找不到这个任务的细节 +2. 创建/更新plan的地方需要创建/更新触发器Task, 调度器那里不再创建新的Task, 只创建PendingTask +3. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能 +4. 系统启动时应该检查一遍执行历史库, 将所有显示为执行中的任务都修正为执行失败并报错 // TODO 暂时实现不了的问题 1. 目前设备都只对应一个地址, 但实际上如电磁两位五通阀等设备是需要用两个开关控制的 diff --git a/go.mod b/go.mod index 16b6f8f..aa3209c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-openapi/validate v0.24.0 github.com/golang-jwt/jwt/v5 v5.3.0 github.com/panjf2000/ants/v2 v2.11.3 + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.11.1 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.1 diff --git a/go.sum b/go.sum index eb67dba..b0bef21 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index e26aca2..fe3a186 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -33,6 +33,11 @@ const ( TaskTypeWaiting TaskType = "waiting" // 等待任务 ) +const ( + // 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key + ParamsPlanID = "plan_id" +) + // Plan 代表系统中的一个计划,可以包含子计划或任务 type Plan struct { gorm.Model diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index b216651..195afd7 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -12,6 +12,7 @@ type ExecutionLogRepository interface { UpdatePlanExecutionLog(log *models.PlanExecutionLog) error CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error UpdateTaskExecutionLog(log *models.TaskExecutionLog) error + FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) } // executionLogRepository 是使用 GORM 的具体实现。 @@ -30,11 +31,11 @@ func (r *executionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutio return r.db.Create(log).Error } -// UpdatePlanExecutionLog 使用 Save 方法全量更新一个计划执行日志。 -// GORM 的 Save 会自动根据主键是否存在来决定是执行 UPDATE 还是 INSERT。 +// UpdatePlanExecutionLog 使用 Updates 方法更新一个计划执行日志。 +// GORM 的 Updates 传入 struct 时,只会更新非零值字段。 // 在这里,我们期望传入的对象一定包含一个有效的 ID。 func (r *executionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error { - return r.db.Save(log).Error + return r.db.Updates(log).Error } // CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。 @@ -44,8 +45,21 @@ func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.T return r.db.Create(&logs).Error } -// UpdateTaskExecutionLog 使用 Save 方法全量更新一个任务执行日志。 +// UpdateTaskExecutionLog 使用 Updates 方法更新一个任务执行日志。 +// GORM 的 Updates 传入 struct 时,只会更新非零值字段。 // 这种方式代码更直观,上层服务可以直接修改模型对象后进行保存。 func (r *executionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error { - return r.db.Save(log).Error + return r.db.Updates(log).Error +} + +// FindTaskExecutionLogByID 根据 ID 查找单个任务执行日志。 +// 它会预加载关联的 Task 信息。 +func (r *executionLogRepository) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) { + var log models.TaskExecutionLog + // 使用 Preload("Task") 来确保关联的任务信息被一并加载 + err := r.db.Preload("Task").First(&log, id).Error + if err != nil { + return nil, err + } + return &log, nil } diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index a91c235..efea8ce 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -34,6 +34,12 @@ type PlanRepository interface { UpdatePlan(plan *models.Plan) error // DeletePlan 根据ID删除计划,同时删除其关联的任务(非子任务)或子计划关联 DeletePlan(id uint) error + // FlattenPlanTasks 递归展开计划,返回按执行顺序排列的所有任务列表 + FlattenPlanTasks(planID uint) ([]models.Task, error) + // DeleteTask 根据ID删除任务 + DeleteTask(id int) error + // FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task + FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) } // gormPlanRepository 是 PlanRepository 的 GORM 实现 @@ -288,7 +294,7 @@ func (r *gormPlanRepository) reconcileTasks(tx *gorm.DB, plan *models.Plan) erro return err } - existingTaskMap := make(map[uint]bool) + existingTaskMap := make(map[int]bool) for _, task := range existingTasks { existingTaskMap[task.ID] = true } @@ -308,7 +314,7 @@ func (r *gormPlanRepository) reconcileTasks(tx *gorm.DB, plan *models.Plan) erro } } - var tasksToDelete []uint + var tasksToDelete []int for id := range existingTaskMap { tasksToDelete = append(tasksToDelete, id) } @@ -401,3 +407,116 @@ func (r *gormPlanRepository) DeletePlan(id uint) error { return nil }) } + +// FlattenPlanTasks 递归展开计划,返回按执行顺序排列的所有任务列表 +func (r *gormPlanRepository) FlattenPlanTasks(planID uint) ([]models.Task, error) { + plan, err := r.GetPlanByID(planID) + if err != nil { + return nil, fmt.Errorf("获取计划(ID: %d)失败: %w", planID, err) + } + + return r.flattenPlanTasksRecursive(plan) +} + +// flattenPlanTasksRecursive 递归展开计划的内部实现 +func (r *gormPlanRepository) flattenPlanTasksRecursive(plan *models.Plan) ([]models.Task, error) { + var tasks []models.Task + + switch plan.ContentType { + case models.PlanContentTypeTasks: + // 如果计划直接包含任务,直接返回这些任务 + // 由于GetPlanByID已经预加载并排序了任务,这里直接使用即可 + tasks = append(tasks, plan.Tasks...) + + case models.PlanContentTypeSubPlans: + // 如果计划包含子计划,则递归处理每个子计划 + for _, subPlan := range plan.SubPlans { + // 获取子计划的任务列表 + var subTasks []models.Task + var err error + + // 确保子计划已经被加载 + if subPlan.ChildPlan != nil { + subTasks, err = r.flattenPlanTasksRecursive(subPlan.ChildPlan) + } else { + // 如果子计划未加载,则从数据库获取并递归展开 + subTasks, err = r.FlattenPlanTasks(subPlan.ChildPlanID) + } + + if err != nil { + return nil, fmt.Errorf("展开子计划(ID: %d)失败: %w", subPlan.ChildPlanID, err) + } + + // 将子计划的任务添加到结果中 + tasks = append(tasks, subTasks...) + } + default: + return nil, fmt.Errorf("未知的计划内容类型: %v", plan.ContentType) + } + + return tasks, nil +} + +// DeleteTask 根据ID删除任务 +func (r *gormPlanRepository) DeleteTask(id int) error { + // 使用事务确保操作的原子性 + return r.db.Transaction(func(tx *gorm.DB) error { + // 1. 检查是否有待执行任务引用了这个任务 + var pendingTaskCount int64 + if err := tx.Model(&models.PendingTask{}).Where("task_id = ?", id).Count(&pendingTaskCount).Error; err != nil { + return fmt.Errorf("检查待执行任务时出错: %w", err) + } + + // 如果有待执行任务引用该任务,不能删除 + if pendingTaskCount > 0 { + return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 条待执行任务引用该任务", id, pendingTaskCount) + } + + // 2. 检查是否有计划仍在使用这个任务 + var planCount int64 + if err := tx.Model(&models.Plan{}).Joins("JOIN tasks ON plans.id = tasks.plan_id").Where("tasks.id = ?", id).Count(&planCount).Error; err != nil { + return fmt.Errorf("检查计划引用任务时出错: %w", err) + } + + // 如果有计划在使用该任务,不能删除 + if planCount > 0 { + return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 个计划仍在使用该任务", id, planCount) + } + + // 3. 执行删除操作 + result := tx.Delete(&models.Task{}, id) + if result.Error != nil { + return fmt.Errorf("删除任务失败: %w", result.Error) + } + + // 检查是否实际删除了记录 + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + + return nil + }) +} + +// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task +func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) { + var task models.Task + + // 构造JSON查询条件,查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务 + // TODO 在JSON字段中查找特定键值的语法取决于数据库类型,这里使用PostgreSQL的语法 + // TODO 如果使用的是MySQL,则需要相应调整查询条件 + result := r.db.Where( + "type = ? AND parameters->>'plan_id' = ?", + models.TaskPlanAnalysis, + fmt.Sprintf("%d", paramsPlanID), + ).First(&task) + + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("未找到Parameters.PlanID为%d的TaskPlanAnalysis类型任务", paramsPlanID) + } + return nil, fmt.Errorf("查找任务时出错: %w", result.Error) + } + + return &task, nil +} diff --git a/internal/infra/task/scheduler.go b/internal/infra/task/scheduler.go index cfd610e..d2be868 100644 --- a/internal/infra/task/scheduler.go +++ b/internal/infra/task/scheduler.go @@ -9,6 +9,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -33,6 +34,29 @@ func NewProgressTracker() *ProgressTracker { return t } +// AddNewPlan 添加一个新的计划,并初始化进度跟踪器 +func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) { + t.mu.Lock() + t.totalTasks[planLogID] = totalTasks + t.completedTasks[planLogID] = 0 + t.mu.Unlock() +} + +// CompletedTask 通知计数器一个任务被完成 +func (t *ProgressTracker) CompletedTask(planLogID uint) { + t.mu.Lock() + t.completedTasks[planLogID]++ + t.mu.Unlock() +} + +// IsPlanOver 检查计划是否完成 +func (t *ProgressTracker) IsPlanOver(planLogID uint) bool { + t.mu.Lock() + defer t.mu.Unlock() + + return t.completedTasks[planLogID] >= t.totalTasks[planLogID] +} + // TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。 func (t *ProgressTracker) TryLock(planLogID uint) bool { t.mu.Lock() @@ -79,12 +103,14 @@ func (t *ProgressTracker) GetRunningPlanIDs() []uint { // Scheduler 是核心的、持久化的任务调度器 type Scheduler struct { - logger *logs.Logger - pollingInterval time.Duration - workers int - pendingTaskRepo repository.PendingTaskRepository - progressTracker *ProgressTracker - taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 + logger *logs.Logger + pollingInterval time.Duration + workers int + pendingTaskRepo repository.PendingTaskRepository + executionLogRepo repository.ExecutionLogRepository + planRepo repository.PlanRepository + progressTracker *ProgressTracker + taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -93,17 +119,27 @@ type Scheduler struct { } // NewScheduler 创建一个新的调度器实例 -func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, taskFactory func(taskType models.TaskType) Task, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { +func NewScheduler( + pendingTaskRepo repository.PendingTaskRepository, + executionLogRepo repository.ExecutionLogRepository, + planRepo repository.PlanRepository, + taskFactory func(taskType models.TaskType) Task, + logger *logs.Logger, + interval time.Duration, + numWorkers int) *Scheduler { ctx, cancel := context.WithCancel(context.Background()) + return &Scheduler{ - pendingTaskRepo: pendingTaskRepo, - logger: logger, - pollingInterval: interval, - workers: numWorkers, - progressTracker: NewProgressTracker(), - taskFactory: taskFactory, - ctx: ctx, - cancel: cancel, + pendingTaskRepo: pendingTaskRepo, + executionLogRepo: executionLogRepo, + planRepo: planRepo, + logger: logger, + pollingInterval: interval, + workers: numWorkers, + progressTracker: NewProgressTracker(), + taskFactory: taskFactory, + ctx: ctx, + cancel: cancel, } } @@ -203,18 +239,174 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) { s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s", claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name) - task := s.taskFactory(claimedLog.Task.Type) - if err := task.ParseParams(s.logger, claimedLog); err != nil { - s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + claimedLog.StartedAt = time.Now() + claimedLog.Status = models.ExecutionStatusCompleted // 先乐观假定任务成功, 后续失败了再改 + defer s.updateTaskExecutionLogStatus(claimedLog) + + // 执行任务 + err := s.runTask(claimedLog) + if err != nil { + claimedLog.Status = models.ExecutionStatusFailed + claimedLog.Output = err.Error() return } - - if err := task.Execute(); err != nil { - s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - - task.OnFailure(err) - return - } - s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID) + + // 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行 + if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) { + err = s.createNewAnalysisPlanTask(claimedLog.Task.PlanID) + if err != nil { + s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err) + } + } + +} + +// runTask 用于执行具体任务 +func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { + // 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中 + if claimedLog.Task.Type == models.TaskPlanAnalysis { + // 解析plan + err := s.analysisPlan(claimedLog) + if err != nil { + // TODO 这里要处理一下, 比如再插一个新的触发器回去 + s.logger.Errorf("[严重] 计划解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + } else { + // 执行普通任务 + task := s.taskFactory(claimedLog.Task.Type) + if err := task.ParseParams(s.logger, claimedLog); err != nil { + + s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + if err := task.Execute(); err != nil { + + s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + + task.OnFailure(err) + return err + } + + } + return nil +} + +// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 +func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { + // 创建Plan执行记录 + planLog := &models.PlanExecutionLog{ + PlanID: claimedLog.Task.PlanID, + Status: models.ExecutionStatusStarted, + StartedAt: time.Now(), + } + if err := s.executionLogRepo.CreatePlanExecutionLog(planLog); err != nil { + s.logger.Errorf("[严重] 创建计划执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + // 解析出Task列表 + tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID) + if err != nil { + s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + // 写入执行历史 + taskLogs := make([]*models.TaskExecutionLog, len(tasks)) + for _, task := range tasks { + taskLogs = append(taskLogs, &models.TaskExecutionLog{ + PlanExecutionLogID: planLog.ID, + TaskID: task.ID, + Status: models.ExecutionStatusWaiting, + }) + + } + err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs) + if err != nil { + s.logger.Errorf("[严重] 写入执行历史, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + // 写入待执行队列 + pendingTasks := make([]*models.PendingTask, len(tasks)) + for i, task := range tasks { + pendingTasks = append(pendingTasks, &models.PendingTask{ + TaskID: task.ID, + TaskExecutionLogID: pendingTasks[i].ID, + + // 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发 + ExecuteAt: time.Now().Add(time.Duration(i) * time.Second), + }) + } + err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks) + if err != nil { + s.logger.Errorf("[严重] 写入待执行队列, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + // 将Task列表加入待执行队列中 + s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks)) + + return nil +} + +// updateTaskExecutionLogStatus 修改任务历史中的执行状态 +func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error { + claimedLog.EndedAt = time.Now() + + if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil { + s.logger.Errorf("[严重] 更新任务执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) + return err + } + + return nil +} + +// createNewAnalysisPlanTask 创建一个新的Plan解析任务用于下次触发plan执行 +func (s *Scheduler) createNewAnalysisPlanTask(planID uint) error { + // 获取计划信息 + plan, err := s.planRepo.GetBasicPlanByID(planID) + if err != nil { + s.logger.Errorf("[严重] 获取计划失败, 错误: %v", err) + return err + } + + // 获取触发任务 + task, err := s.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID) + if err != nil { + s.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err) + return err + } + + // 写入执行日志 + taskLog := &models.TaskExecutionLog{ + TaskID: task.ID, + Status: models.ExecutionStatusWaiting, + } + if err := s.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil { + s.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err) + return err + } + + // 写入待执行队列 + next, err := utils.GetNextCronTime(plan.CronExpression) + if err != nil { + s.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err) + return err + } + pendingTask := &models.PendingTask{ + TaskID: task.ID, + ExecuteAt: next, + TaskExecutionLogID: taskLog.ID, + } + err = s.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask}) + if err != nil { + s.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err) + return err + } + return nil } diff --git a/internal/infra/utils/time.go b/internal/infra/utils/time.go new file mode 100644 index 0000000..fed5705 --- /dev/null +++ b/internal/infra/utils/time.go @@ -0,0 +1,22 @@ +package utils + +import ( + "time" + + "github.com/robfig/cron/v3" +) + +// GetNextCronTime 根据传入的 Cron 表达式计算下一次执行的时间。 +// 它使用兼容大多数 Cron 实现的标准解析器。 +// 如果 Cron 表达式无效,它将返回一个错误。 +func GetNextCronTime(cronExpression string) (time.Time, error) { + // cron.ParseStandard() 返回一个支持标准5位或6位(带秒)格式的解析器。 + schedule, err := cron.ParseStandard(cronExpression) + if err != nil { + return time.Time{}, err // 返回零值时间和错误 + } + + // 从当前时间计算下一次执行时间 + nextTime := schedule.Next(time.Now()) + return nextTime, nil +}