增加心跳

This commit is contained in:
2025-09-09 10:51:36 +08:00
parent 8b22514aad
commit fc657d7448
6 changed files with 211 additions and 8 deletions

View File

@@ -25,4 +25,9 @@ database:
# WebSocket配置
websocket:
# WebSocket请求超时时间(秒)
timeout: 5
timeout: 5
# 心跳配置
heartbeat:
# 心跳间隔(秒)
interval: 30

View File

@@ -56,6 +56,9 @@ type API struct {
// websocketService WebSocket服务
websocketService *service.WebSocketService
// heartbeatService 心跳服务
heartbeatService *service.HeartbeatService
// deviceStatusPool 设备状态池
deviceStatusPool *service.DeviceStatusPool
@@ -65,7 +68,7 @@ type API struct {
// NewAPI 创建并返回一个新的API实例
// 初始化Gin引擎和相关配置
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *API {
func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API {
// 设置Gin为发布模式
gin.SetMode(gin.DebugMode)
@@ -96,7 +99,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
operationController := operation.NewController(operationHistoryRepo)
// 创建设备控制控制器
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, deviceStatusPool)
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool)
// 创建WebSocket管理器
websocketManager := websocket.NewManager(websocketService, deviceRepo)
@@ -117,6 +120,7 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe
authMiddleware: authMiddleware,
websocketManager: websocketManager,
websocketService: websocketService,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}

View File

@@ -20,6 +20,9 @@ type Config struct {
// WebSocket WebSocket配置
WebSocket WebSocketConfig `yaml:"websocket"`
// Heartbeat 心跳配置
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
}
// ServerConfig 代表服务器配置
@@ -76,12 +79,21 @@ type WebSocketConfig struct {
Timeout int `yaml:"timeout"`
}
// HeartbeatConfig 代表心跳配置
type HeartbeatConfig struct {
// Interval 心跳间隔(秒)
Interval int `yaml:"interval"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
return &Config{
WebSocket: WebSocketConfig{
Timeout: 5, // 默认5秒超时
},
Heartbeat: HeartbeatConfig{
Interval: 30, // 默认30秒心跳间隔
},
}
}
@@ -122,3 +134,11 @@ func (c *Config) GetWebSocketTimeout() int {
}
return c.WebSocket.Timeout
}
// GetHeartbeatInterval 获取心跳间隔(秒)
func (c *Config) GetHeartbeatInterval() int {
if c.Heartbeat.Interval <= 0 {
return 30 // 默认30秒心跳间隔
}
return c.Heartbeat.Interval
}

View File

@@ -122,16 +122,18 @@ type Controller struct {
deviceControlRepo repository.DeviceControlRepo
deviceRepo repository.DeviceRepo
websocketService *service.WebSocketService
heartbeatService *service.HeartbeatService
deviceStatusPool *service.DeviceStatusPool
logger *logs.Logger
}
// NewController 创建设备控制控制器实例
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, deviceStatusPool *service.DeviceStatusPool) *Controller {
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller {
return &Controller{
deviceControlRepo: deviceControlRepo,
deviceRepo: deviceRepo,
websocketService: websocketService,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}
@@ -199,6 +201,9 @@ func (c *Controller) Create(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "创建设备成功", device)
}
@@ -253,9 +258,6 @@ func (c *Controller) Update(ctx *gin.Context) {
req.BusNumber != nil && req.DeviceAddress != nil {
device.Set485Address(*req.BusNumber, *req.DeviceAddress)
}
// TODO: 设备状态应该由系统自动获取,而不是由用户指定
// 这里保持设备原有状态,后续需要实现自动状态检测
// 设备状态现在只在内存中维护,不持久化到数据库
if err := c.deviceRepo.Update(device); err != nil {
c.logger.Error("更新设备失败: " + err.Error())
@@ -263,6 +265,9 @@ func (c *Controller) Update(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "更新设备成功", device)
}
@@ -291,6 +296,9 @@ func (c *Controller) Delete(ctx *gin.Context) {
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "删除设备成功", nil)
}
@@ -397,6 +405,9 @@ func (c *Controller) Switch(ctx *gin.Context) {
Message: message,
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "设备控制成功", data)
}
@@ -474,6 +485,8 @@ func (c *Controller) GetDeviceStatus(ctx *gin.Context) {
return
}
// TODO 需要刷新设备状态吗? 刷新的话这个接口可能会很慢
// 从设备状态池中获取设备状态
status, exists := c.deviceStatusPool.GetStatus(deviceID)
if !exists {

View File

@@ -45,6 +45,9 @@ type Application struct {
// DeviceStatusPool 设备状态池实例
DeviceStatusPool *service.DeviceStatusPool
// HeartbeatService 心跳服务实例
HeartbeatService *service.HeartbeatService
// Config 应用配置
Config *config.Config
@@ -103,8 +106,11 @@ func (app *Application) Start() error {
// 设置WebSocket超时时间
app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout())
// 初始化心跳服务
app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config)
// 初始化API组件
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.DeviceStatusPool)
app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool)
// 初始化任务执行器组件(使用5个工作协程)
app.TaskExecutor = task.NewExecutor(5)
@@ -119,6 +125,9 @@ func (app *Application) Start() error {
app.logger.Info("启动任务执行器")
app.TaskExecutor.Start()
// 启动心跳服务
app.logger.Info("启动心跳服务")
app.HeartbeatService.Start()
return nil
}
@@ -134,6 +143,10 @@ func (app *Application) Stop() error {
app.logger.Info("停止任务执行器")
app.TaskExecutor.Stop()
// 停止心跳服务
app.logger.Info("停止心跳服务")
app.HeartbeatService.Stop()
// 停止存储组件
if err := app.Storage.Disconnect(); err != nil {
return fmt.Errorf("存储断开连接失败: %v", err)

View File

@@ -0,0 +1,148 @@
// Package service 提供各种业务服务功能
package service
import (
"context"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
type HeartbeatService struct {
// websocketService WebSocket服务
websocketService *WebSocketService
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// logger 日志记录器
logger *logs.Logger
// 心跳间隔
heartbeatInterval time.Duration
// 手动心跳触发器
triggerChan chan struct{}
// ticker 心跳定时器
ticker *time.Ticker
// ctx 上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
}
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
return &HeartbeatService{
websocketService: websocketService,
deviceStatusPool: deviceStatusPool,
deviceRepo: deviceRepo,
logger: logs.NewLogger(),
heartbeatInterval: time.Duration(config.GetHeartbeatInterval()) * time.Second,
triggerChan: make(chan struct{}),
}
}
// Start 启动心跳服务
func (hs *HeartbeatService) Start() {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
hs.cancel = cancel
// 创建定时器
hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds())))
hs.ticker = time.NewTicker(hs.heartbeatInterval)
// 启动心跳goroutine
go func() {
for {
select {
case <-hs.ticker.C:
hs.handleHeartbeat()
case <-hs.triggerChan:
hs.handleHeartbeat()
case <-ctx.Done():
hs.logger.Info("心跳服务已停止")
return
}
}
}()
hs.logger.Info("心跳服务已启动")
}
// Stop 停止心跳服务
func (hs *HeartbeatService) Stop() {
if hs == nil {
return
}
if hs.ticker != nil {
hs.ticker.Stop()
}
if hs.cancel != nil {
hs.cancel()
}
hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送")
}
// TriggerManualHeartbeat 手动触发心跳检测
func (hs *HeartbeatService) TriggerManualHeartbeat() {
hs.logger.Info("收到手动触发心跳检测请求")
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}
// TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果
func (hs *HeartbeatService) TriggerManualHeartbeatAsync() {
hs.logger.Info("收到手动触发异步心跳检测请求")
go func() {
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}()
}
// sendHeartbeat 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeat() {
// 记录心跳开始日志
hs.logger.Debug("开始发送心跳包")
// 获取所有已连接的设备
connectedDevices := hs.websocketService.GetConnectedDevices()
// 遍历所有连接的设备并发送心跳包
for _, deviceID := range connectedDevices {
// 发送心跳包到设备
response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", nil, 0)
if err != nil {
hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err))
// 更新设备状态为离线
hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{
Active: false,
})
continue
}
// 记录收到心跳响应
hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response))
// 更新设备状态为在线
hs.deviceStatusPool.SetStatus(deviceID, &DeviceStatus{
Active: true,
})
}
hs.logger.Debug("心跳包发送完成")
}