196 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			196 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package websocket 提供WebSocket通信功能
 | ||
| // 实现中继设备与平台之间的实时通信
 | ||
| package websocket
 | ||
| 
 | ||
| import (
 | ||
| 	"fmt"
 | ||
| 	"net/http"
 | ||
| 	"sync"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/logs"
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
 | ||
| 	"github.com/gorilla/websocket"
 | ||
| )
 | ||
| 
 | ||
| // Message WebSocket消息结构
 | ||
| type Message struct {
 | ||
| 	DeviceID  string      `json:"device_id"`
 | ||
| 	Type      string      `json:"type"`
 | ||
| 	Data      interface{} `json:"data"`
 | ||
| 	Timestamp time.Time   `json:"timestamp"`
 | ||
| }
 | ||
| 
 | ||
| // Hub WebSocket中心,管理所有客户端连接
 | ||
| type Hub struct {
 | ||
| 	// 注册客户端的通道
 | ||
| 	register chan *Client
 | ||
| 
 | ||
| 	// 注销客户端的通道
 | ||
| 	unregister chan *Client
 | ||
| 
 | ||
| 	// 当前活跃的客户端映射
 | ||
| 	clients map[*Client]bool
 | ||
| 
 | ||
| 	// 广播消息通道
 | ||
| 	broadcast chan Message
 | ||
| 
 | ||
| 	// 设备ID到客户端的映射
 | ||
| 	deviceClients map[string]*Client
 | ||
| 
 | ||
| 	// 日志记录器
 | ||
| 	logger *logs.Logger
 | ||
| 
 | ||
| 	// 互斥锁保护映射
 | ||
| 	mutex sync.RWMutex
 | ||
| 
 | ||
| 	// deviceRepo 设备仓库
 | ||
| 	deviceRepo repository.DeviceRepo
 | ||
| }
 | ||
| 
 | ||
| // Client WebSocket客户端结构
 | ||
| type Client struct {
 | ||
| 	hub *Hub
 | ||
| 
 | ||
| 	// WebSocket连接
 | ||
| 	conn *websocket.Conn
 | ||
| 
 | ||
| 	// 发送缓冲区
 | ||
| 	send chan Message
 | ||
| 
 | ||
| 	// 设备ID
 | ||
| 	DeviceID string
 | ||
| 
 | ||
| 	// HTTP请求
 | ||
| 	Request *http.Request
 | ||
| 
 | ||
| 	// 日志记录器
 | ||
| 	logger *logs.Logger
 | ||
| }
 | ||
| 
 | ||
| // NewHub 创建新的WebSocket中心实例
 | ||
