实现定时采集
This commit is contained in:
		| @@ -48,8 +48,10 @@ chirp_stack: | |||||||
|   api_host: "http://localhost:8080" # ChirpStack API 主机地址 |   api_host: "http://localhost:8080" # ChirpStack API 主机地址 | ||||||
|   api_token: "your_chirpstack_api_token" # ChirpStack API Token |   api_token: "your_chirpstack_api_token" # ChirpStack API Token | ||||||
|   fport: 10 # ChirpStack FPort |   fport: 10 # ChirpStack FPort | ||||||
|   api_timeout: 5 # API 请求超时时间 (秒) |   api_timeout: 10 # ChirpStack API请求超时时间(秒) | ||||||
|   collection_request_timeout: 10 # 采集请求超时时间 (秒) |   # 等待设备上行响应的超时时间(秒)。 | ||||||
|  |   # 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。 | ||||||
|  |   collection_request_timeout: 300 | ||||||
|  |  | ||||||
| # 任务调度配置 | # 任务调度配置 | ||||||
| task: | task: | ||||||
| @@ -62,12 +64,28 @@ lora: | |||||||
|  |  | ||||||
| # Lora Mesh 配置 | # Lora Mesh 配置 | ||||||
| lora_mesh: | lora_mesh: | ||||||
|   uart_port: "/dev/ttyUSB0" # UART 串口路径 |   # 主节点串口 | ||||||
|   baud_rate: 115200 # 波特率 |   uart_port: "COM7" | ||||||
|   timeout: 5 # 超时时间 (秒) |   # LoRa模块的通信波特率 | ||||||
|   lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command |   baud_rate: 9600 | ||||||
|   max_chunk_size: 200 # 最大数据块大小 |   # 等待LoRa模块AT指令响应的超时时间(ms) | ||||||
|   reassembly_timeout: 10 # 重组超时时间 (秒) |   timeout: 50 | ||||||
|  |   # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包) | ||||||
|  |   # e.g. | ||||||
|  |   #   EC: 接收端只会接收到消息, 不会接收到请求头 | ||||||
|  |   #       e.g. 发送: EC 05 02 01 48 65 6c 6c 6f | ||||||
|  |   #            (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体)) | ||||||
|  |   #            接收: 48 65 6c 6c 6f ("Hello") | ||||||
|  |   #   ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。 | ||||||
|  |   #       e.g. 发送: ED 05 12 34 01 00 01 02 03 | ||||||
|  |   #            (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块)) | ||||||
|  |   #            接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾) | ||||||
|  |   lora_mesh_mode: "ED" | ||||||
|  |   # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238 | ||||||
|  |   max_chunk_size: 238 | ||||||
|  |   #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 | ||||||
|  |   # 还没收到完整的包,则认为接收失败。 | ||||||
|  |   reassembly_timeout: 30 | ||||||
|  |  | ||||||
| # 通知服务配置 | # 通知服务配置 | ||||||
| notify: | notify: | ||||||
| @@ -91,3 +109,7 @@ notify: | |||||||
|     enabled: false # 是否启用飞书通知 |     enabled: false # 是否启用飞书通知 | ||||||
|     appID: "cli_xxxxxxxxxx" # 应用 ID |     appID: "cli_xxxxxxxxxx" # 应用 ID | ||||||
|     appSecret: "your_lark_app_secret" # 应用密钥 |     appSecret: "your_lark_app_secret" # 应用密钥 | ||||||
|  |  | ||||||
|  | # 定时采集配置 | ||||||
|  | collection: | ||||||
|  |   interval: 300 # 采集间隔 (秒) | ||||||
|   | |||||||
| @@ -87,3 +87,7 @@ lora_mesh: | |||||||
|   #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 |   #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 | ||||||
|   # 还没收到完整的包,则认为接收失败。 |   # 还没收到完整的包,则认为接收失败。 | ||||||
|   reassembly_timeout: 30 |   reassembly_timeout: 30 | ||||||
|  |  | ||||||
|  | # 定时采集配置 | ||||||
|  | collection: | ||||||
|  |   interval: 300 # 采集间隔 (秒) | ||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service" | 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service" | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" | 	"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" | 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" | 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" | ||||||
| 	domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" | 	domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" | ||||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" | 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" | ||||||
| @@ -32,7 +33,8 @@ type Application struct { | |||||||
| 	Logger    *logs.Logger | 	Logger    *logs.Logger | ||||||
| 	Storage   database.Storage | 	Storage   database.Storage | ||||||
| 	Executor  *task.Scheduler | 	Executor  *task.Scheduler | ||||||
| 	API      *api.API // 添加 API 对象 | 	API       *api.API | ||||||
|  | 	Collector collection.Collector | ||||||
|  |  | ||||||
| 	// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 | 	// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 | ||||||
| 	planRepo                repository.PlanRepository | 	planRepo                repository.PlanRepository | ||||||
| @@ -177,6 +179,14 @@ func NewApplication(configPath string) (*Application, error) { | |||||||
| 		cfg.Task.NumWorkers, | 		cfg.Task.NumWorkers, | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
|  | 	// --- 初始化定时采集器 --- | ||||||
|  | 	timedCollector := collection.NewTimedCollector( | ||||||
|  | 		deviceRepo, | ||||||
|  | 		generalDeviceService, | ||||||
|  | 		logger, | ||||||
|  | 		time.Duration(cfg.Collection.Interval)*time.Second, | ||||||
|  | 	) | ||||||
|  |  | ||||||
| 	//  初始化 API 服务器 | 	//  初始化 API 服务器 | ||||||
| 	apiServer := api.NewAPI( | 	apiServer := api.NewAPI( | ||||||
| 		cfg.Server, | 		cfg.Server, | ||||||
| @@ -204,6 +214,7 @@ func NewApplication(configPath string) (*Application, error) { | |||||||
| 		Storage:                 storage, | 		Storage:                 storage, | ||||||
| 		Executor:                executor, | 		Executor:                executor, | ||||||
| 		API:                     apiServer, | 		API:                     apiServer, | ||||||
|  | 		Collector:               timedCollector, | ||||||
| 		planRepo:                planRepo, | 		planRepo:                planRepo, | ||||||
| 		pendingTaskRepo:         pendingTaskRepo, | 		pendingTaskRepo:         pendingTaskRepo, | ||||||
| 		executionLogRepo:        executionLogRepo, | 		executionLogRepo:        executionLogRepo, | ||||||
| @@ -327,6 +338,9 @@ func (app *Application) Start() error { | |||||||
| 	// 启动任务执行器 | 	// 启动任务执行器 | ||||||
| 	app.Executor.Start() | 	app.Executor.Start() | ||||||
|  |  | ||||||
|  | 	// 启动定时采集器 | ||||||
|  | 	app.Collector.Start() | ||||||
|  |  | ||||||
| 	// 启动 API 服务器 | 	// 启动 API 服务器 | ||||||
| 	app.API.Start() | 	app.API.Start() | ||||||
|  |  | ||||||
| @@ -349,6 +363,9 @@ func (app *Application) Stop() error { | |||||||
| 	// 关闭任务执行器 | 	// 关闭任务执行器 | ||||||
| 	app.Executor.Stop() | 	app.Executor.Stop() | ||||||
|  |  | ||||||
|  | 	// 关闭定时采集器 | ||||||
|  | 	app.Collector.Stop() | ||||||
|  |  | ||||||
| 	// 断开数据库连接 | 	// 断开数据库连接 | ||||||
| 	if err := app.Storage.Disconnect(); err != nil { | 	if err := app.Storage.Disconnect(); err != nil { | ||||||
| 		app.Logger.Errorw("数据库连接断开失败", "error", err) | 		app.Logger.Errorw("数据库连接断开失败", "error", err) | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								internal/domain/collection/collector.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								internal/domain/collection/collector.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,6 @@ | |||||||
|  | package collection | ||||||
|  |  | ||||||
|  | type Collector interface { | ||||||
|  | 	Start() | ||||||
|  | 	Stop() | ||||||
|  | } | ||||||
							
								
								
									
										89
									
								
								internal/domain/collection/timed_collector.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								internal/domain/collection/timed_collector.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | |||||||
|  | package collection | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||||
|  | 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令 | ||||||
|  | type TimedCollector struct { | ||||||
|  | 	deviceRepo    repository.DeviceRepository | ||||||
|  | 	deviceService device.Service | ||||||
|  | 	logger        *logs.Logger | ||||||
|  | 	interval      time.Duration | ||||||
|  | 	ticker        *time.Ticker | ||||||
|  | 	done          chan bool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewTimedCollector 创建一个定时采集器实例 | ||||||
|  | func NewTimedCollector( | ||||||
|  | 	deviceRepo repository.DeviceRepository, | ||||||
|  | 	deviceService device.Service, | ||||||
|  | 	logger *logs.Logger, | ||||||
|  | 	interval time.Duration, | ||||||
|  | ) Collector { | ||||||
|  | 	return &TimedCollector{ | ||||||
|  | 		deviceRepo:    deviceRepo, | ||||||
|  | 		deviceService: deviceService, | ||||||
|  | 		logger:        logger, | ||||||
|  | 		interval:      interval, | ||||||
|  | 		done:          make(chan bool), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Start 开始定时采集 | ||||||
|  | func (c *TimedCollector) Start() { | ||||||
|  | 	c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval) | ||||||
|  | 	c.ticker = time.NewTicker(c.interval) | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-c.done: | ||||||
|  | 				return | ||||||
|  | 			case <-c.ticker.C: | ||||||
|  | 				c.collect() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Stop 停止定时采集 | ||||||
|  | func (c *TimedCollector) Stop() { | ||||||
|  | 	c.logger.Info("定时采集器停止") | ||||||
|  | 	c.ticker.Stop() | ||||||
|  | 	c.done <- true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // collect 是核心的采集逻辑 | ||||||
|  | func (c *TimedCollector) collect() { | ||||||
|  | 	c.logger.Info("开始新一轮的设备数据采集") | ||||||
|  |  | ||||||
|  | 	sensors, err := c.deviceRepo.ListAllSensors() | ||||||
|  | 	if err != nil { | ||||||
|  | 		c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(sensors) == 0 { | ||||||
|  | 		c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	sensorsByController := make(map[uint][]*models.Device) | ||||||
|  | 	for _, sensor := range sensors { | ||||||
|  | 		sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for controllerID, controllerSensors := range sensorsByController { | ||||||
|  | 		c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors)) | ||||||
|  | 		if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil { | ||||||
|  | 			c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	c.logger.Info("本轮设备数据采集完成") | ||||||
|  | } | ||||||
| @@ -44,6 +44,9 @@ type Config struct { | |||||||
|  |  | ||||||
| 	// Notify 通知服务配置 | 	// Notify 通知服务配置 | ||||||
| 	Notify NotifyConfig `yaml:"notify"` | 	Notify NotifyConfig `yaml:"notify"` | ||||||
|  |  | ||||||
|  | 	// Collection 定时采集配置 | ||||||
|  | 	Collection CollectionConfig `yaml:"collection"` | ||||||
| } | } | ||||||
|  |  | ||||||
| // AppConfig 代表应用基础配置 | // AppConfig 代表应用基础配置 | ||||||
| @@ -195,6 +198,11 @@ type LarkConfig struct { | |||||||
| 	AppSecret string `yaml:"appSecret"` | 	AppSecret string `yaml:"appSecret"` | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // CollectionConfig 代表定时采集配置 | ||||||
|  | type CollectionConfig struct { | ||||||
|  | 	Interval int `yaml:"interval"` | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewConfig 创建并返回一个新的配置实例 | // NewConfig 创建并返回一个新的配置实例 | ||||||
| func NewConfig() *Config { | func NewConfig() *Config { | ||||||
| 	// 默认值可以在这里设置,但我们优先使用配置文件中的值 | 	// 默认值可以在这里设置,但我们优先使用配置文件中的值 | ||||||
|   | |||||||
| @@ -23,6 +23,9 @@ type DeviceRepository interface { | |||||||
| 	// ListAll 获取所有设备的列表 | 	// ListAll 获取所有设备的列表 | ||||||
| 	ListAll() ([]*models.Device, error) | 	ListAll() ([]*models.Device, error) | ||||||
|  |  | ||||||
|  | 	// ListAllSensors 获取所有传感器类型的设备列表 | ||||||
|  | 	ListAllSensors() ([]*models.Device, error) | ||||||
|  |  | ||||||
| 	// ListByAreaControllerID 根据区域主控 ID 列出所有子设备。 | 	// ListByAreaControllerID 根据区域主控 ID 列出所有子设备。 | ||||||
| 	ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) | 	ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) | ||||||
|  |  | ||||||
| @@ -84,6 +87,19 @@ func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) { | |||||||
| 	return devices, nil | 	return devices, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ListAllSensors 检索归类为传感器的所有设备 | ||||||
|  | func (r *gormDeviceRepository) ListAllSensors() ([]*models.Device, error) { | ||||||
|  | 	var sensors []*models.Device | ||||||
|  | 	err := r.db.Preload("AreaController").Preload("DeviceTemplate"). | ||||||
|  | 		Joins("JOIN device_templates ON device_templates.id = devices.device_template_id"). | ||||||
|  | 		Where("device_templates.category = ?", models.CategorySensor). | ||||||
|  | 		Find(&sensors).Error | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("查询所有传感器失败: %w", err) | ||||||
|  | 	} | ||||||
|  | 	return sensors, nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // ListByAreaControllerID 根据区域主控 ID 列出所有子设备 | // ListByAreaControllerID 根据区域主控 ID 列出所有子设备 | ||||||
| func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) { | func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) { | ||||||
| 	var devices []*models.Device | 	var devices []*models.Device | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user