1. 完善websocket通信逻辑

2. 实现Switch接口
This commit is contained in:
2025-09-08 14:59:42 +08:00
parent a6ee1013ca
commit 6a2d97b543
7 changed files with 499 additions and 19 deletions

271
RELAY_API.md Normal file
View File

@@ -0,0 +1,271 @@
# 中继程序接口定义
本文档定义了猪场控制器系统主动向中继程序发送请求时使用的接口规范。
## 1. 概述
中继程序作为猪场控制器系统与现场设备之间的桥梁,负责接收来自控制器系统的指令并转发给相应的设备,同时将设备的响应和状态信息回传给控制器系统。
通信方式采用WebSocket协议确保双向实时通信。
## 2. WebSocket连接
### 2.1 连接建立
设备通过WebSocket连接到平台连接地址格式
```
ws://[server_address]:[port]/ws/device?device_id=[device_id]
```
参数说明:
- `device_id`: 中继设备唯一标识符
### 2.2 心跳机制
为保持连接活跃状态,中继设备需要定期发送心跳消息:
**心跳请求**
```json
{
"type": "heartbeat",
"device_id": "relay-001",
"timestamp": "2023-01-01T12:00:00Z"
}
```
## 3. 指令接口
### 3.1 设备控制指令
平台向中继设备发送设备控制指令。
**请求格式**
```json
{
"type": "command",
"command": "control_device",
"data": {
"device_type": "fan",
"device_id": "fan-001",
"action": "on"
},
"timestamp": "2023-01-01T12:00:00Z"
}
```
**参数说明**
- `type`: 消息类型,固定为"command"
- `command`: 指令名称,固定为"control_device"
- `data`: 指令数据
- `device_type`: 设备类型 (如: fan, water_curtain)
- `device_id`: 设备唯一标识符
- `action`: 执行动作 (如: on, off)
- `timestamp`: 指令发送时间
### 3.2 查询设备状态指令
平台向中继设备发送查询设备状态指令。
**请求格式**
```json
{
"type": "command",
"command": "query_device_status",
"data": {
"device_id": "fan-001"
},
"timestamp": "2023-01-01T12:00:00Z"
}
```
**参数说明**
- `type`: 消息类型,固定为"command"
- `command`: 指令名称,固定为"query_device_status"
- `data`: 指令数据
- `device_id`: 设备唯一标识符
- `timestamp`: 指令发送时间
### 3.3 查询所有设备状态指令
平台向中继设备发送查询所有设备状态指令。
**请求格式**
```json
{
"type": "command",
"command": "query_all_device_status",
"timestamp": "2023-01-01T12:00:00Z"
}
```
**参数说明**
- `type`: 消息类型,固定为"command"
- `command`: 指令名称,固定为"query_all_device_status"
- `timestamp`: 指令发送时间
## 4. 响应接口
### 4.1 设备控制响应
中继设备执行控制指令后,向平台发送响应。
**响应格式**
```json
{
"type": "response",
"command": "control_device",
"data": {
"device_id": "fan-001",
"status": "success",
"message": "设备控制成功"
},
"timestamp": "2023-01-01T12:00:05Z"
}
```
### 4.2 设备状态响应
中继设备响应设备状态查询指令。
**响应格式**
```json
{
"type": "response",
"command": "query_device_status",
"data": {
"device_id": "fan-001",
"status": "running",
"power": 220,
"current": 5.2
},
"timestamp": "2023-01-01T12:00:05Z"
}
```
### 4.3 所有设备状态响应
中继设备响应所有设备状态查询指令。
**响应格式**
```json
{
"type": "response",
"command": "query_all_device_status",
"data": [
{
"device_id": "fan-001",
"device_type": "fan",
"status": "running"
},
{
"device_id": "curtain-001",
"device_type": "water_curtain",
"status": "stopped"
}
],
"timestamp": "2023-01-01T12:00:05Z"
}
```
## 5. 请求-响应机制
平台在发送指令后会等待中继设备的响应超时时间由配置文件决定默认为5秒。
### 5.1 同步等待响应
平台发送指令后会阻塞等待中继设备返回响应,直到:
1. 收到中继设备的响应消息
2. 超时由配置文件决定默认5秒
### 5.2 响应处理
平台接收到响应后会:
1. 解析响应数据
2. 根据响应结果更新设备控制记录
3. 向前端返回操作结果
### 5.3 超时处理
如果在指定时间内未收到中继设备的响应,平台会:
1. 记录超时错误日志
2. 向前端返回超时错误信息
3. 设备控制记录中标记为失败状态
## 6. 配置说明
WebSocket请求超时时间可以通过配置文件进行设置
```yaml
# WebSocket配置
websocket:
# WebSocket请求超时时间(秒)
timeout: 5
```
如果没有配置或配置值无效系统将使用默认的5秒超时时间。
## 7. 响应结构定义
平台提供统一的响应结构定义,用于处理中继设备返回的响应:
### 7.1 CommandResponse 结构体
```go
type CommandResponse struct {
// DeviceID 设备ID
DeviceID string
// Command 命令名称
Command string
// Data 响应数据
Data interface{}
// Status 响应状态
Status string
// Message 响应消息
Message string
// Timestamp 时间戳
Timestamp time.Time
}
```
### 7.2 响应处理规则
1. `Status` 字段:表示操作的整体状态,如 "success"、"failed" 等
2. `Message` 字段:提供人类可读的操作结果描述
3. `Data` 字段:包含具体的操作结果数据
4. 如果响应中的 `Data` 是一个对象且包含 `status``message` 字段,系统会自动提取并填充到对应的结构体字段中
## 8. 消息类型说明
| 类型 | 说明 |
|------|------|
| command | 平台向中继设备发送的指令 |
| response | 中继设备向平台发送的响应 |
| heartbeat | 心跳消息 |
## 9. 设备类型说明
| 类型 | 说明 |
|------|------|
| fan | 风机设备 |
| water_curtain | 水帘设备 |
## 10. 动作说明
| 动作 | 说明 |
|------|------|
| on | 开启设备 |
| off | 关闭设备 |
## 11. 状态说明
| 状态 | 说明 |
|------|------|
| success | 操作成功 |
| failed | 操作失败 |
| running | 设备运行中 |
| stopped | 设备已停止 |

