242 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			242 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package device
 | 
						||
 | 
						||
import (
 | 
						||
	"errors"
 | 
						||
	"fmt"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
 | 
						||
 | 
						||
	"github.com/google/uuid"
 | 
						||
	gproto "google.golang.org/protobuf/proto"
 | 
						||
)
 | 
						||
 | 
						||
type GeneralDeviceService struct {
 | 
						||
	deviceRepo            repository.DeviceRepository
 | 
						||
	deviceCommandLogRepo  repository.DeviceCommandLogRepository
 | 
						||
	pendingCollectionRepo repository.PendingCollectionRepository
 | 
						||
	logger                *logs.Logger
 | 
						||
	comm                  transport.Communicator
 | 
						||
}
 | 
						||
 | 
						||
// NewGeneralDeviceService 创建一个通用设备服务
 | 
						||
func NewGeneralDeviceService(
 | 
						||
	deviceRepo repository.DeviceRepository,
 | 
						||
	deviceCommandLogRepo repository.DeviceCommandLogRepository,
 | 
						||
	pendingCollectionRepo repository.PendingCollectionRepository,
 | 
						||
	logger *logs.Logger,
 | 
						||
	comm transport.Communicator,
 | 
						||
) Service {
 | 
						||
	return &GeneralDeviceService{
 | 
						||
		deviceRepo:            deviceRepo,
 | 
						||
		deviceCommandLogRepo:  deviceCommandLogRepo,
 | 
						||
		pendingCollectionRepo: pendingCollectionRepo,
 | 
						||
		logger:                logger,
 | 
						||
		comm:                  comm,
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
func (g *GeneralDeviceService) Switch(device *models.Device, action DeviceAction) error {
 | 
						||
	// 1. 依赖模型自身的 SelfCheck 进行全面校验
 | 
						||
	if err := device.SelfCheck(); err != nil {
 | 
						||
		return fmt.Errorf("设备 %v(id=%v) 未通过自检: %w", device.Name, device.ID, err)
 | 
						||
	}
 | 
						||
	if err := device.DeviceTemplate.SelfCheck(); err != nil {
 | 
						||
		return fmt.Errorf("设备 %v(id=%v) 的模板未通过自检: %w", device.Name, device.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 2. 检查预加载的 AreaController 是否有效
 | 
						||
	areaController := &device.AreaController
 | 
						||
	if err := areaController.SelfCheck(); err != nil {
 | 
						||
		return fmt.Errorf("区域主控 %v(id=%v) 未通过自检: %w", areaController.Name, areaController.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 3. 使用模型层预定义的 Bus485Properties 结构体解析设备属性
 | 
						||
	var deviceProps models.Bus485Properties
 | 
						||
	if err := device.ParseProperties(&deviceProps); err != nil {
 | 
						||
		return fmt.Errorf("解析设备 %v(id=%v) 的属性失败: %w", device.Name, device.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 4. 解析设备模板中的开关指令参数
 | 
						||
	var switchCmd models.SwitchCommands
 | 
						||
	if err := device.DeviceTemplate.ParseCommands(&switchCmd); err != nil {
 | 
						||
		return fmt.Errorf("解析设备 %v(id=%v) 的开关指令失败: %w", device.Name, device.ID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 5. 根据 action 生成 Modbus RTU 写入指令
 | 
						||
	onOffState := true              // 默认为开启
 | 
						||
	if action == DeviceActionStop { // 如果是停止动作,则设置为关闭
 | 
						||
		onOffState = false
 | 
						||
	}
 | 
						||
 | 
						||
	modbusCommandBytes, err := command_generater.GenerateModbusRTUSwitchCommand(
 | 
						||
		deviceProps.BusAddress,
 | 
						||
		switchCmd.ModbusStartAddress,
 | 
						||
		onOffState,
 | 
						||
	)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("生成Modbus RTU写入指令失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 6. 构建 Protobuf Raw485Command,包含总线号
 | 
						||
	raw485Cmd := &proto.Raw485Command{
 | 
						||
		BusNumber:    int32(deviceProps.BusNumber), // 添加总线号
 | 
						||
		CommandBytes: modbusCommandBytes,
 | 
						||
	}
 | 
						||
 | 
						||
	instruction := &proto.Instruction{
 | 
						||
		Payload: &proto.Instruction_Raw_485Command{
 | 
						||
			Raw_485Command: raw485Cmd,
 | 
						||
		},
 | 
						||
	}
 | 
						||
 | 
						||
	message, err := gproto.Marshal(instruction)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("序列化指令失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 7. 发送指令
 | 
						||
	networkID := areaController.NetworkID
 | 
						||
	sendResult, err := g.comm.Send(networkID, message)
 | 
						||
	if err != nil {
 | 
						||
		return fmt.Errorf("发送指令到 %s 失败: %w", networkID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 8. 创建并保存命令日志
 | 
						||
	logRecord := &models.DeviceCommandLog{
 | 
						||
		MessageID: sendResult.MessageID,
 | 
						||
		DeviceID:  areaController.ID,
 | 
						||
		SentAt:    time.Now(),
 | 
						||
	}
 | 
						||
	if err := g.deviceCommandLogRepo.Create(logRecord); err != nil {
 | 
						||
		// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
 | 
						||
		// 我们记录一个错误日志,然后成功返回。
 | 
						||
		g.logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	g.logger.Infof("成功发送指令到 %s 并创建日志 (MessageID: %s)", networkID, sendResult.MessageID)
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。
 | 
						||
func (g *GeneralDeviceService) Collect(regionalControllerID uint, devicesToCollect []*models.Device) error {
 | 
						||
	if len(devicesToCollect) == 0 {
 | 
						||
		g.logger.Info("待采集设备列表为空,无需执行采集任务。")
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	// 1. 从设备列表中获取预加载的区域主控,并进行校验
 | 
						||
	regionalController := &devicesToCollect[0].AreaController
 | 
						||
	if regionalController.ID != regionalControllerID {
 | 
						||
		return fmt.Errorf("设备列表与指定的区域主控ID (%d) 不匹配", regionalControllerID)
 | 
						||
	}
 | 
						||
	if err := regionalController.SelfCheck(); err != nil {
 | 
						||
		return fmt.Errorf("区域主控 (ID: %d) 未通过自检: %w", regionalControllerID, err)
 | 
						||
	}
 | 
						||
 | 
						||
	// 2. 准备采集任务列表
 | 
						||
	var childDeviceIDs []uint
 | 
						||
	var collectTasks []*proto.CollectTask
 | 
						||
 | 
						||
	for _, dev := range devicesToCollect {
 | 
						||
		// 依赖模型自身的 SelfCheck 进行全面校验
 | 
						||
		if err := dev.SelfCheck(); err != nil {
 | 
						||
			g.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		if err := dev.DeviceTemplate.SelfCheck(); err != nil {
 | 
						||
			g.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		// 使用模板的 ParseCommands 方法获取传感器指令参数
 | 
						||
		var sensorCmd models.SensorCommands
 | 
						||
		if err := dev.DeviceTemplate.ParseCommands(&sensorCmd); err != nil {
 | 
						||
			g.logger.Warnf("跳过设备 %d,因其模板指令无法解析为 SensorCommands: %v", dev.ID, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		// 使用模型层预定义的 Bus485Properties 结构体解析设备属性
 | 
						||
		var deviceProps models.Bus485Properties
 | 
						||
		if err := dev.ParseProperties(&deviceProps); err != nil {
 | 
						||
			g.logger.Warnf("跳过设备 %d,因其属性解析失败: %v", dev.ID, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
 | 
						||
		// 生成 Modbus RTU 读取指令
 | 
						||
		modbusCommandBytes, err := command_generater.GenerateModbusRTUReadCommand(
 | 
						||
			deviceProps.BusAddress,
 | 
						||
			sensorCmd.ModbusFunctionCode,
 | 
						||
			sensorCmd.ModbusStartAddress,
 | 
						||
			sensorCmd.ModbusQuantity,
 | 
						||
		)
 | 
						||
		if err != nil {
 | 
						||
			g.logger.Warnf("跳过设备 %d,因生成Modbus RTU读取指令失败: %v", dev.ID, err)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		g.logger.Debugf("生成485指令: %v", modbusCommandBytes)
 | 
						||
 | 
						||
		// 构建 Raw485Command,包含总线号
 | 
						||
		raw485Cmd := &proto.Raw485Command{
 | 
						||
			BusNumber:    int32(deviceProps.BusNumber), // 添加总线号
 | 
						||
			CommandBytes: modbusCommandBytes,
 | 
						||
		}
 | 
						||
 | 
						||
		collectTasks = append(collectTasks, &proto.CollectTask{
 | 
						||
			Command: raw485Cmd,
 | 
						||
		})
 | 
						||
		childDeviceIDs = append(childDeviceIDs, dev.ID)
 | 
						||
	}
 | 
						||
 | 
						||
	if len(childDeviceIDs) == 0 {
 | 
						||
		return errors.New("经过滤后,没有可通过自检的有效设备")
 | 
						||
	}
 | 
						||
 | 
						||
	// 3. 构建并发送指令
 | 
						||
	networkID := regionalController.NetworkID
 | 
						||
 | 
						||
	// 4. 创建待处理请求记录
 | 
						||
	correlationID := uuid.New().String()
 | 
						||
	pendingReq := &models.PendingCollection{
 | 
						||
		CorrelationID:   correlationID,
 | 
						||
		DeviceID:        regionalController.ID,
 | 
						||
		CommandMetadata: childDeviceIDs,
 | 
						||
		Status:          models.PendingStatusPending,
 | 
						||
		CreatedAt:       time.Now(),
 | 
						||
	}
 | 
						||
	if err := g.pendingCollectionRepo.Create(pendingReq); err != nil {
 | 
						||
		g.logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
	g.logger.Infof("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, regionalController.ID)
 | 
						||
 | 
						||
	// 5. 构建最终的空中载荷
 | 
						||
	batchCmd := &proto.BatchCollectCommand{
 | 
						||
		CorrelationId: correlationID,
 | 
						||
		Tasks:         collectTasks,
 | 
						||
	}
 | 
						||
	instruction := &proto.Instruction{
 | 
						||
		Payload: &proto.Instruction_BatchCollectCommand{
 | 
						||
			BatchCollectCommand: batchCmd,
 | 
						||
		},
 | 
						||
	}
 | 
						||
	payload, err := gproto.Marshal(instruction)
 | 
						||
	if err != nil {
 | 
						||
		g.logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
	g.logger.Infof("构造空中载荷成功: networkID: %v, payload: %v", networkID, instruction)
 | 
						||
	if _, err := g.comm.Send(networkID, payload); err != nil {
 | 
						||
		g.logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致!", correlationID, err)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	g.logger.Infof("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID)
 | 
						||
	return nil
 | 
						||
}
 |