// Package websocket 提供WebSocket通信功能 // 实现中继设备和平台之间的双向通信 package websocket import ( "context" "encoding/json" "fmt" "net/http" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) // WebSocket消息类型常量 const ( // MessageTypeCommand 平台向设备发送的指令 MessageTypeCommand = "command" // MessageTypeResponse 设备向平台发送的响应 MessageTypeResponse = "response" // MessageTypeHeartbeat 心跳消息 MessageTypeHeartbeat = "heartbeat" ) // WebSocketMessage WebSocket消息结构 type WebSocketMessage struct { // Type 消息类型 Type string `json:"type"` // DeviceID 设备ID DeviceID string `json:"device_id,omitempty"` // Command 指令内容 Command string `json:"command,omitempty"` // Data 消息数据 Data interface{} `json:"data,omitempty"` // Timestamp 时间戳 Timestamp time.Time `json:"timestamp"` } // DeviceConnection 设备连接信息 type DeviceConnection struct { // DeviceID 设备ID DeviceID string // Connection WebSocket连接 Connection *websocket.Conn // LastHeartbeat 最后心跳时间 LastHeartbeat time.Time // ResponseChan 响应通道 ResponseChan chan *WebSocketMessage } // CommandResponse WebSocket命令响应结构体 type CommandResponse struct { // DeviceID 设备ID DeviceID string `json:"device_id,omitempty"` // Command 命令名称 Command string `json:"command,omitempty"` // Data 响应数据 Data interface{} `json:"data,omitempty"` // Status 响应状态 Status string `json:"status,omitempty"` // Message 响应消息 Message string `json:"message,omitempty"` // Timestamp 时间戳 Timestamp time.Time `json:"timestamp"` } // ParseData 将响应数据解析到目标结构体 func (cr *CommandResponse) ParseData(target interface{}) error { dataBytes, err := json.Marshal(cr.Data) if err != nil { return err } return json.Unmarshal(dataBytes, target) } // CommandResult WebSocket命令执行结果 type CommandResult struct { // Response 响应消息 Response *CommandResponse // Error 错误信息 Error error } // Manager WebSocket管理器 type Manager struct { // connections 设备连接映射 connections map[string]*DeviceConnection // mutex 互斥锁 mutex sync.RWMutex // logger 日志记录器 logger *logs.Logger // upgrader WebSocket升级器 upgrader websocket.Upgrader // defaultTimeout 默认超时时间(秒) defaultTimeout int // deviceRepo 设备仓库 deviceRepo repository.DeviceRepo } // NewManager 创建WebSocket管理器实例 func NewManager(deviceRepo repository.DeviceRepo) *Manager { return &Manager{ connections: make(map[string]*DeviceConnection), logger: logs.NewLogger(), defaultTimeout: 5, // 默认5秒超时 deviceRepo: deviceRepo, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // 允许所有跨域请求 return true }, }, } } // SetDefaultTimeout 设置默认超时时间 func (wm *Manager) SetDefaultTimeout(timeout int) { wm.defaultTimeout = timeout } // getDeviceDisplayName 获取设备显示名称 func (wm *Manager) getDeviceDisplayName(deviceID string) string { if wm.deviceRepo != nil { if device, err := wm.deviceRepo.FindByIDString(deviceID); err == nil && device != nil { return fmt.Sprintf("%s(id:%s)", device.Name, deviceID) } } return fmt.Sprintf("未知设备(id:%s)", deviceID) } // AddConnection 添加设备连接 func (wm *Manager) AddConnection(deviceID string, conn *websocket.Conn) { wm.mutex.Lock() defer wm.mutex.Unlock() wm.connections[deviceID] = &DeviceConnection{ DeviceID: deviceID, Connection: conn, LastHeartbeat: time.Now(), } deviceName := wm.getDeviceDisplayName(deviceID) wm.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName)) } // RemoveConnection 移除设备连接 func (wm *Manager) RemoveConnection(deviceID string) { wm.mutex.Lock() defer wm.mutex.Unlock() deviceName := wm.getDeviceDisplayName(deviceID) delete(wm.connections, deviceID) wm.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName)) } // SetResponseHandler 设置响应处理器 func (wm *Manager) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) { wm.mutex.Lock() defer wm.mutex.Unlock() if deviceConn, exists := wm.connections[deviceID]; exists { deviceConn.ResponseChan = responseChan } } // SendCommand 向指定设备发送指令 func (wm *Manager) SendCommand(deviceID, command string, data interface{}) error { wm.mutex.RLock() deviceConn, exists := wm.connections[deviceID] wm.mutex.RUnlock() deviceName := wm.getDeviceDisplayName(deviceID) if !exists { return fmt.Errorf("设备 %s 未连接", deviceName) } // 构造消息 msg := WebSocketMessage{ Type: MessageTypeCommand, Command: command, Data: data, Timestamp: time.Now(), } // 发送消息 if err := deviceConn.Connection.WriteJSON(msg); err != nil { return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err) } return nil } // SendCommandAndWait 发送指令并等待响应 func (wm *Manager) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) { deviceName := wm.getDeviceDisplayName(deviceID) // 如果未指定超时时间,使用默认超时时间 if timeout <= 0 { timeout = wm.defaultTimeout } // 创建用于接收响应的通道 responseChan := make(chan *WebSocketMessage, 1) wm.SetResponseHandler(deviceID, responseChan) // 发送指令 if err := wm.SendCommand(deviceID, command, data); err != nil { return nil, fmt.Errorf("发送指令失败: %v", err) } // 等待设备响应,设置超时 var response *WebSocketMessage ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) defer cancel() select { case response = <-responseChan: // 成功接收到响应 // 转换为CommandResponse结构体 commandResponse := &CommandResponse{ DeviceID: response.DeviceID, Command: response.Command, Data: response.Data, Timestamp: response.Timestamp, } // 尝试提取状态和消息字段 if responseData, ok := response.Data.(map[string]interface{}); ok { if status, exists := responseData["status"]; exists { if statusStr, ok := status.(string); ok { commandResponse.Status = statusStr } } if message, exists := responseData["message"]; exists { if messageStr, ok := message.(string); ok { commandResponse.Message = messageStr } } } return commandResponse, nil case <-ctx.Done(): // 超时处理 return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName) } } // GetConnectedDevices 获取已连接的设备列表 func (wm *Manager) GetConnectedDevices() []string { wm.mutex.RLock() defer wm.mutex.RUnlock() devices := make([]string, 0, len(wm.connections)) for deviceID := range wm.connections { devices = append(devices, deviceID) } return devices } // HandleMessage 处理来自设备的消息 func (wm *Manager) HandleMessage(deviceID string, message []byte) error { // 解析消息 var msg WebSocketMessage if err := json.Unmarshal(message, &msg); err != nil { return fmt.Errorf("解析设备 %s 消息失败: %v", wm.getDeviceDisplayName(deviceID), err) } // 更新心跳时间 if msg.Type == MessageTypeHeartbeat { wm.mutex.Lock() if deviceConn, exists := wm.connections[deviceID]; exists { deviceConn.LastHeartbeat = time.Now() } wm.mutex.Unlock() } // 处理响应消息 if msg.Type == MessageTypeResponse { wm.mutex.RLock() if deviceConn, exists := wm.connections[deviceID]; exists && deviceConn.ResponseChan != nil { // 发送响应到通道 select { case deviceConn.ResponseChan <- &msg: // 成功发送 default: // 通道已满,丢弃消息 wm.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", wm.getDeviceDisplayName(deviceID))) } } wm.mutex.RUnlock() } // 记录消息日志 wm.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", wm.getDeviceDisplayName(deviceID), msg)) return nil } // GetDeviceConnection 获取设备连接信息 func (wm *Manager) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) { wm.mutex.RLock() defer wm.mutex.RUnlock() deviceConn, exists := wm.connections[deviceID] return deviceConn, exists } // HandleConnection 处理WebSocket连接 func (wm *Manager) HandleConnection(c *gin.Context) { // 升级HTTP连接到WebSocket conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { wm.logger.Error(fmt.Sprintf("WebSocket连接升级失败: %v", err)) return } // 获取设备ID deviceID := c.Query("device_id") if deviceID == "" { wm.logger.Error("缺少设备ID参数") conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数")) conn.Close() return } // 添加连接 wm.AddConnection(deviceID, conn) deviceName := wm.getDeviceDisplayName(deviceID) wm.logger.Info("设备 " + deviceName + " 已连接") // 发送连接成功消息 successMsg := WebSocketMessage{ Type: "system", Command: "connected", Timestamp: time.Now(), } conn.WriteJSON(successMsg) // 处理消息循环 for { // 读取消息 messageType, message, err := conn.ReadMessage() if err != nil { wm.logger.Error(fmt.Sprintf("读取设备 %s 消息失败: %v", deviceName, err)) break } // 只处理文本消息 if messageType != websocket.TextMessage { continue } // 处理设备消息 if err := wm.HandleMessage(deviceID, message); err != nil { wm.logger.Error(fmt.Sprintf("处理设备 %s 消息失败: %v", deviceName, err)) continue } } // 连接断开时清理 wm.RemoveConnection(deviceID) conn.Close() wm.logger.Info("设备 " + deviceName + " 已断开连接") }