diff --git a/internal/api/websocket.go b/internal/api/websocket.go deleted file mode 100644 index c839da9..0000000 --- a/internal/api/websocket.go +++ /dev/null @@ -1,180 +0,0 @@ -// Package api 提供统一的API接口层 -// 负责处理所有外部请求,包括HTTP和WebSocket接口 -// 将请求路由到相应的服务层进行处理 -package api - -import ( - "net/http" - "sync" - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/service" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" -) - -// WebSocket消息类型常量 -const ( - // MessageTypeCommand 平台向设备发送的指令 - MessageTypeCommand = "command" - - // MessageTypeResponse 设备向平台发送的响应 - MessageTypeResponse = "response" - - // MessageTypeHeartbeat 心跳消息 - MessageTypeHeartbeat = "heartbeat" -) - -// WebSocketMessage WebSocket消息结构 -type WebSocketMessage struct { - // Type 消息类型 - Type string `json:"type"` - - // DeviceID 设备ID - DeviceID string `json:"device_id,omitempty"` - - // Command 指令内容 - Command string `json:"command,omitempty"` - - // Data 消息数据 - Data interface{} `json:"data,omitempty"` - - // Timestamp 时间戳 - Timestamp time.Time `json:"timestamp"` -} - -// WebSocketManager WebSocket管理器 -type WebSocketManager struct { - // websocketService WebSocket服务 - websocketService *service.WebSocketService - - // logger 日志记录器 - logger *logs.Logger - - // upgrader WebSocket升级器 - upgrader websocket.Upgrader - - // mutex 互斥锁 - mutex sync.RWMutex - - // connections 设备连接映射 - connections map[string]*websocket.Conn -} - -// NewWebSocketManager 创建WebSocket管理器实例 -func NewWebSocketManager(websocketService *service.WebSocketService) *WebSocketManager { - return &WebSocketManager{ - websocketService: websocketService, - logger: logs.NewLogger(), - upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - // 允许所有跨域请求 - return true - }, - }, - connections: make(map[string]*websocket.Conn), - } -} - -// HandleConnection 处理WebSocket连接 -func (wm *WebSocketManager) HandleConnection(c *gin.Context) { - // 升级HTTP连接到WebSocket - conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - wm.logger.Error("WebSocket连接升级失败: " + err.Error()) - return - } - - // 获取设备ID - deviceID := c.Query("device_id") - if deviceID == "" { - wm.logger.Error("缺少设备ID参数") - conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数")) - conn.Close() - return - } - - // 添加连接到映射 - wm.mutex.Lock() - wm.connections[deviceID] = conn - wm.mutex.Unlock() - - wm.logger.Info("设备 " + deviceID + " 已连接") - - // 发送连接成功消息 - successMsg := service.WebSocketMessage{ - Type: "system", - Command: "connected", - Timestamp: time.Now(), - } - conn.WriteJSON(successMsg) - - // 处理消息循环 - for { - // 读取消息 - messageType, message, err := conn.ReadMessage() - if err != nil { - wm.logger.Error("读取设备 " + deviceID + " 消息失败: " + err.Error()) - break - } - - // 只处理文本消息 - if messageType != websocket.TextMessage { - continue - } - - // 处理设备消息 - if err := wm.websocketService.HandleMessage(deviceID, message); err != nil { - wm.logger.Error("处理设备 " + deviceID + " 消息失败: " + err.Error()) - continue - } - } - - // 连接断开时清理 - wm.mutex.Lock() - delete(wm.connections, deviceID) - wm.mutex.Unlock() - - conn.Close() - wm.logger.Info("设备 " + deviceID + " 已断开连接") -} - -// SendCommand 向指定设备发送指令 -func (wm *WebSocketManager) SendCommand(deviceID, command string, data interface{}) error { - wm.mutex.RLock() - conn, exists := wm.connections[deviceID] - wm.mutex.RUnlock() - - if !exists { - return wm.websocketService.SendCommand(deviceID, command, data) - } - - // 构造消息 - msg := service.WebSocketMessage{ - Type: service.MessageTypeCommand, - Command: command, - Data: data, - Timestamp: time.Now(), - } - - // 发送消息 - if err := conn.WriteJSON(msg); err != nil { - return err - } - - return nil -} - -// GetConnectedDevices 获取已连接的设备列表 -func (wm *WebSocketManager) GetConnectedDevices() []string { - wm.mutex.RLock() - defer wm.mutex.RUnlock() - - devices := make([]string, 0, len(wm.connections)) - for deviceID := range wm.connections { - devices = append(devices, deviceID) - } - - return devices -} diff --git a/internal/core/websocket.go b/internal/core/websocket.go deleted file mode 100644 index 2513cdb..0000000 --- a/internal/core/websocket.go +++ /dev/null @@ -1,194 +0,0 @@ -// Package core 提供WebSocket服务功能 -// 实现中继设备和平台之间的双向通信 -package core - -import ( - "encoding/json" - "fmt" - "sync" - "time" - - "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/model" - "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" - "github.com/gorilla/websocket" -) - -// WebSocket消息类型常量 -const ( - // MessageTypeCommand 平台向设备发送的指令 - MessageTypeCommand = "command" - - // MessageTypeResponse 设备向平台发送的响应 - MessageTypeResponse = "response" - - // MessageTypeHeartbeat 心跳消息 - MessageTypeHeartbeat = "heartbeat" -) - -// WebSocketMessage WebSocket消息结构 -type WebSocketMessage struct { - // Type 消息类型 - Type string `json:"type"` - - // DeviceID 设备ID - DeviceID string `json:"device_id,omitempty"` - - // Command 指令内容 - Command string `json:"command,omitempty"` - - // Data 消息数据 - Data interface{} `json:"data,omitempty"` - - // Timestamp 时间戳 - Timestamp time.Time `json:"timestamp"` -} - -// DeviceConnection 设备连接信息 -type DeviceConnection struct { - // DeviceID 设备ID - DeviceID string - - // Connection WebSocket连接 - Connection *websocket.Conn - - // LastHeartbeat 最后心跳时间 - LastHeartbeat time.Time - - // DeviceInfo 设备信息 - DeviceInfo *model.Device -} - -// WebSocketService WebSocket服务 -type WebSocketService struct { - // connections 设备连接映射 - connections map[string]*DeviceConnection - - // mutex 互斥锁 - mutex sync.RWMutex - - // logger 日志记录器 - logger *logs.Logger - - // deviceRepo 设备仓库 - deviceRepo repository.DeviceRepo -} - -// NewWebSocketService 创建WebSocket服务实例 -func NewWebSocketService(deviceRepo repository.DeviceRepo) *WebSocketService { - return &WebSocketService{ - connections: make(map[string]*DeviceConnection), - logger: logs.NewLogger(), - deviceRepo: deviceRepo, - } -} - -// getDeviceDisplayName 获取设备显示名称 -func (ws *WebSocketService) getDeviceDisplayName(deviceID string) string { - if ws.deviceRepo != nil { - if device, err := ws.deviceRepo.FindByIDString(deviceID); err == nil && device != nil { - return fmt.Sprintf("%s(id:%s)", device.Name, deviceID) - } - } - return fmt.Sprintf("未知设备(id:%s)", deviceID) -} - -// AddConnection 添加设备连接 -func (ws *WebSocketService) AddConnection(deviceID string, conn *websocket.Conn) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - - ws.connections[deviceID] = &DeviceConnection{ - DeviceID: deviceID, - Connection: conn, - LastHeartbeat: time.Now(), - } - - deviceName := ws.getDeviceDisplayName(deviceID) - ws.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName)) -} - -// RemoveConnection 移除设备连接 -func (ws *WebSocketService) RemoveConnection(deviceID string) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - - deviceName := ws.getDeviceDisplayName(deviceID) - - delete(ws.connections, deviceID) - - ws.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName)) -} - -// SendCommand 向指定设备发送指令 -func (ws *WebSocketService) SendCommand(deviceID, command string, data interface{}) error { - ws.mutex.RLock() - deviceConn, exists := ws.connections[deviceID] - ws.mutex.RUnlock() - - deviceName := ws.getDeviceDisplayName(deviceID) - - if !exists { - return fmt.Errorf("设备 %s 未连接", deviceName) - } - - // 构造消息 - msg := WebSocketMessage{ - Type: MessageTypeCommand, - Command: command, - Data: data, - Timestamp: time.Now(), - } - - // 发送消息 - if err := deviceConn.Connection.WriteJSON(msg); err != nil { - return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err) - } - - return nil -} - -// GetConnectedDevices 获取已连接的设备列表 -func (ws *WebSocketService) GetConnectedDevices() []string { - ws.mutex.RLock() - defer ws.mutex.RUnlock() - - devices := make([]string, 0, len(ws.connections)) - for deviceID := range ws.connections { - devices = append(devices, deviceID) - } - - return devices -} - -// HandleMessage 处理来自设备的消息 -func (ws *WebSocketService) HandleMessage(deviceID string, message []byte) error { - // 解析消息 - var msg WebSocketMessage - if err := json.Unmarshal(message, &msg); err != nil { - return fmt.Errorf("解析设备 %s 消息失败: %v", ws.getDeviceDisplayName(deviceID), err) - } - - // 更新心跳时间 - if msg.Type == MessageTypeHeartbeat { - ws.mutex.Lock() - if deviceConn, exists := ws.connections[deviceID]; exists { - deviceConn.LastHeartbeat = time.Now() - } - ws.mutex.Unlock() - } - - // 记录消息日志 - ws.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", ws.getDeviceDisplayName(deviceID), msg)) - - return nil -} - -// GetDeviceConnection 获取设备连接信息 -func (ws *WebSocketService) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) { - ws.mutex.RLock() - defer ws.mutex.RUnlock() - - deviceConn, exists := ws.connections[deviceID] - return deviceConn, exists -}