From 0d6d1db290ae5f4e4733f99b5d0ab34297e7bda1 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Thu, 25 Sep 2025 09:44:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E4=B9=89ReleaseFeedWeightTask?= =?UTF-8?q?=E5=B9=B6=E6=B3=A8=E5=85=A5=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 1 + internal/app/service/task/delay_task.go | 21 +++++------ .../service/task/release_feed_weight_task.go | 36 +++++++++++++++++++ internal/app/service/task/scheduler.go | 31 ++++++++++------ internal/app/service/task/task.go | 4 --- internal/core/application.go | 6 +++- internal/infra/config/config.go | 16 +++++++++ internal/infra/models/plan.go | 5 +-- internal/infra/transport/lora/chirp_stack.go | 24 +++---------- 9 files changed, 95 insertions(+), 49 deletions(-) create mode 100644 internal/app/service/task/release_feed_weight_task.go diff --git a/config.yml b/config.yml index 2b428db..af5693c 100644 --- a/config.yml +++ b/config.yml @@ -48,6 +48,7 @@ heartbeat: chirp_stack: api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址 api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer + fport: 1 api_timeout: 10 # ChirpStack API请求超时时间(秒) # 任务调度器配置 diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 6e57de3..eae2e00 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -20,22 +20,15 @@ type DelayTask struct { logger *logs.Logger } -func NewDelayTask() Task { - return &DelayTask{} +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { + return &DelayTask{ + executionTask: executionTask, + logger: logger, + } } // Execute 执行延迟任务,等待指定的时间 func (d *DelayTask) Execute() error { - d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) - time.Sleep(d.duration) - d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID) - return nil -} - -func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskExecutionLog) error { - d.logger = logger - d.executionTask = executionTask - if d.executionTask.Task.Parameters == nil { d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID) return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) @@ -54,6 +47,10 @@ func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskE } d.duration = time.Duration(duration) * time.Second + + d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) + time.Sleep(d.duration) + d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID) return nil } diff --git a/internal/app/service/task/release_feed_weight_task.go b/internal/app/service/task/release_feed_weight_task.go new file mode 100644 index 0000000..02350e6 --- /dev/null +++ b/internal/app/service/task/release_feed_weight_task.go @@ -0,0 +1,36 @@ +package task + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" +) + +// ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 +type ReleaseFeedWeightTask struct { + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository + + comm transport.Communicator + + logger *logs.Logger +} + +func (r *ReleaseFeedWeightTask) Execute() error { + //TODO implement me + panic("implement me") +} + +func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { + //TODO implement me + panic("implement me") +} + +// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 +func NewReleaseFeedWeightTask(deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator) Task { + return &ReleaseFeedWeightTask{ + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, + comm: comm, + } +} diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index 0208eea..6f546f6 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -9,6 +9,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -80,8 +81,11 @@ type Scheduler struct { workers int pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository - analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager + comm transport.Communicator + analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker pool *ants.Pool // 使用 ants 协程池来管理并发 @@ -93,16 +97,22 @@ type Scheduler struct { func NewScheduler( pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, + deviceRepo repository.DeviceRepository, + sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, - analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager + comm transport.Communicator, + analysisPlanTaskManager *AnalysisPlanTaskManager, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { return &Scheduler{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, planRepo: planRepo, - analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager + comm: comm, + analysisPlanTaskManager: analysisPlanTaskManager, logger: logger, pollingInterval: interval, workers: numWorkers, @@ -261,11 +271,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } else { // 执行普通任务 - task := s.taskFactory(claimedLog.Task.Type) - if err := task.ParseParams(s.logger, claimedLog); err != nil { - s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - return err - } + task := s.taskFactory(claimedLog) if err := task.Execute(); err != nil { s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) @@ -278,10 +284,13 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } // taskFactory 会根据任务类型初始化对应任务 -func (s *Scheduler) taskFactory(taskType models.TaskType) Task { - switch taskType { +func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { + switch claimedLog.Task.Type { case models.TaskTypeWaiting: - return NewDelayTask() + return NewDelayTask(s.logger, claimedLog) + case models.TaskTypeReleaseFeedWeight: + return NewReleaseFeedWeightTask(s.deviceRepo, s.sensorDataRepo, s.comm) + default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 panic("不支持的任务类型") diff --git a/internal/app/service/task/task.go b/internal/app/service/task/task.go index 5426fc7..77623aa 100644 --- a/internal/app/service/task/task.go +++ b/internal/app/service/task/task.go @@ -1,7 +1,6 @@ package task import ( - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) @@ -13,9 +12,6 @@ type Task interface { // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 Execute() error - // ParseParams 解析及校验参数 - ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error - // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 // log: 任务执行的上下文。 // executeErr: 从 Execute 方法返回的原始错误。 diff --git a/internal/core/application.go b/internal/core/application.go index d9af392..fdc74bf 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -16,6 +16,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 @@ -81,8 +82,11 @@ func NewApplication(configPath string) (*Application, error) { // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) + // 初始化设备通信器 + comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo) + // 初始化任务执行器 - executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 4944f99..e0a0a40 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -30,6 +30,9 @@ type Config struct { // Heartbeat 心跳配置 Heartbeat HeartbeatConfig `yaml:"heartbeat"` + // ChirpStack ChirpStack API 配置 + ChirpStack ChirpStackConfig `yaml:"chirp_stack"` + // TaskConfig 任务调度配置 Task TaskConfig `yaml:"task"` } @@ -112,6 +115,14 @@ type HeartbeatConfig struct { Concurrency int `yaml:"concurrency"` } +// ChirpStackConfig 代表 ChirpStack API 配置 +type ChirpStackConfig struct { + APIHost string `yaml:"api_host"` + APIToken string `yaml:"api_token"` + FPort int `yaml:"fport"` + APITimeout int `yaml:"api_timeout"` +} + // TaskConfig 代表任务调度配置 type TaskConfig struct { Interval int `yaml:"interval"` @@ -139,3 +150,8 @@ func (c *Config) Load(path string) error { return nil } + +// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 +func (c ChirpStackConfig) GenerateAPIKey() string { + return "Bearer " + c.APIToken +} diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 516f907..5dee1e1 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -29,8 +29,9 @@ const ( type TaskType string const ( - TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 - TaskTypeWaiting TaskType = "waiting" // 等待任务 + TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 + TaskTypeWaiting TaskType = "waiting" // 等待任务 + TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务 ) const ( diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 16ab842..4f653fa 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -3,6 +3,7 @@ package lora import ( "time" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -14,26 +15,11 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client" ) -// ChirpStackConfig 保存连接到 ChirpStack API 所需的配置。 -type ChirpStackConfig struct { - // ServerAddress 是 ChirpStack API 服务器的地址,例如 "localhost:8080"。 - ServerAddress string - // APIKey 是用于认证的 API 密钥。 - APIKey string - // LoRaWAN 端口, 需要和设备一致 - Fport int64 -} - -// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 -func (c ChirpStackConfig) GenerateAPIKey() string { - return "Bearer " + c.APIKey -} - // ChirpStackTransport 是一个客户端,用于封装与 ChirpStack REST API 的交互。 type ChirpStackTransport struct { client *client.ChirpStackRESTAPI authInfo runtime.ClientAuthInfoWriter - config ChirpStackConfig + config config.ChirpStackConfig deviceCommandLogRepo repository.DeviceCommandLogRepository deviceRepo repository.DeviceRepository @@ -43,14 +29,14 @@ type ChirpStackTransport struct { // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 func NewChirpStackTransport( - config ChirpStackConfig, + config config.ChirpStackConfig, logger *logs.Logger, deviceCommandLogRepo repository.DeviceCommandLogRepository, deviceRepo repository.DeviceRepository, ) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 - transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) + transport := httptransport.New(config.APIHost, client.DefaultBasePath, client.DefaultSchemes) // 使用 transport 和默认的字符串格式化器,创建一个 API 主客户端。 apiClient := client.New(transport, strfmt.Default) @@ -77,7 +63,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { QueueItem: &device_service.DeviceServiceEnqueueParamsBodyQueueItem{ Confirmed: true, Data: payload, - FPort: c.config.Fport, + FPort: int64(c.config.FPort), }, }