Compare commits

..

2 Commits

Author SHA1 Message Date
00822427ca 实现在心跳中采集各设备信息 2025-09-09 11:51:13 +08:00
fc657d7448 增加心跳 2025-09-09 10:51:36 +08:00
13 changed files with 476 additions and 16 deletions

View File

@@ -103,6 +103,29 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id]
- `command`: 指令名称,固定为"query_all_device_status"
- `timestamp`: 指令发送时间
### 3.4 心跳包指令
平台向中继设备发送心跳包指令,用于检测设备连接状态并获取下级设备状态信息。
**请求格式**
```json
{
"type": "command",
"command": "heartbeat",
"data": {
"timestamp": 1672545600
},
"timestamp": "2023-01-01T12:00:00Z"
}
```
**参数说明**
- `type`: 消息类型,固定为"command"
- `command`: 指令名称,固定为"heartbeat"
- `data`: 指令数据
- `timestamp`: 时间戳Unix时间戳格式
- `timestamp`: 指令发送时间
## 4. 响应接口
### 4.1 设备控制响应
@@ -167,6 +190,48 @@ ws://[server_address]:[port]/ws/device?device_id=[device_id]
}
```
### 4.4 心跳包响应
中继设备响应心跳包指令,返回自身及下级设备的状态信息。
**响应格式**
```json
{
"type": "response",
"command": "heartbeat",
"data": {
"devices": [
{
"device_id": "relay-001",
"device_type": "relay",
"status": "running"
},
{
"device_id": "fan-001",
"device_type": "fan",
"status": "running"
},
{
"device_id": "curtain-001",
"device_type": "water_curtain",
"status": "stopped"
}
]
},
"timestamp": "2023-01-01T12:00:05Z"
}
```
**参数说明**
- `type`: 消息类型,固定为"response"
- `command`: 指令名称,固定为"heartbeat"
- `data`: 响应数据
- `devices`: 设备列表
- `device_id`: 设备唯一标识符
- `device_type`: 设备类型
- `status`: 设备状态(如: running, stopped, online, offline等
- `timestamp`: 平台发送的时间戳, 需要原封不动的返回
## 5. 请求-响应机制
平台在发送指令后会等待中继设备的响应超时时间由配置文件决定默认为5秒。
@@ -277,6 +342,7 @@ if err := response.ParseData(&status); err != nil {
|------|------|
| fan | 风机设备 |
| water_curtain | 水帘设备 |
| relay | 中继设备 |
## 10. 动作说明
@@ -292,4 +358,7 @@ if err := response.ParseData(&status); err != nil {
| success | 操作成功 |
| failed | 操作失败 |
| running | 设备运行中 |
| stopped | 设备已停止 |
| stopped | 设备已停止 |
| online | 设备在线 |
| offline | 设备离线 |
| active | 设备激活 |

View File

@@ -3,4 +3,4 @@
1. websocket不是安全的wss
2. 添加设备时应该激活一下设备状态采集
3. 设备Model缺少硬件地址
4. 如果同时有两条请求发给同一个设备, 会不会导致接收到错误的回复

View File

@@ -25,4 +25,11 @@ database:
# WebSocket配置
websocket:
# WebSocket请求超时时间(秒)
timeout: 5
timeout: 5
# 心跳配置
heartbeat:
# 心跳间隔(秒)
interval: 30
# 请求并发数
concurrency: 5

3
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/gorilla/websocket v1.5.0
github.com/panjf2000/ants/v2 v2.11.3
golang.org/x/crypto v0.17.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/postgres v1.5.9
@@ -40,7 +41,7 @@ require (
github.com/ugorji/go/codec v1.2.11 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.11.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect

9
go.sum
View File

@@ -62,6 +62,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg=
github.com/panjf2000/ants/v2 v2.11.3/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -77,8 +79,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
@@ -90,8 +93,8 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=

View File

@@ -56,6 +56,9 @@ type API struct {
// websocketService WebSocket服务
websocketService *service.WebSocketService
// heartbeatService 心跳服务
heartbeatService *service.HeartbeatService
// deviceStatusPool 设备状态池
deviceStatusPool *service.DeviceStatusPool
@@ -65,7 +68,7 @@ type API struct {
// NewAPI 创建并返回一个新的API实例
// 初始化Gin引擎和相关配置
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *API {
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API {
// 设置Gin为发布模式
gin.SetMode(gin.DebugMode)
@@ -96,7 +99,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
operationController := operation.NewController(operationHistoryRepo)
// 创建设备控制控制器
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, deviceStatusPool)
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool)
// 创建WebSocket管理器
websocketManager := websocket.NewManager(websocketService, deviceRepo)
@@ -117,6 +120,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
authMiddleware: authMiddleware,
websocketManager: websocketManager,
websocketService: websocketService,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}

View File

@@ -20,6 +20,9 @@ type Config struct {
// WebSocket WebSocket配置
WebSocket WebSocketConfig `yaml:"websocket"`
// Heartbeat 心跳配置
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
}
// ServerConfig 代表服务器配置
@@ -76,12 +79,24 @@ type WebSocketConfig struct {
Timeout int `yaml:"timeout"`
}
// HeartbeatConfig 代表心跳配置
type HeartbeatConfig struct {
// Interval 心跳间隔(秒)
Interval int `yaml:"interval"`
// Concurrency 请求并发数
Concurrency int `yaml:"concurrency"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
return &Config{
WebSocket: WebSocketConfig{
Timeout: 5, // 默认5秒超时
},
Heartbeat: HeartbeatConfig{
Interval: 30, // 默认30秒心跳间隔
},
}
}
@@ -122,3 +137,8 @@ func (c *Config) GetWebSocketTimeout() int {
}
return c.WebSocket.Timeout
}
// GetHeartbeatConfig 获取心跳配置
func (c *Config) GetHeartbeatConfig() HeartbeatConfig {
return c.Heartbeat
}

