From 6a8e8f1f7dd8fd13d7b48ada9f784005134bc6f9 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 26 Oct 2025 15:10:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=AE=9A=E6=97=B6=E9=87=87?= =?UTF-8?q?=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.example.yml | 38 ++++++-- config.yml | 6 +- internal/core/application.go | 27 ++++-- internal/domain/collection/collector.go | 6 ++ internal/domain/collection/timed_collector.go | 89 +++++++++++++++++++ internal/infra/config/config.go | 8 ++ .../infra/repository/device_repository.go | 16 ++++ 7 files changed, 176 insertions(+), 14 deletions(-) create mode 100644 internal/domain/collection/collector.go create mode 100644 internal/domain/collection/timed_collector.go diff --git a/config.example.yml b/config.example.yml index 455fd53..ae486b5 100644 --- a/config.example.yml +++ b/config.example.yml @@ -48,8 +48,10 @@ chirp_stack: api_host: "http://localhost:8080" # ChirpStack API 主机地址 api_token: "your_chirpstack_api_token" # ChirpStack API Token fport: 10 # ChirpStack FPort - api_timeout: 5 # API 请求超时时间 (秒) - collection_request_timeout: 10 # 采集请求超时时间 (秒) + api_timeout: 10 # ChirpStack API请求超时时间(秒) + # 等待设备上行响应的超时时间(秒)。 + # 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。 + collection_request_timeout: 300 # 任务调度配置 task: @@ -62,12 +64,28 @@ lora: # Lora Mesh 配置 lora_mesh: - uart_port: "/dev/ttyUSB0" # UART 串口路径 - baud_rate: 115200 # 波特率 - timeout: 5 # 超时时间 (秒) - lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command - max_chunk_size: 200 # 最大数据块大小 - reassembly_timeout: 10 # 重组超时时间 (秒) + # 主节点串口 + uart_port: "COM7" + # LoRa模块的通信波特率 + baud_rate: 9600 + # 等待LoRa模块AT指令响应的超时时间(ms) + timeout: 50 + # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包) + # e.g. + # EC: 接收端只会接收到消息, 不会接收到请求头 + # e.g. 发送: EC 05 02 01 48 65 6c 6c 6f + # (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体)) + # 接收: 48 65 6c 6c 6f ("Hello") + # ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。 + # e.g. 发送: ED 05 12 34 01 00 01 02 03 + # (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块)) + # 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾) + lora_mesh_mode: "ED" + # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238 + max_chunk_size: 238 + #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 + # 还没收到完整的包,则认为接收失败。 + reassembly_timeout: 30 # 通知服务配置 notify: @@ -91,3 +109,7 @@ notify: enabled: false # 是否启用飞书通知 appID: "cli_xxxxxxxxxx" # 应用 ID appSecret: "your_lark_app_secret" # 应用密钥 + +# 定时采集配置 +collection: + interval: 300 # 采集间隔 (秒) diff --git a/config.yml b/config.yml index ea5b45c..ace399d 100644 --- a/config.yml +++ b/config.yml @@ -86,4 +86,8 @@ lora_mesh: max_chunk_size: 238 #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 # 还没收到完整的包,则认为接收失败。 - reassembly_timeout: 30 \ No newline at end of file + reassembly_timeout: 30 + +# 定时采集配置 +collection: + interval: 300 # 采集间隔 (秒) \ No newline at end of file diff --git a/internal/core/application.go b/internal/core/application.go index 700b01a..454beee 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -11,6 +11,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" @@ -28,11 +29,12 @@ import ( // Application 是整个应用的核心,封装了所有组件和生命周期。 type Application struct { - Config *config.Config - Logger *logs.Logger - Storage database.Storage - Executor *task.Scheduler - API *api.API // 添加 API 对象 + Config *config.Config + Logger *logs.Logger + Storage database.Storage + Executor *task.Scheduler + API *api.API + Collector collection.Collector // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 planRepo repository.PlanRepository @@ -177,6 +179,14 @@ func NewApplication(configPath string) (*Application, error) { cfg.Task.NumWorkers, ) + // --- 初始化定时采集器 --- + timedCollector := collection.NewTimedCollector( + deviceRepo, + generalDeviceService, + logger, + time.Duration(cfg.Collection.Interval)*time.Second, + ) + // 初始化 API 服务器 apiServer := api.NewAPI( cfg.Server, @@ -204,6 +214,7 @@ func NewApplication(configPath string) (*Application, error) { Storage: storage, Executor: executor, API: apiServer, + Collector: timedCollector, planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, @@ -327,6 +338,9 @@ func (app *Application) Start() error { // 启动任务执行器 app.Executor.Start() + // 启动定时采集器 + app.Collector.Start() + // 启动 API 服务器 app.API.Start() @@ -349,6 +363,9 @@ func (app *Application) Stop() error { // 关闭任务执行器 app.Executor.Stop() + // 关闭定时采集器 + app.Collector.Stop() + // 断开数据库连接 if err := app.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) diff --git a/internal/domain/collection/collector.go b/internal/domain/collection/collector.go new file mode 100644 index 0000000..ae28dd7 --- /dev/null +++ b/internal/domain/collection/collector.go @@ -0,0 +1,6 @@ +package collection + +type Collector interface { + Start() + Stop() +} diff --git a/internal/domain/collection/timed_collector.go b/internal/domain/collection/timed_collector.go new file mode 100644 index 0000000..f6855ad --- /dev/null +++ b/internal/domain/collection/timed_collector.go @@ -0,0 +1,89 @@ +package collection + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令 +type TimedCollector struct { + deviceRepo repository.DeviceRepository + deviceService device.Service + logger *logs.Logger + interval time.Duration + ticker *time.Ticker + done chan bool +} + +// NewTimedCollector 创建一个定时采集器实例 +func NewTimedCollector( + deviceRepo repository.DeviceRepository, + deviceService device.Service, + logger *logs.Logger, + interval time.Duration, +) Collector { + return &TimedCollector{ + deviceRepo: deviceRepo, + deviceService: deviceService, + logger: logger, + interval: interval, + done: make(chan bool), + } +} + +// Start 开始定时采集 +func (c *TimedCollector) Start() { + c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval) + c.ticker = time.NewTicker(c.interval) + go func() { + for { + select { + case <-c.done: + return + case <-c.ticker.C: + c.collect() + } + } + }() +} + +// Stop 停止定时采集 +func (c *TimedCollector) Stop() { + c.logger.Info("定时采集器停止") + c.ticker.Stop() + c.done <- true +} + +// collect 是核心的采集逻辑 +func (c *TimedCollector) collect() { + c.logger.Info("开始新一轮的设备数据采集") + + sensors, err := c.deviceRepo.ListAllSensors() + if err != nil { + c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err) + return + } + + if len(sensors) == 0 { + c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集") + return + } + + sensorsByController := make(map[uint][]*models.Device) + for _, sensor := range sensors { + sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) + } + + for controllerID, controllerSensors := range sensorsByController { + c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors)) + if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil { + c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err) + } + } + + c.logger.Info("本轮设备数据采集完成") +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 1a1cfb4..b22f97b 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -44,6 +44,9 @@ type Config struct { // Notify 通知服务配置 Notify NotifyConfig `yaml:"notify"` + + // Collection 定时采集配置 + Collection CollectionConfig `yaml:"collection"` } // AppConfig 代表应用基础配置 @@ -195,6 +198,11 @@ type LarkConfig struct { AppSecret string `yaml:"appSecret"` } +// CollectionConfig 代表定时采集配置 +type CollectionConfig struct { + Interval int `yaml:"interval"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index c22a584..96104cb 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -23,6 +23,9 @@ type DeviceRepository interface { // ListAll 获取所有设备的列表 ListAll() ([]*models.Device, error) + // ListAllSensors 获取所有传感器类型的设备列表 + ListAllSensors() ([]*models.Device, error) + // ListByAreaControllerID 根据区域主控 ID 列出所有子设备。 ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) @@ -84,6 +87,19 @@ func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) { return devices, nil } +// ListAllSensors 检索归类为传感器的所有设备 +func (r *gormDeviceRepository) ListAllSensors() ([]*models.Device, error) { + var sensors []*models.Device + err := r.db.Preload("AreaController").Preload("DeviceTemplate"). + Joins("JOIN device_templates ON device_templates.id = devices.device_template_id"). + Where("device_templates.category = ?", models.CategorySensor). + Find(&sensors).Error + if err != nil { + return nil, fmt.Errorf("查询所有传感器失败: %w", err) + } + return sensors, nil +} + // ListByAreaControllerID 根据区域主控 ID 列出所有子设备 func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) { var devices []*models.Device