diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 89c48d7..6d2dbf9 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -53,8 +53,6 @@ func NewAPI(cfg config.ServerConfig, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, - sensorDataRepository repository.SensorDataRepository, - executionLogRepository repository.ExecutionLogRepository, tokenService token.TokenService, listenHandler transport.ListenHandler, analysisTaskManager *task.AnalysisPlanTaskManager) *API { diff --git a/internal/app/service/task/release_feed_weight_task.go b/internal/app/service/task/release_feed_weight_task.go index 7cf6df1..7d13685 100644 --- a/internal/app/service/task/release_feed_weight_task.go +++ b/internal/app/service/task/release_feed_weight_task.go @@ -9,7 +9,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" ) // ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 @@ -21,17 +20,14 @@ type ReleaseFeedWeightTaskParams struct { // ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 type ReleaseFeedWeightTask struct { - deviceRepo repository.DeviceRepository - sensorDataRepo repository.SensorDataRepository - deviceCommandLogRepo repository.DeviceCommandLogRepository - pendingCollectionRepo repository.PendingCollectionRepository - claimedLog *models.TaskExecutionLog + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository + claimedLog *models.TaskExecutionLog - feedPortDevice *models.Device // 下料口基本信息 - releaseWeight float64 // 需要释放的重量 - mixingTankDeviceID uint // 搅拌罐称重传感器ID + feedPortDevice *models.Device + releaseWeight float64 + mixingTankDeviceID uint - comm transport.Communicator feedPort device.Service logger *logs.Logger @@ -40,21 +36,17 @@ type ReleaseFeedWeightTask struct { // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 func NewReleaseFeedWeightTask( claimedLog *models.TaskExecutionLog, - deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, - deviceCommandLogRepo repository.DeviceCommandLogRepository, - pendingCollectionRepo repository.PendingCollectionRepository, - comm transport.Communicator, + deviceRepo repository.DeviceRepository, + deviceService device.Service, logger *logs.Logger, ) Task { return &ReleaseFeedWeightTask{ - claimedLog: claimedLog, - deviceRepo: deviceRepo, - sensorDataRepo: sensorDataRepo, - deviceCommandLogRepo: deviceCommandLogRepo, - pendingCollectionRepo: pendingCollectionRepo, - comm: comm, - logger: logger, + claimedLog: claimedLog, + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, + feedPort: deviceService, // 直接注入 + logger: logger, } } @@ -152,7 +144,6 @@ func (r *ReleaseFeedWeightTask) parseParameters() error { r.releaseWeight = params.ReleaseWeight r.mixingTankDeviceID = params.MixingTankDeviceID - r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.deviceCommandLogRepo, r.pendingCollectionRepo, r.logger, r.comm) r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) if err != nil { r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 21a5cce..6c42be6 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -6,10 +6,10 @@ import ( "sync" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -83,12 +83,10 @@ type Scheduler struct { executionLogRepo repository.ExecutionLogRepository deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository - deviceCommandLogRepo repository.DeviceCommandLogRepository - pendingCollectionRepo repository.PendingCollectionRepository planRepo repository.PlanRepository - comm transport.Communicator analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker + deviceService device.Service pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -102,11 +100,9 @@ func NewScheduler( deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, - comm transport.Communicator, analysisPlanTaskManager *AnalysisPlanTaskManager, logger *logs.Logger, - deviceCommandLogRepo repository.DeviceCommandLogRepository, - pendingCollectionRepo repository.PendingCollectionRepository, + deviceService device.Service, interval time.Duration, numWorkers int, ) *Scheduler { @@ -116,11 +112,9 @@ func NewScheduler( deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, planRepo: planRepo, - comm: comm, analysisPlanTaskManager: analysisPlanTaskManager, logger: logger, - deviceCommandLogRepo: deviceCommandLogRepo, - pendingCollectionRepo: pendingCollectionRepo, + deviceService: deviceService, pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), @@ -296,7 +290,7 @@ func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { case models.TaskTypeWaiting: return NewDelayTask(s.logger, claimedLog) case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.deviceCommandLogRepo, s.pendingCollectionRepo, s.comm, s.logger) + return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 diff --git a/internal/core/application.go b/internal/core/application.go index cc6e808..216c205 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -8,6 +8,7 @@ import ( "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/token" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport" @@ -89,6 +90,15 @@ func NewApplication(configPath string) (*Application, error) { // 初始化设备通信器 (纯粹的通信客户端) comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger) + // 初始化通用设备服务 + generalDeviceService := device.NewGeneralDeviceService( + deviceRepo, + deviceCommandLogRepo, + pendingCollectionRepo, + logger, + comm, + ) + // 初始化任务执行器 executor := task.NewScheduler( pendingTaskRepo, @@ -96,26 +106,32 @@ func NewApplication(configPath string) (*Application, error) { deviceRepo, sensorDataRepo, planRepo, - comm, analysisPlanTaskManager, logger, - deviceCommandLogRepo, - pendingCollectionRepo, + generalDeviceService, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers, ) // 初始化 API 服务器 - apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) + apiServer := api.NewAPI( + cfg.Server, + logger, + userRepo, + deviceRepo, + planRepo, + tokenService, + listenHandler, + analysisPlanTaskManager, + ) // 组装 Application 对象 app := &Application{ - Config: cfg, - Logger: logger, - Storage: storage, - Executor: executor, - API: apiServer, - // 填充新增的字段 + Config: cfg, + Logger: logger, + Storage: storage, + Executor: executor, + API: apiServer, planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo,