Files
pig-farm-controller/internal/infra/repository/pending_task_repository.go

97 lines
3.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface {
CreatePendingTasksInBatch(tasks []*models.PendingTask) error
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error)
// RequeueTask 安全地将一个任务重新放回队列。
RequeueTask(originalPendingTask *models.PendingTask) error
}
// pendingTaskRepository 是使用 GORM 的具体实现。
type pendingTaskRepository struct {
db *gorm.DB
}
// NewPendingTaskRepository 创建一个新的待执行任务队列仓库。
func NewPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
return &pendingTaskRepository{db: db}
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *pendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
return r.db.Create(&tasks).Error
}
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
var log models.TaskExecutionLog
var pendingTask models.PendingTask
err := r.db.Transaction(func(tx *gorm.DB) error {
query := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("execute_at <= ?", time.Now()).
Order("execute_at ASC")
if len(excludePlanIDs) > 0 {
query = query.Where("plan_execution_log_id NOT IN ?", excludePlanIDs)
}
if err := query.First(&pendingTask).Error; err != nil {
return err
}
if err := tx.Unscoped().Delete(&pendingTask).Error; err != nil {
return err
}
updates := map[string]interface{}{
"status": models.ExecutionStatusStarted,
"started_at": time.Now(),
}
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", pendingTask.TaskExecutionLogID).Updates(updates).Error; err != nil {
return err
}
if err := tx.Preload("Task").First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {
return err
}
return nil
})
if err != nil {
return nil, nil, err
}
return &log, &pendingTask, nil
}
// RequeueTask 安全地将一个任务重新放回队列。
// 它通过将原始 PendingTask 的 ID 重置为 0并重新创建它来实现。
func (r *pendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error {
return r.db.Transaction(func(tx *gorm.DB) error {
// 1. 将日志状态恢复为 waiting
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {
return err
}
// 2. 关键:将传入的 PendingTask 的 ID 重置为 0。
// 这会告诉 GORM这是一个需要创建INSERT的新记录而不是更新。
originalPendingTask.ID = 0
// 3. 重新创建待办任务。GORM 会忽略掉已被重置的 ID并让数据库生成一个新的主键。
return tx.Create(originalPendingTask).Error
})
}