120 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package repository
 | 
						|
 | 
						|
import (
 | 
						|
	"time"
 | 
						|
 | 
						|
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						|
	"gorm.io/gorm"
 | 
						|
)
 | 
						|
 | 
						|
// PendingCollectionListOptions 定义了查询待采集请求时的可选参数
 | 
						|
type PendingCollectionListOptions struct {
 | 
						|
	DeviceID  *uint
 | 
						|
	Status    *models.PendingCollectionStatus
 | 
						|
	StartTime *time.Time // 基于 created_at 字段
 | 
						|
	EndTime   *time.Time // 基于 created_at 字段
 | 
						|
	OrderBy   string     // 例如 "created_at asc"
 | 
						|
}
 | 
						|
 | 
						|
// PendingCollectionRepository 定义了与待采集请求相关的数据库操作接口。
 | 
						|
type PendingCollectionRepository interface {
 | 
						|
	// Create 创建一个新的待采集请求。
 | 
						|
	Create(req *models.PendingCollection) error
 | 
						|
 | 
						|
	// FindByCorrelationID 根据关联ID查找一个待采集请求。
 | 
						|
	FindByCorrelationID(correlationID string) (*models.PendingCollection, error)
 | 
						|
 | 
						|
	// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。
 | 
						|
	UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error
 | 
						|
 | 
						|
	// MarkAllPendingAsTimedOut 将所有“待处理”请求更新为“已超时”。
 | 
						|
	MarkAllPendingAsTimedOut() (int64, error)
 | 
						|
 | 
						|
	// List 支持分页和过滤的列表查询
 | 
						|
	List(opts PendingCollectionListOptions, page, pageSize int) ([]models.PendingCollection, int64, error)
 | 
						|
}
 | 
						|
 | 
						|
// gormPendingCollectionRepository 是 PendingCollectionRepository 的 GORM 实现。
 | 
						|
type gormPendingCollectionRepository struct {
 | 
						|
	db *gorm.DB
 | 
						|
}
 | 
						|
 | 
						|
// NewGormPendingCollectionRepository 创建一个新的 PendingCollectionRepository GORM 实现实例。
 | 
						|
func NewGormPendingCollectionRepository(db *gorm.DB) PendingCollectionRepository {
 | 
						|
	return &gormPendingCollectionRepository{db: db}
 | 
						|
}
 | 
						|
 | 
						|
// Create 创建一个新的待采集请求。
 | 
						|
func (r *gormPendingCollectionRepository) Create(req *models.PendingCollection) error {
 | 
						|
	return r.db.Create(req).Error
 | 
						|
}
 | 
						|
 | 
						|
// FindByCorrelationID 根据关联ID查找一个待采集请求。
 | 
						|
func (r *gormPendingCollectionRepository) FindByCorrelationID(correlationID string) (*models.PendingCollection, error) {
 | 
						|
	var req models.PendingCollection
 | 
						|
	if err := r.db.First(&req, "correlation_id = ?", correlationID).Error; err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &req, nil
 | 
						|
}
 | 
						|
 | 
						|
// UpdateStatusToFulfilled 将指定关联ID的请求状态更新为“已完成”。
 | 
						|
func (r *gormPendingCollectionRepository) UpdateStatusToFulfilled(correlationID string, fulfilledAt time.Time) error {
 | 
						|
	return r.db.Model(&models.PendingCollection{}).
 | 
						|
		Where("correlation_id = ?", correlationID).
 | 
						|
		Updates(map[string]interface{}{
 | 
						|
			"status":       models.PendingStatusFulfilled,
 | 
						|
			"fulfilled_at": &fulfilledAt,
 | 
						|
		}).Error
 | 
						|
}
 | 
						|
 | 
						|
// MarkAllPendingAsTimedOut 将所有状态为 'pending' 的记录更新为 'timed_out'。
 | 
						|
// 返回被更新的记录数量和错误。
 | 
						|
func (r *gormPendingCollectionRepository) MarkAllPendingAsTimedOut() (int64, error) {
 | 
						|
	result := r.db.Model(&models.PendingCollection{}).
 | 
						|
		Where("status = ?", models.PendingStatusPending).
 | 
						|
		Update("status", models.PendingStatusTimedOut)
 | 
						|
 | 
						|
	return result.RowsAffected, result.Error
 | 
						|
}
 | 
						|
 | 
						|
// List 实现了分页和过滤查询待采集请求的功能
 | 
						|
func (r *gormPendingCollectionRepository) List(opts PendingCollectionListOptions, page, pageSize int) ([]models.PendingCollection, int64, error) {
 | 
						|
	if page <= 0 || pageSize <= 0 {
 | 
						|
		return nil, 0, ErrInvalidPagination
 | 
						|
	}
 | 
						|
 | 
						|
	var results []models.PendingCollection
 | 
						|
	var total int64
 | 
						|
 | 
						|
	query := r.db.Model(&models.PendingCollection{})
 | 
						|
 | 
						|
	if opts.DeviceID != nil {
 | 
						|
		query = query.Where("device_id = ?", *opts.DeviceID)
 | 
						|
	}
 | 
						|
	if opts.Status != nil {
 | 
						|
		query = query.Where("status = ?", *opts.Status)
 | 
						|
	}
 | 
						|
	if opts.StartTime != nil {
 | 
						|
		query = query.Where("created_at >= ?", *opts.StartTime)
 | 
						|
	}
 | 
						|
	if opts.EndTime != nil {
 | 
						|
		query = query.Where("created_at <= ?", *opts.EndTime)
 | 
						|
	}
 | 
						|
 | 
						|
	if err := query.Count(&total).Error; err != nil {
 | 
						|
		return nil, 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	orderBy := "created_at DESC"
 | 
						|
	if opts.OrderBy != "" {
 | 
						|
		orderBy = opts.OrderBy
 | 
						|
	}
 | 
						|
	query = query.Order(orderBy)
 | 
						|
 | 
						|
	offset := (page - 1) * pageSize
 | 
						|
	err := query.Limit(pageSize).Offset(offset).Find(&results).Error
 | 
						|
 | 
						|
	return results, total, err
 | 
						|
}
 |