From 6a2d97b543124c8e7324f78db5810d552d426ffd Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Mon, 8 Sep 2025 14:59:42 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E5=AE=8C=E5=96=84websocket=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=E9=80=BB=E8=BE=91=202.=20=E5=AE=9E=E7=8E=B0Switch?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- RELAY_API.md | 271 +++++++++++++++++++++++++++ config.yml | 7 +- internal/config/config.go | 23 ++- internal/controller/device/device.go | 51 ++++- internal/controller/remote/remote.go | 28 ++- internal/core/application.go | 2 + internal/service/websocket.go | 136 +++++++++++++- 7 files changed, 499 insertions(+), 19 deletions(-) create mode 100644 RELAY_API.md diff --git a/RELAY_API.md b/RELAY_API.md new file mode 100644 index 0000000..781ef6c --- /dev/null +++ b/RELAY_API.md @@ -0,0 +1,271 @@ +# 中继程序接口定义 + +本文档定义了猪场控制器系统主动向中继程序发送请求时使用的接口规范。 + +## 1. 概述 + +中继程序作为猪场控制器系统与现场设备之间的桥梁,负责接收来自控制器系统的指令并转发给相应的设备,同时将设备的响应和状态信息回传给控制器系统。 + +通信方式采用WebSocket协议,确保双向实时通信。 + +## 2. WebSocket连接 + +### 2.1 连接建立 + +设备通过WebSocket连接到平台,连接地址格式: +``` +ws://[server_address]:[port]/ws/device?device_id=[device_id] +``` + +参数说明: +- `device_id`: 中继设备唯一标识符 + +### 2.2 心跳机制 + +为保持连接活跃状态,中继设备需要定期发送心跳消息: + +**心跳请求** +```json +{ + "type": "heartbeat", + "device_id": "relay-001", + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +## 3. 指令接口 + +### 3.1 设备控制指令 + +平台向中继设备发送设备控制指令。 + +**请求格式** +```json +{ + "type": "command", + "command": "control_device", + "data": { + "device_type": "fan", + "device_id": "fan-001", + "action": "on" + }, + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"command" +- `command`: 指令名称,固定为"control_device" +- `data`: 指令数据 + - `device_type`: 设备类型 (如: fan, water_curtain) + - `device_id`: 设备唯一标识符 + - `action`: 执行动作 (如: on, off) +- `timestamp`: 指令发送时间 + +### 3.2 查询设备状态指令 + +平台向中继设备发送查询设备状态指令。 + +**请求格式** +```json +{ + "type": "command", + "command": "query_device_status", + "data": { + "device_id": "fan-001" + }, + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"command" +- `command`: 指令名称,固定为"query_device_status" +- `data`: 指令数据 + - `device_id`: 设备唯一标识符 +- `timestamp`: 指令发送时间 + +### 3.3 查询所有设备状态指令 + +平台向中继设备发送查询所有设备状态指令。 + +**请求格式** +```json +{ + "type": "command", + "command": "query_all_device_status", + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"command" +- `command`: 指令名称,固定为"query_all_device_status" +- `timestamp`: 指令发送时间 + +## 4. 响应接口 + +### 4.1 设备控制响应 + +中继设备执行控制指令后,向平台发送响应。 + +**响应格式** +```json +{ + "type": "response", + "command": "control_device", + "data": { + "device_id": "fan-001", + "status": "success", + "message": "设备控制成功" + }, + "timestamp": "2023-01-01T12:00:05Z" +} +``` + +### 4.2 设备状态响应 + +中继设备响应设备状态查询指令。 + +**响应格式** +```json +{ + "type": "response", + "command": "query_device_status", + "data": { + "device_id": "fan-001", + "status": "running", + "power": 220, + "current": 5.2 + }, + "timestamp": "2023-01-01T12:00:05Z" +} +``` + +### 4.3 所有设备状态响应 + +中继设备响应所有设备状态查询指令。 + +**响应格式** +```json +{ + "type": "response", + "command": "query_all_device_status", + "data": [ + { + "device_id": "fan-001", + "device_type": "fan", + "status": "running" + }, + { + "device_id": "curtain-001", + "device_type": "water_curtain", + "status": "stopped" + } + ], + "timestamp": "2023-01-01T12:00:05Z" +} +``` + +## 5. 请求-响应机制 + +平台在发送指令后会等待中继设备的响应,超时时间由配置文件决定,默认为5秒。 + +### 5.1 同步等待响应 + +平台发送指令后会阻塞等待中继设备返回响应,直到: +1. 收到中继设备的响应消息 +2. 超时(由配置文件决定,默认5秒) + +### 5.2 响应处理 + +平台接收到响应后会: +1. 解析响应数据 +2. 根据响应结果更新设备控制记录 +3. 向前端返回操作结果 + +### 5.3 超时处理 + +如果在指定时间内未收到中继设备的响应,平台会: +1. 记录超时错误日志 +2. 向前端返回超时错误信息 +3. 设备控制记录中标记为失败状态 + +## 6. 配置说明 + +WebSocket请求超时时间可以通过配置文件进行设置: + +```yaml +# WebSocket配置 +websocket: + # WebSocket请求超时时间(秒) + timeout: 5 +``` + +如果没有配置或配置值无效,系统将使用默认的5秒超时时间。 + +## 7. 响应结构定义 + +平台提供统一的响应结构定义,用于处理中继设备返回的响应: + +### 7.1 CommandResponse 结构体 + +```go +type CommandResponse struct { + // DeviceID 设备ID + DeviceID string + + // Command 命令名称 + Command string + + // Data 响应数据 + Data interface{} + + // Status 响应状态 + Status string + + // Message 响应消息 + Message string + + // Timestamp 时间戳 + Timestamp time.Time +} +``` + +### 7.2 响应处理规则 + +1. `Status` 字段:表示操作的整体状态,如 "success"、"failed" 等 +2. `Message` 字段:提供人类可读的操作结果描述 +3. `Data` 字段:包含具体的操作结果数据 +4. 如果响应中的 `Data` 是一个对象且包含 `status` 和 `message` 字段,系统会自动提取并填充到对应的结构体字段中 + +## 8. 消息类型说明 + +| 类型 | 说明 | +|------|------| +| command | 平台向中继设备发送的指令 | +| response | 中继设备向平台发送的响应 | +| heartbeat | 心跳消息 | + +## 9. 设备类型说明 + +| 类型 | 说明 | +|------|------| +| fan | 风机设备 | +| water_curtain | 水帘设备 | + +## 10. 动作说明 + +| 动作 | 说明 | +|------|------| +| on | 开启设备 | +| off | 关闭设备 | + +## 11. 状态说明 + +| 状态 | 说明 | +|------|------| +| success | 操作成功 | +| failed | 操作失败 | +| running | 设备运行中 | +| stopped | 设备已停止 | \ No newline at end of file diff --git a/config.yml b/config.yml index 87a8b30..af9e52d 100644 --- a/config.yml +++ b/config.yml @@ -20,4 +20,9 @@ database: # 连接池配置 max_open_conns: 5 max_idle_conns: 5 - conn_max_lifetime: 300 # 5分钟 \ No newline at end of file + conn_max_lifetime: 300 # 5分钟 + +# WebSocket配置 +websocket: + # WebSocket请求超时时间(秒) + timeout: 5 \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index cea6aea..e6db5b1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,9 @@ type Config struct { // Database 数据库配置 Database DatabaseConfig `yaml:"database"` + + // WebSocket WebSocket配置 + WebSocket WebSocketConfig `yaml:"websocket"` } // ServerConfig 代表服务器配置 @@ -67,9 +70,19 @@ type DatabaseConfig struct { ConnMaxLifetime int `yaml:"conn_max_lifetime"` } +// WebSocketConfig 代表WebSocket配置 +type WebSocketConfig struct { + // Timeout WebSocket请求超时时间(秒) + Timeout int `yaml:"timeout"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { - return &Config{} + return &Config{ + WebSocket: WebSocketConfig{ + Timeout: 5, // 默认5秒超时 + }, + } } // Load 从指定路径加载配置文件 @@ -101,3 +114,11 @@ func (c *Config) GetDatabaseConnectionString() string { c.Database.SSLMode, ) } + +// GetWebSocketTimeout 获取WebSocket超时时间(秒) +func (c *Config) GetWebSocketTimeout() int { + if c.WebSocket.Timeout <= 0 { + return 5 // 默认5秒超时 + } + return c.WebSocket.Timeout +} diff --git a/internal/controller/device/device.go b/internal/controller/device/device.go index ec10033..46c1dee 100644 --- a/internal/controller/device/device.go +++ b/internal/controller/device/device.go @@ -43,6 +43,21 @@ type SwitchResponseData struct { DeviceType string `json:"device_type"` DeviceID string `json:"device_id"` Action string `json:"action"` + Status string `json:"status"` // 添加状态字段 + Message string `json:"message"` // 添加消息字段 +} + +// RelayControlData 发送给中继设备的控制数据结构体 +type RelayControlData struct { + DeviceType string `json:"device_type"` + DeviceID string `json:"device_id"` + Action string `json:"action"` +} + +// RelayControlResponseData 中继设备控制响应数据结构体 +type RelayControlResponseData struct { + Status string `json:"status"` + Message string `json:"message"` } // Switch 设备控制接口 @@ -67,31 +82,45 @@ func (c *Controller) Switch(ctx *gin.Context) { } // 通过WebSocket向中继设备发送控制指令 - // 这里假设中继设备ID为"relay-001",在实际应用中应该根据设备层级结构动态获取 - controlData := map[string]interface{}{ - "device_type": req.DeviceType, - "device_id": req.DeviceID, - "action": req.Action, + controlData := RelayControlData{ + DeviceType: req.DeviceType, + DeviceID: req.DeviceID, + Action: req.Action, } - err := c.websocketService.SendCommand("relay-001", "control_device", controlData) + // 发送指令并等待响应 + response, err := c.websocketService.SendCommandAndWait("relay-001", "control_device", controlData, 0) if err != nil { c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error()) - controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败") + controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error()) return } + // 使用响应中的状态和消息 + status := "解析失败" + message := "消息解析失败" + + // 如果响应中没有明确的状态和消息,则从数据中提取 + if status == "" && message == "" { + // 解析响应数据 + var responseData RelayControlResponseData + if err := response.ParseData(&responseData); err == nil { + status = responseData.Status + message = responseData.Message + } + } + // 创建设备控制记录 if err := c.createDeviceControlRecord( user.ID, req.DeviceID, req.DeviceType, req.Action, - "success", - "设备控制成功", + status, + message, ); err != nil { c.logger.Error("创建设备控制记录失败: " + err.Error()) - controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败") + controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "记录控制历史失败") return } @@ -99,6 +128,8 @@ func (c *Controller) Switch(ctx *gin.Context) { DeviceType: req.DeviceType, DeviceID: req.DeviceID, Action: req.Action, + Status: status, + Message: message, } controller.SendSuccessResponse(ctx, "设备控制成功", data) diff --git a/internal/controller/remote/remote.go b/internal/controller/remote/remote.go index 7284bcb..bab9cf9 100644 --- a/internal/controller/remote/remote.go +++ b/internal/controller/remote/remote.go @@ -13,13 +13,15 @@ import ( type Controller struct { websocketService *service.WebSocketService logger *logs.Logger + service *service.WebSocketService } // NewController 创建远程控制控制器实例 -func NewController(websocketService *service.WebSocketService) *Controller { +func NewController(websocketService *service.WebSocketService, service *service.WebSocketService) *Controller { return &Controller{ websocketService: websocketService, logger: logs.NewLogger(), + service: service, } } @@ -32,9 +34,17 @@ type SendCommandRequest struct { // SendCommandResponseData 发送指令响应数据结构体 type SendCommandResponseData struct { - DeviceID string `json:"device_id"` - Command string `json:"command"` - Status string `json:"status"` + DeviceID string `json:"device_id"` + Command string `json:"command"` + Status string `json:"status"` + Data interface{} `json:"data,omitempty"` +} + +// RelayCommandData 发送到中继设备的命令数据结构体 +type RelayCommandData struct { + DeviceID string `json:"device_id"` + Command string `json:"command"` + Data interface{} `json:"data,omitempty"` } // SendCommand 向设备发送指令接口 @@ -54,7 +64,14 @@ func (c *Controller) SendCommand(ctx *gin.Context) { } // 通过WebSocket服务向设备发送指令 - err := c.websocketService.SendCommand(req.DeviceID, req.Command, req.Data) + commandData := RelayCommandData{ + DeviceID: req.DeviceID, + Command: req.Command, + Data: req.Data, + } + + // 发送指令并等待响应 + response, err := c.websocketService.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0) if err != nil { c.logger.Error("发送指令失败: " + err.Error()) controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error()) @@ -65,6 +82,7 @@ func (c *Controller) SendCommand(ctx *gin.Context) { DeviceID: req.DeviceID, Command: req.Command, Status: "sent", + Data: response.Data, } controller.SendSuccessResponse(ctx, "指令发送成功", data) diff --git a/internal/core/application.go b/internal/core/application.go index 828f303..a9a5228 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -77,6 +77,8 @@ func NewApplication(cfg *config.Config) *Application { // 初始化WebSocket服务 websocketService := service.NewWebSocketService() + // 设置WebSocket超时时间 + websocketService.SetDefaultTimeout(cfg.GetWebSocketTimeout()) // 初始化API组件 apiInstance := api.NewAPI(cfg, userRepo, operationHistoryRepo, deviceControlRepo, deviceRepo, websocketService) diff --git a/internal/service/websocket.go b/internal/service/websocket.go index 863fba5..f318aa8 100644 --- a/internal/service/websocket.go +++ b/internal/service/websocket.go @@ -3,6 +3,7 @@ package service import ( + "context" "encoding/json" "fmt" "sync" @@ -52,6 +53,9 @@ type DeviceConnection struct { // LastHeartbeat 最后心跳时间 LastHeartbeat time.Time + + // ResponseChan 响应通道 + ResponseChan chan *WebSocketMessage } // WebSocketService WebSocket服务 @@ -64,16 +68,25 @@ type WebSocketService struct { // logger 日志记录器 logger *logs.Logger + + // defaultTimeout 默认超时时间(秒) + defaultTimeout int } // NewWebSocketService 创建WebSocket服务实例 func NewWebSocketService() *WebSocketService { return &WebSocketService{ - connections: make(map[string]*DeviceConnection), - logger: logs.NewLogger(), + connections: make(map[string]*DeviceConnection), + logger: logs.NewLogger(), + defaultTimeout: 5, // 默认5秒超时 } } +// SetDefaultTimeout 设置默认超时时间 +func (ws *WebSocketService) SetDefaultTimeout(timeout int) { + ws.defaultTimeout = timeout +} + // AddConnection 添加设备连接 func (ws *WebSocketService) AddConnection(deviceID string, conn *websocket.Conn) { ws.mutex.Lock() @@ -98,6 +111,16 @@ func (ws *WebSocketService) RemoveConnection(deviceID string) { ws.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceID)) } +// SetResponseHandler 设置响应处理器 +func (ws *WebSocketService) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) { + ws.mutex.Lock() + defer ws.mutex.Unlock() + + if deviceConn, exists := ws.connections[deviceID]; exists { + deviceConn.ResponseChan = responseChan + } +} + // SendCommand 向指定设备发送指令 func (ws *WebSocketService) SendCommand(deviceID, command string, data interface{}) error { ws.mutex.RLock() @@ -124,6 +147,99 @@ func (ws *WebSocketService) SendCommand(deviceID, command string, data interface return nil } +// CommandResponse WebSocket命令响应结构体 +type CommandResponse struct { + // DeviceID 设备ID + DeviceID string `json:"device_id,omitempty"` + + // Command 命令名称 + Command string `json:"command,omitempty"` + + // Data 响应数据 + Data interface{} `json:"data,omitempty"` + + // Status 响应状态 + Status string `json:"status,omitempty"` + + // Message 响应消息 + Message string `json:"message,omitempty"` + + // Timestamp 时间戳 + Timestamp time.Time `json:"timestamp"` +} + +// ParseData 将响应数据解析到目标结构体 +func (cr *CommandResponse) ParseData(target interface{}) error { + dataBytes, err := json.Marshal(cr.Data) + if err != nil { + return err + } + return json.Unmarshal(dataBytes, target) +} + +// CommandResult WebSocket命令执行结果 +type CommandResult struct { + // Response 响应消息 + Response *CommandResponse + + // Error 错误信息 + Error error +} + +// SendCommandAndWait 发送指令并等待响应 +func (ws *WebSocketService) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) { + // 如果未指定超时时间,使用默认超时时间 + if timeout <= 0 { + timeout = ws.defaultTimeout + } + + // 创建用于接收响应的通道 + responseChan := make(chan *WebSocketMessage, 1) + ws.SetResponseHandler(deviceID, responseChan) + + // 发送指令 + if err := ws.SendCommand(deviceID, command, data); err != nil { + return nil, fmt.Errorf("发送指令失败: %v", err) + } + + // 等待设备响应,设置超时 + var response *WebSocketMessage + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) + defer cancel() + + select { + case response = <-responseChan: + // 成功接收到响应 + // 转换为CommandResponse结构体 + commandResponse := &CommandResponse{ + DeviceID: response.DeviceID, + Command: response.Command, + Data: response.Data, + Timestamp: response.Timestamp, + } + + // 尝试提取状态和消息字段 + if responseData, ok := response.Data.(map[string]interface{}); ok { + if status, exists := responseData["status"]; exists { + if statusStr, ok := status.(string); ok { + commandResponse.Status = statusStr + } + } + + if message, exists := responseData["message"]; exists { + if messageStr, ok := message.(string); ok { + commandResponse.Message = messageStr + } + } + } + + return commandResponse, nil + case <-ctx.Done(): + // 超时处理 + return nil, fmt.Errorf("等待设备响应超时") + } +} + // GetConnectedDevices 获取已连接的设备列表 func (ws *WebSocketService) GetConnectedDevices() []string { ws.mutex.RLock() @@ -154,6 +270,22 @@ func (ws *WebSocketService) HandleMessage(deviceID string, message []byte) error ws.mutex.Unlock() } + // 处理响应消息 + if msg.Type == MessageTypeResponse { + ws.mutex.RLock() + if deviceConn, exists := ws.connections[deviceID]; exists && deviceConn.ResponseChan != nil { + // 发送响应到通道 + select { + case deviceConn.ResponseChan <- &msg: + // 成功发送 + default: + // 通道已满,丢弃消息 + ws.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", deviceID)) + } + } + ws.mutex.RUnlock() + } + // 记录消息日志 ws.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", deviceID, msg))