From fc657d7448aa174e5bf4e94f1d5001d13643df2e Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Tue, 9 Sep 2025 10:51:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=BF=83=E8=B7=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 7 +- internal/api/api.go | 8 +- internal/config/config.go | 20 ++++ internal/controller/device/device.go | 21 +++- internal/core/application.go | 15 ++- internal/service/heartbeat.go | 148 +++++++++++++++++++++++++++ 6 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 internal/service/heartbeat.go diff --git a/config.yml b/config.yml index af9e52d..7e26f21 100644 --- a/config.yml +++ b/config.yml @@ -25,4 +25,9 @@ database: # WebSocket配置 websocket: # WebSocket请求超时时间(秒) - timeout: 5 \ No newline at end of file + timeout: 5 + +# 心跳配置 +heartbeat: + # 心跳间隔(秒) + interval: 30 \ No newline at end of file diff --git a/internal/api/api.go b/internal/api/api.go index e1787b7..e9aa30d 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -56,6 +56,9 @@ type API struct { // websocketService WebSocket服务 websocketService *service.WebSocketService + // heartbeatService 心跳服务 + heartbeatService *service.HeartbeatService + // deviceStatusPool 设备状态池 deviceStatusPool *service.DeviceStatusPool @@ -65,7 +68,7 @@ type API struct { // NewAPI 创建并返回一个新的API实例 // 初始化Gin引擎和相关配置 -func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *API { +func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API { // 设置Gin为发布模式 gin.SetMode(gin.DebugMode) @@ -96,7 +99,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe operationController := operation.NewController(operationHistoryRepo) // 创建设备控制控制器 - deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, deviceStatusPool) + deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool) // 创建WebSocket管理器 websocketManager := websocket.NewManager(websocketService, deviceRepo) @@ -117,6 +120,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe authMiddleware: authMiddleware, websocketManager: websocketManager, websocketService: websocketService, + heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), } diff --git a/internal/config/config.go b/internal/config/config.go index e6db5b1..32751c1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -20,6 +20,9 @@ type Config struct { // WebSocket WebSocket配置 WebSocket WebSocketConfig `yaml:"websocket"` + + // Heartbeat 心跳配置 + Heartbeat HeartbeatConfig `yaml:"heartbeat"` } // ServerConfig 代表服务器配置 @@ -76,12 +79,21 @@ type WebSocketConfig struct { Timeout int `yaml:"timeout"` } +// HeartbeatConfig 代表心跳配置 +type HeartbeatConfig struct { + // Interval 心跳间隔(秒) + Interval int `yaml:"interval"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { return &Config{ WebSocket: WebSocketConfig{ Timeout: 5, // 默认5秒超时 }, + Heartbeat: HeartbeatConfig{ + Interval: 30, // 默认30秒心跳间隔 + }, } } @@ -122,3 +134,11 @@ func (c *Config) GetWebSocketTimeout() int { } return c.WebSocket.Timeout } + +// GetHeartbeatInterval 获取心跳间隔(秒) +func (c *Config) GetHeartbeatInterval() int { + if c.Heartbeat.Interval <= 0 { + return 30 // 默认30秒心跳间隔 + } + return c.Heartbeat.Interval +} diff --git a/internal/controller/device/device.go b/internal/controller/device/device.go index 0892e57..9b30e5f 100644 --- a/internal/controller/device/device.go +++ b/internal/controller/device/device.go @@ -122,16 +122,18 @@ type Controller struct { deviceControlRepo repository.DeviceControlRepo deviceRepo repository.DeviceRepo websocketService *service.WebSocketService + heartbeatService *service.HeartbeatService deviceStatusPool *service.DeviceStatusPool logger *logs.Logger } // NewController 创建设备控制控制器实例 -func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *Controller { +func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller { return &Controller{ deviceControlRepo: deviceControlRepo, deviceRepo: deviceRepo, websocketService: websocketService, + heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), } @@ -199,6 +201,9 @@ func (c *Controller) Create(ctx *gin.Context) { return } + // 刷新设备状态 + c.heartbeatService.TriggerManualHeartbeatAsync() + controller.SendSuccessResponse(ctx, "创建设备成功", device) } @@ -253,9 +258,6 @@ func (c *Controller) Update(ctx *gin.Context) { req.BusNumber != nil && req.DeviceAddress != nil { device.Set485Address(*req.BusNumber, *req.DeviceAddress) } - // TODO: 设备状态应该由系统自动获取,而不是由用户指定 - // 这里保持设备原有状态,后续需要实现自动状态检测 - // 设备状态现在只在内存中维护,不持久化到数据库 if err := c.deviceRepo.Update(device); err != nil { c.logger.Error("更新设备失败: " + err.Error()) @@ -263,6 +265,9 @@ func (c *Controller) Update(ctx *gin.Context) { return } + // 刷新设备状态 + c.heartbeatService.TriggerManualHeartbeatAsync() + controller.SendSuccessResponse(ctx, "更新设备成功", device) } @@ -291,6 +296,9 @@ func (c *Controller) Delete(ctx *gin.Context) { return } + // 刷新设备状态 + c.heartbeatService.TriggerManualHeartbeatAsync() + controller.SendSuccessResponse(ctx, "删除设备成功", nil) } @@ -397,6 +405,9 @@ func (c *Controller) Switch(ctx *gin.Context) { Message: message, } + // 刷新设备状态 + c.heartbeatService.TriggerManualHeartbeatAsync() + controller.SendSuccessResponse(ctx, "设备控制成功", data) } @@ -474,6 +485,8 @@ func (c *Controller) GetDeviceStatus(ctx *gin.Context) { return } + // TODO 需要刷新设备状态吗? 刷新的话这个接口可能会很慢 + // 从设备状态池中获取设备状态 status, exists := c.deviceStatusPool.GetStatus(deviceID) if !exists { diff --git a/internal/core/application.go b/internal/core/application.go index 9bfb42e..3da5336 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -45,6 +45,9 @@ type Application struct { // DeviceStatusPool 设备状态池实例 DeviceStatusPool *service.DeviceStatusPool + // HeartbeatService 心跳服务实例 + HeartbeatService *service.HeartbeatService + // Config 应用配置 Config *config.Config @@ -103,8 +106,11 @@ func (app *Application) Start() error { // 设置WebSocket超时时间 app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout()) + // 初始化心跳服务 + app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config) + // 初始化API组件 - app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.DeviceStatusPool) + app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool) // 初始化任务执行器组件(使用5个工作协程) app.TaskExecutor = task.NewExecutor(5) @@ -119,6 +125,9 @@ func (app *Application) Start() error { app.logger.Info("启动任务执行器") app.TaskExecutor.Start() + // 启动心跳服务 + app.logger.Info("启动心跳服务") + app.HeartbeatService.Start() return nil } @@ -134,6 +143,10 @@ func (app *Application) Stop() error { app.logger.Info("停止任务执行器") app.TaskExecutor.Stop() + // 停止心跳服务 + app.logger.Info("停止心跳服务") + app.HeartbeatService.Stop() + // 停止存储组件 if err := app.Storage.Disconnect(); err != nil { return fmt.Errorf("存储断开连接失败: %v", err) diff --git a/internal/service/heartbeat.go b/internal/service/heartbeat.go new file mode 100644 index 0000000..4d3232a --- /dev/null +++ b/internal/service/heartbeat.go @@ -0,0 +1,148 @@ +// Package service 提供各种业务服务功能 +package service + +import ( + "context" + "fmt" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/config" + "git.huangwc.com/pig/pig-farm-controller/internal/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" +) + +// HeartbeatService 心跳服务,负责管理设备的心跳检测 +type HeartbeatService struct { + // websocketService WebSocket服务 + websocketService *WebSocketService + + // deviceStatusPool 设备状态池 + deviceStatusPool *DeviceStatusPool + + // deviceRepo 设备仓库 + deviceRepo repository.DeviceRepo + + // logger 日志记录器 + logger *logs.Logger + + // 心跳间隔 + heartbeatInterval time.Duration + + // 手动心跳触发器 + triggerChan chan struct{} + + // ticker 心跳定时器 + ticker *time.Ticker + + // ctx 上下文 + ctx context.Context + + // cancel 取消函数 + cancel context.CancelFunc +} + +// NewHeartbeatService 创建心跳服务实例 +func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { + return &HeartbeatService{ + websocketService: websocketService, + deviceStatusPool: deviceStatusPool, + deviceRepo: deviceRepo, + logger: logs.NewLogger(), + heartbeatInterval: time.Duration(config.GetHeartbeatInterval()) * time.Second, + triggerChan: make(chan struct{}), + } +} + +// Start 启动心跳服务 +func (hs *HeartbeatService) Start() { + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + hs.cancel = cancel + + // 创建定时器 + hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds()))) + hs.ticker = time.NewTicker(hs.heartbeatInterval) + + // 启动心跳goroutine + go func() { + for { + select { + case <-hs.ticker.C: + hs.handleHeartbeat() + case <-hs.triggerChan: + hs.handleHeartbeat() + case <-ctx.Done(): + hs.logger.Info("心跳服务已停止") + return + } + } + }() + + hs.logger.Info("心跳服务已启动") +} + +// Stop 停止心跳服务 +func (hs *HeartbeatService) Stop() { + if hs == nil { + return + } + + if hs.ticker != nil { + hs.ticker.Stop() + } + + if hs.cancel != nil { + hs.cancel() + } + + hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送") +} + +// TriggerManualHeartbeat 手动触发心跳检测 +func (hs *HeartbeatService) TriggerManualHeartbeat() { + hs.logger.Info("收到手动触发心跳检测请求") + hs.triggerChan <- struct{}{} + hs.logger.Info("手动心跳检测完成") +} + +// TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果 +func (hs *HeartbeatService) TriggerManualHeartbeatAsync() { + hs.logger.Info("收到手动触发异步心跳检测请求") + go func() { + hs.triggerChan <- struct{}{} + hs.logger.Info("手动心跳检测完成") + }() +} + +// sendHeartbeat 发送心跳包到所有中继设备 +func (hs *HeartbeatService) handleHeartbeat() { + // 记录心跳开始日志 + hs.logger.Debug("开始发送心跳包") + + // 获取所有已连接的设备 + connectedDevices := hs.websocketService.GetConnectedDevices() + + // 遍历所有连接的设备并发送心跳包 + for _, deviceID := range connectedDevices { + // 发送心跳包到设备 + response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", nil, 0) + if err != nil { + hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) + // 更新设备状态为离线 + hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{ + Active: false, + }) + continue + } + + // 记录收到心跳响应 + hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response)) + + // 更新设备状态为在线 + hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{ + Active: true, + }) + } + + hs.logger.Debug("心跳包发送完成") +}