View File

@@ -21,3 +21,8 @@ database:
max_open_conns: 5
max_idle_conns: 5
conn_max_lifetime: 300 # 5分钟
# WebSocket配置
websocket:
# WebSocket请求超时时间(秒)
timeout: 5

View File

@@ -17,6 +17,9 @@ type Config struct {
// Database 数据库配置
Database DatabaseConfig `yaml:"database"`
// WebSocket WebSocket配置
WebSocket WebSocketConfig `yaml:"websocket"`
}
// ServerConfig 代表服务器配置
@@ -67,9 +70,19 @@ type DatabaseConfig struct {
ConnMaxLifetime int `yaml:"conn_max_lifetime"`
}
// WebSocketConfig 代表WebSocket配置
type WebSocketConfig struct {
// Timeout WebSocket请求超时时间(秒)
Timeout int `yaml:"timeout"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
return &Config{}
return &Config{
WebSocket: WebSocketConfig{
Timeout: 5, // 默认5秒超时
},
}
}
// Load 从指定路径加载配置文件
@@ -101,3 +114,11 @@ func (c *Config) GetDatabaseConnectionString() string {
c.Database.SSLMode,
)
}
// GetWebSocketTimeout 获取WebSocket超时时间(秒)
func (c *Config) GetWebSocketTimeout() int {
if c.WebSocket.Timeout <= 0 {
return 5 // 默认5秒超时
}
return c.WebSocket.Timeout
}

View File

@@ -43,6 +43,21 @@ type SwitchResponseData struct {
DeviceType string `json:"device_type"`
DeviceID string `json:"device_id"`
Action string `json:"action"`
Status string `json:"status"` // 添加状态字段
Message string `json:"message"` // 添加消息字段
}
// RelayControlData 发送给中继设备的控制数据结构体
type RelayControlData struct {
DeviceType string `json:"device_type"`
DeviceID string `json:"device_id"`
Action string `json:"action"`
}
// RelayControlResponseData 中继设备控制响应数据结构体
type RelayControlResponseData struct {
Status string `json:"status"`
Message string `json:"message"`
}
// Switch 设备控制接口
@@ -67,31 +82,45 @@ func (c *Controller) Switch(ctx *gin.Context) {
}
// 通过WebSocket向中继设备发送控制指令
// 这里假设中继设备ID为"relay-001",在实际应用中应该根据设备层级结构动态获取
controlData := map[string]interface{}{
"device_type": req.DeviceType,
"device_id": req.DeviceID,
"action": req.Action,
controlData := RelayControlData{
DeviceType: req.DeviceType,
DeviceID: req.DeviceID,
Action: req.Action,
}
err := c.websocketService.SendCommand("relay-001", "control_device", controlData)
// 发送指令并等待响应
response, err := c.websocketService.SendCommandAndWait("relay-001", "control_device", controlData, 0)
if err != nil {
c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败")
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error())
return
}
// 使用响应中的状态和消息
status := "解析失败"
message := "消息解析失败"
// 如果响应中没有明确的状态和消息,则从数据中提取
if status == "" && message == "" {
// 解析响应数据
var responseData RelayControlResponseData
if err := response.ParseData(&responseData); err == nil {
status = responseData.Status
message = responseData.Message
}
}
// 创建设备控制记录
if err := c.createDeviceControlRecord(
user.ID,
req.DeviceID,
req.DeviceType,
req.Action,
"success",
"设备控制成功",
status,
message,
); err != nil {
c.logger.Error("创建设备控制记录失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败")
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "记录控制历史失败")
return
}
@@ -99,6 +128,8 @@ func (c *Controller) Switch(ctx *gin.Context) {
DeviceType: req.DeviceType,
DeviceID: req.DeviceID,
Action: req.Action,
Status: status,
Message: message,
}
controller.SendSuccessResponse(ctx, "设备控制成功", data)

View File

@@ -13,13 +13,15 @@ import (
type Controller struct {
websocketService *service.WebSocketService
logger *logs.Logger
service *service.WebSocketService
}
// NewController 创建远程控制控制器实例
func NewController(websocketService *service.WebSocketService) *Controller {
func NewController(websocketService *service.WebSocketService, service *service.WebSocketService) *Controller {
return &Controller{
websocketService: websocketService,
logger: logs.NewLogger(),
service: service,
}
}
@@ -35,6 +37,14 @@ type SendCommandResponseData struct {
DeviceID string `json:"device_id"`
Command string `json:"command"`
Status string `json:"status"`
Data interface{} `json:"data,omitempty"`
}
// RelayCommandData 发送到中继设备的命令数据结构体
type RelayCommandData struct {
DeviceID string `json:"device_id"`
Command string `json:"command"`
Data interface{} `json:"data,omitempty"`
}
// SendCommand 向设备发送指令接口
@@ -54,7 +64,14 @@ func (c *Controller) SendCommand(ctx *gin.Context) {
}
// 通过WebSocket服务向设备发送指令
err := c.websocketService.SendCommand(req.DeviceID, req.Command, req.Data)
commandData := RelayCommandData{
DeviceID: req.DeviceID,
Command: req.Command,
Data: req.Data,
}
// 发送指令并等待响应
response, err := c.websocketService.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0)
if err != nil {
c.logger.Error("发送指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error())
@@ -65,6 +82,7 @@ func (c *Controller) SendCommand(ctx *gin.Context) {
DeviceID: req.DeviceID,
Command: req.Command,
Status: "sent",
Data: response.Data,
}
controller.SendSuccessResponse(ctx, "指令发送成功", data)

View File

@@ -77,6 +77,8 @@ func NewApplication(cfg *config.Config) *Application {
// 初始化WebSocket服务
websocketService := service.NewWebSocketService()
// 设置WebSocket超时时间
websocketService.SetDefaultTimeout(cfg.GetWebSocketTimeout())
// 初始化API组件
apiInstance := api.NewAPI(cfg, userRepo, operationHistoryRepo, deviceControlRepo, deviceRepo, websocketService)

View File

@@ -3,6 +3,7 @@
package service
import (
"context"
"encoding/json"
"fmt"
"sync"
@@ -52,6 +53,9 @@ type DeviceConnection struct {
// LastHeartbeat 最后心跳时间
LastHeartbeat time.Time
// ResponseChan 响应通道
ResponseChan chan *WebSocketMessage
}
// WebSocketService WebSocket服务
@@ -64,6 +68,9 @@ type WebSocketService struct {
// logger 日志记录器
logger *logs.Logger
// defaultTimeout 默认超时时间(秒)
defaultTimeout int
}
// NewWebSocketService 创建WebSocket服务实例
@@ -71,9 +78,15 @@ func NewWebSocketService() *WebSocketService {
return &WebSocketService{
connections: make(map[string]*DeviceConnection),
logger: logs.NewLogger(),
defaultTimeout: 5, // 默认5秒超时
}
}
// SetDefaultTimeout 设置默认超时时间
func (ws *WebSocketService) SetDefaultTimeout(timeout int) {
ws.defaultTimeout = timeout
}
// AddConnection 添加设备连接
func (ws *WebSocketService) AddConnection(deviceID string, conn *websocket.Conn) {
ws.mutex.Lock()
@@ -98,6 +111,16 @@ func (ws *WebSocketService) RemoveConnection(deviceID string) {
ws.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceID))
}
// SetResponseHandler 设置响应处理器
func (ws *WebSocketService) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) {
ws.mutex.Lock()
defer ws.mutex.Unlock()
if deviceConn, exists := ws.connections[deviceID]; exists {
deviceConn.ResponseChan = responseChan
}
}
// SendCommand 向指定设备发送指令
func (ws *WebSocketService) SendCommand(deviceID, command string, data interface{}) error {
ws.mutex.RLock()
@@ -124,6 +147,99 @@ func (ws *WebSocketService) SendCommand(deviceID, command string, data interface
return nil
}
// CommandResponse WebSocket命令响应结构体
type CommandResponse struct {
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 命令名称
Command string `json:"command,omitempty"`
// Data 响应数据
Data interface{} `json:"data,omitempty"`
// Status 响应状态
Status string `json:"status,omitempty"`
// Message 响应消息
Message string `json:"message,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// ParseData 将响应数据解析到目标结构体
func (cr *CommandResponse) ParseData(target interface{}) error {
dataBytes, err := json.Marshal(cr.Data)
if err != nil {
return err
}
return json.Unmarshal(dataBytes, target)
}
// CommandResult WebSocket命令执行结果
type CommandResult struct {
// Response 响应消息
Response *CommandResponse
// Error 错误信息
Error error
}
// SendCommandAndWait 发送指令并等待响应
func (ws *WebSocketService) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) {
// 如果未指定超时时间,使用默认超时时间
if timeout <= 0 {
timeout = ws.defaultTimeout
}
// 创建用于接收响应的通道
responseChan := make(chan *WebSocketMessage, 1)
ws.SetResponseHandler(deviceID, responseChan)
// 发送指令
if err := ws.SendCommand(deviceID, command, data); err != nil {
return nil, fmt.Errorf("发送指令失败: %v", err)
}
// 等待设备响应,设置超时
var response *WebSocketMessage
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
select {
case response = <-responseChan:
// 成功接收到响应
// 转换为CommandResponse结构体
commandResponse := &CommandResponse{
DeviceID: response.DeviceID,
Command: response.Command,
Data: response.Data,
Timestamp: response.Timestamp,
}
// 尝试提取状态和消息字段
if responseData, ok := response.Data.(map[string]interface{}); ok {
if status, exists := responseData["status"]; exists {
if statusStr, ok := status.(string); ok {
commandResponse.Status = statusStr
}
}
if message, exists := responseData["message"]; exists {
if messageStr, ok := message.(string); ok {
commandResponse.Message = messageStr
}
}
}
return commandResponse, nil
case <-ctx.Done():
// 超时处理
return nil, fmt.Errorf("等待设备响应超时")
}
}
// GetConnectedDevices 获取已连接的设备列表
func (ws *WebSocketService) GetConnectedDevices() []string {
ws.mutex.RLock()
@@ -154,6 +270,22 @@ func (ws *WebSocketService) HandleMessage(deviceID string, message []byte) error
ws.mutex.Unlock()
}
// 处理响应消息
if msg.Type == MessageTypeResponse {
ws.mutex.RLock()
if deviceConn, exists := ws.connections[deviceID]; exists && deviceConn.ResponseChan != nil {
// 发送响应到通道
select {
case deviceConn.ResponseChan <- &msg:
// 成功发送
default:
// 通道已满,丢弃消息
ws.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", deviceID))
}
}
ws.mutex.RUnlock()
}
// 记录消息日志
ws.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", deviceID, msg))