View File

@@ -122,16 +122,18 @@ type Controller struct {
deviceControlRepo repository.DeviceControlRepo
deviceRepo repository.DeviceRepo
websocketService *service.WebSocketService
heartbeatService *service.HeartbeatService
deviceStatusPool *service.DeviceStatusPool
logger *logs.Logger
}
// NewController 创建设备控制控制器实例
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *Controller {
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller {
return &Controller{
deviceControlRepo: deviceControlRepo,
deviceRepo: deviceRepo,
websocketService: websocketService,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}
@@ -199,6 +201,9 @@ func (c *Controller) Create(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "创建设备成功", device)
}
@@ -253,9 +258,6 @@ func (c *Controller) Update(ctx *gin.Context) {
req.BusNumber != nil && req.DeviceAddress != nil {
device.Set485Address(*req.BusNumber, *req.DeviceAddress)
}
// TODO: 设备状态应该由系统自动获取,而不是由用户指定
// 这里保持设备原有状态,后续需要实现自动状态检测
// 设备状态现在只在内存中维护,不持久化到数据库
if err := c.deviceRepo.Update(device); err != nil {
c.logger.Error("更新设备失败: " + err.Error())
@@ -263,6 +265,9 @@ func (c *Controller) Update(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "更新设备成功", device)
}
@@ -291,6 +296,9 @@ func (c *Controller) Delete(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "删除设备成功", nil)
}
@@ -397,6 +405,9 @@ func (c *Controller) Switch(ctx *gin.Context) {
Message: message,
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "设备控制成功", data)
}
@@ -474,6 +485,8 @@ func (c *Controller) GetDeviceStatus(ctx *gin.Context) {
return
}
// TODO 需要刷新设备状态吗? 刷新的话这个接口可能会很慢
// 从设备状态池中获取设备状态
status, exists := c.deviceStatusPool.GetStatus(deviceID)
if !exists {

View File

@@ -45,6 +45,9 @@ type Application struct {
// DeviceStatusPool 设备状态池实例
DeviceStatusPool *service.DeviceStatusPool
// HeartbeatService 心跳服务实例
HeartbeatService *service.HeartbeatService
// Config 应用配置
Config *config.Config
@@ -103,8 +106,11 @@ func (app *Application) Start() error {
// 设置WebSocket超时时间
app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout())
// 初始化心跳服务
app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config)
// 初始化API组件
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.DeviceStatusPool)
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool)
// 初始化任务执行器组件(使用5个工作协程)
app.TaskExecutor = task.NewExecutor(5)
@@ -119,6 +125,9 @@ func (app *Application) Start() error {
app.logger.Info("启动任务执行器")
app.TaskExecutor.Start()
// 启动心跳服务
app.logger.Info("启动心跳服务")
app.HeartbeatService.Start()
return nil
}
@@ -134,6 +143,10 @@ func (app *Application) Stop() error {
app.logger.Info("停止任务执行器")
app.TaskExecutor.Stop()
// 停止心跳服务
app.logger.Info("停止心跳服务")
app.HeartbeatService.Stop()
// 停止存储组件
if err := app.Storage.Disconnect(); err != nil {
return fmt.Errorf("存储断开连接失败: %v", err)

View File

@@ -65,3 +65,19 @@ func (dsp *DeviceStatusPool) GetAllStatuses() map[string]*DeviceStatus {
return result
}
// SetAllStatuses 全量更新设备状态池
func (dsp *DeviceStatusPool) SetAllStatuses(statuses map[string]*DeviceStatus) {
dsp.mutex.Lock()
defer dsp.mutex.Unlock()
// 清空现有状态
for id := range dsp.statuses {
delete(dsp.statuses, id)
}
// 添加新状态
for id, status := range statuses {
dsp.statuses[id] = status
}
}

View File

@@ -0,0 +1,296 @@
// Package service 提供各种业务服务功能
package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/panjf2000/ants/v2"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
type HeartbeatService struct {
// websocketService WebSocket服务
websocketService *WebSocketService
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// logger 日志记录器
logger *logs.Logger
// 心跳间隔
heartbeatInterval time.Duration
// 手动心跳触发器
triggerChan chan struct{}
// ticker 心跳定时器
ticker *time.Ticker
// poolSize 线程池大小
poolSize int
// pool 线程池
pool *ants.Pool
// ctx 上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
}
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
interval := config.GetHeartbeatConfig().Interval
if interval <= 0 {
interval = 30 // 默认30秒心跳间隔
}
concurrency := config.GetHeartbeatConfig().Concurrency
if concurrency <= 0 {
concurrency = 10 // 默认10个并发
}
return &HeartbeatService{
websocketService: websocketService,
deviceStatusPool: deviceStatusPool,
deviceRepo: deviceRepo,
logger: logs.NewLogger(),
heartbeatInterval: time.Duration(interval) * time.Second,
poolSize: concurrency,
triggerChan: make(chan struct{}),
}
}
// Start 启动心跳服务
func (hs *HeartbeatService) Start() {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
hs.cancel = cancel
// 创建定时器
hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds())))
hs.ticker = time.NewTicker(hs.heartbeatInterval)
// 创建线程池
hs.pool, _ = ants.NewPool(hs.poolSize)
// 启动心跳goroutine
go func() {
for {
select {
case <-hs.ticker.C:
hs.handleHeartbeatAll()
case <-hs.triggerChan:
hs.handleHeartbeatAll()
case <-ctx.Done():
hs.logger.Info("心跳服务已停止")
return
}
}
}()
hs.logger.Info("心跳服务已启动")
}
// Stop 停止心跳服务
func (hs *HeartbeatService) Stop() {
if hs == nil {
return
}
if hs.ticker != nil {
hs.ticker.Stop()
}
if hs.cancel != nil {
hs.cancel()
}
if hs.pool != nil {
hs.pool.Release()
}
hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送")
}
// TriggerManualHeartbeat 手动触发心跳检测
func (hs *HeartbeatService) TriggerManualHeartbeat() {
hs.logger.Info("收到手动触发心跳检测请求")
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}
// TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果
func (hs *HeartbeatService) TriggerManualHeartbeatAsync() {
hs.logger.Info("收到手动触发异步心跳检测请求")
go func() {
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}()
}
// sendHeartbeatAll 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatAll() {
// 记录心跳开始日志
hs.logger.Debug("开始发送心跳包")
// 获取所有中继设备
relays, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取设备列表失败: " + err.Error())
return
}
// 创建线程安全的临时map用于保存所有设备状态
tempStatusMap := &TempStatusMap{
data: make(map[string]*DeviceStatus),
mu: sync.RWMutex{},
}
// 遍历所有连接的设备并发送心跳包
wg := sync.WaitGroup{}
for _, relay := range relays {
// 心跳包之发送给中继设备
if relay.Type != model.DeviceTypeRelay {
continue
}
id := fmt.Sprintf("%v", relay.ID)
name := relay.Name
wg.Add(1)
err := hs.pool.Submit(func() {
defer wg.Done()
err := hs.handleHeartbeatWithStatus(id, tempStatusMap)
if err != nil {
hs.logger.Error("[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err.Error())
}
})
if err != nil {
hs.logger.Error("向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err.Error())
}
}
wg.Wait()
// 获取所有设备列表
allDevices, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取所有设备列表失败: " + err.Error())
return
}
// 补齐临时map中缺失的设备缺失的设备全部设为离线状态
tempStatusMap.mu.Lock()
for _, device := range allDevices {
id := fmt.Sprintf("%v", device.ID)
if _, exists := tempStatusMap.data[id]; !exists {
tempStatusMap.data[id] = &DeviceStatus{
Active: false,
}
}
}
tempStatusMap.mu.Unlock()
// 将临时状态更新到全局状态池
hs.deviceStatusPool.SetAllStatuses(tempStatusMap.data)
hs.logger.Debug("心跳包发送完成")
}
// TempStatusMap 线程安全的临时状态映射
type TempStatusMap struct {
data map[string]*DeviceStatus
mu sync.RWMutex
}
// SetStatus 设置设备状态
func (tsm *TempStatusMap) SetStatus(deviceID string, status *DeviceStatus) {
tsm.mu.Lock()
defer tsm.mu.Unlock()
tsm.data[deviceID] = status
}
// GetStatus 获取设备状态
func (tsm *TempStatusMap) GetStatus(deviceID string) (*DeviceStatus, bool) {
tsm.mu.RLock()
defer tsm.mu.RUnlock()
status, exists := tsm.data[deviceID]
return status, exists
}
// sendHeartbeat 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatusMap *TempStatusMap) error {
// 构造带时间戳的心跳包数据
heartbeatData := map[string]interface{}{
"timestamp": time.Now().Unix(),
}
// 发送心跳包到设备
response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0)
if err != nil {
hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err))
// 更新设备状态为离线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: false,
})
return err
}
// 记录收到心跳响应
hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response))
// 有响应中继设备就是在线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: true,
})
// 时间戳校验
if response.Timestamp != heartbeatData["timestamp"] {
hs.logger.Error(fmt.Sprintf("心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致", response))
return errors.New("心跳响应时间戳校验失败")
}
// 解析响应中的下级设备状态
type DeviceStatusInfo struct {
DeviceID string `json:"device_id"`
DeviceType string `json:"device_type"`
Status string `json:"status"`
}
type HeartbeatResponseData struct {
Devices []DeviceStatusInfo `json:"devices"`
}
var responseData HeartbeatResponseData
if err := response.ParseData(&responseData); err != nil {
hs.logger.Error(fmt.Sprintf("解析设备 %s 的心跳响应数据失败: %v", deviceID, err))
return err
}
// 更新所有下级设备的状态
for _, device := range responseData.Devices {
// 根据设备状态确定Active值
isActive := device.Status == "running" || device.Status == "online" || device.Status == "active"
tempStatusMap.SetStatus(device.DeviceID, &DeviceStatus{
Active: isActive,
})
}
return nil
}

