// Package websocket 提供WebSocket通信功能 // 实现中继设备与平台之间的实时通信 package websocket import ( "fmt" "net/http" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "github.com/gorilla/websocket" ) // Message WebSocket消息结构 type Message struct { DeviceID string `json:"device_id"` Type string `json:"type"` Data interface{} `json:"data"` Timestamp time.Time `json:"timestamp"` } // Hub WebSocket中心,管理所有客户端连接 type Hub struct { // 注册客户端的通道 register chan *Client // 注销客户端的通道 unregister chan *Client // 当前活跃的客户端映射 clients map[*Client]bool // 广播消息通道 broadcast chan Message // 设备ID到客户端的映射 deviceClients map[string]*Client // 日志记录器 logger *logs.Logger // 互斥锁保护映射 mutex sync.RWMutex // deviceRepo 设备仓库 deviceRepo repository.DeviceRepo } // Client WebSocket客户端结构 type Client struct { hub *Hub // WebSocket连接 conn *websocket.Conn // 发送缓冲区 send chan Message // 设备ID DeviceID string // HTTP请求 Request *http.Request // 日志记录器 logger *logs.Logger } // NewHub 创建新的WebSocket中心实例 func NewHub(deviceRepo repository.DeviceRepo) *Hub { return &Hub{ register: make(chan *Client), unregister: make(chan *Client), clients: make(map[*Client]bool), broadcast: make(chan Message), deviceClients: make(map[string]*Client), logger: logs.NewLogger(), deviceRepo: deviceRepo, } } // getDeviceDisplayName 获取设备显示名称 func (h *Hub) getDeviceDisplayName(deviceID string) string { if h.deviceRepo != nil { if device, err := h.deviceRepo.FindByIDString(deviceID); err == nil && device != nil { return fmt.Sprintf("%s(id:%s)", device.Name, deviceID) } } return fmt.Sprintf("未知设备(id:%s)", deviceID) } // Run 启动WebSocket中心 func (h *Hub) Run() { for { select { case client := <-h.register: h.registerClient(client) case client := <-h.unregister: h.unregisterClient(client) case message := <-h.broadcast: h.broadcastMessage(message) } } } // registerClient 注册客户端 func (h *Hub) registerClient(client *Client) { h.mutex.Lock() defer h.mutex.Unlock() h.clients[client] = true if client.DeviceID != "" { h.deviceClients[client.DeviceID] = client } deviceName := h.getDeviceDisplayName(client.DeviceID) h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注册,当前客户端数: " + fmt.Sprintf("%d", len(h.clients))) } // unregisterClient 注销客户端 func (h *Hub) unregisterClient(client *Client) { h.mutex.Lock() defer h.mutex.Unlock() if _, ok := h.clients[client]; ok { delete(h.clients, client) if client.DeviceID != "" { delete(h.deviceClients, client.DeviceID) } close(client.send) } deviceName := h.getDeviceDisplayName(client.DeviceID) h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注销,当前客户端数: " + fmt.Sprintf("%d", len(h.clients))) } // broadcastMessage 广播消息 func (h *Hub) broadcastMessage(message Message) { h.mutex.RLock() defer h.mutex.RUnlock() if client, exists := h.deviceClients[message.DeviceID]; exists { select { case client.send <- message: default: close(client.send) delete(h.clients, client) delete(h.deviceClients, message.DeviceID) } } } // SendToDevice 向指定设备发送消息 func (h *Hub) SendToDevice(deviceID string, msgType string, data interface{}) error { h.mutex.RLock() defer h.mutex.RUnlock() deviceName := h.getDeviceDisplayName(deviceID) if client, exists := h.deviceClients[deviceID]; exists { message := Message{ DeviceID: deviceID, Type: msgType, Data: data, Timestamp: time.Now(), } select { case client.send <- message: h.logger.Info(fmt.Sprintf("[WebSocket] 向设备 %s 发送消息: %s", deviceName, msgType)) return nil default: h.logger.Error(fmt.Sprintf("[WebSocket] 设备 %s 消息通道已满", deviceName)) return fmt.Errorf("设备 %s 消息通道已满", deviceName) } } h.logger.Warn(fmt.Sprintf("[WebSocket] 设备 %s 未连接", deviceName)) return fmt.Errorf("设备 %s 未连接", deviceName) } // GetConnectedDevices 获取已连接的设备列表 func (h *Hub) GetConnectedDevices() []string { h.mutex.RLock() defer h.mutex.RUnlock() devices := make([]string, 0, len(h.deviceClients)) for deviceID := range h.deviceClients { devices = append(devices, deviceID) } return devices }