1. 提取触发器逻辑

2. 创建/更新计划时自动生成对应触发器
This commit is contained in:
2025-09-17 22:43:35 +08:00
parent ceba0c280e
commit f7a5e4737d
6 changed files with 217 additions and 126 deletions

View File

@@ -0,0 +1,370 @@
package task
import (
"context"
"errors"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"github.com/panjf2000/ants/v2"
"gorm.io/gorm"
)
// ProgressTracker 在内存中跟踪计划的执行状态,包括进度和执行锁
type ProgressTracker struct {
mu sync.Mutex
cond *sync.Cond // 用于实现阻塞锁
totalTasks map[uint]int // key: planExecutionLogID, value: total tasks
completedTasks map[uint]int // key: planExecutionLogID, value: completed tasks
runningPlans map[uint]bool // key: planExecutionLogID, value: true (用作内存锁)
}
// NewProgressTracker 创建一个新的进度跟踪器
func NewProgressTracker() *ProgressTracker {
t := &ProgressTracker{
totalTasks: make(map[uint]int),
completedTasks: make(map[uint]int),
runningPlans: make(map[uint]bool),
}
t.cond = sync.NewCond(&t.mu)
return t
}
// AddNewPlan 添加一个新的计划,并初始化进度跟踪器
func (t *ProgressTracker) AddNewPlan(planLogID uint, totalTasks int) {
t.mu.Lock()
t.totalTasks[planLogID] = totalTasks
t.completedTasks[planLogID] = 0
t.mu.Unlock()
}
// CompletedTask 通知计数器一个任务被完成
func (t *ProgressTracker) CompletedTask(planLogID uint) {
t.mu.Lock()
t.completedTasks[planLogID]++
t.mu.Unlock()
}
// IsPlanOver 检查计划是否完成
func (t *ProgressTracker) IsPlanOver(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.completedTasks[planLogID] >= t.totalTasks[planLogID]
}
// TryLock (非阻塞) 尝试锁定一个计划。如果计划未被锁定,则锁定并返回 true。
func (t *ProgressTracker) TryLock(planLogID uint) bool {
t.mu.Lock()
defer t.mu.Unlock()
if t.runningPlans[planLogID] {
return false // 已被锁定
}
t.runningPlans[planLogID] = true
return true
}
// Lock (阻塞) 获取一个计划的执行锁。如果锁已被占用,则会一直等待直到锁被释放。
func (t *ProgressTracker) Lock(planLogID uint) {
t.mu.Lock()
// 当计划正在运行时,调用 t.cond.Wait() 会原子地解锁 mu 并挂起当前协程。
// 当被唤醒时,它会重新锁定 mu 并再次检查循环条件。
for t.runningPlans[planLogID] {
t.cond.Wait()
}
// 获取到锁
t.runningPlans[planLogID] = true
t.mu.Unlock()
}
// Unlock 解锁一个计划,并唤醒所有正在等待此锁的协程。
func (t *ProgressTracker) Unlock(planLogID uint) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.runningPlans, planLogID)
// 唤醒所有在此条件上等待的协程
t.cond.Broadcast()
}
// GetRunningPlanIDs 获取当前所有正在执行的计划ID列表
func (t *ProgressTracker) GetRunningPlanIDs() []uint {
t.mu.Lock()
defer t.mu.Unlock()
ids := make([]uint, 0, len(t.runningPlans))
for id := range t.runningPlans {
ids = append(ids, id)
}
return ids
}
// Scheduler 是核心的、持久化的任务调度器
type Scheduler struct {
logger *logs.Logger
pollingInterval time.Duration
workers int
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
planRepo repository.PlanRepository
analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager
progressTracker *ProgressTracker
taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewScheduler 创建一个新的调度器实例
func NewScheduler(
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
planRepo repository.PlanRepository,
analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager
taskFactory func(taskType models.TaskType) Task,
logger *logs.Logger,
interval time.Duration,
numWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
planRepo: planRepo,
analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager
logger: logger,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),
taskFactory: taskFactory,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动调度器,包括初始化协程池和启动主轮询循环
func (s *Scheduler) Start() {
s.logger.Warnf("任务调度器正在启动,工作协程数: %d...", s.workers)
pool, err := ants.NewPool(s.workers, ants.WithPanicHandler(func(err interface{}) {
s.logger.Errorf("[严重] 任务执行时发生 panic: %v", err)
}))
if err != nil {
panic("初始化协程池失败: " + err.Error())
}
s.pool = pool
s.wg.Add(1)
go s.run()
s.logger.Warnf("任务调度器已成功启动")
}
// Stop 优雅地停止调度器
func (s *Scheduler) Stop() {
s.logger.Warnf("正在停止任务调度器...")
s.cancel() // 1. 发出取消信号,停止主循环
s.wg.Wait() // 2. 等待主循环完成
s.pool.Release() // 3. 释放 ants 池 (等待所有已提交的任务执行完毕)
s.logger.Warnf("任务调度器已安全停止")
}
// run 是主轮询循环,负责从数据库认领任务并提交到协程池
func (s *Scheduler) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.pollingInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
go s.claimAndSubmit()
}
}
}
// claimAndSubmit 实现了最终的“认领-锁定-执行 或 等待-放回”的健壮逻辑
func (s *Scheduler) claimAndSubmit() {
runningPlanIDs := s.progressTracker.GetRunningPlanIDs()
claimedLog, pendingTask, err := s.pendingTaskRepo.ClaimNextAvailableTask(runningPlanIDs)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
s.logger.Errorf("认领任务时发生错误: %v", err)
}
// gorm.ErrRecordNotFound 说明没任务要执行
return
}
// 尝试获取内存执行锁
if s.progressTracker.TryLock(claimedLog.PlanExecutionLogID) {
// 成功获取锁,正常派发任务
err = s.pool.Submit(func() {
defer s.progressTracker.Unlock(claimedLog.PlanExecutionLogID)
s.processTask(claimedLog)
})
if err != nil {
s.logger.Errorf("向协程池提交任务失败: %v", err)
// 提交失败,必须释放刚刚获取的锁
s.progressTracker.Unlock(claimedLog.PlanExecutionLogID)
// 同样需要将任务安全放回
s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask)
}
} else {
// 获取锁失败,说明有“兄弟”任务正在执行。执行“锁定并安全放回”逻辑。
s.handleRequeue(claimedLog.PlanExecutionLogID, pendingTask)
}
}
// handleRequeue 同步地、安全地将一个无法立即执行的任务放回队列。
func (s *Scheduler) handleRequeue(planExecutionLogID uint, taskToRequeue *models.PendingTask) {
s.logger.Warnf("计划 %d 正在执行,任务 %d (TaskID: %d) 将等待并重新入队...", planExecutionLogID, taskToRequeue.ID, taskToRequeue.TaskID)
// 1. 阻塞式地等待,直到可以获取到该计划的锁。
s.progressTracker.Lock(planExecutionLogID)
defer s.progressTracker.Unlock(planExecutionLogID)
// 2. 在持有锁的情况下,将任务安全地放回队列。
if err := s.pendingTaskRepo.RequeueTask(taskToRequeue); err != nil {
s.logger.Errorf("[严重] 任务重新入队失败, 原始PendingTaskID: %d, 错误: %v", taskToRequeue.ID, err)
return
}
s.logger.Warnf("任务 (原始ID: %d) 已成功重新入队,并已释放计划 %d 的锁。", taskToRequeue.ID, planExecutionLogID)
}
// processTask 处理单个任务的逻辑 (当前为占位符)
func (s *Scheduler) processTask(claimedLog *models.TaskExecutionLog) {
s.logger.Warnf("开始处理任务, 日志ID: %d, 任务ID: %d, 任务名称: %s",
claimedLog.ID, claimedLog.TaskID, claimedLog.Task.Name)
claimedLog.StartedAt = time.Now()
claimedLog.Status = models.ExecutionStatusCompleted // 先乐观假定任务成功, 后续失败了再改
defer s.updateTaskExecutionLogStatus(claimedLog)
// 执行任务
err := s.runTask(claimedLog)
if err != nil {
claimedLog.Status = models.ExecutionStatusFailed
claimedLog.Output = err.Error()
return
}
s.logger.Warnf("完成任务, 日志ID: %d", claimedLog.ID)
// 任务计数器校验, Plan的任务全部执行完成后需要插入一个新的PlanAnalysisTask用于触发下一次Plan的执行
if s.progressTracker.IsPlanOver(claimedLog.PlanExecutionLogID) {
// 调用共享的 Manager 来处理触发器更新逻辑
err = s.analysisPlanTaskManager.CreateOrUpdateTrigger(s.ctx, claimedLog.Task.PlanID)
if err != nil {
s.logger.Errorf("[严重] 创建计划分析任务失败, 当前Plan(%v)将无法进行下次触发, 错误: %v", claimedLog.Task.PlanID, err)
}
}
}
// runTask 用于执行具体任务
func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
// 这是个特殊任务, 用于解析Plan并将解析出的任务队列添加到待执行队列中
if claimedLog.Task.Type == models.TaskPlanAnalysis {
// 解析plan
err := s.analysisPlan(claimedLog)
if err != nil {
// TODO 这里要处理一下, 比如再插一个新的触发器回去
s.logger.Errorf("[严重] 计划解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
} else {
// 执行普通任务
task := s.taskFactory(claimedLog.Task.Type)
if err := task.ParseParams(s.logger, claimedLog); err != nil {
s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
task.OnFailure(err)
return err
}
}
return nil
}
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
// 创建Plan执行记录
planLog := &models.PlanExecutionLog{
PlanID: claimedLog.Task.PlanID,
Status: models.ExecutionStatusStarted,
StartedAt: time.Now(),
}
if err := s.executionLogRepo.CreatePlanExecutionLog(planLog); err != nil {
s.logger.Errorf("[严重] 创建计划执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 解析出Task列表
tasks, err := s.planRepo.FlattenPlanTasks(claimedLog.Task.PlanID)
if err != nil {
s.logger.Errorf("[严重] 解析计划失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入执行历史
taskLogs := make([]*models.TaskExecutionLog, len(tasks))
for _, task := range tasks {
taskLogs = append(taskLogs, &models.TaskExecutionLog{
PlanExecutionLogID: planLog.ID,
TaskID: task.ID,
Status: models.ExecutionStatusWaiting,
})
}
err = s.executionLogRepo.CreateTaskExecutionLogsInBatch(taskLogs)
if err != nil {
s.logger.Errorf("[严重] 写入执行历史, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 写入待执行队列
pendingTasks := make([]*models.PendingTask, len(tasks))
for i, task := range tasks {
pendingTasks = append(pendingTasks, &models.PendingTask{
TaskID: task.ID,
TaskExecutionLogID: pendingTasks[i].ID,
// 待执行队列是通过任务触发时间排序的, 且只要在调度器获取的时间点之前的都可以被触发
ExecuteAt: time.Now().Add(time.Duration(i) * time.Second),
})
}
err = s.pendingTaskRepo.CreatePendingTasksInBatch(pendingTasks)
if err != nil {
s.logger.Errorf("[严重] 写入待执行队列, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
// 将Task列表加入待执行队列中
s.progressTracker.AddNewPlan(claimedLog.PlanExecutionLogID, len(tasks))
return nil
}
// updateTaskExecutionLogStatus 修改任务历史中的执行状态
func (s *Scheduler) updateTaskExecutionLogStatus(claimedLog *models.TaskExecutionLog) error {
claimedLog.EndedAt = time.Now()
if err := s.executionLogRepo.UpdateTaskExecutionLog(claimedLog); err != nil {
s.logger.Errorf("[严重] 更新任务执行日志失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
return nil
}