diff --git a/RELAY_API.md b/RELAY_API.md index 2c0c1d6..fb799f5 100644 --- a/RELAY_API.md +++ b/RELAY_API.md @@ -103,6 +103,29 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id] - `command`: 指令名称,固定为"query_all_device_status" - `timestamp`: 指令发送时间 +### 3.4 心跳包指令 + +平台向中继设备发送心跳包指令,用于检测设备连接状态并获取下级设备状态信息。 + +**请求格式** +```json +{ + "type": "command", + "command": "heartbeat", + "data": { + "timestamp": 1672545600 + }, + "timestamp": "2023-01-01T12:00:00Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"command" +- `command`: 指令名称,固定为"heartbeat" +- `data`: 指令数据 + - `timestamp`: 时间戳(Unix时间戳格式) +- `timestamp`: 指令发送时间 + ## 4. 响应接口 ### 4.1 设备控制响应 @@ -167,6 +190,48 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id] } ``` +### 4.4 心跳包响应 + +中继设备响应心跳包指令,返回自身及下级设备的状态信息。 + +**响应格式** +```json +{ + "type": "response", + "command": "heartbeat", + "data": { + "devices": [ + { + "device_id": "relay-001", + "device_type": "relay", + "status": "running" + }, + { + "device_id": "fan-001", + "device_type": "fan", + "status": "running" + }, + { + "device_id": "curtain-001", + "device_type": "water_curtain", + "status": "stopped" + } + ] + }, + "timestamp": "2023-01-01T12:00:05Z" +} +``` + +**参数说明** +- `type`: 消息类型,固定为"response" +- `command`: 指令名称,固定为"heartbeat" +- `data`: 响应数据 + - `devices`: 设备列表 + - `device_id`: 设备唯一标识符 + - `device_type`: 设备类型 + - `status`: 设备状态(如: running, stopped, online, offline等) +- `timestamp`: 平台发送的时间戳, 需要原封不动的返回 + ## 5. 请求-响应机制 平台在发送指令后会等待中继设备的响应,超时时间由配置文件决定,默认为5秒。 @@ -277,6 +342,7 @@ if err := response.ParseData(&status); err != nil { |------|------| | fan | 风机设备 | | water_curtain | 水帘设备 | +| relay | 中继设备 | ## 10. 动作说明 @@ -292,4 +358,7 @@ if err := response.ParseData(&status); err != nil { | success | 操作成功 | | failed | 操作失败 | | running | 设备运行中 | -| stopped | 设备已停止 | \ No newline at end of file +| stopped | 设备已停止 | +| online | 设备在线 | +| offline | 设备离线 | +| active | 设备激活 | \ No newline at end of file diff --git a/TODO-List b/TODO-List index 07243fc..89dd658 100644 --- a/TODO-List +++ b/TODO-List @@ -3,4 +3,4 @@ 1. websocket不是安全的wss 2. 添加设备时应该激活一下设备状态采集 3. 设备Model缺少硬件地址 - +4. 如果同时有两条请求发给同一个设备, 会不会导致接收到错误的回复 diff --git a/config.yml b/config.yml index 7e26f21..a635039 100644 --- a/config.yml +++ b/config.yml @@ -30,4 +30,6 @@ websocket: # 心跳配置 heartbeat: # 心跳间隔(秒) - interval: 30 \ No newline at end of file + interval: 30 + # 请求并发数 + concurrency: 5 \ No newline at end of file diff --git a/go.mod b/go.mod index 2b9514b..65476bb 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/golang-jwt/jwt/v5 v5.0.0 github.com/gorilla/websocket v1.5.0 + github.com/panjf2000/ants/v2 v2.11.3 golang.org/x/crypto v0.17.0 gopkg.in/yaml.v2 v2.4.0 gorm.io/driver/postgres v1.5.9 @@ -40,7 +41,7 @@ require ( github.com/ugorji/go/codec v1.2.11 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.1.0 // indirect + golang.org/x/sync v0.11.0 // indirect golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/go.sum b/go.sum index e902af7..0c21a1e 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg= +github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -77,8 +79,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= @@ -90,8 +93,8 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= diff --git a/internal/config/config.go b/internal/config/config.go index 32751c1..ca8e74a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,6 +83,9 @@ type WebSocketConfig struct { type HeartbeatConfig struct { // Interval 心跳间隔(秒) Interval int `yaml:"interval"` + + // Concurrency 请求并发数 + Concurrency int `yaml:"concurrency"` } // NewConfig 创建并返回一个新的配置实例 @@ -135,10 +138,7 @@ func (c *Config) GetWebSocketTimeout() int { return c.WebSocket.Timeout } -// GetHeartbeatInterval 获取心跳间隔(秒) -func (c *Config) GetHeartbeatInterval() int { - if c.Heartbeat.Interval <= 0 { - return 30 // 默认30秒心跳间隔 - } - return c.Heartbeat.Interval +// GetHeartbeatConfig 获取心跳配置 +func (c *Config) GetHeartbeatConfig() HeartbeatConfig { + return c.Heartbeat } diff --git a/internal/service/device_status.go b/internal/service/device_status.go index 807124f..05e5c0e 100644 --- a/internal/service/device_status.go +++ b/internal/service/device_status.go @@ -65,3 +65,19 @@ func (dsp *DeviceStatusPool) GetAllStatuses() map[string]*DeviceStatus { return result } + +// SetAllStatuses 全量更新设备状态池 +func (dsp *DeviceStatusPool) SetAllStatuses(statuses map[string]*DeviceStatus) { + dsp.mutex.Lock() + defer dsp.mutex.Unlock() + + // 清空现有状态 + for id := range dsp.statuses { + delete(dsp.statuses, id) + } + + // 添加新状态 + for id, status := range statuses { + dsp.statuses[id] = status + } +} diff --git a/internal/service/heartbeat.go b/internal/service/heartbeat.go index 4d3232a..885139a 100644 --- a/internal/service/heartbeat.go +++ b/internal/service/heartbeat.go @@ -3,12 +3,16 @@ package service import ( "context" + "errors" "fmt" + "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/config" "git.huangwc.com/pig/pig-farm-controller/internal/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" + "github.com/panjf2000/ants/v2" ) // HeartbeatService 心跳服务,负责管理设备的心跳检测 @@ -34,6 +38,12 @@ type HeartbeatService struct { // ticker 心跳定时器 ticker *time.Ticker + // poolSize 线程池大小 + poolSize int + + // pool 线程池 + pool *ants.Pool + // ctx 上下文 ctx context.Context @@ -43,12 +53,24 @@ type HeartbeatService struct { // NewHeartbeatService 创建心跳服务实例 func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { + + interval := config.GetHeartbeatConfig().Interval + if interval <= 0 { + interval = 30 // 默认30秒心跳间隔 + } + + concurrency := config.GetHeartbeatConfig().Concurrency + if concurrency <= 0 { + concurrency = 10 // 默认10个并发 + } + return &HeartbeatService{ websocketService: websocketService, deviceStatusPool: deviceStatusPool, deviceRepo: deviceRepo, logger: logs.NewLogger(), - heartbeatInterval: time.Duration(config.GetHeartbeatInterval()) * time.Second, + heartbeatInterval: time.Duration(interval) * time.Second, + poolSize: concurrency, triggerChan: make(chan struct{}), } } @@ -63,14 +85,17 @@ func (hs *HeartbeatService) Start() { hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds()))) hs.ticker = time.NewTicker(hs.heartbeatInterval) + // 创建线程池 + hs.pool, _ = ants.NewPool(hs.poolSize) + // 启动心跳goroutine go func() { for { select { case <-hs.ticker.C: - hs.handleHeartbeat() + hs.handleHeartbeatAll() case <-hs.triggerChan: - hs.handleHeartbeat() + hs.handleHeartbeatAll() case <-ctx.Done(): hs.logger.Info("心跳服务已停止") return @@ -95,6 +120,10 @@ func (hs *HeartbeatService) Stop() { hs.cancel() } + if hs.pool != nil { + hs.pool.Release() + } + hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送") } @@ -114,35 +143,154 @@ func (hs *HeartbeatService) TriggerManualHeartbeatAsync() { }() } -// sendHeartbeat 发送心跳包到所有中继设备 -func (hs *HeartbeatService) handleHeartbeat() { +// sendHeartbeatAll 发送心跳包到所有中继设备 +func (hs *HeartbeatService) handleHeartbeatAll() { // 记录心跳开始日志 hs.logger.Debug("开始发送心跳包") - // 获取所有已连接的设备 - connectedDevices := hs.websocketService.GetConnectedDevices() + // 获取所有中继设备 + relays, err := hs.deviceRepo.ListAll() + if err != nil { + hs.logger.Error("获取设备列表失败: " + err.Error()) + return + } + + // 创建线程安全的临时map用于保存所有设备状态 + tempStatusMap := &TempStatusMap{ + data: make(map[string]*DeviceStatus), + mu: sync.RWMutex{}, + } // 遍历所有连接的设备并发送心跳包 - for _, deviceID := range connectedDevices { - // 发送心跳包到设备 - response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", nil, 0) - if err != nil { - hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) - // 更新设备状态为离线 - hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{ - Active: false, - }) + wg := sync.WaitGroup{} + for _, relay := range relays { + // 心跳包之发送给中继设备 + if relay.Type != model.DeviceTypeRelay { continue } - // 记录收到心跳响应 - hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response)) + id := fmt.Sprintf("%v", relay.ID) + name := relay.Name - // 更新设备状态为在线 - hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{ - Active: true, + wg.Add(1) + err := hs.pool.Submit(func() { + defer wg.Done() + + err := hs.handleHeartbeatWithStatus(id, tempStatusMap) + if err != nil { + hs.logger.Error("[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err.Error()) + } }) + if err != nil { + hs.logger.Error("向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err.Error()) + } } + wg.Wait() + + // 获取所有设备列表 + allDevices, err := hs.deviceRepo.ListAll() + if err != nil { + hs.logger.Error("获取所有设备列表失败: " + err.Error()) + return + } + + // 补齐临时map中缺失的设备,缺失的设备全部设为离线状态 + tempStatusMap.mu.Lock() + for _, device := range allDevices { + id := fmt.Sprintf("%v", device.ID) + if _, exists := tempStatusMap.data[id]; !exists { + tempStatusMap.data[id] = &DeviceStatus{ + Active: false, + } + } + } + tempStatusMap.mu.Unlock() + + // 将临时状态更新到全局状态池 + hs.deviceStatusPool.SetAllStatuses(tempStatusMap.data) hs.logger.Debug("心跳包发送完成") } + +// TempStatusMap 线程安全的临时状态映射 +type TempStatusMap struct { + data map[string]*DeviceStatus + mu sync.RWMutex +} + +// SetStatus 设置设备状态 +func (tsm *TempStatusMap) SetStatus(deviceID string, status *DeviceStatus) { + tsm.mu.Lock() + defer tsm.mu.Unlock() + tsm.data[deviceID] = status +} + +// GetStatus 获取设备状态 +func (tsm *TempStatusMap) GetStatus(deviceID string) (*DeviceStatus, bool) { + tsm.mu.RLock() + defer tsm.mu.RUnlock() + status, exists := tsm.data[deviceID] + return status, exists +} + +// sendHeartbeat 发送心跳包到所有中继设备 +func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatusMap *TempStatusMap) error { + // 构造带时间戳的心跳包数据 + heartbeatData := map[string]interface{}{ + "timestamp": time.Now().Unix(), + } + + // 发送心跳包到设备 + response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) + if err != nil { + hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) + // 更新设备状态为离线 + tempStatusMap.SetStatus(deviceID, &DeviceStatus{ + Active: false, + }) + return err + } + + // 记录收到心跳响应 + hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response)) + + // 有响应中继设备就是在线 + tempStatusMap.SetStatus(deviceID, &DeviceStatus{ + Active: true, + }) + + // 时间戳校验 + if response.Timestamp != heartbeatData["timestamp"] { + hs.logger.Error(fmt.Sprintf("心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致", response)) + return errors.New("心跳响应时间戳校验失败") + } + + // 解析响应中的下级设备状态 + type DeviceStatusInfo struct { + DeviceID string `json:"device_id"` + DeviceType string `json:"device_type"` + Status string `json:"status"` + } + + type HeartbeatResponseData struct { + Devices []DeviceStatusInfo `json:"devices"` + } + + var responseData HeartbeatResponseData + if err := response.ParseData(&responseData); err != nil { + hs.logger.Error(fmt.Sprintf("解析设备 %s 的心跳响应数据失败: %v", deviceID, err)) + return err + } + + // 更新所有下级设备的状态 + for _, device := range responseData.Devices { + // 根据设备状态确定Active值 + isActive := device.Status == "running" || device.Status == "online" || device.Status == "active" + + tempStatusMap.SetStatus(device.DeviceID, &DeviceStatus{ + Active: isActive, + }) + } + + return nil +} diff --git a/internal/storage/repository/device.go b/internal/storage/repository/device.go index d984c3b..d1f64db 100644 --- a/internal/storage/repository/device.go +++ b/internal/storage/repository/device.go @@ -34,6 +34,9 @@ type DeviceRepo interface { // ListAll 获取所有设备列表 ListAll() ([]model.Device, error) + + // FindRelayDevices 获取所有中继设备 + FindRelayDevices() ([]*model.Device, error) } // DeviceControlRepo 设备控制仓库接口 @@ -135,6 +138,16 @@ func (r *deviceRepo) FindByType(deviceType model.DeviceType) ([]*model.Device, e return devices, nil } +// FindRelayDevices 获取所有中继设备 +func (r *deviceRepo) FindRelayDevices() ([]*model.Device, error) { + var devices []*model.Device + result := r.db.Where("type = ?", model.DeviceTypeRelay).Find(&devices) + if result.Error != nil { + return nil, result.Error + } + return devices, nil +} + // Update 更新设备信息 func (r *deviceRepo) Update(device *model.Device) error { result := r.db.Save(device) diff --git a/vendor/modules.txt b/vendor/modules.txt index 3b62d62..043e0a9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -118,6 +118,10 @@ github.com/modern-go/concurrent # github.com/modern-go/reflect2 v1.0.2 ## explicit; go 1.12 github.com/modern-go/reflect2 +# github.com/panjf2000/ants/v2 v2.11.3 +## explicit; go 1.18 +github.com/panjf2000/ants/v2 +github.com/panjf2000/ants/v2/pkg/sync # github.com/pelletier/go-toml/v2 v2.0.8 ## explicit; go 1.16 github.com/pelletier/go-toml/v2 @@ -167,8 +171,9 @@ golang.org/x/net/http2 golang.org/x/net/http2/h2c golang.org/x/net/http2/hpack golang.org/x/net/idna -# golang.org/x/sync v0.1.0 -## explicit +# golang.org/x/sync v0.11.0 +## explicit; go 1.18 +golang.org/x/sync/errgroup golang.org/x/sync/semaphore # golang.org/x/sys v0.26.0 ## explicit; go 1.18