diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index 25f7d7e..0f8cc3d 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -114,15 +114,28 @@ func (r *gormPendingTaskRepository) ClaimNextAvailableTask(ctx context.Context, var pendingTask models.PendingTask err := r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error { - query := tx.WithContext(repoCtx).Clauses(clause.Locking{Strength: "UPDATE"}). - Where("execute_at <= ?", time.Now()). - Order("execute_at ASC") + // 从 pending_tasks 表开始构建查询 + query := tx.WithContext(repoCtx).Model(&models.PendingTask{}) + // JOIN task_execution_logs 表 + query = query.Joins("JOIN task_execution_logs ON task_execution_logs.id = pending_tasks.task_execution_log_id") + + // 添加行锁 + query = query.Clauses(clause.Locking{Strength: "UPDATE"}) + + // 添加基础查询条件,并明确指定表名 + query = query.Where("pending_tasks.execute_at <= ?", time.Now()) + + // 如果需要排除,则基于 JOIN 后的表进行过滤 if len(excludePlanIDs) > 0 { - query = query.Where("plan_execution_log_id NOT IN ?", excludePlanIDs) + query = query.Where("task_execution_logs.plan_execution_log_id NOT IN ?", excludePlanIDs) } - if err := query.First(&pendingTask).Error; err != nil { + // 按执行时间排序,并明确指定表名 + query = query.Order("pending_tasks.execute_at ASC") + + // 明确 SELECT pending_tasks 的所有列,并执行查询 + if err := query.Select("pending_tasks.*").First(&pendingTask).Error; err != nil { return err } @@ -138,7 +151,7 @@ func (r *gormPendingTaskRepository) ClaimNextAvailableTask(ctx context.Context, return err } - // 关键修改:在 Preload("Task") 时,使用 Unscoped() 来忽略 Task 的软删除状态 + // 在 Preload("Task") 时,使用 Unscoped() 来忽略 Task 的软删除状态 if err := tx.WithContext(repoCtx).Preload("Task", func(db *gorm.DB) *gorm.DB { return db.Unscoped() }).First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {