Files
pig-farm-controller/internal/infra/repository/pending_task_repository.go
2025-09-23 17:11:31 +08:00

179 lines
6.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"errors"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface {
FindAllPendingTasks() ([]models.PendingTask, error)
FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error)
DeletePendingTasksByIDs(ids []uint) error
CreatePendingTask(task *models.PendingTask) error
CreatePendingTasksInBatch(tasks []*models.PendingTask) error
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error
// ClearAllPendingTasks 清空所有待执行任务
ClearAllPendingTasks() error
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
// RequeueTask 安全地将一个任务重新放回队列。
RequeueTask(originalPendingTask *models.PendingTask) error
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error)
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
DeletePendingTasksByPlanLogID(planLogID uint) error
}
// gormPendingTaskRepository 是使用 GORM 的具体实现。
type gormPendingTaskRepository struct {
db *gorm.DB
}
// NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。
func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
return &gormPendingTaskRepository{db: db}
}
func (r *gormPendingTaskRepository) FindAllPendingTasks() ([]models.PendingTask, error) {
var tasks []models.PendingTask
// 预加载 Task 以便后续访问 Task.PlanID
// 预加载 TaskExecutionLog 以便后续访问 PlanExecutionLogID
err := r.db.Preload("Task").Preload("TaskExecutionLog").Find(&tasks).Error
return tasks, err
}
func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(planID uint) (*models.PendingTask, error) {
var pendingTask models.PendingTask
// 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id
err := r.db.
Joins("JOIN tasks ON tasks.id = pending_tasks.task_id").
Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).
First(&pendingTask).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // 未找到不是错误
}
return &pendingTask, err
}
func (r *gormPendingTaskRepository) DeletePendingTasksByIDs(ids []uint) error {
if len(ids) == 0 {
return nil
}
return r.db.Where("id IN ?", ids).Delete(&models.PendingTask{}).Error
}
func (r *gormPendingTaskRepository) CreatePendingTask(task *models.PendingTask) error {
return r.db.Create(task).Error
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
if len(tasks) == 0 {
return nil
}
return r.db.Create(&tasks).Error
}
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(id uint, executeAt time.Time) error {
return r.db.Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
}
// ClearAllPendingTasks 清空所有待执行任务
func (r *gormPendingTaskRepository) ClearAllPendingTasks() error {
return r.db.Where("1 = 1").Delete(&models.PendingTask{}).Error
}
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
var log models.TaskExecutionLog
var pendingTask models.PendingTask
err := r.db.Transaction(func(tx *gorm.DB) error {
query := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("execute_at <= ?", time.Now()).
Order("execute_at ASC")
if len(excludePlanIDs) > 0 {
query = query.Where("plan_execution_log_id NOT IN ?", excludePlanIDs)
}
if err := query.First(&pendingTask).Error; err != nil {
return err
}
if err := tx.Unscoped().Delete(&pendingTask).Error; err != nil {
return err
}
updates := map[string]interface{}{
"status": models.ExecutionStatusStarted,
"started_at": time.Now(),
}
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", pendingTask.TaskExecutionLogID).Updates(updates).Error; err != nil {
return err
}
if err := tx.Preload("Task").First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {
return err
}
return nil
})
if err != nil {
return nil, nil, err
}
return &log, &pendingTask, nil
}
// RequeueTask 安全地将一个任务重新放回队列。
// 它通过将原始 PendingTask 的 ID 重置为 0并重新创建它来实现。
func (r *gormPendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error {
return r.db.Transaction(func(tx *gorm.DB) error {
// 1. 将日志状态恢复为 waiting
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {
return err
}
// 2. 关键:将传入的 PendingTask 的 ID 重置为 0。
// 这会告诉 GORM这是一个需要创建INSERT的新记录而不是更新。
originalPendingTask.ID = 0
// 3. 重新创建待办任务。GORM 会忽略掉已被重置的 ID并让数据库生成一个新的主键。
return tx.Create(originalPendingTask).Error
})
}
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(taskLogIDs []uint) ([]models.PendingTask, error) {
if len(taskLogIDs) == 0 {
return []models.PendingTask{}, nil
}
var pendingTasks []models.PendingTask
err := r.db.Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
return pendingTasks, err
}
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(planLogID uint) error {
// 使用子查询找到所有与 planLogID 相关的 task_execution_log_id
subQuery := r.db.Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID)
// 使用子查询的结果来删除待执行任务
return r.db.Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error
}