增加全量采集任务
This commit is contained in:
		
							
								
								
									
										93
									
								
								internal/domain/task/full_collection_task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								internal/domain/task/full_collection_task.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,93 @@ | ||||
| package task | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| ) | ||||
|  | ||||
| // FullCollectionTask 实现了 scheduler.Task 接口,用于执行一次全量的设备数据采集 | ||||
| type FullCollectionTask struct { | ||||
| 	log           *models.TaskExecutionLog | ||||
| 	deviceRepo    repository.DeviceRepository | ||||
| 	deviceService device.Service | ||||
| 	logger        *logs.Logger | ||||
| } | ||||
|  | ||||
| // NewFullCollectionTask 创建一个全量采集任务实例 | ||||
| func NewFullCollectionTask( | ||||
| 	log *models.TaskExecutionLog, | ||||
| 	deviceRepo repository.DeviceRepository, | ||||
| 	deviceService device.Service, | ||||
| 	logger *logs.Logger, | ||||
| ) *FullCollectionTask { | ||||
| 	return &FullCollectionTask{ | ||||
| 		log:           log, | ||||
| 		deviceRepo:    deviceRepo, | ||||
| 		deviceService: deviceService, | ||||
| 		logger:        logger, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Execute 是任务的核心执行逻辑 | ||||
| func (t *FullCollectionTask) Execute() error { | ||||
| 	t.logger.Infow("开始执行全量采集任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) | ||||
|  | ||||
| 	sensors, err := t.deviceRepo.ListAllSensors() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("全量采集任务: 从数据库获取所有传感器失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	if len(sensors) == 0 { | ||||
| 		t.logger.Infow("全量采集任务: 未发现任何传感器设备,跳过本次采集", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	sensorsByController := make(map[uint][]*models.Device) | ||||
| 	for _, sensor := range sensors { | ||||
| 		sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) | ||||
| 	} | ||||
|  | ||||
| 	var firstError error | ||||
| 	for controllerID, controllerSensors := range sensorsByController { | ||||
| 		t.logger.Infow("全量采集任务: 准备为区域主控下的传感器下发采集指令", | ||||
| 			"task_id", t.log.TaskID, | ||||
| 			"task_type", t.log.Task.Type, | ||||
| 			"log_id", t.log.ID, | ||||
| 			"controller_id", controllerID, | ||||
| 			"sensor_count", len(controllerSensors), | ||||
| 		) | ||||
| 		if err := t.deviceService.Collect(controllerID, controllerSensors); err != nil { | ||||
| 			t.logger.Errorw("全量采集任务: 为区域主控下发采集指令失败", | ||||
| 				"task_id", t.log.TaskID, | ||||
| 				"task_type", t.log.Task.Type, | ||||
| 				"log_id", t.log.ID, | ||||
| 				"controller_id", controllerID, | ||||
| 				"error", err, | ||||
| 			) | ||||
| 			if firstError == nil { | ||||
| 				firstError = err // 保存第一个错误 | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if firstError != nil { | ||||
| 		return fmt.Errorf("全量采集任务执行期间发生错误: %w", firstError) | ||||
| 	} | ||||
|  | ||||
| 	t.logger.Infow("全量采集任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 | ||||
| func (t *FullCollectionTask) OnFailure(executeErr error) { | ||||
| 	t.logger.Errorw("全量采集任务执行失败", | ||||
| 		"task_id", t.log.TaskID, | ||||
| 		"task_type", t.log.Task.Type, | ||||
| 		"log_id", t.log.ID, | ||||
| 		"error", executeErr, | ||||
| 	) | ||||
| } | ||||
| @@ -35,6 +35,8 @@ func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler. | ||||
| 		return NewDelayTask(t.logger, claimedLog) | ||||
| 	case models.TaskTypeReleaseFeedWeight: | ||||
| 		return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger) | ||||
| 	case models.TaskTypeFullCollection: | ||||
| 		return NewFullCollectionTask(claimedLog, t.deviceRepo, t.deviceService, t.logger) | ||||
| 	default: | ||||
| 		// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 | ||||
| 		t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) | ||||
|   | ||||
| @@ -34,6 +34,7 @@ const ( | ||||
| 	TaskPlanAnalysis          TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 | ||||
| 	TaskTypeWaiting           TaskType = "等待"   // 等待任务 | ||||
| 	TaskTypeReleaseFeedWeight TaskType = "下料"   // 下料口释放指定重量任务 | ||||
| 	TaskTypeFullCollection    TaskType = "全量采集" // 新增的全量采集任务 | ||||
| ) | ||||
|  | ||||
| // -- Task Parameters -- | ||||
|   | ||||
		Reference in New Issue
	
	Block a user