diff --git a/config.yml b/config.yml index 2b428db..af5693c 100644 --- a/config.yml +++ b/config.yml @@ -48,6 +48,7 @@ heartbeat: chirp_stack: api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址 api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer + fport: 1 api_timeout: 10 # ChirpStack API请求超时时间(秒) # 任务调度器配置 diff --git a/internal/app/service/device/general_device_service.go b/internal/app/service/device/general_device_service.go index 228e025..473ff03 100644 --- a/internal/app/service/device/general_device_service.go +++ b/internal/app/service/device/general_device_service.go @@ -17,29 +17,23 @@ type GeneralDeviceService struct { deviceRepo repository.DeviceRepository logger *logs.Logger - deviceID uint // 区域主控的设备ID - - // regionalController 是执行命令的区域主控, 所有的指令都会发往区域主控 - regionalController *models.Device - comm transport.Communicator } // NewGeneralDeviceService 创建一个通用设备服务 -func NewGeneralDeviceService(deviceID uint, deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { +func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { return &GeneralDeviceService{ - deviceID: deviceID, deviceRepo: deviceRepo, logger: logger, comm: comm, } } -func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) error { +func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error { // 校验设备参数及生成指令 - if *device.ParentID != g.deviceID { - return fmt.Errorf("设备 %v(id=%v) 的上级区域主控是(id=%v), 不是当前区域主控(id=%v)下属设备, 无法执行指令", device.Name, device.ID, device.ParentID, g.deviceID) + if *device.ParentID == 0 { + return fmt.Errorf("设备 %v(id=%v) 的上级区域主控(id=%v) ID不合理, 无法执行指令", device.Name, device.ID, *device.ParentID) } if !device.SelfCheck() { @@ -80,9 +74,9 @@ func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) } // 获取自身LoRa设备ID, 因为可能变更, 所以每次都现获取 - thisDevice, err := g.deviceRepo.FindByID(g.deviceID) + thisDevice, err := g.deviceRepo.FindByID(*device.ParentID) if err != nil { - return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", g.deviceID, err) + return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", *device.ParentID, err) } if !thisDevice.SelfCheck() { return fmt.Errorf("区域主控 %v(id=%v) 缺少必要信息, 无法发送指令", thisDevice.Name, thisDevice.ID) diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index 058260e..5d96a36 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -9,9 +9,9 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) -const ( - ParamsDelayDuration = "delay_duration" -) +type DelayTaskParams struct { + DelayDuration float64 `json:"delay_duration"` +} // DelayTask 是一个用于模拟延迟的 Task 实现 type DelayTask struct { @@ -20,36 +20,45 @@ type DelayTask struct { logger *logs.Logger } +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { + return &DelayTask{ + executionTask: executionTask, + logger: logger, + } +} + // Execute 执行延迟任务,等待指定的时间 func (d *DelayTask) Execute() error { + if err := d.parseParameters(); err != nil { + return err + } + d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) time.Sleep(d.duration) d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID) return nil } -func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskExecutionLog) error { - d.logger = logger - d.executionTask = executionTask - +func (d *DelayTask) parseParameters() error { if d.executionTask.Task.Parameters == nil { d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID) return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) } - var params map[string]interface{} - if err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms); err != nil { + var params DelayTaskParams + err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms) + if err != nil { d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) } - duration, ok := params[ParamsDelayDuration].(float64) - if !ok { - d.logger.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) - return fmt.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) + if params.DelayDuration <= 0 { + d.logger.Errorf("任务 %v: 参数 delay_duration 缺失或无效 (必须大于0)", d.executionTask.TaskID) + return fmt.Errorf("任务 %v: 参数 delay_duration 缺失或无效 (必须大于0)", d.executionTask.TaskID) } - d.duration = time.Duration(duration) * time.Second + d.duration = time.Duration(params.DelayDuration) * time.Second + return nil } diff --git a/internal/app/service/task/release_feed_weight_task.go b/internal/app/service/task/release_feed_weight_task.go new file mode 100644 index 0000000..d3a89a4 --- /dev/null +++ b/internal/app/service/task/release_feed_weight_task.go @@ -0,0 +1,164 @@ +package task + +import ( + "encoding/json" + "fmt" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" +) + +// ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 +type ReleaseFeedWeightTaskParams struct { + ReleaseWeight float64 `json:"release_weight"` // 需要释放的重量 + FeedPortDeviceID uint `json:"feed_port_device_id"` // 下料口ID + MixingTankDeviceID uint `json:"mixing_tank_device_id"` // 称重传感器ID +} + +// ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 +type ReleaseFeedWeightTask struct { + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository + claimedLog *models.TaskExecutionLog + + feedPortDevice *models.Device // 下料口基本信息 + releaseWeight float64 // 需要释放的重量 + mixingTankDeviceID uint // 搅拌罐称重传感器ID + + comm transport.Communicator + feedPort *device.GeneralDeviceService // 下料口指令下发器 + + logger *logs.Logger +} + +// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 +func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task { + return &ReleaseFeedWeightTask{ + claimedLog: claimedLog, + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, + comm: comm, + logger: logger, + } +} + +func (r *ReleaseFeedWeightTask) Execute() error { + r.logger.Infof("任务 %v: 开始执行, 日志ID: %v", r.claimedLog.TaskID, r.claimedLog.ID) + if err := r.parseParameters(); err != nil { + return err + } + + weight, err := r.getNowWeight() + if err != nil { + return err + } + + if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStart); err != nil { + r.logger.Errorf("启动下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) + return err + } + + targetWeight := weight - r.releaseWeight + errCount := 1 + + // TODO 这个判断有延迟, 尤其是LoRa通信本身延迟较高, 可以考虑根据信号质量或其他指标提前发送停止命令 + for targetWeight <= weight { + weight, err = r.getNowWeight() + if err != nil { + errCount++ + if errCount > 3 { // 如果连续三次没成功采集到重量数据,则认为计划执行失败 + r.logger.Errorf("获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v, 任务结束", r.claimedLog.ID, err) + return err + } + r.logger.Warnf("第%v次尝试获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v", errCount, r.claimedLog.ID, err) + continue + } + time.Sleep(100 * time.Millisecond) + } + + if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop); err != nil { + r.logger.Errorf("关闭下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) + return err + } + + r.logger.Infof("完成计划执行日志(id=%v)的当前计划, 完成下料 %vkg, 搅拌罐剩余重量 %vkg", r.claimedLog.ID, r.releaseWeight, weight) + return nil +} + +// 获取当前搅拌罐重量 +func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) { + sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorDataTypeWeight) + if err != nil { + r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) + return 0, err + } + + if sensorData == nil { + return 0, fmt.Errorf("未找到设备 %v 的最新重量传感器数据", r.mixingTankDeviceID) + } + + wg := &models.WeightData{} + err = json.Unmarshal(sensorData.Data, wg) + if err != nil { + r.logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) + return 0, err + } + + return wg.WeightKilograms, nil +} + +func (r *ReleaseFeedWeightTask) parseParameters() error { + if r.claimedLog.Task.Parameters == nil { + r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID) + return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID) + } + + var params ReleaseFeedWeightTaskParams + err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms) + if err != nil { + r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) + return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) + } + + // 校验参数是否存在 + if params.ReleaseWeight == 0 { + r.logger.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) + return fmt.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) + } + if params.FeedPortDeviceID == 0 { + r.logger.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) + return fmt.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) + } + if params.MixingTankDeviceID == 0 { + r.logger.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) + return fmt.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) + } + + r.releaseWeight = params.ReleaseWeight + r.mixingTankDeviceID = params.MixingTankDeviceID + r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm) + r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) + if err != nil { + r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) + return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) + } + + return nil +} + +func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { + r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID) + if r.feedPort != nil { + err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop) + if err != nil { + r.logger.Errorf("[严重] 下料口停止失败, 日志ID: %v, 错误: %v", r.claimedLog.ID, err) + } + } else { + r.logger.Warnf("[警告] 下料口通信器尚未初始化, 不进行任何操作, 日志ID: %v", r.claimedLog.ID) + } + r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID) +} diff --git a/internal/app/service/task/scheduler.go b/internal/app/service/task/scheduler.go index caa135e..53fe5ae 100644 --- a/internal/app/service/task/scheduler.go +++ b/internal/app/service/task/scheduler.go @@ -9,6 +9,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "github.com/panjf2000/ants/v2" "gorm.io/gorm" ) @@ -80,10 +81,12 @@ type Scheduler struct { workers int pendingTaskRepo repository.PendingTaskRepository executionLogRepo repository.ExecutionLogRepository + deviceRepo repository.DeviceRepository + sensorDataRepo repository.SensorDataRepository planRepo repository.PlanRepository - analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager + comm transport.Communicator + analysisPlanTaskManager *AnalysisPlanTaskManager progressTracker *ProgressTracker - taskFactory func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -94,22 +97,26 @@ type Scheduler struct { func NewScheduler( pendingTaskRepo repository.PendingTaskRepository, executionLogRepo repository.ExecutionLogRepository, + deviceRepo repository.DeviceRepository, + sensorDataRepo repository.SensorDataRepository, planRepo repository.PlanRepository, - analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager - taskFactory func(taskType models.TaskType) Task, + comm transport.Communicator, + analysisPlanTaskManager *AnalysisPlanTaskManager, logger *logs.Logger, interval time.Duration, numWorkers int) *Scheduler { return &Scheduler{ pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, + deviceRepo: deviceRepo, + sensorDataRepo: sensorDataRepo, planRepo: planRepo, - analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager + comm: comm, + analysisPlanTaskManager: analysisPlanTaskManager, logger: logger, pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), - taskFactory: taskFactory, stopChan: make(chan struct{}), // 初始化停止信号通道 } } @@ -264,17 +271,10 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { } else { // 执行普通任务 - task := s.taskFactory(claimedLog.Task.Type) - if err := task.ParseParams(s.logger, claimedLog); err != nil { - - s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - return err - } + task := s.taskFactory(claimedLog) if err := task.Execute(); err != nil { - s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) - task.OnFailure(err) return err } @@ -283,6 +283,20 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { return nil } +// taskFactory 会根据任务类型初始化对应任务 +func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { + switch claimedLog.Task.Type { + case models.TaskTypeWaiting: + return NewDelayTask(s.logger, claimedLog) + case models.TaskTypeReleaseFeedWeight: + return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger) + + default: + // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 + panic("不支持的任务类型") + } +} + // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { // 创建Plan执行记录 diff --git a/internal/app/service/task/task.go b/internal/app/service/task/task.go index 5426fc7..77623aa 100644 --- a/internal/app/service/task/task.go +++ b/internal/app/service/task/task.go @@ -1,7 +1,6 @@ package task import ( - "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) @@ -13,9 +12,6 @@ type Task interface { // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 Execute() error - // ParseParams 解析及校验参数 - ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error - // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 // log: 任务执行的上下文。 // executeErr: 从 Execute 方法返回的原始错误。 diff --git a/internal/core/application.go b/internal/core/application.go index 2beddd6..fdc74bf 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -16,6 +16,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 @@ -81,8 +82,11 @@ func NewApplication(configPath string) (*Application, error) { // 初始化计划触发器管理器 analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) + // 初始化设备通信器 + comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo) + // 初始化任务执行器 - executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) // 初始化 API 服务器 apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 4944f99..e0a0a40 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -30,6 +30,9 @@ type Config struct { // Heartbeat 心跳配置 Heartbeat HeartbeatConfig `yaml:"heartbeat"` + // ChirpStack ChirpStack API 配置 + ChirpStack ChirpStackConfig `yaml:"chirp_stack"` + // TaskConfig 任务调度配置 Task TaskConfig `yaml:"task"` } @@ -112,6 +115,14 @@ type HeartbeatConfig struct { Concurrency int `yaml:"concurrency"` } +// ChirpStackConfig 代表 ChirpStack API 配置 +type ChirpStackConfig struct { + APIHost string `yaml:"api_host"` + APIToken string `yaml:"api_token"` + FPort int `yaml:"fport"` + APITimeout int `yaml:"api_timeout"` +} + // TaskConfig 代表任务调度配置 type TaskConfig struct { Interval int `yaml:"interval"` @@ -139,3 +150,8 @@ func (c *Config) Load(path string) error { return nil } + +// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 +func (c ChirpStackConfig) GenerateAPIKey() string { + return "Bearer " + c.APIToken +} diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index 27d0ecc..b2200c4 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -124,7 +124,7 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { return err } - // 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable + // 如果是 TimescaleDB, 则将部分表转换为 hypertable if ps.isTimescaleDB { ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换") if err := ps.creatingHyperTable(); err != nil { @@ -139,16 +139,18 @@ func (ps *PostgresStorage) creatingHyperTable() error { // 将 sensor_data 转换为超表 // 使用 if_not_exists => TRUE 保证幂等性 // 'time' 是 SensorData 模型中定义的时间列 - sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" + // 设置 chunk_time_interval 为 1 天, 以优化按天查询的性能 + sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" if err := ps.db.Exec(sqlSensorData).Error; err != nil { ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) } - ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") + ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换), chunk 间隔为 1 天") // 将 device_command_log 转换为超表 // 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 - sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);" + // 设置 chunk_time_interval 为 1 天 + sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 516f907..bbb2315 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -29,10 +29,12 @@ const ( type TaskType string const ( - TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 - TaskTypeWaiting TaskType = "waiting" // 等待任务 + TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 + TaskTypeWaiting TaskType = "waiting" // 等待任务 + TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务 ) +// -- Task Parameters -- const ( // 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key ParamsPlanID = "plan_id" diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index a667f65..0e1a6a9 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -50,22 +50,18 @@ type WeightData struct { // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。 - // GORM 会将其映射到 'time' TIMESTAMPTZ 列。 Time time.Time `gorm:"primaryKey" json:"time"` // DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 - // GORM 会将其映射到 'device_id' VARCHAR(50) 列。 DeviceID uint `gorm:"primaryKey" json:"device_id"` // RegionalControllerID 是上报此数据的区域主控的ID。 - // 我们为其添加了数据库索引以优化按区域查询的性能。 RegionalControllerID uint `json:"regional_controller_id"` // SensorDataType 是传感数据的类型 SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` // Data 存储一个或多个传感器读数,格式为 JSON。 - // GORM 会使用 'jsonb' 类型来创建此列。 Data datatypes.JSON `gorm:"type:jsonb" json:"data"` } diff --git a/internal/infra/repository/sensor_data_repository.go b/internal/infra/repository/sensor_data_repository.go index b9b80a9..94f6f81 100644 --- a/internal/infra/repository/sensor_data_repository.go +++ b/internal/infra/repository/sensor_data_repository.go @@ -1,6 +1,8 @@ package repository import ( + "time" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" ) @@ -8,6 +10,7 @@ import ( // SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 type SensorDataRepository interface { Create(sensorData *models.SensorData) error + GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorDataType) (*models.SensorData, error) } // gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 @@ -25,3 +28,13 @@ func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository { func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { return r.db.Create(sensorData).Error } + +// GetLatestSensorDataByDeviceIDAndSensorType 根据设备ID和传感器类型查询最新的传感器数据。 +func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorDataType models.SensorDataType) (*models.SensorData, error) { + var sensorData models.SensorData + // 增加一个时间范围来缩小查询范围, 从而加快查找速度, 当使用时序数据库时时间范围可以让数据库忽略时间靠前的分片 + err := r.db.Where("device_id = ? AND sensor_data_type = ? AND time >=?", deviceID, sensorDataType, time.Now().Add(-24*time.Hour)). + Order("time DESC"). + First(&sensorData).Error + return &sensorData, err +} diff --git a/internal/infra/transport/lora/chirp_stack.go b/internal/infra/transport/lora/chirp_stack.go index 16ab842..4f653fa 100644 --- a/internal/infra/transport/lora/chirp_stack.go +++ b/internal/infra/transport/lora/chirp_stack.go @@ -3,6 +3,7 @@ package lora import ( "time" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -14,26 +15,11 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client" ) -// ChirpStackConfig 保存连接到 ChirpStack API 所需的配置。 -type ChirpStackConfig struct { - // ServerAddress 是 ChirpStack API 服务器的地址,例如 "localhost:8080"。 - ServerAddress string - // APIKey 是用于认证的 API 密钥。 - APIKey string - // LoRaWAN 端口, 需要和设备一致 - Fport int64 -} - -// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 -func (c ChirpStackConfig) GenerateAPIKey() string { - return "Bearer " + c.APIKey -} - // ChirpStackTransport 是一个客户端,用于封装与 ChirpStack REST API 的交互。 type ChirpStackTransport struct { client *client.ChirpStackRESTAPI authInfo runtime.ClientAuthInfoWriter - config ChirpStackConfig + config config.ChirpStackConfig deviceCommandLogRepo repository.DeviceCommandLogRepository deviceRepo repository.DeviceRepository @@ -43,14 +29,14 @@ type ChirpStackTransport struct { // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 func NewChirpStackTransport( - config ChirpStackConfig, + config config.ChirpStackConfig, logger *logs.Logger, deviceCommandLogRepo repository.DeviceCommandLogRepository, deviceRepo repository.DeviceRepository, ) *ChirpStackTransport { // 使用配置中的服务器地址创建一个 HTTP transport。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。 - transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) + transport := httptransport.New(config.APIHost, client.DefaultBasePath, client.DefaultSchemes) // 使用 transport 和默认的字符串格式化器,创建一个 API 主客户端。 apiClient := client.New(transport, strfmt.Default) @@ -77,7 +63,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { QueueItem: &device_service.DeviceServiceEnqueueParamsBodyQueueItem{ Confirmed: true, Data: payload, - FPort: c.config.Fport, + FPort: int64(c.config.FPort), }, }