任务调度器关于任务执行部分实现(没测没检查, 但应该实现完了)

This commit is contained in:
2025-09-17 20:02:40 +08:00
parent e6047f6b6e
commit ceba0c280e
8 changed files with 392 additions and 36 deletions

View File

@@ -9,6 +9,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils"
"github.com/panjf2000/ants/v2"
"gorm.io/gorm"
)
@@ -33,6 +34,29 @@ func NewProgressTracker() *ProgressTracker {
return t
}
// AddNewPlan 添加一个新的计划,并初始化进度跟踪器
func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) {
t.mu.Lock()
t.totalTasks[planLogID] = totalTasks
t.completedTasks[planLogID] = 0
t.mu.Unlock()
}
// CompletedTask 通知计数器一个任务被完成
func (t *ProgressTracker) CompletedTask(planLogID uint) {
t.mu.Lock()
t.completedTasks[planLogID]++
t.mu.Unlock()
}
// IsPlanOver 检查计划是否完成
func (t *ProgressTracker) IsPlanOver(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.completedTasks[planLogID] >= t.totalTasks[planLogID]
}
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
func (t *ProgressTracker) TryLock(planLogID uint) bool {
t.mu.Lock()
@@ -79,12 +103,14 @@ func (t *ProgressTracker) GetRunningPlanIDs() []uint {
// Scheduler 是核心的、持久化的任务调度器
type Scheduler struct {
logger *logs.Logger
pollingInterval time.Duration
workers int
pendingTaskRepo repository.PendingTaskRepository
progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
logger *logs.Logger
pollingInterval time.Duration
workers int
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
planRepo repository.PlanRepository
progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
@@ -93,17 +119,27 @@ type Scheduler struct {
}
// NewScheduler 创建一个新的调度器实例
func NewScheduler(pendingTaskRepo repository.PendingTaskRepository, taskFactory func(taskType models.TaskType) Task, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler {
func NewScheduler(
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
planRepo repository.PlanRepository,
taskFactory func(taskType models.TaskType) Task,
logger *logs.Logger,
interval time.Duration,
numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
pendingTaskRepo: pendingTaskRepo,
logger: logger,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),
taskFactory: taskFactory,
ctx: ctx,
cancel: cancel,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
planRepo: planRepo,
logger: logger,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),
taskFactory: taskFactory,
ctx: ctx,
cancel: cancel,
}
}
@@ -203,18 +239,174 @@ func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
task := s.taskFactory(claimedLog.Task.Type)
if err := task.ParseParams(s.logger, claimedLog); err != nil {
s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
claimedLog.StartedAt = time.Now()
claimedLog.Status = models.ExecutionStatusCompleted // 先乐观假定任务成功, 后续失败了再改
defer s.updateTaskExecutionLogStatus(claimedLog)
// 执行任务
err := s.runTask(claimedLog)
if err != nil {
claimedLog.Status = models.ExecutionStatusFailed
claimedLog.Output = err.Error()
return
}
if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
task.OnFailure(err)
return
}
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
err = s.createNewAnalysisPlanTask(claimedLog.Task.PlanID)
if err != nil {
s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err)
}
}
}
// runTask 用于执行具体任务
func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中
if claimedLog.Task.Type == models.TaskPlanAnalysis {
// 解析plan
err := s.analysisPlan(claimedLog)
if err != nil {
// TODO 这里要处理一下, 比如再插一个新的触发器回去
s.logger.Errorf("[严重] 计划解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
} else {
// 执行普通任务
task := s.taskFactory(claimedLog.Task.Type)
if err := task.ParseParams(s.logger, claimedLog); err != nil {
s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
task.OnFailure(err)
return err
}
}
return nil
}
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录
planLog := &models.PlanExecutionLog{
PlanID: claimedLog.Task.PlanID,
Status: models.ExecutionStatusStarted,
StartedAt: time.Now(),
}
if err := s.executionLogRepo.CreatePlanExecutionLog(planLog); err != nil {
s.logger.Errorf("[严重] 创建计划执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID)
if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for _, task := range tasks {
taskLogs = append(taskLogs, &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID,
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
})
}
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
if err != nil {
s.logger.Errorf("[严重] 写入执行历史, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks {
pendingTasks = append(pendingTasks, &models.PendingTask{
TaskID: task.ID,
TaskExecutionLogID: pendingTasks[i].ID,
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
})
}
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil {
s.logger.Errorf("[严重] 写入待执行队列, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks))
return nil
}
// updateTaskExecutionLogStatus 修改任务历史中的执行状态
func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error {
claimedLog.EndedAt = time.Now()
if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil {
s.logger.Errorf("[严重] 更新任务执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
return nil
}
// createNewAnalysisPlanTask 创建一个新的Plan解析任务用于下次触发plan执行
func (s *Scheduler) createNewAnalysisPlanTask(planID uint) error {
// 获取计划信息
plan, err := s.planRepo.GetBasicPlanByID(planID)
if err != nil {
s.logger.Errorf("[严重] 获取计划失败, 错误: %v", err)
return err
}
// 获取触发任务
task, err := s.planRepo.FindPlanAnalysisTaskByParamsPlanID(planID)
if err != nil {
s.logger.Errorf("[严重] 获取计划解析任务失败, 错误: %v", err)
return err
}
// 写入执行日志
taskLog := &models.TaskExecutionLog{
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
}
if err := s.executionLogRepo.CreateTaskExecutionLogsInBatch([]*models.TaskExecutionLog{taskLog}); err != nil {
s.logger.Errorf("[严重] 创建任务执行日志失败, 错误: %v", err)
return err
}
// 写入待执行队列
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
s.logger.Errorf("[严重] 执行时间解析失败, 错误: %v", err)
return err
}
pendingTask := &models.PendingTask{
TaskID: task.ID,
ExecuteAt: next,
TaskExecutionLogID: taskLog.ID,
}
err = s.pendingTaskRepo.CreatePendingTasksInBatch([]*models.PendingTask{pendingTask})
if err != nil {
s.logger.Errorf("[严重] 创建待执行任务失败, 错误: %v", err)
return err
}
return nil
}