From 3cc02d3a98ef89b7adb9695ef3b2970185ca8240 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 9 Sep 2025 13:35:22 +0800 Subject: [PATCH 1/3] =?UTF-8?q?config=E5=A2=9E=E5=8A=A0=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 2 ++ internal/config/config.go | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/config.yml b/config.yml index ad8f8ab..dd313bf 100644 --- a/config.yml +++ b/config.yml @@ -26,6 +26,8 @@ database: websocket: # WebSocket请求超时时间(秒) timeout: 5 + # 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接 + heartbeat_interval: 54 # 心跳配置 heartbeat: diff --git a/internal/config/config.go b/internal/config/config.go index ca8e74a..0444a90 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -77,6 +77,9 @@ type DatabaseConfig struct { type WebSocketConfig struct { // Timeout WebSocket请求超时时间(秒) Timeout int `yaml:"timeout"` + + // HeartbeatInterval 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接 + HeartbeatInterval int `yaml:"heartbeat_interval"` } // HeartbeatConfig 代表心跳配置 @@ -130,12 +133,9 @@ func (c *Config) GetDatabaseConnectionString() string { ) } -// GetWebSocketTimeout 获取WebSocket超时时间(秒) -func (c *Config) GetWebSocketTimeout() int { - if c.WebSocket.Timeout <= 0 { - return 5 // 默认5秒超时 - } - return c.WebSocket.Timeout +// GetWebSocketConfig 获取WebSocket配置 +func (c *Config) GetWebSocketConfig() WebSocketConfig { + return c.WebSocket } // GetHeartbeatConfig 获取心跳配置 -- 2.49.1 From adb9a12a9d50f2e9aca6984860e3daa30faec4c7 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 9 Sep 2025 15:19:34 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=90=88=E5=B9=B6websocket=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/service/websocket.go | 333 ------------------------------ internal/websocket/manager.go | 377 +++++++++++++++++++++++++++------- 2 files changed, 302 insertions(+), 408 deletions(-) delete mode 100644 internal/service/websocket.go diff --git a/internal/service/websocket.go b/internal/service/websocket.go deleted file mode 100644 index df613c2..0000000 --- a/internal/service/websocket.go +++ /dev/null @@ -1,333 +0,0 @@ -// Package service 提供WebSocket服务功能 -// 实现中继设备和平台之间的双向通信 -package service - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" - "github.com/gorilla/websocket" -) - -// WebSocket消息类型常量 -const ( - // MessageTypeCommand 平台向设备发送的指令 - MessageTypeCommand = "command" - - // MessageTypeResponse 设备向平台发送的响应 - MessageTypeResponse = "response" - - // MessageTypeHeartbeat 心跳消息 - MessageTypeHeartbeat = "heartbeat" -) - -// WebSocketMessage WebSocket消息结构 -type WebSocketMessage struct { - // Type 消息类型 - Type string `json:"type"` - - // DeviceID 设备ID - DeviceID string `json:"device_id,omitempty"` - - // Command 指令内容 - Command string `json:"command,omitempty"` - - // Data 消息数据 - Data interface{} `json:"data,omitempty"` - - // Timestamp 时间戳 - Timestamp time.Time `json:"timestamp"` -} - -// DeviceConnection 设备连接信息 -type DeviceConnection struct { - // DeviceID 设备ID - DeviceID string - - // Connection WebSocket连接 - Connection *websocket.Conn - - // LastHeartbeat 最后心跳时间 - LastHeartbeat time.Time - - // ResponseChan 响应通道 - ResponseChan chan *WebSocketMessage -} - -// WebSocketService WebSocket服务 -type WebSocketService struct { - // connections 设备连接映射 - connections map[string]*DeviceConnection - - // mutex 互斥锁 - mutex sync.RWMutex - - // logger 日志记录器 - logger *logs.Logger - - // defaultTimeout 默认超时时间(秒) - defaultTimeout int - - // deviceRepo 设备仓库 - deviceRepo repository.DeviceRepo - - // deviceStatusPool 设备状态池 - deviceStatusPool *DeviceStatusPool -} - -// SetDeviceStatusPool 设置设备状态池 -func (ws *WebSocketService) SetDeviceStatusPool(pool *DeviceStatusPool) { - ws.deviceStatusPool = pool -} - -// NewWebSocketService 创建WebSocket服务实例 -func NewWebSocketService(deviceRepo repository.DeviceRepo) *WebSocketService { - return &WebSocketService{ - connections: make(map[string]*DeviceConnection), - logger: logs.NewLogger(), - defaultTimeout: 5, // 默认5秒超时 - deviceRepo: deviceRepo, - deviceStatusPool: NewDeviceStatusPool(), - } -} - -// SetDefaultTimeout 设置默认超时时间 -func (ws *WebSocketService) SetDefaultTimeout(timeout int) { - ws.defaultTimeout = timeout -} - -// getDeviceDisplayName 获取设备显示名称 -func (ws *WebSocketService) getDeviceDisplayName(deviceID string) string { - if ws.deviceRepo != nil { - if device, err := ws.deviceRepo.FindByIDString(deviceID); err == nil && device != nil { - return fmt.Sprintf("%s(id:%s)", device.Name, deviceID) - } - } - return fmt.Sprintf("未知设备(id:%s)", deviceID) -} - -// AddConnection 添加设备连接 -func (ws *WebSocketService) AddConnection(deviceID string, conn *websocket.Conn) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - - ws.connections[deviceID] = &DeviceConnection{ - DeviceID: deviceID, - Connection: conn, - LastHeartbeat: time.Now(), - } - - deviceName := ws.getDeviceDisplayName(deviceID) - ws.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName)) -} - -// RemoveConnection 移除设备连接 -func (ws *WebSocketService) RemoveConnection(deviceID string) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - - deviceName := ws.getDeviceDisplayName(deviceID) - - delete(ws.connections, deviceID) - - ws.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName)) -} - -// SetResponseHandler 设置响应处理器 -func (ws *WebSocketService) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - - if deviceConn, exists := ws.connections[deviceID]; exists { - deviceConn.ResponseChan = responseChan - } -} - -// SendCommand 向指定设备发送指令 -func (ws *WebSocketService) SendCommand(deviceID, command string, data interface{}) error { - ws.mutex.RLock() - deviceConn, exists := ws.connections[deviceID] - ws.mutex.RUnlock() - - deviceName := ws.getDeviceDisplayName(deviceID) - - if !exists { - return fmt.Errorf("设备 %s 未连接", deviceName) - } - - // 构造消息 - msg := WebSocketMessage{ - Type: MessageTypeCommand, - Command: command, - Data: data, - Timestamp: time.Now(), - } - - // 发送消息 - if err := deviceConn.Connection.WriteJSON(msg); err != nil { - return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err) - } - - return nil -} - -// CommandResponse WebSocket命令响应结构体 -type CommandResponse struct { - // DeviceID 设备ID - DeviceID string `json:"device_id,omitempty"` - - // Command 命令名称 - Command string `json:"command,omitempty"` - - // Data 响应数据 - Data interface{} `json:"data,omitempty"` - - // Status 响应状态 - Status string `json:"status,omitempty"` - - // Message 响应消息 - Message string `json:"message,omitempty"` - - // Timestamp 时间戳 - Timestamp time.Time `json:"timestamp"` -} - -// ParseData 将响应数据解析到目标结构体 -func (cr *CommandResponse) ParseData(target interface{}) error { - dataBytes, err := json.Marshal(cr.Data) - if err != nil { - return err - } - return json.Unmarshal(dataBytes, target) -} - -// CommandResult WebSocket命令执行结果 -type CommandResult struct { - // Response 响应消息 - Response *CommandResponse - - // Error 错误信息 - Error error -} - -// SendCommandAndWait 发送指令并等待响应 -func (ws *WebSocketService) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) { - deviceName := ws.getDeviceDisplayName(deviceID) - - // 如果未指定超时时间,使用默认超时时间 - if timeout <= 0 { - timeout = ws.defaultTimeout - } - - // 创建用于接收响应的通道 - responseChan := make(chan *WebSocketMessage, 1) - ws.SetResponseHandler(deviceID, responseChan) - - // 发送指令 - if err := ws.SendCommand(deviceID, command, data); err != nil { - return nil, fmt.Errorf("发送指令失败: %v", err) - } - - // 等待设备响应,设置超时 - var response *WebSocketMessage - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) - defer cancel() - - select { - case response = <-responseChan: - // 成功接收到响应 - // 转换为CommandResponse结构体 - commandResponse := &CommandResponse{ - DeviceID: response.DeviceID, - Command: response.Command, - Data: response.Data, - Timestamp: response.Timestamp, - } - - // 尝试提取状态和消息字段 - if responseData, ok := response.Data.(map[string]interface{}); ok { - if status, exists := responseData["status"]; exists { - if statusStr, ok := status.(string); ok { - commandResponse.Status = statusStr - } - } - - if message, exists := responseData["message"]; exists { - if messageStr, ok := message.(string); ok { - commandResponse.Message = messageStr - } - } - } - - return commandResponse, nil - case <-ctx.Done(): - // 超时处理 - return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName) - } -} - -// GetConnectedDevices 获取已连接的设备列表 -func (ws *WebSocketService) GetConnectedDevices() []string { - ws.mutex.RLock() - defer ws.mutex.RUnlock() - - devices := make([]string, 0, len(ws.connections)) - for deviceID := range ws.connections { - devices = append(devices, deviceID) - } - - return devices -} - -// HandleMessage 处理来自设备的消息 -func (ws *WebSocketService) HandleMessage(deviceID string, message []byte) error { - // 解析消息 - var msg WebSocketMessage - if err := json.Unmarshal(message, &msg); err != nil { - return fmt.Errorf("解析设备 %s 消息失败: %v", ws.getDeviceDisplayName(deviceID), err) - } - - // 更新心跳时间 - if msg.Type == MessageTypeHeartbeat { - ws.mutex.Lock() - if deviceConn, exists := ws.connections[deviceID]; exists { - deviceConn.LastHeartbeat = time.Now() - } - ws.mutex.Unlock() - } - - // 处理响应消息 - if msg.Type == MessageTypeResponse { - ws.mutex.RLock() - if deviceConn, exists := ws.connections[deviceID]; exists && deviceConn.ResponseChan != nil { - // 发送响应到通道 - select { - case deviceConn.ResponseChan <- &msg: - // 成功发送 - default: - // 通道已满,丢弃消息 - ws.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", ws.getDeviceDisplayName(deviceID))) - } - } - ws.mutex.RUnlock() - } - - // 记录消息日志 - ws.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", ws.getDeviceDisplayName(deviceID), msg)) - - return nil -} - -// GetDeviceConnection 获取设备连接信息 -func (ws *WebSocketService) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) { - ws.mutex.RLock() - defer ws.mutex.RUnlock() - - deviceConn, exists := ws.connections[deviceID] - return deviceConn, exists -} diff --git a/internal/websocket/manager.go b/internal/websocket/manager.go index c10e68d..4012f2a 100644 --- a/internal/websocket/manager.go +++ b/internal/websocket/manager.go @@ -3,22 +3,110 @@ package websocket import ( + "context" + "encoding/json" "fmt" "net/http" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/service" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) +// WebSocket消息类型常量 +const ( + // MessageTypeCommand 平台向设备发送的指令 + MessageTypeCommand = "command" + + // MessageTypeResponse 设备向平台发送的响应 + MessageTypeResponse = "response" + + // MessageTypeHeartbeat 心跳消息 + MessageTypeHeartbeat = "heartbeat" +) + +// WebSocketMessage WebSocket消息结构 +type WebSocketMessage struct { + // Type 消息类型 + Type string `json:"type"` + + // DeviceID 设备ID + DeviceID string `json:"device_id,omitempty"` + + // Command 指令内容 + Command string `json:"command,omitempty"` + + // Data 消息数据 + Data interface{} `json:"data,omitempty"` + + // Timestamp 时间戳 + Timestamp time.Time `json:"timestamp"` +} + +// DeviceConnection 设备连接信息 +type DeviceConnection struct { + // DeviceID 设备ID + DeviceID string + + // Connection WebSocket连接 + Connection *websocket.Conn + + // LastHeartbeat 最后心跳时间 + LastHeartbeat time.Time + + // ResponseChan 响应通道 + ResponseChan chan *WebSocketMessage +} + +// CommandResponse WebSocket命令响应结构体 +type CommandResponse struct { + // DeviceID 设备ID + DeviceID string `json:"device_id,omitempty"` + + // Command 命令名称 + Command string `json:"command,omitempty"` + + // Data 响应数据 + Data interface{} `json:"data,omitempty"` + + // Status 响应状态 + Status string `json:"status,omitempty"` + + // Message 响应消息 + Message string `json:"message,omitempty"` + + // Timestamp 时间戳 + Timestamp time.Time `json:"timestamp"` +} + +// ParseData 将响应数据解析到目标结构体 +func (cr *CommandResponse) ParseData(target interface{}) error { + dataBytes, err := json.Marshal(cr.Data) + if err != nil { + return err + } + return json.Unmarshal(dataBytes, target) +} + +// CommandResult WebSocket命令执行结果 +type CommandResult struct { + // Response 响应消息 + Response *CommandResponse + + // Error 错误信息 + Error error +} + // Manager WebSocket管理器 type Manager struct { - // websocketService WebSocket服务 - websocketService *service.WebSocketService + // connections 设备连接映射 + connections map[string]*DeviceConnection + + // mutex 互斥锁 + mutex sync.RWMutex // logger 日志记录器 logger *logs.Logger @@ -26,32 +114,34 @@ type Manager struct { // upgrader WebSocket升级器 upgrader websocket.Upgrader - // mutex 互斥锁 - mutex sync.RWMutex - - // connections 设备连接映射 - connections map[string]*websocket.Conn + // defaultTimeout 默认超时时间(秒) + defaultTimeout int // deviceRepo 设备仓库 deviceRepo repository.DeviceRepo } // NewManager 创建WebSocket管理器实例 -func NewManager(websocketService *service.WebSocketService, deviceRepo repository.DeviceRepo) *Manager { +func NewManager(deviceRepo repository.DeviceRepo) *Manager { return &Manager{ - websocketService: websocketService, - logger: logs.NewLogger(), + connections: make(map[string]*DeviceConnection), + logger: logs.NewLogger(), + defaultTimeout: 5, // 默认5秒超时 + deviceRepo: deviceRepo, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // 允许所有跨域请求 return true }, }, - connections: make(map[string]*websocket.Conn), - deviceRepo: deviceRepo, } } +// SetDefaultTimeout 设置默认超时时间 +func (wm *Manager) SetDefaultTimeout(timeout int) { + wm.defaultTimeout = timeout +} + // getDeviceDisplayName 获取设备显示名称 func (wm *Manager) getDeviceDisplayName(deviceID string) string { if wm.deviceRepo != nil { @@ -62,97 +152,127 @@ func (wm *Manager) getDeviceDisplayName(deviceID string) string { return fmt.Sprintf("未知设备(id:%s)", deviceID) } -// HandleConnection 处理WebSocket连接 -func (wm *Manager) HandleConnection(c *gin.Context) { - // 升级HTTP连接到WebSocket - conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - wm.logger.Error("WebSocket连接升级失败: " + err.Error()) - return - } - - // 获取设备ID - deviceID := c.Query("device_id") - if deviceID == "" { - wm.logger.Error("缺少设备ID参数") - conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数")) - conn.Close() - return - } - - // 添加连接到映射 +// AddConnection 添加设备连接 +func (wm *Manager) AddConnection(deviceID string, conn *websocket.Conn) { wm.mutex.Lock() - wm.connections[deviceID] = conn - wm.mutex.Unlock() + defer wm.mutex.Unlock() + + wm.connections[deviceID] = &DeviceConnection{ + DeviceID: deviceID, + Connection: conn, + LastHeartbeat: time.Now(), + } deviceName := wm.getDeviceDisplayName(deviceID) - wm.logger.Info("设备 " + deviceName + " 已连接") + wm.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName)) +} - // 发送连接成功消息 - successMsg := service.WebSocketMessage{ - Type: "system", - Command: "connected", - Timestamp: time.Now(), - } - conn.WriteJSON(successMsg) - - // 处理消息循环 - for { - // 读取消息 - messageType, message, err := conn.ReadMessage() - if err != nil { - wm.logger.Error("读取设备 " + deviceName + " 消息失败: " + err.Error()) - break - } - - // 只处理文本消息 - if messageType != websocket.TextMessage { - continue - } - - // 处理设备消息 - if err := wm.websocketService.HandleMessage(deviceID, message); err != nil { - wm.logger.Error("处理设备 " + deviceName + " 消息失败: " + err.Error()) - continue - } - } - - // 连接断开时清理 +// RemoveConnection 移除设备连接 +func (wm *Manager) RemoveConnection(deviceID string) { wm.mutex.Lock() - delete(wm.connections, deviceID) - wm.mutex.Unlock() + defer wm.mutex.Unlock() - conn.Close() - wm.logger.Info("设备 " + deviceName + " 已断开连接") + deviceName := wm.getDeviceDisplayName(deviceID) + + delete(wm.connections, deviceID) + + wm.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName)) +} + +// SetResponseHandler 设置响应处理器 +func (wm *Manager) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) { + wm.mutex.Lock() + defer wm.mutex.Unlock() + + if deviceConn, exists := wm.connections[deviceID]; exists { + deviceConn.ResponseChan = responseChan + } } // SendCommand 向指定设备发送指令 func (wm *Manager) SendCommand(deviceID, command string, data interface{}) error { wm.mutex.RLock() - conn, exists := wm.connections[deviceID] + deviceConn, exists := wm.connections[deviceID] wm.mutex.RUnlock() + deviceName := wm.getDeviceDisplayName(deviceID) + if !exists { - return wm.websocketService.SendCommand(deviceID, command, data) + return fmt.Errorf("设备 %s 未连接", deviceName) } // 构造消息 - msg := service.WebSocketMessage{ - Type: service.MessageTypeCommand, + msg := WebSocketMessage{ + Type: MessageTypeCommand, Command: command, Data: data, Timestamp: time.Now(), } // 发送消息 - if err := conn.WriteJSON(msg); err != nil { - deviceName := wm.getDeviceDisplayName(deviceID) + if err := deviceConn.Connection.WriteJSON(msg); err != nil { return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err) } return nil } +// SendCommandAndWait 发送指令并等待响应 +func (wm *Manager) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) { + deviceName := wm.getDeviceDisplayName(deviceID) + + // 如果未指定超时时间,使用默认超时时间 + if timeout <= 0 { + timeout = wm.defaultTimeout + } + + // 创建用于接收响应的通道 + responseChan := make(chan *WebSocketMessage, 1) + wm.SetResponseHandler(deviceID, responseChan) + + // 发送指令 + if err := wm.SendCommand(deviceID, command, data); err != nil { + return nil, fmt.Errorf("发送指令失败: %v", err) + } + + // 等待设备响应,设置超时 + var response *WebSocketMessage + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) + defer cancel() + + select { + case response = <-responseChan: + // 成功接收到响应 + // 转换为CommandResponse结构体 + commandResponse := &CommandResponse{ + DeviceID: response.DeviceID, + Command: response.Command, + Data: response.Data, + Timestamp: response.Timestamp, + } + + // 尝试提取状态和消息字段 + if responseData, ok := response.Data.(map[string]interface{}); ok { + if status, exists := responseData["status"]; exists { + if statusStr, ok := status.(string); ok { + commandResponse.Status = statusStr + } + } + + if message, exists := responseData["message"]; exists { + if messageStr, ok := message.(string); ok { + commandResponse.Message = messageStr + } + } + } + + return commandResponse, nil + case <-ctx.Done(): + // 超时处理 + return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName) + } +} + // GetConnectedDevices 获取已连接的设备列表 func (wm *Manager) GetConnectedDevices() []string { wm.mutex.RLock() @@ -165,3 +285,110 @@ func (wm *Manager) GetConnectedDevices() []string { return devices } + +// HandleMessage 处理来自设备的消息 +func (wm *Manager) HandleMessage(deviceID string, message []byte) error { + // 解析消息 + var msg WebSocketMessage + if err := json.Unmarshal(message, &msg); err != nil { + return fmt.Errorf("解析设备 %s 消息失败: %v", wm.getDeviceDisplayName(deviceID), err) + } + + // 更新心跳时间 + if msg.Type == MessageTypeHeartbeat { + wm.mutex.Lock() + if deviceConn, exists := wm.connections[deviceID]; exists { + deviceConn.LastHeartbeat = time.Now() + } + wm.mutex.Unlock() + } + + // 处理响应消息 + if msg.Type == MessageTypeResponse { + wm.mutex.RLock() + if deviceConn, exists := wm.connections[deviceID]; exists && deviceConn.ResponseChan != nil { + // 发送响应到通道 + select { + case deviceConn.ResponseChan <- &msg: + // 成功发送 + default: + // 通道已满,丢弃消息 + wm.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", wm.getDeviceDisplayName(deviceID))) + } + } + wm.mutex.RUnlock() + } + + // 记录消息日志 + wm.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", wm.getDeviceDisplayName(deviceID), msg)) + + return nil +} + +// GetDeviceConnection 获取设备连接信息 +func (wm *Manager) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) { + wm.mutex.RLock() + defer wm.mutex.RUnlock() + + deviceConn, exists := wm.connections[deviceID] + return deviceConn, exists +} + +// HandleConnection 处理WebSocket连接 +func (wm *Manager) HandleConnection(c *gin.Context) { + // 升级HTTP连接到WebSocket + conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + wm.logger.Error(fmt.Sprintf("WebSocket连接升级失败: %v", err)) + return + } + + // 获取设备ID + deviceID := c.Query("device_id") + if deviceID == "" { + wm.logger.Error("缺少设备ID参数") + conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数")) + conn.Close() + return + } + + // 添加连接 + wm.AddConnection(deviceID, conn) + + deviceName := wm.getDeviceDisplayName(deviceID) + wm.logger.Info("设备 " + deviceName + " 已连接") + + // 发送连接成功消息 + successMsg := WebSocketMessage{ + Type: "system", + Command: "connected", + Timestamp: time.Now(), + } + conn.WriteJSON(successMsg) + + // 处理消息循环 + for { + // 读取消息 + messageType, message, err := conn.ReadMessage() + if err != nil { + wm.logger.Error(fmt.Sprintf("读取设备 %s 消息失败: %v", deviceName, err)) + break + } + + // 只处理文本消息 + if messageType != websocket.TextMessage { + continue + } + + // 处理设备消息 + if err := wm.HandleMessage(deviceID, message); err != nil { + wm.logger.Error(fmt.Sprintf("处理设备 %s 消息失败: %v", deviceName, err)) + continue + } + } + + // 连接断开时清理 + wm.RemoveConnection(deviceID) + conn.Close() + wm.logger.Info("设备 " + deviceName + " 已断开连接") +} -- 2.49.1 From 4c9f059af293174311433a4557ae7396bab02451 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 9 Sep 2025 15:27:53 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=90=88=E5=B9=B6websocket=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/api/api.go | 13 +++---------- internal/controller/device/device.go | 9 +++++---- internal/controller/remote/remote.go | 12 ++++++------ internal/core/application.go | 15 +++++++-------- internal/service/heartbeat.go | 11 ++++++----- 5 files changed, 27 insertions(+), 33 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index e9aa30d..8939ea4 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -53,9 +53,6 @@ type API struct { // websocketManager WebSocket管理器 websocketManager *websocket.Manager - // websocketService WebSocket服务 - websocketService *service.WebSocketService - // heartbeatService 心跳服务 heartbeatService *service.HeartbeatService @@ -68,7 +65,7 @@ type API struct { // NewAPI 创建并返回一个新的API实例 // 初始化Gin引擎和相关配置 -func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API { +func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API { // 设置Gin为发布模式 gin.SetMode(gin.DebugMode) @@ -99,13 +96,10 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe operationController := operation.NewController(operationHistoryRepo) // 创建设备控制控制器 - deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool) - - // 创建WebSocket管理器 - websocketManager := websocket.NewManager(websocketService, deviceRepo) + deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketManager, heartbeatService, deviceStatusPool) // 创建远程控制控制器 - remoteController := remote.NewController(websocketService) + remoteController := remote.NewController(websocketManager) // 创建鉴权中间件 authMiddleware := middleware.NewAuthMiddleware(userRepo) @@ -119,7 +113,6 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe remoteController: remoteController, authMiddleware: authMiddleware, websocketManager: websocketManager, - websocketService: websocketService, heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), diff --git a/internal/controller/device/device.go b/internal/controller/device/device.go index 9b30e5f..3f7f3eb 100644 --- a/internal/controller/device/device.go +++ b/internal/controller/device/device.go @@ -12,6 +12,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/service" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/gin-gonic/gin" ) @@ -121,18 +122,18 @@ func (req *DeviceRequest) BindAndValidate(data []byte) error { type Controller struct { deviceControlRepo repository.DeviceControlRepo deviceRepo repository.DeviceRepo - websocketService *service.WebSocketService + websocketManager *websocket.Manager heartbeatService *service.HeartbeatService deviceStatusPool *service.DeviceStatusPool logger *logs.Logger } // NewController 创建设备控制控制器实例 -func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller { +func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller { return &Controller{ deviceControlRepo: deviceControlRepo, deviceRepo: deviceRepo, - websocketService: websocketService, + websocketManager: websocketManager, heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), @@ -367,7 +368,7 @@ func (c *Controller) Switch(ctx *gin.Context) { } // 发送指令并等待响应 - response, err := c.websocketService.SendCommandAndWait("relay-001", "control_device", controlData, 0) + response, err := c.websocketManager.SendCommandAndWait("relay-001", "control_device", controlData, 0) if err != nil { c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error()) controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error()) diff --git a/internal/controller/remote/remote.go b/internal/controller/remote/remote.go index 2c305b4..7c90e0e 100644 --- a/internal/controller/remote/remote.go +++ b/internal/controller/remote/remote.go @@ -5,20 +5,20 @@ package remote import ( "git.huangwc.com/pig/pig-farm-controller/internal/controller" "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/service" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/gin-gonic/gin" ) // Controller 远程控制控制器 type Controller struct { - websocketService *service.WebSocketService + websocketManager *websocket.Manager logger *logs.Logger } // NewController 创建远程控制控制器实例 -func NewController(websocketService *service.WebSocketService) *Controller { +func NewController(websocketManager *websocket.Manager) *Controller { return &Controller{ - websocketService: websocketService, + websocketManager: websocketManager, logger: logs.NewLogger(), } } @@ -69,7 +69,7 @@ func (c *Controller) SendCommand(ctx *gin.Context) { } // 发送指令并等待响应 - response, err := c.websocketService.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0) + response, err := c.websocketManager.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0) if err != nil { c.logger.Error("发送指令失败: " + err.Error()) controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error()) @@ -100,7 +100,7 @@ type ListConnectedDevicesResponseData struct { // @Router /api/v1/remote/devices [get] func (c *Controller) ListConnectedDevices(ctx *gin.Context) { // 获取已连接的设备列表 - devices := c.websocketService.GetConnectedDevices() + devices := c.websocketManager.GetConnectedDevices() data := ListConnectedDevicesResponseData{ Devices: devices, diff --git a/internal/core/application.go b/internal/core/application.go index 3da5336..ce71b9a 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -13,6 +13,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/storage/db" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "git.huangwc.com/pig/pig-farm-controller/internal/task" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" ) // Application 代表核心应用结构 @@ -39,8 +40,8 @@ type Application struct { // DeviceRepo 设备仓库实例 DeviceRepo repository.DeviceRepo - // WebSocketService WebSocket服务实例 - WebSocketService *service.WebSocketService + // WebsocketManager WebSocket管理器 + WebsocketManager *websocket.Manager // DeviceStatusPool 设备状态池实例 DeviceStatusPool *service.DeviceStatusPool @@ -100,17 +101,15 @@ func (app *Application) Start() error { app.DeviceStatusPool = service.NewDeviceStatusPool() // 初始化WebSocket服务 - app.WebSocketService = service.NewWebSocketService(app.DeviceRepo) - // 设置设备状态池 - app.WebSocketService.SetDeviceStatusPool(app.DeviceStatusPool) + app.WebsocketManager = websocket.NewManager(app.DeviceRepo) // 设置WebSocket超时时间 - app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout()) + app.WebsocketManager.SetDefaultTimeout(app.Config.GetWebSocketConfig().Timeout) // 初始化心跳服务 - app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config) + app.HeartbeatService = service.NewHeartbeatService(app.WebsocketManager, app.DeviceStatusPool, app.DeviceRepo, app.Config) // 初始化API组件 - app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool) + app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebsocketManager, app.HeartbeatService, app.DeviceStatusPool) // 初始化任务执行器组件(使用5个工作协程) app.TaskExecutor = task.NewExecutor(5) diff --git a/internal/service/heartbeat.go b/internal/service/heartbeat.go index 885139a..adf3ee7 100644 --- a/internal/service/heartbeat.go +++ b/internal/service/heartbeat.go @@ -12,13 +12,14 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/panjf2000/ants/v2" ) // HeartbeatService 心跳服务,负责管理设备的心跳检测 type HeartbeatService struct { - // websocketService WebSocket服务 - websocketService *WebSocketService + // websocketManager WebSocket管理器 + websocketManager *websocket.Manager // deviceStatusPool 设备状态池 deviceStatusPool *DeviceStatusPool @@ -52,7 +53,7 @@ type HeartbeatService struct { } // NewHeartbeatService 创建心跳服务实例 -func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { +func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { interval := config.GetHeartbeatConfig().Interval if interval <= 0 { @@ -65,7 +66,7 @@ func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *D } return &HeartbeatService{ - websocketService: websocketService, + websocketManager: websocketManager, deviceStatusPool: deviceStatusPool, deviceRepo: deviceRepo, logger: logs.NewLogger(), @@ -241,7 +242,7 @@ func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatu } // 发送心跳包到设备 - response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) + response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) if err != nil { hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) // 更新设备状态为离线 -- 2.49.1