合并websocket逻辑 #1

Merged
huang merged 3 commits from 合并websocket逻辑 into main 2025-09-09 15:29:43 +08:00
9 changed files with 337 additions and 447 deletions

View File

@@ -26,6 +26,8 @@ database:
websocket:
# WebSocket请求超时时间(秒)
timeout: 5
# 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接
heartbeat_interval: 54
# 心跳配置
heartbeat:

View File

@@ -53,9 +53,6 @@ type API struct {
// websocketManager WebSocket管理器
websocketManager *websocket.Manager
// websocketService WebSocket服务
websocketService *service.WebSocketService
// heartbeatService 心跳服务
heartbeatService *service.HeartbeatService
@@ -68,7 +65,7 @@ type API struct {
// NewAPI 创建并返回一个新的API实例
// 初始化Gin引擎和相关配置
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API {
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API {
// 设置Gin为发布模式
gin.SetMode(gin.DebugMode)
@@ -99,13 +96,10 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
operationController := operation.NewController(operationHistoryRepo)
// 创建设备控制控制器
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool)
// 创建WebSocket管理器
websocketManager := websocket.NewManager(websocketService, deviceRepo)
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketManager, heartbeatService, deviceStatusPool)
// 创建远程控制控制器
remoteController := remote.NewController(websocketService)
remoteController := remote.NewController(websocketManager)
// 创建鉴权中间件
authMiddleware := middleware.NewAuthMiddleware(userRepo)
@@ -119,7 +113,6 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
remoteController: remoteController,
authMiddleware: authMiddleware,
websocketManager: websocketManager,
websocketService: websocketService,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),

View File

@@ -77,6 +77,9 @@ type DatabaseConfig struct {
type WebSocketConfig struct {
// Timeout WebSocket请求超时时间(秒)
Timeout int `yaml:"timeout"`
// HeartbeatInterval 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接
HeartbeatInterval int `yaml:"heartbeat_interval"`
}
// HeartbeatConfig 代表心跳配置
@@ -130,12 +133,9 @@ func (c *Config) GetDatabaseConnectionString() string {
)
}
// GetWebSocketTimeout 获取WebSocket超时时间(秒)
func (c *Config) GetWebSocketTimeout() int {
if c.WebSocket.Timeout <= 0 {
return 5 // 默认5秒超时
}
return c.WebSocket.Timeout
// GetWebSocketConfig 获取WebSocket配置
func (c *Config) GetWebSocketConfig() WebSocketConfig {
return c.WebSocket
}
// GetHeartbeatConfig 获取心跳配置

View File

@@ -12,6 +12,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/gin-gonic/gin"
)
@@ -121,18 +122,18 @@ func (req *DeviceRequest) BindAndValidate(data []byte) error {
type Controller struct {
deviceControlRepo repository.DeviceControlRepo
deviceRepo repository.DeviceRepo
websocketService *service.WebSocketService
websocketManager *websocket.Manager
heartbeatService *service.HeartbeatService
deviceStatusPool *service.DeviceStatusPool
logger *logs.Logger
}
// NewController 创建设备控制控制器实例
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller {
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller {
return &Controller{
deviceControlRepo: deviceControlRepo,
deviceRepo: deviceRepo,
websocketService: websocketService,
websocketManager: websocketManager,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
@@ -367,7 +368,7 @@ func (c *Controller) Switch(ctx *gin.Context) {
}
// 发送指令并等待响应
response, err := c.websocketService.SendCommandAndWait("relay-001", "control_device", controlData, 0)
response, err := c.websocketManager.SendCommandAndWait("relay-001", "control_device", controlData, 0)
if err != nil {
c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error())

View File

@@ -5,20 +5,20 @@ package remote
import (
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/gin-gonic/gin"
)
// Controller 远程控制控制器
type Controller struct {
websocketService *service.WebSocketService
websocketManager *websocket.Manager
logger *logs.Logger
}
// NewController 创建远程控制控制器实例
func NewController(websocketService *service.WebSocketService) *Controller {
func NewController(websocketManager *websocket.Manager) *Controller {
return &Controller{
websocketService: websocketService,
websocketManager: websocketManager,
logger: logs.NewLogger(),
}
}
@@ -69,7 +69,7 @@ func (c *Controller) SendCommand(ctx *gin.Context) {
}
// 发送指令并等待响应
response, err := c.websocketService.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0)
response, err := c.websocketManager.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0)
if err != nil {
c.logger.Error("发送指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error())
@@ -100,7 +100,7 @@ type ListConnectedDevicesResponseData struct {
// @Router /api/v1/remote/devices [get]
func (c *Controller) ListConnectedDevices(ctx *gin.Context) {
// 获取已连接的设备列表
devices := c.websocketService.GetConnectedDevices()
devices := c.websocketManager.GetConnectedDevices()
data := ListConnectedDevicesResponseData{
Devices: devices,

View File

@@ -13,6 +13,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/storage/db"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/task"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
)
// Application 代表核心应用结构
@@ -39,8 +40,8 @@ type Application struct {
// DeviceRepo 设备仓库实例
DeviceRepo repository.DeviceRepo
// WebSocketService WebSocket服务实例
WebSocketService *service.WebSocketService
// WebsocketManager WebSocket管理器
WebsocketManager *websocket.Manager
// DeviceStatusPool 设备状态池实例
DeviceStatusPool *service.DeviceStatusPool
@@ -100,17 +101,15 @@ func (app *Application) Start() error {
app.DeviceStatusPool = service.NewDeviceStatusPool()
// 初始化WebSocket服务
app.WebSocketService = service.NewWebSocketService(app.DeviceRepo)
// 设置设备状态池
app.WebSocketService.SetDeviceStatusPool(app.DeviceStatusPool)
app.WebsocketManager = websocket.NewManager(app.DeviceRepo)
// 设置WebSocket超时时间
app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout())
app.WebsocketManager.SetDefaultTimeout(app.Config.GetWebSocketConfig().Timeout)
// 初始化心跳服务
app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config)
app.HeartbeatService = service.NewHeartbeatService(app.WebsocketManager, app.DeviceStatusPool, app.DeviceRepo, app.Config)
// 初始化API组件
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool)
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebsocketManager, app.HeartbeatService, app.DeviceStatusPool)
// 初始化任务执行器组件(使用5个工作协程)
app.TaskExecutor = task.NewExecutor(5)

View File

@@ -12,13 +12,14 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/panjf2000/ants/v2"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
type HeartbeatService struct {
// websocketService WebSocket服务
websocketService *WebSocketService
// websocketManager WebSocket管理器
websocketManager *websocket.Manager
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
@@ -52,7 +53,7 @@ type HeartbeatService struct {
}
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
interval := config.GetHeartbeatConfig().Interval
if interval <= 0 {
@@ -65,7 +66,7 @@ func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *D
}
return &HeartbeatService{
websocketService: websocketService,
websocketManager: websocketManager,
deviceStatusPool: deviceStatusPool,
deviceRepo: deviceRepo,
logger: logs.NewLogger(),
@@ -241,7 +242,7 @@ func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatu
}
// 发送心跳包到设备
response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0)
response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0)
if err != nil {
hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err))
// 更新设备状态为离线

View File

@@ -1,333 +0,0 @@
// Package service 提供WebSocket服务功能
// 实现中继设备和平台之间的双向通信
package service
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gorilla/websocket"
)
// WebSocket消息类型常量
const (
// MessageTypeCommand 平台向设备发送的指令
MessageTypeCommand = "command"
// MessageTypeResponse 设备向平台发送的响应
MessageTypeResponse = "response"
// MessageTypeHeartbeat 心跳消息
MessageTypeHeartbeat = "heartbeat"
)
// WebSocketMessage WebSocket消息结构
type WebSocketMessage struct {
// Type 消息类型
Type string `json:"type"`
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 指令内容
Command string `json:"command,omitempty"`
// Data 消息数据
Data interface{} `json:"data,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// DeviceConnection 设备连接信息
type DeviceConnection struct {
// DeviceID 设备ID
DeviceID string
// Connection WebSocket连接
Connection *websocket.Conn
// LastHeartbeat 最后心跳时间
LastHeartbeat time.Time
// ResponseChan 响应通道
ResponseChan chan *WebSocketMessage
}
// WebSocketService WebSocket服务
type WebSocketService struct {
// connections 设备连接映射
connections map[string]*DeviceConnection
// mutex 互斥锁
mutex sync.RWMutex
// logger 日志记录器
logger *logs.Logger
// defaultTimeout 默认超时时间(秒)
defaultTimeout int
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
}
// SetDeviceStatusPool 设置设备状态池
func (ws *WebSocketService) SetDeviceStatusPool(pool *DeviceStatusPool) {
ws.deviceStatusPool = pool
}
// NewWebSocketService 创建WebSocket服务实例
func NewWebSocketService(deviceRepo repository.DeviceRepo) *WebSocketService {
return &WebSocketService{
connections: make(map[string]*DeviceConnection),
logger: logs.NewLogger(),
defaultTimeout: 5, // 默认5秒超时
deviceRepo: deviceRepo,
deviceStatusPool: NewDeviceStatusPool(),
}
}
// SetDefaultTimeout 设置默认超时时间
func (ws *WebSocketService) SetDefaultTimeout(timeout int) {
ws.defaultTimeout = timeout
}
// getDeviceDisplayName 获取设备显示名称
func (ws *WebSocketService) getDeviceDisplayName(deviceID string) string {
if ws.deviceRepo != nil {
if device, err := ws.deviceRepo.FindByIDString(deviceID); err == nil && device != nil {
return fmt.Sprintf("%s(id:%s)", device.Name, deviceID)
}
}
return fmt.Sprintf("未知设备(id:%s)", deviceID)
}
// AddConnection 添加设备连接
func (ws *WebSocketService) AddConnection(deviceID string, conn *websocket.Conn) {
ws.mutex.Lock()
defer ws.mutex.Unlock()
ws.connections[deviceID] = &DeviceConnection{
DeviceID: deviceID,
Connection: conn,
LastHeartbeat: time.Now(),
}
deviceName := ws.getDeviceDisplayName(deviceID)
ws.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName))
}
// RemoveConnection 移除设备连接
func (ws *WebSocketService) RemoveConnection(deviceID string) {
ws.mutex.Lock()
defer ws.mutex.Unlock()
deviceName := ws.getDeviceDisplayName(deviceID)
delete(ws.connections, deviceID)
ws.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName))
}
// SetResponseHandler 设置响应处理器
func (ws *WebSocketService) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) {
ws.mutex.Lock()
defer ws.mutex.Unlock()
if deviceConn, exists := ws.connections[deviceID]; exists {
deviceConn.ResponseChan = responseChan
}
}
// SendCommand 向指定设备发送指令
func (ws *WebSocketService) SendCommand(deviceID, command string, data interface{}) error {
ws.mutex.RLock()
deviceConn, exists := ws.connections[deviceID]
ws.mutex.RUnlock()
deviceName := ws.getDeviceDisplayName(deviceID)
if !exists {
return fmt.Errorf("设备 %s 未连接", deviceName)
}
// 构造消息
msg := WebSocketMessage{
Type: MessageTypeCommand,
Command: command,
Data: data,
Timestamp: time.Now(),
}
// 发送消息
if err := deviceConn.Connection.WriteJSON(msg); err != nil {
return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err)
}
return nil
}
// CommandResponse WebSocket命令响应结构体
type CommandResponse struct {
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 命令名称
Command string `json:"command,omitempty"`
// Data 响应数据
Data interface{} `json:"data,omitempty"`
// Status 响应状态
Status string `json:"status,omitempty"`
// Message 响应消息
Message string `json:"message,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// ParseData 将响应数据解析到目标结构体
func (cr *CommandResponse) ParseData(target interface{}) error {
dataBytes, err := json.Marshal(cr.Data)
if err != nil {
return err
}
return json.Unmarshal(dataBytes, target)
}
// CommandResult WebSocket命令执行结果
type CommandResult struct {
// Response 响应消息
Response *CommandResponse
// Error 错误信息
Error error
}
// SendCommandAndWait 发送指令并等待响应
func (ws *WebSocketService) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) {
deviceName := ws.getDeviceDisplayName(deviceID)
// 如果未指定超时时间,使用默认超时时间
if timeout <= 0 {
timeout = ws.defaultTimeout
}
// 创建用于接收响应的通道
responseChan := make(chan *WebSocketMessage, 1)
ws.SetResponseHandler(deviceID, responseChan)
// 发送指令
if err := ws.SendCommand(deviceID, command, data); err != nil {
return nil, fmt.Errorf("发送指令失败: %v", err)
}
// 等待设备响应,设置超时
var response *WebSocketMessage
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
select {
case response = <-responseChan:
// 成功接收到响应
// 转换为CommandResponse结构体
commandResponse := &CommandResponse{
DeviceID: response.DeviceID,
Command: response.Command,
Data: response.Data,
Timestamp: response.Timestamp,
}
// 尝试提取状态和消息字段
if responseData, ok := response.Data.(map[string]interface{}); ok {
if status, exists := responseData["status"]; exists {
if statusStr, ok := status.(string); ok {
commandResponse.Status = statusStr
}
}
if message, exists := responseData["message"]; exists {
if messageStr, ok := message.(string); ok {
commandResponse.Message = messageStr
}
}
}
return commandResponse, nil
case <-ctx.Done():
// 超时处理
return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName)
}
}
// GetConnectedDevices 获取已连接的设备列表
func (ws *WebSocketService) GetConnectedDevices() []string {
ws.mutex.RLock()
defer ws.mutex.RUnlock()
devices := make([]string, 0, len(ws.connections))
for deviceID := range ws.connections {
devices = append(devices, deviceID)
}
return devices
}
// HandleMessage 处理来自设备的消息
func (ws *WebSocketService) HandleMessage(deviceID string, message []byte) error {
// 解析消息
var msg WebSocketMessage
if err := json.Unmarshal(message, &msg); err != nil {
return fmt.Errorf("解析设备 %s 消息失败: %v", ws.getDeviceDisplayName(deviceID), err)
}
// 更新心跳时间
if msg.Type == MessageTypeHeartbeat {
ws.mutex.Lock()
if deviceConn, exists := ws.connections[deviceID]; exists {
deviceConn.LastHeartbeat = time.Now()
}
ws.mutex.Unlock()
}
// 处理响应消息
if msg.Type == MessageTypeResponse {
ws.mutex.RLock()
if deviceConn, exists := ws.connections[deviceID]; exists && deviceConn.ResponseChan != nil {
// 发送响应到通道
select {
case deviceConn.ResponseChan <- &msg:
// 成功发送
default:
// 通道已满,丢弃消息
ws.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", ws.getDeviceDisplayName(deviceID)))
}
}
ws.mutex.RUnlock()
}
// 记录消息日志
ws.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", ws.getDeviceDisplayName(deviceID), msg))
return nil
}
// GetDeviceConnection 获取设备连接信息
func (ws *WebSocketService) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) {
ws.mutex.RLock()
defer ws.mutex.RUnlock()
deviceConn, exists := ws.connections[deviceID]
return deviceConn, exists
}

View File

@@ -3,22 +3,110 @@
package websocket
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// WebSocket消息类型常量
const (
// MessageTypeCommand 平台向设备发送的指令
MessageTypeCommand = "command"
// MessageTypeResponse 设备向平台发送的响应
MessageTypeResponse = "response"
// MessageTypeHeartbeat 心跳消息
MessageTypeHeartbeat = "heartbeat"
)
// WebSocketMessage WebSocket消息结构
type WebSocketMessage struct {
// Type 消息类型
Type string `json:"type"`
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 指令内容
Command string `json:"command,omitempty"`
// Data 消息数据
Data interface{} `json:"data,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// DeviceConnection 设备连接信息
type DeviceConnection struct {
// DeviceID 设备ID
DeviceID string
// Connection WebSocket连接
Connection *websocket.Conn
// LastHeartbeat 最后心跳时间
LastHeartbeat time.Time
// ResponseChan 响应通道
ResponseChan chan *WebSocketMessage
}
// CommandResponse WebSocket命令响应结构体
type CommandResponse struct {
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 命令名称
Command string `json:"command,omitempty"`
// Data 响应数据
Data interface{} `json:"data,omitempty"`
// Status 响应状态
Status string `json:"status,omitempty"`
// Message 响应消息
Message string `json:"message,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// ParseData 将响应数据解析到目标结构体
func (cr *CommandResponse) ParseData(target interface{}) error {
dataBytes, err := json.Marshal(cr.Data)
if err != nil {
return err
}
return json.Unmarshal(dataBytes, target)
}
// CommandResult WebSocket命令执行结果
type CommandResult struct {
// Response 响应消息
Response *CommandResponse
// Error 错误信息
Error error
}
// Manager WebSocket管理器
type Manager struct {
// websocketService WebSocket服务
websocketService *service.WebSocketService
// connections 设备连接映射
connections map[string]*DeviceConnection
// mutex 互斥锁
mutex sync.RWMutex
// logger 日志记录器
logger *logs.Logger
@@ -26,32 +114,34 @@ type Manager struct {
// upgrader WebSocket升级器
upgrader websocket.Upgrader
// mutex 互斥锁
mutex sync.RWMutex
// connections 设备连接映射
connections map[string]*websocket.Conn
// defaultTimeout 默认超时时间(秒)
defaultTimeout int
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
}
// NewManager 创建WebSocket管理器实例
func NewManager(websocketService *service.WebSocketService, deviceRepo repository.DeviceRepo) *Manager {
func NewManager(deviceRepo repository.DeviceRepo) *Manager {
return &Manager{
websocketService: websocketService,
logger: logs.NewLogger(),
connections: make(map[string]*DeviceConnection),
logger: logs.NewLogger(),
defaultTimeout: 5, // 默认5秒超时
deviceRepo: deviceRepo,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// 允许所有跨域请求
return true
},
},
connections: make(map[string]*websocket.Conn),
deviceRepo: deviceRepo,
}
}
// SetDefaultTimeout 设置默认超时时间
func (wm *Manager) SetDefaultTimeout(timeout int) {
wm.defaultTimeout = timeout
}
// getDeviceDisplayName 获取设备显示名称
func (wm *Manager) getDeviceDisplayName(deviceID string) string {
if wm.deviceRepo != nil {
@@ -62,97 +152,127 @@ func (wm *Manager) getDeviceDisplayName(deviceID string) string {
return fmt.Sprintf("未知设备(id:%s)", deviceID)
}
// HandleConnection 处理WebSocket连接
func (wm *Manager) HandleConnection(c *gin.Context) {
// 升级HTTP连接到WebSocket
conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
wm.logger.Error("WebSocket连接升级失败: " + err.Error())
return
}
// 获取设备ID
deviceID := c.Query("device_id")
if deviceID == "" {
wm.logger.Error("缺少设备ID参数")
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数"))
conn.Close()
return
}
// 添加连接到映射
// AddConnection 添加设备连接
func (wm *Manager) AddConnection(deviceID string, conn *websocket.Conn) {
wm.mutex.Lock()
wm.connections[deviceID] = conn
wm.mutex.Unlock()
defer wm.mutex.Unlock()
wm.connections[deviceID] = &DeviceConnection{
DeviceID: deviceID,
Connection: conn,
LastHeartbeat: time.Now(),
}
deviceName := wm.getDeviceDisplayName(deviceID)
wm.logger.Info("设备 " + deviceName + " 已连接")
wm.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName))
}
// 发送连接成功消息
successMsg := service.WebSocketMessage{
Type: "system",
Command: "connected",
Timestamp: time.Now(),
}
conn.WriteJSON(successMsg)
// 处理消息循环
for {
// 读取消息
messageType, message, err := conn.ReadMessage()
if err != nil {
wm.logger.Error("读取设备 " + deviceName + " 消息失败: " + err.Error())
break
}
// 只处理文本消息
if messageType != websocket.TextMessage {
continue
}
// 处理设备消息
if err := wm.websocketService.HandleMessage(deviceID, message); err != nil {
wm.logger.Error("处理设备 " + deviceName + " 消息失败: " + err.Error())
continue
}
}
// 连接断开时清理
// RemoveConnection 移除设备连接
func (wm *Manager) RemoveConnection(deviceID string) {
wm.mutex.Lock()
delete(wm.connections, deviceID)
wm.mutex.Unlock()
defer wm.mutex.Unlock()
conn.Close()
wm.logger.Info("设备 " + deviceName + " 已断开连接")
deviceName := wm.getDeviceDisplayName(deviceID)
delete(wm.connections, deviceID)
wm.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName))
}
// SetResponseHandler 设置响应处理器
func (wm *Manager) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) {
wm.mutex.Lock()
defer wm.mutex.Unlock()
if deviceConn, exists := wm.connections[deviceID]; exists {
deviceConn.ResponseChan = responseChan
}
}
// SendCommand 向指定设备发送指令
func (wm *Manager) SendCommand(deviceID, command string, data interface{}) error {
wm.mutex.RLock()
conn, exists := wm.connections[deviceID]
deviceConn, exists := wm.connections[deviceID]
wm.mutex.RUnlock()
deviceName := wm.getDeviceDisplayName(deviceID)
if !exists {
return wm.websocketService.SendCommand(deviceID, command, data)
return fmt.Errorf("设备 %s 未连接", deviceName)
}
// 构造消息
msg := service.WebSocketMessage{
Type: service.MessageTypeCommand,
msg := WebSocketMessage{
Type: MessageTypeCommand,
Command: command,
Data: data,
Timestamp: time.Now(),
}
// 发送消息
if err := conn.WriteJSON(msg); err != nil {
deviceName := wm.getDeviceDisplayName(deviceID)
if err := deviceConn.Connection.WriteJSON(msg); err != nil {
return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err)
}
return nil
}
// SendCommandAndWait 发送指令并等待响应
func (wm *Manager) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) {
deviceName := wm.getDeviceDisplayName(deviceID)
// 如果未指定超时时间,使用默认超时时间
if timeout <= 0 {
timeout = wm.defaultTimeout
}
// 创建用于接收响应的通道
responseChan := make(chan *WebSocketMessage, 1)
wm.SetResponseHandler(deviceID, responseChan)
// 发送指令
if err := wm.SendCommand(deviceID, command, data); err != nil {
return nil, fmt.Errorf("发送指令失败: %v", err)
}
// 等待设备响应,设置超时
var response *WebSocketMessage
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
select {
case response = <-responseChan:
// 成功接收到响应
// 转换为CommandResponse结构体
commandResponse := &CommandResponse{
DeviceID: response.DeviceID,
Command: response.Command,
Data: response.Data,
Timestamp: response.Timestamp,
}
// 尝试提取状态和消息字段
if responseData, ok := response.Data.(map[string]interface{}); ok {
if status, exists := responseData["status"]; exists {
if statusStr, ok := status.(string); ok {
commandResponse.Status = statusStr
}
}
if message, exists := responseData["message"]; exists {
if messageStr, ok := message.(string); ok {
commandResponse.Message = messageStr
}
}
}
return commandResponse, nil
case <-ctx.Done():
// 超时处理
return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName)
}
}
// GetConnectedDevices 获取已连接的设备列表
func (wm *Manager) GetConnectedDevices() []string {
wm.mutex.RLock()
@@ -165,3 +285,110 @@ func (wm *Manager) GetConnectedDevices() []string {
return devices
}
// HandleMessage 处理来自设备的消息
func (wm *Manager) HandleMessage(deviceID string, message []byte) error {
// 解析消息
var msg WebSocketMessage
if err := json.Unmarshal(message, &msg); err != nil {
return fmt.Errorf("解析设备 %s 消息失败: %v", wm.getDeviceDisplayName(deviceID), err)
}
// 更新心跳时间
if msg.Type == MessageTypeHeartbeat {
wm.mutex.Lock()
if deviceConn, exists := wm.connections[deviceID]; exists {
deviceConn.LastHeartbeat = time.Now()
}
wm.mutex.Unlock()
}
// 处理响应消息
if msg.Type == MessageTypeResponse {
wm.mutex.RLock()
if deviceConn, exists := wm.connections[deviceID]; exists && deviceConn.ResponseChan != nil {
// 发送响应到通道
select {
case deviceConn.ResponseChan <- &msg:
// 成功发送
default:
// 通道已满,丢弃消息
wm.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", wm.getDeviceDisplayName(deviceID)))
}
}
wm.mutex.RUnlock()
}
// 记录消息日志
wm.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", wm.getDeviceDisplayName(deviceID), msg))
return nil
}
// GetDeviceConnection 获取设备连接信息
func (wm *Manager) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) {
wm.mutex.RLock()
defer wm.mutex.RUnlock()
deviceConn, exists := wm.connections[deviceID]
return deviceConn, exists
}
// HandleConnection 处理WebSocket连接
func (wm *Manager) HandleConnection(c *gin.Context) {
// 升级HTTP连接到WebSocket
conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
wm.logger.Error(fmt.Sprintf("WebSocket连接升级失败: %v", err))
return
}
// 获取设备ID
deviceID := c.Query("device_id")
if deviceID == "" {
wm.logger.Error("缺少设备ID参数")
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数"))
conn.Close()
return
}
// 添加连接
wm.AddConnection(deviceID, conn)
deviceName := wm.getDeviceDisplayName(deviceID)
wm.logger.Info("设备 " + deviceName + " 已连接")
// 发送连接成功消息
successMsg := WebSocketMessage{
Type: "system",
Command: "connected",
Timestamp: time.Now(),
}
conn.WriteJSON(successMsg)
// 处理消息循环
for {
// 读取消息
messageType, message, err := conn.ReadMessage()
if err != nil {
wm.logger.Error(fmt.Sprintf("读取设备 %s 消息失败: %v", deviceName, err))
break
}
// 只处理文本消息
if messageType != websocket.TextMessage {
continue
}
// 处理设备消息
if err := wm.HandleMessage(deviceID, message); err != nil {
wm.logger.Error(fmt.Sprintf("处理设备 %s 消息失败: %v", deviceName, err))
continue
}
}
// 连接断开时清理
wm.RemoveConnection(deviceID)
conn.Close()
wm.logger.Info("设备 " + deviceName + " 已断开连接")
}