View File

@@ -34,6 +34,9 @@ type DeviceRepo interface {
// ListAll 获取所有设备列表
ListAll() ([]model.Device, error)
// FindRelayDevices 获取所有中继设备
FindRelayDevices() ([]*model.Device, error)
}
// DeviceControlRepo 设备控制仓库接口
@@ -135,6 +138,16 @@ func (r *deviceRepo) FindByType(deviceType model.DeviceType) ([]*model.Device, e
return devices, nil
}
// FindRelayDevices 获取所有中继设备
func (r *deviceRepo) FindRelayDevices() ([]*model.Device, error) {
var devices []*model.Device
result := r.db.Where("type = ?", model.DeviceTypeRelay).Find(&devices)
if result.Error != nil {
return nil, result.Error
}
return devices, nil
}
// Update 更新设备信息
func (r *deviceRepo) Update(device *model.Device) error {
result := r.db.Save(device)

9
vendor/modules.txt vendored
View File

@@ -118,6 +118,10 @@ github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.2
## explicit; go 1.12
github.com/modern-go/reflect2
# github.com/panjf2000/ants/v2 v2.11.3
## explicit; go 1.18
github.com/panjf2000/ants/v2
github.com/panjf2000/ants/v2/pkg/sync
# github.com/pelletier/go-toml/v2 v2.0.8
## explicit; go 1.16
github.com/pelletier/go-toml/v2
@@ -167,8 +171,9 @@ golang.org/x/net/http2
golang.org/x/net/http2/h2c
golang.org/x/net/http2/hpack
golang.org/x/net/idna
# golang.org/x/sync v0.1.0
## explicit
# golang.org/x/sync v0.11.0
## explicit; go 1.18
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
# golang.org/x/sys v0.26.0
## explicit; go 1.18