| func NewHub(deviceRepo repository.DeviceRepo) *Hub {
 | ||
| 	return &Hub{
 | ||
| 		register:      make(chan *Client),
 | ||
| 		unregister:    make(chan *Client),
 | ||
| 		clients:       make(map[*Client]bool),
 | ||
| 		broadcast:     make(chan Message),
 | ||
| 		deviceClients: make(map[string]*Client),
 | ||
| 		logger:        logs.NewLogger(),
 | ||
| 		deviceRepo:    deviceRepo,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // getDeviceDisplayName 获取设备显示名称
 | ||
| func (h *Hub) getDeviceDisplayName(deviceID string) string {
 | ||
| 	if h.deviceRepo != nil {
 | ||
| 		if device, err := h.deviceRepo.FindByIDString(deviceID); err == nil && device != nil {
 | ||
| 			return fmt.Sprintf("%s(id:%s)", device.Name, deviceID)
 | ||
| 		}
 | ||
| 	}
 | ||
| 	return fmt.Sprintf("未知设备(id:%s)", deviceID)
 | ||
| }
 | ||
| 
 | ||
| // Run 启动WebSocket中心
 | ||
| func (h *Hub) Run() {
 | ||
| 	for {
 | ||
| 		select {
 | ||
| 		case client := <-h.register:
 | ||
| 			h.registerClient(client)
 | ||
| 		case client := <-h.unregister:
 | ||
| 			h.unregisterClient(client)
 | ||
| 		case message := <-h.broadcast:
 | ||
| 			h.broadcastMessage(message)
 | ||
| 		}
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // registerClient 注册客户端
 | ||
| func (h *Hub) registerClient(client *Client) {
 | ||
| 	h.mutex.Lock()
 | ||
| 	defer h.mutex.Unlock()
 | ||
| 
 | ||
| 	h.clients[client] = true
 | ||
| 	if client.DeviceID != "" {
 | ||
| 		h.deviceClients[client.DeviceID] = client
 | ||
| 	}
 | ||
| 
 | ||
| 	deviceName := h.getDeviceDisplayName(client.DeviceID)
 | ||
| 	h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注册,当前客户端数: " + fmt.Sprintf("%d", len(h.clients)))
 | ||
| }
 | ||
| 
 | ||
| // unregisterClient 注销客户端
 | ||
| func (h *Hub) unregisterClient(client *Client) {
 | ||
| 	h.mutex.Lock()
 | ||
| 	defer h.mutex.Unlock()
 | ||
| 
 | ||
| 	if _, ok := h.clients[client]; ok {
 | ||
| 		delete(h.clients, client)
 | ||
| 		if client.DeviceID != "" {
 | ||
| 			delete(h.deviceClients, client.DeviceID)
 | ||
| 		}
 | ||
| 		close(client.send)
 | ||
| 	}
 | ||
| 
 | ||
| 	deviceName := h.getDeviceDisplayName(client.DeviceID)
 | ||
| 	h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注销,当前客户端数: " + fmt.Sprintf("%d", len(h.clients)))
 | ||
| }
 | ||
| 
 | ||
| // broadcastMessage 广播消息
 | ||
| func (h *Hub) broadcastMessage(message Message) {
 | ||
| 	h.mutex.RLock()
 | ||
| 	defer h.mutex.RUnlock()
 | ||
| 
 | ||
| 	if client, exists := h.deviceClients[message.DeviceID]; exists {
 | ||
| 		select {
 | ||
| 		case client.send <- message:
 | ||
| 		default:
 | ||
| 			close(client.send)
 | ||
| 			delete(h.clients, client)
 | ||
| 			delete(h.deviceClients, message.DeviceID)
 | ||
| 		}
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // SendToDevice 向指定设备发送消息
 | ||
| func (h *Hub) SendToDevice(deviceID string, msgType string, data interface{}) error {
 | ||
| 	h.mutex.RLock()
 | ||
| 	defer h.mutex.RUnlock()
 | ||
| 
 | ||
| 	deviceName := h.getDeviceDisplayName(deviceID)
 | ||
| 
 | ||
| 	if client, exists := h.deviceClients[deviceID]; exists {
 | ||
| 		message := Message{
 | ||
| 			DeviceID:  deviceID,
 | ||
| 			Type:      msgType,
 | ||
| 			Data:      data,
 | ||
| 			Timestamp: time.Now(),
 | ||
| 		}
 | ||
| 
 | ||
| 		select {
 | ||
| 		case client.send <- message:
 | ||
| 			h.logger.Info(fmt.Sprintf("[WebSocket] 向设备 %s 发送消息: %s", deviceName, msgType))
 | ||
| 			return nil
 | ||
| 		default:
 | ||
| 			h.logger.Error(fmt.Sprintf("[WebSocket] 设备 %s 消息通道已满", deviceName))
 | ||
| 			return fmt.Errorf("设备 %s 消息通道已满", deviceName)
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	h.logger.Warn(fmt.Sprintf("[WebSocket] 设备 %s 未连接", deviceName))
 | ||
| 	return fmt.Errorf("设备 %s 未连接", deviceName)
 | ||
| }
 | ||
| 
 | ||
| // GetConnectedDevices 获取已连接的设备列表
 | ||
| func (h *Hub) GetConnectedDevices() []string {
 | ||
| 	h.mutex.RLock()
 | ||
| 	defer h.mutex.RUnlock()
 | ||
| 
 | ||
| 	devices := make([]string, 0, len(h.deviceClients))
 | ||
| 	for deviceID := range h.deviceClients {
 | ||
| 		devices = append(devices, deviceID)
 | ||
| 	}
 | ||
| 
 | ||
| 	return devices
 | ||
| }
 |