Files
pig-farm-controller/internal/infra/repository/pending_task_repository.go
2025-11-16 21:49:28 +08:00

213 lines
9.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface {
FindAllPendingTasks(ctx context.Context) ([]models.PendingTask, error)
FindPendingTriggerByPlanID(ctx context.Context, planID uint32) (*models.PendingTask, error)
DeletePendingTasksByIDs(ctx context.Context, ids []uint32) error
CreatePendingTask(ctx context.Context, task *models.PendingTask) error
CreatePendingTasksInBatch(ctx context.Context, tasks []*models.PendingTask) error
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
UpdatePendingTaskExecuteAt(ctx context.Context, id uint32, executeAt time.Time) error
// ClearAllPendingTasks 清空所有待执行任务
ClearAllPendingTasks(ctx context.Context) error
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
ClaimNextAvailableTask(ctx context.Context, excludePlanIDs []uint32) (*models.TaskExecutionLog, *models.PendingTask, error)
// RequeueTask 安全地将一个任务重新放回队列。
RequeueTask(ctx context.Context, originalPendingTask *models.PendingTask) error
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
FindPendingTasksByTaskLogIDs(ctx context.Context, taskLogIDs []uint32) ([]models.PendingTask, error)
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
DeletePendingTasksByPlanLogID(ctx context.Context, planLogID uint32) error
}
// gormPendingTaskRepository 是使用 GORM 的具体实现。
type gormPendingTaskRepository struct {
ctx context.Context
db *gorm.DB
mutex sync.Mutex
}
// NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。
func NewGormPendingTaskRepository(ctx context.Context, db *gorm.DB) PendingTaskRepository {
return &gormPendingTaskRepository{ctx: ctx, db: db, mutex: sync.Mutex{}}
}
func (r *gormPendingTaskRepository) FindAllPendingTasks(ctx context.Context) ([]models.PendingTask, error) {
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindAllPendingTasks")
var tasks []models.PendingTask
// 预加载 Task 以便后续访问 Task.PlanID
// 预加载 TaskExecutionLog 以便后续访问 PlanExecutionLogID
err := r.db.WithContext(repoCtx).Preload("Task").Preload("TaskExecutionLog").Find(&tasks).Error
return tasks, err
}
func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(ctx context.Context, planID uint32) (*models.PendingTask, error) {
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindPendingTriggerByPlanID")
var pendingTask models.PendingTask
// 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id
err := r.db.WithContext(repoCtx).
Joins("JOIN tasks ON tasks.id = pending_tasks.task_id").
Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).
First(&pendingTask).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误
}
return &pendingTask, err
}
func (r *gormPendingTaskRepository) DeletePendingTasksByIDs(ctx context.Context, ids []uint32) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "DeletePendingTasksByIDs")
if len(ids) == 0 {
return nil
}
return r.db.WithContext(repoCtx).Where("id IN ?", ids).Delete(&models.PendingTask{}).Error
}
func (r *gormPendingTaskRepository) CreatePendingTask(ctx context.Context, task *models.PendingTask) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "CreatePendingTask")
return r.db.WithContext(repoCtx).Create(task).Error
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(ctx context.Context, tasks []*models.PendingTask) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "CreatePendingTasksInBatch")
if len(tasks) == 0 {
return nil
}
return r.db.WithContext(repoCtx).Create(&tasks).Error
}
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(ctx context.Context, id uint32, executeAt time.Time) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "UpdatePendingTaskExecuteAt")
return r.db.WithContext(repoCtx).Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
}
// ClearAllPendingTasks 清空所有待执行任务
func (r *gormPendingTaskRepository) ClearAllPendingTasks(ctx context.Context) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "ClearAllPendingTasks")
return r.db.WithContext(repoCtx).Where("1 = 1").Delete(&models.PendingTask{}).Error
}
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(ctx context.Context, excludePlanIDs []uint32) (*models.TaskExecutionLog, *models.PendingTask, error) {
repoCtx := logs.AddFuncName(ctx, r.ctx, "ClaimNextAvailableTask")
var log models.TaskExecutionLog
var pendingTask models.PendingTask
err := r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error {
// TODO task_execution_logs 开启了压缩, 所以无法使用行锁控制资源, 暂时使用程序锁控制并发问题, 这是基于目前只会启动一个实例的前提
r.mutex.Lock()
defer r.mutex.Unlock()
// 从 pending_tasks 表开始构建查询
query := tx.WithContext(repoCtx).Model(&models.PendingTask{})
// JOIN task_execution_logs 表
query = query.Joins("JOIN task_execution_logs ON task_execution_logs.id = pending_tasks.task_execution_log_id")
// 添加基础查询条件,并明确指定表名
query = query.Where("pending_tasks.execute_at <= ?", time.Now())
// 如果需要排除,则基于 JOIN 后的表进行过滤
if len(excludePlanIDs) > 0 {
query = query.Where("task_execution_logs.plan_execution_log_id NOT IN ?", excludePlanIDs)
}
// 按执行时间排序,并明确指定表名
query = query.Order("pending_tasks.execute_at ASC")
// 明确 SELECT pending_tasks 的所有列,并执行查询
if err := query.Select("pending_tasks.*").First(&pendingTask).Error; err != nil {
return err
}
if err := tx.WithContext(repoCtx).Unscoped().Delete(&pendingTask).Error; err != nil {
return err
}
updates := map[string]interface{}{
"status": models.ExecutionStatusStarted,
"started_at": time.Now(),
}
if err := tx.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Where("id = ?", pendingTask.TaskExecutionLogID).Updates(updates).Error; err != nil {
return err
}
// 在 Preload("Tasks") 时,使用 Unscoped() 来忽略 Task 的软删除状态
if err := tx.WithContext(repoCtx).Preload("Tasks", func(db *gorm.DB) *gorm.DB {
return db.Unscoped()
}).First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {
return err
}
return nil
})
if err != nil {
return nil, nil, err
}
return &log, &pendingTask, nil
}
// RequeueTask 安全地将一个任务重新放回队列。
// 它通过将原始 PendingTask 的 ID 重置为 0并重新创建它来实现。
func (r *gormPendingTaskRepository) RequeueTask(ctx context.Context, originalPendingTask *models.PendingTask) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "RequeueTask")
return r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error {
// 1. 将日志状态恢复为 waiting
if err := tx.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {
return err
}
// 2. 关键:将传入的 PendingTask 的 ID 重置为 0。
// 这会告诉 GORM这是一个需要创建INSERT的新记录而不是更新。
originalPendingTask.ID = 0
// 3. 重新创建待办任务。GORM 会忽略掉已被重置的 ID并让数据库生成一个新的主键。
return tx.WithContext(repoCtx).Create(originalPendingTask).Error
})
}
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(ctx context.Context, taskLogIDs []uint32) ([]models.PendingTask, error) {
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindPendingTasksByTaskLogIDs")
if len(taskLogIDs) == 0 {
return []models.PendingTask{}, nil
}
var pendingTasks []models.PendingTask
err := r.db.WithContext(repoCtx).Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
return pendingTasks, err
}
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(ctx context.Context, planLogID uint32) error {
repoCtx := logs.AddFuncName(ctx, r.ctx, "DeletePendingTasksByPlanLogID")
// 使用子查询找到所有与 planLogID 相关的 task_execution_log_id
subQuery := r.db.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID)
// 使用子查询的结果来删除待执行任务
return r.db.WithContext(repoCtx).Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error
}