package repository import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" ) // PendingCollectionListOptions 定义了查询待采集请求时的可选参数 type PendingCollectionListOptions struct { DeviceID *uint Status *models.PendingCollectionStatus StartTime *time.Time // 基于 created_at 字段 EndTime *time.Time // 基于 created_at 字段 OrderBy string // 例如 "created_at asc" } // PendingCollectionRepository 定义了与待采集请求相关的数据库操作接口。 type PendingCollectionRepository interface { // Create 创建一个新的待采集请求。 Create(req *models.PendingCollection) error // FindByCorrelationID 根据关联ID查找一个待采集请求。 FindByCorrelationID(correlationID string) (*models.PendingCollection, error) // UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error // MarkAllPendingAsTimedOut 将所有“待处理”请求更新为“已超时”。 MarkAllPendingAsTimedOut() (int64, error) // List 支持分页和过滤的列表查询 List(opts PendingCollectionListOptions, page, pageSize int) ([]models.PendingCollection, int64, error) } // gormPendingCollectionRepository 是 PendingCollectionRepository 的 GORM 实现。 type gormPendingCollectionRepository struct { db *gorm.DB } // NewGormPendingCollectionRepository 创建一个新的 PendingCollectionRepository GORM 实现实例。 func NewGormPendingCollectionRepository(db *gorm.DB) PendingCollectionRepository { return &gormPendingCollectionRepository{db: db} } // Create 创建一个新的待采集请求。 func (r *gormPendingCollectionRepository) Create(req *models.PendingCollection) error { return r.db.Create(req).Error } // FindByCorrelationID 根据关联ID查找一个待采集请求。 func (r *gormPendingCollectionRepository) FindByCorrelationID(correlationID string) (*models.PendingCollection, error) { var req models.PendingCollection if err := r.db.First(&req, "correlation_id = ?", correlationID).Error; err != nil { return nil, err } return &req, nil } // UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。 func (r *gormPendingCollectionRepository) UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error { return r.db.Model(&models.PendingCollection{}). Where("correlation_id = ?", correlationID). Updates(map[string]interface{}{ "status": models.PendingStatusFulfilled, "fulfilled_at": &fulfilledAt, }).Error } // MarkAllPendingAsTimedOut 将所有状态为 'pending' 的记录更新为 'timed_out'。 // 返回被更新的记录数量和错误。 func (r *gormPendingCollectionRepository) MarkAllPendingAsTimedOut() (int64, error) { result := r.db.Model(&models.PendingCollection{}). Where("status = ?", models.PendingStatusPending). Update("status", models.PendingStatusTimedOut) return result.RowsAffected, result.Error } // List 实现了分页和过滤查询待采集请求的功能 func (r *gormPendingCollectionRepository) List(opts PendingCollectionListOptions, page, pageSize int) ([]models.PendingCollection, int64, error) { if page <= 0 || pageSize <= 0 { return nil, 0, ErrInvalidPagination } var results []models.PendingCollection var total int64 query := r.db.Model(&models.PendingCollection{}) if opts.DeviceID != nil { query = query.Where("device_id = ?", *opts.DeviceID) } if opts.Status != nil { query = query.Where("status = ?", *opts.Status) } if opts.StartTime != nil { query = query.Where("created_at >= ?", *opts.StartTime) } if opts.EndTime != nil { query = query.Where("created_at <= ?", *opts.EndTime) } if err := query.Count(&total).Error; err != nil { return nil, 0, err } orderBy := "created_at DESC" if opts.OrderBy != "" { orderBy = opts.OrderBy } query = query.Order(orderBy) offset := (page - 1) * pageSize err := query.Limit(pageSize).Offset(offset).Find(&results).Error return results, total, err }