Merge pull request 'issue_16' (#17) from issue_16 into main
Reviewed-on: #17
This commit is contained in:
		| @@ -48,6 +48,7 @@ heartbeat: | ||||
| chirp_stack: | ||||
|   api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址 | ||||
|   api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN> | ||||
|   fport: 1 | ||||
|   api_timeout: 10 # ChirpStack API请求超时时间(秒) | ||||
|  | ||||
| # 任务调度器配置 | ||||
|   | ||||
| @@ -17,29 +17,23 @@ type GeneralDeviceService struct { | ||||
| 	deviceRepo repository.DeviceRepository | ||||
| 	logger     *logs.Logger | ||||
|  | ||||
| 	deviceID uint // 区域主控的设备ID | ||||
|  | ||||
| 	// regionalController 是执行命令的区域主控, 所有的指令都会发往区域主控 | ||||
| 	regionalController *models.Device | ||||
|  | ||||
| 	comm transport.Communicator | ||||
| } | ||||
|  | ||||
| // NewGeneralDeviceService 创建一个通用设备服务 | ||||
| func NewGeneralDeviceService(deviceID uint, deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { | ||||
| func NewGeneralDeviceService(deviceRepo repository.DeviceRepository, logger *logs.Logger, comm transport.Communicator) *GeneralDeviceService { | ||||
| 	return &GeneralDeviceService{ | ||||
| 		deviceID:   deviceID, | ||||
| 		deviceRepo: deviceRepo, | ||||
| 		logger:     logger, | ||||
| 		comm:       comm, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) error { | ||||
| func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error { | ||||
|  | ||||
| 	// 校验设备参数及生成指令 | ||||
| 	if *device.ParentID != g.deviceID { | ||||
| 		return fmt.Errorf("设备 %v(id=%v) 的上级区域主控是(id=%v), 不是当前区域主控(id=%v)下属设备, 无法执行指令", device.Name, device.ID, device.ParentID, g.deviceID) | ||||
| 	if *device.ParentID == 0 { | ||||
| 		return fmt.Errorf("设备 %v(id=%v) 的上级区域主控(id=%v) ID不合理, 无法执行指令", device.Name, device.ID, *device.ParentID) | ||||
| 	} | ||||
|  | ||||
| 	if !device.SelfCheck() { | ||||
| @@ -80,9 +74,9 @@ func (g *GeneralDeviceService) Switch(device models.Device, action DeviceAction) | ||||
| 	} | ||||
|  | ||||
| 	// 获取自身LoRa设备ID, 因为可能变更, 所以每次都现获取 | ||||
| 	thisDevice, err := g.deviceRepo.FindByID(g.deviceID) | ||||
| 	thisDevice, err := g.deviceRepo.FindByID(*device.ParentID) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", g.deviceID, err) | ||||
| 		return fmt.Errorf("获取区域主控(id=%v)信息失败: %v", *device.ParentID, err) | ||||
| 	} | ||||
| 	if !thisDevice.SelfCheck() { | ||||
| 		return fmt.Errorf("区域主控 %v(id=%v) 缺少必要信息, 无法发送指令", thisDevice.Name, thisDevice.ID) | ||||
|   | ||||
| @@ -9,9 +9,9 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	ParamsDelayDuration = "delay_duration" | ||||
| ) | ||||
| type DelayTaskParams struct { | ||||
| 	DelayDuration float64 `json:"delay_duration"` | ||||
| } | ||||
|  | ||||
| // DelayTask 是一个用于模拟延迟的 Task 实现 | ||||
| type DelayTask struct { | ||||
| @@ -20,36 +20,45 @@ type DelayTask struct { | ||||
| 	logger        *logs.Logger | ||||
| } | ||||
|  | ||||
| func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task { | ||||
| 	return &DelayTask{ | ||||
| 		executionTask: executionTask, | ||||
| 		logger:        logger, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Execute 执行延迟任务,等待指定的时间 | ||||
| func (d *DelayTask) Execute() error { | ||||
| 	if err := d.parseParameters(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) | ||||
| 	time.Sleep(d.duration) | ||||
| 	d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskExecutionLog) error { | ||||
| 	d.logger = logger | ||||
| 	d.executionTask = executionTask | ||||
|  | ||||
| func (d *DelayTask) parseParameters() error { | ||||
| 	if d.executionTask.Task.Parameters == nil { | ||||
| 		d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) | ||||
| 	} | ||||
|  | ||||
| 	var params map[string]interface{} | ||||
| 	if err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms); err != nil { | ||||
| 	var params DelayTaskParams | ||||
| 	err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms) | ||||
| 	if err != nil { | ||||
| 		d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) | ||||
| 		return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) | ||||
| 	} | ||||
|  | ||||
| 	duration, ok := params[ParamsDelayDuration].(float64) | ||||
| 	if !ok { | ||||
| 		d.logger.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) | ||||
| 		return fmt.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) | ||||
| 	if params.DelayDuration <= 0 { | ||||
| 		d.logger.Errorf("任务 %v: 参数 delay_duration 缺失或无效 (必须大于0)", d.executionTask.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数 delay_duration 缺失或无效 (必须大于0)", d.executionTask.TaskID) | ||||
| 	} | ||||
|  | ||||
| 	d.duration = time.Duration(duration) * time.Second | ||||
| 	d.duration = time.Duration(params.DelayDuration) * time.Second | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										164
									
								
								internal/app/service/task/release_feed_weight_task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										164
									
								
								internal/app/service/task/release_feed_weight_task.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,164 @@ | ||||
| package task | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/app/service/device" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| ) | ||||
|  | ||||
| // ReleaseFeedWeightTaskParams 定义了 ReleaseFeedWeightTask 的参数结构 | ||||
| type ReleaseFeedWeightTaskParams struct { | ||||
| 	ReleaseWeight      float64 `json:"release_weight"`        // 需要释放的重量 | ||||
| 	FeedPortDeviceID   uint    `json:"feed_port_device_id"`   // 下料口ID | ||||
| 	MixingTankDeviceID uint    `json:"mixing_tank_device_id"` // 称重传感器ID | ||||
| } | ||||
|  | ||||
| // ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务 | ||||
| type ReleaseFeedWeightTask struct { | ||||
| 	deviceRepo     repository.DeviceRepository | ||||
| 	sensorDataRepo repository.SensorDataRepository | ||||
| 	claimedLog     *models.TaskExecutionLog | ||||
|  | ||||
| 	feedPortDevice     *models.Device // 下料口基本信息 | ||||
| 	releaseWeight      float64        // 需要释放的重量 | ||||
| 	mixingTankDeviceID uint           // 搅拌罐称重传感器ID | ||||
|  | ||||
| 	comm     transport.Communicator | ||||
| 	feedPort *device.GeneralDeviceService // 下料口指令下发器 | ||||
|  | ||||
| 	logger *logs.Logger | ||||
| } | ||||
|  | ||||
| // NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例 | ||||
| func NewReleaseFeedWeightTask(claimedLog *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator, logger *logs.Logger) Task { | ||||
| 	return &ReleaseFeedWeightTask{ | ||||
| 		claimedLog:     claimedLog, | ||||
| 		deviceRepo:     deviceRepo, | ||||
| 		sensorDataRepo: sensorDataRepo, | ||||
| 		comm:           comm, | ||||
| 		logger:         logger, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *ReleaseFeedWeightTask) Execute() error { | ||||
| 	r.logger.Infof("任务 %v: 开始执行, 日志ID: %v", r.claimedLog.TaskID, r.claimedLog.ID) | ||||
| 	if err := r.parseParameters(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	weight, err := r.getNowWeight() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStart); err != nil { | ||||
| 		r.logger.Errorf("启动下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	targetWeight := weight - r.releaseWeight | ||||
| 	errCount := 1 | ||||
|  | ||||
| 	// TODO 这个判断有延迟, 尤其是LoRa通信本身延迟较高, 可以考虑根据信号质量或其他指标提前发送停止命令 | ||||
| 	for targetWeight <= weight { | ||||
| 		weight, err = r.getNowWeight() | ||||
| 		if err != nil { | ||||
| 			errCount++ | ||||
| 			if errCount > 3 { // 如果连续三次没成功采集到重量数据,则认为计划执行失败 | ||||
| 				r.logger.Errorf("获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v, 任务结束", r.claimedLog.ID, err) | ||||
| 				return err | ||||
| 			} | ||||
| 			r.logger.Warnf("第%v次尝试获取当前计划执行日志(id=%v)的当前搅拌罐重量失败: %v", errCount, r.claimedLog.ID, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
|  | ||||
| 	if err = r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop); err != nil { | ||||
| 		r.logger.Errorf("关闭下料口(id=%v)失败: %v , 日志ID: %v", r.feedPortDevice.ID, err, r.claimedLog.ID) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	r.logger.Infof("完成计划执行日志(id=%v)的当前计划, 完成下料 %vkg, 搅拌罐剩余重量 %vkg", r.claimedLog.ID, r.releaseWeight, weight) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // 获取当前搅拌罐重量 | ||||
| func (r *ReleaseFeedWeightTask) getNowWeight() (float64, error) { | ||||
| 	sensorData, err := r.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(r.mixingTankDeviceID, models.SensorDataTypeWeight) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("获取设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	if sensorData == nil { | ||||
| 		return 0, fmt.Errorf("未找到设备 %v 的最新重量传感器数据", r.mixingTankDeviceID) | ||||
| 	} | ||||
|  | ||||
| 	wg := &models.WeightData{} | ||||
| 	err = json.Unmarshal(sensorData.Data, wg) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	return wg.WeightKilograms, nil | ||||
| } | ||||
|  | ||||
| func (r *ReleaseFeedWeightTask) parseParameters() error { | ||||
| 	if r.claimedLog.Task.Parameters == nil { | ||||
| 		r.logger.Errorf("任务 %v: 缺少参数", r.claimedLog.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数不全", r.claimedLog.TaskID) | ||||
| 	} | ||||
|  | ||||
| 	var params ReleaseFeedWeightTaskParams | ||||
| 	err := json.Unmarshal(r.claimedLog.Task.Parameters, ¶ms) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) | ||||
| 		return fmt.Errorf("任务 %v: 解析参数失败: %v", r.claimedLog.TaskID, err) | ||||
| 	} | ||||
|  | ||||
| 	// 校验参数是否存在 | ||||
| 	if params.ReleaseWeight == 0 { | ||||
| 		r.logger.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数 release_weight 缺失或无效", r.claimedLog.TaskID) | ||||
| 	} | ||||
| 	if params.FeedPortDeviceID == 0 { | ||||
| 		r.logger.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数 feed_port_device_id 缺失或无效", r.claimedLog.TaskID) | ||||
| 	} | ||||
| 	if params.MixingTankDeviceID == 0 { | ||||
| 		r.logger.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) | ||||
| 		return fmt.Errorf("任务 %v: 参数 mixing_tank_device_id 缺失或无效", r.claimedLog.TaskID) | ||||
| 	} | ||||
|  | ||||
| 	r.releaseWeight = params.ReleaseWeight | ||||
| 	r.mixingTankDeviceID = params.MixingTankDeviceID | ||||
| 	r.feedPort = device.NewGeneralDeviceService(r.deviceRepo, r.logger, r.comm) | ||||
| 	r.feedPortDevice, err = r.deviceRepo.FindByID(params.FeedPortDeviceID) | ||||
| 	if err != nil { | ||||
| 		r.logger.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) | ||||
| 		return fmt.Errorf("任务 %v: 获取设备信息失败: %v", r.claimedLog.TaskID, err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) { | ||||
| 	r.logger.Errorf("开始善后处理, 日志ID:%v", r.claimedLog.ID) | ||||
| 	if r.feedPort != nil { | ||||
| 		err := r.feedPort.Switch(r.feedPortDevice, device.DeviceActionStop) | ||||
| 		if err != nil { | ||||
| 			r.logger.Errorf("[严重] 下料口停止失败, 日志ID: %v, 错误: %v", r.claimedLog.ID, err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		r.logger.Warnf("[警告] 下料口通信器尚未初始化, 不进行任何操作, 日志ID: %v", r.claimedLog.ID) | ||||
| 	} | ||||
| 	r.logger.Errorf("善后处理完成, 日志ID:%v", r.claimedLog.ID) | ||||
| } | ||||
| @@ -9,6 +9,7 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" | ||||
| 	"github.com/panjf2000/ants/v2" | ||||
| 	"gorm.io/gorm" | ||||
| ) | ||||
| @@ -80,10 +81,12 @@ type Scheduler struct { | ||||
| 	workers                 int | ||||
| 	pendingTaskRepo         repository.PendingTaskRepository | ||||
| 	executionLogRepo        repository.ExecutionLogRepository | ||||
| 	deviceRepo              repository.DeviceRepository | ||||
| 	sensorDataRepo          repository.SensorDataRepository | ||||
| 	planRepo                repository.PlanRepository | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager | ||||
| 	comm                    transport.Communicator | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager | ||||
| 	progressTracker         *ProgressTracker | ||||
| 	taskFactory             func(taskType models.TaskType) Task // 调度器需要注入一个任务工厂,用于创建任务实例 | ||||
|  | ||||
| 	pool     *ants.Pool // 使用 ants 协程池来管理并发 | ||||
| 	wg       sync.WaitGroup | ||||
| @@ -94,22 +97,26 @@ type Scheduler struct { | ||||
| func NewScheduler( | ||||
| 	pendingTaskRepo repository.PendingTaskRepository, | ||||
| 	executionLogRepo repository.ExecutionLogRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	sensorDataRepo repository.SensorDataRepository, | ||||
| 	planRepo repository.PlanRepository, | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager | ||||
| 	taskFactory func(taskType models.TaskType) Task, | ||||
| 	comm transport.Communicator, | ||||
| 	analysisPlanTaskManager *AnalysisPlanTaskManager, | ||||
| 	logger *logs.Logger, | ||||
| 	interval time.Duration, | ||||
| 	numWorkers int) *Scheduler { | ||||
| 	return &Scheduler{ | ||||
| 		pendingTaskRepo:         pendingTaskRepo, | ||||
| 		executionLogRepo:        executionLogRepo, | ||||
| 		deviceRepo:              deviceRepo, | ||||
| 		sensorDataRepo:          sensorDataRepo, | ||||
| 		planRepo:                planRepo, | ||||
| 		analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager | ||||
| 		comm:                    comm, | ||||
| 		analysisPlanTaskManager: analysisPlanTaskManager, | ||||
| 		logger:                  logger, | ||||
| 		pollingInterval:         interval, | ||||
| 		workers:                 numWorkers, | ||||
| 		progressTracker:         NewProgressTracker(), | ||||
| 		taskFactory:             taskFactory, | ||||
| 		stopChan:                make(chan struct{}), // 初始化停止信号通道 | ||||
| 	} | ||||
| } | ||||
| @@ -264,17 +271,10 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { | ||||
|  | ||||
| 	} else { | ||||
| 		// 执行普通任务 | ||||
| 		task := s.taskFactory(claimedLog.Task.Type) | ||||
| 		if err := task.ParseParams(s.logger, claimedLog); err != nil { | ||||
|  | ||||
| 			s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) | ||||
| 			return err | ||||
| 		} | ||||
| 		task := s.taskFactory(claimedLog) | ||||
|  | ||||
| 		if err := task.Execute(); err != nil { | ||||
|  | ||||
| 			s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) | ||||
|  | ||||
| 			task.OnFailure(err) | ||||
| 			return err | ||||
| 		} | ||||
| @@ -283,6 +283,20 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // taskFactory 会根据任务类型初始化对应任务 | ||||
| func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task { | ||||
| 	switch claimedLog.Task.Type { | ||||
| 	case models.TaskTypeWaiting: | ||||
| 		return NewDelayTask(s.logger, claimedLog) | ||||
| 	case models.TaskTypeReleaseFeedWeight: | ||||
| 		return NewReleaseFeedWeightTask(claimedLog, s.deviceRepo, s.sensorDataRepo, s.comm, s.logger) | ||||
|  | ||||
| 	default: | ||||
| 		// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 | ||||
| 		panic("不支持的任务类型") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中 | ||||
| func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error { | ||||
| 	// 创建Plan执行记录 | ||||
|   | ||||
| @@ -1,7 +1,6 @@ | ||||
| package task | ||||
|  | ||||
| import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| ) | ||||
|  | ||||
| @@ -13,9 +12,6 @@ type Task interface { | ||||
| 	// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 | ||||
| 	Execute() error | ||||
|  | ||||
| 	// ParseParams 解析及校验参数 | ||||
| 	ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error | ||||
|  | ||||
| 	// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 | ||||
| 	// log: 任务执行的上下文。 | ||||
| 	// executeErr: 从 Execute 方法返回的原始错误。 | ||||
|   | ||||
| @@ -16,6 +16,7 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" | ||||
| ) | ||||
|  | ||||
| // Application 是整个应用的核心,封装了所有组件和生命周期。 | ||||
| @@ -81,8 +82,11 @@ func NewApplication(configPath string) (*Application, error) { | ||||
| 	// 初始化计划触发器管理器 | ||||
| 	analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) | ||||
|  | ||||
| 	// 初始化设备通信器 | ||||
| 	comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo) | ||||
|  | ||||
| 	//  初始化任务执行器 | ||||
| 	executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) | ||||
| 	executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) | ||||
|  | ||||
| 	//  初始化 API 服务器 | ||||
| 	apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) | ||||
|   | ||||
| @@ -30,6 +30,9 @@ type Config struct { | ||||
| 	// Heartbeat 心跳配置 | ||||
| 	Heartbeat HeartbeatConfig `yaml:"heartbeat"` | ||||
|  | ||||
| 	// ChirpStack ChirpStack API 配置 | ||||
| 	ChirpStack ChirpStackConfig `yaml:"chirp_stack"` | ||||
|  | ||||
| 	// TaskConfig 任务调度配置 | ||||
| 	Task TaskConfig `yaml:"task"` | ||||
| } | ||||
| @@ -112,6 +115,14 @@ type HeartbeatConfig struct { | ||||
| 	Concurrency int `yaml:"concurrency"` | ||||
| } | ||||
|  | ||||
| // ChirpStackConfig 代表 ChirpStack API 配置 | ||||
| type ChirpStackConfig struct { | ||||
| 	APIHost    string `yaml:"api_host"` | ||||
| 	APIToken   string `yaml:"api_token"` | ||||
| 	FPort      int    `yaml:"fport"` | ||||
| 	APITimeout int    `yaml:"api_timeout"` | ||||
| } | ||||
|  | ||||
| // TaskConfig 代表任务调度配置 | ||||
| type TaskConfig struct { | ||||
| 	Interval   int `yaml:"interval"` | ||||
| @@ -139,3 +150,8 @@ func (c *Config) Load(path string) error { | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 | ||||
| func (c ChirpStackConfig) GenerateAPIKey() string { | ||||
| 	return "Bearer " + c.APIToken | ||||
| } | ||||
|   | ||||
| @@ -124,7 +124,7 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable | ||||
| 	// 如果是 TimescaleDB, 则将部分表转换为 hypertable | ||||
| 	if ps.isTimescaleDB { | ||||
| 		ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换") | ||||
| 		if err := ps.creatingHyperTable(); err != nil { | ||||
| @@ -139,16 +139,18 @@ func (ps *PostgresStorage) creatingHyperTable() error { | ||||
| 	// 将 sensor_data 转换为超表 | ||||
| 	// 使用 if_not_exists => TRUE 保证幂等性 | ||||
| 	// 'time' 是 SensorData 模型中定义的时间列 | ||||
| 	sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);" | ||||
| 	// 设置 chunk_time_interval 为 1 天, 以优化按天查询的性能 | ||||
| 	sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" | ||||
| 	if err := ps.db.Exec(sqlSensorData).Error; err != nil { | ||||
| 		ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) | ||||
| 		return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) | ||||
| 	} | ||||
| 	ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)") | ||||
| 	ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换), chunk 间隔为 1 天") | ||||
|  | ||||
| 	// 将 device_command_log 转换为超表 | ||||
| 	// 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 | ||||
| 	sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);" | ||||
| 	// 设置 chunk_time_interval 为 1 天 | ||||
| 	sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" | ||||
| 	if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { | ||||
| 		ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) | ||||
| 		return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) | ||||
|   | ||||
| @@ -29,10 +29,12 @@ const ( | ||||
| type TaskType string | ||||
|  | ||||
| const ( | ||||
| 	TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 | ||||
| 	TaskTypeWaiting  TaskType = "waiting"       // 等待任务 | ||||
| 	TaskPlanAnalysis          TaskType = "plan_analysis"       // 解析Plan的Task列表并添加到待执行队列的特殊任务 | ||||
| 	TaskTypeWaiting           TaskType = "waiting"             // 等待任务 | ||||
| 	TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务 | ||||
| ) | ||||
|  | ||||
| // -- Task Parameters -- | ||||
| const ( | ||||
| 	// 这个参数是 TaskPlanAnalysis 类型的 Task Parameters 中用于记录plan_id的字段的key | ||||
| 	ParamsPlanID = "plan_id" | ||||
|   | ||||
| @@ -50,22 +50,18 @@ type WeightData struct { | ||||
| // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 | ||||
| type SensorData struct { | ||||
| 	// Time 是数据记录的时间戳,作为复合主键的一部分。 | ||||
| 	// GORM 会将其映射到 'time' TIMESTAMPTZ 列。 | ||||
| 	Time time.Time `gorm:"primaryKey" json:"time"` | ||||
|  | ||||
| 	// DeviceID 是传感器的唯一标识符,作为复合主键的另一部分。 | ||||
| 	// GORM 会将其映射到 'device_id' VARCHAR(50) 列。 | ||||
| 	DeviceID uint `gorm:"primaryKey" json:"device_id"` | ||||
|  | ||||
| 	// RegionalControllerID 是上报此数据的区域主控的ID。 | ||||
| 	// 我们为其添加了数据库索引以优化按区域查询的性能。 | ||||
| 	RegionalControllerID uint `json:"regional_controller_id"` | ||||
|  | ||||
| 	// SensorDataType 是传感数据的类型 | ||||
| 	SensorDataType SensorDataType `gorm:"not null;index" json:"sensor_data_type"` | ||||
|  | ||||
| 	// Data 存储一个或多个传感器读数,格式为 JSON。 | ||||
| 	// GORM 会使用 'jsonb' 类型来创建此列。 | ||||
| 	Data datatypes.JSON `gorm:"type:jsonb" json:"data"` | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| package repository | ||||
|  | ||||
| import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"gorm.io/gorm" | ||||
| ) | ||||
| @@ -8,6 +10,7 @@ import ( | ||||
| // SensorDataRepository 定义了与传感器数据相关的数据库操作接口。 | ||||
| type SensorDataRepository interface { | ||||
| 	Create(sensorData *models.SensorData) error | ||||
| 	GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorType models.SensorDataType) (*models.SensorData, error) | ||||
| } | ||||
|  | ||||
| // gormSensorDataRepository 是 SensorDataRepository 的 GORM 实现。 | ||||
| @@ -25,3 +28,13 @@ func NewGormSensorDataRepository(db *gorm.DB) SensorDataRepository { | ||||
| func (r *gormSensorDataRepository) Create(sensorData *models.SensorData) error { | ||||
| 	return r.db.Create(sensorData).Error | ||||
| } | ||||
|  | ||||
| // GetLatestSensorDataByDeviceIDAndSensorType 根据设备ID和传感器类型查询最新的传感器数据。 | ||||
| func (r *gormSensorDataRepository) GetLatestSensorDataByDeviceIDAndSensorType(deviceID uint, sensorDataType models.SensorDataType) (*models.SensorData, error) { | ||||
| 	var sensorData models.SensorData | ||||
| 	// 增加一个时间范围来缩小查询范围, 从而加快查找速度, 当使用时序数据库时时间范围可以让数据库忽略时间靠前的分片 | ||||
| 	err := r.db.Where("device_id = ? AND sensor_data_type = ? AND time >=?", deviceID, sensorDataType, time.Now().Add(-24*time.Hour)). | ||||
| 		Order("time DESC"). | ||||
| 		First(&sensorData).Error | ||||
| 	return &sensorData, err | ||||
| } | ||||
|   | ||||
| @@ -3,6 +3,7 @@ package lora | ||||
| import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| @@ -14,26 +15,11 @@ import ( | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client" | ||||
| ) | ||||
|  | ||||
| // ChirpStackConfig 保存连接到 ChirpStack API 所需的配置。 | ||||
| type ChirpStackConfig struct { | ||||
| 	// ServerAddress 是 ChirpStack API 服务器的地址,例如 "localhost:8080"。 | ||||
| 	ServerAddress string | ||||
| 	// APIKey 是用于认证的 API 密钥。 | ||||
| 	APIKey string | ||||
| 	// LoRaWAN 端口, 需要和设备一致 | ||||
| 	Fport int64 | ||||
| } | ||||
|  | ||||
| // GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分 | ||||
| func (c ChirpStackConfig) GenerateAPIKey() string { | ||||
| 	return "Bearer " + c.APIKey | ||||
| } | ||||
|  | ||||
| // ChirpStackTransport 是一个客户端,用于封装与 ChirpStack REST API 的交互。 | ||||
| type ChirpStackTransport struct { | ||||
| 	client   *client.ChirpStackRESTAPI | ||||
| 	authInfo runtime.ClientAuthInfoWriter | ||||
| 	config   ChirpStackConfig | ||||
| 	config   config.ChirpStackConfig | ||||
|  | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository | ||||
| 	deviceRepo           repository.DeviceRepository | ||||
| @@ -43,14 +29,14 @@ type ChirpStackTransport struct { | ||||
|  | ||||
| // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 | ||||
| func NewChirpStackTransport( | ||||
| 	config ChirpStackConfig, | ||||
| 	config config.ChirpStackConfig, | ||||
| 	logger *logs.Logger, | ||||
| 	deviceCommandLogRepo repository.DeviceCommandLogRepository, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| ) *ChirpStackTransport { | ||||
| 	// 使用配置中的服务器地址创建一个 HTTP transport。 | ||||
| 	// 它会使用生成的客户端中定义的默认 base path 和 schemes。 | ||||
| 	transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) | ||||
| 	transport := httptransport.New(config.APIHost, client.DefaultBasePath, client.DefaultSchemes) | ||||
|  | ||||
| 	// 使用 transport 和默认的字符串格式化器,创建一个 API 主客户端。 | ||||
| 	apiClient := client.New(transport, strfmt.Default) | ||||
| @@ -77,7 +63,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error { | ||||
| 		QueueItem: &device_service.DeviceServiceEnqueueParamsBodyQueueItem{ | ||||
| 			Confirmed: true, | ||||
| 			Data:      payload, | ||||
| 			FPort:     c.config.Fport, | ||||
| 			FPort:     int64(c.config.FPort), | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user