@@ -3,12 +3,16 @@ package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/panjf2000/ants/v2"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
@@ -34,6 +38,12 @@ type HeartbeatService struct {
// ticker 心跳定时器
ticker * time . Ticker
// poolSize 线程池大小
poolSize int
// pool 线程池
pool * ants . Pool
// ctx 上下文
ctx context . Context
@@ -43,12 +53,24 @@ type HeartbeatService struct {
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService ( websocketService * WebSocketService , deviceStatusPool * DeviceStatusPool , deviceRepo repository . DeviceRepo , config * config . Config ) * HeartbeatService {
interval := config . GetHeartbeatConfig ( ) . Interval
if interval <= 0 {
interval = 30 // 默认30秒心跳间隔
}
concurrency := config . GetHeartbeatConfig ( ) . Concurrency
if concurrency <= 0 {
concurrency = 10 // 默认10个并发
}
return & HeartbeatService {
websocketService : websocketService ,
deviceStatusPool : deviceStatusPool ,
deviceRepo : deviceRepo ,
logger : logs . NewLogger ( ) ,
heartbeatInterval : time . Duration ( config . GetHeartbeatI nterval( ) ) * time . Second ,
heartbeatInterval : time . Duration ( i nterval) * time . Second ,
poolSize : concurrency ,
triggerChan : make ( chan struct { } ) ,
}
}
@@ -63,14 +85,17 @@ func (hs *HeartbeatService) Start() {
hs . logger . Info ( fmt . Sprintf ( "设置心跳间隔为 %d 秒" , int ( hs . heartbeatInterval . Seconds ( ) ) ) )
hs . ticker = time . NewTicker ( hs . heartbeatInterval )
// 创建线程池
hs . pool , _ = ants . NewPool ( hs . poolSize )
// 启动心跳goroutine
go func ( ) {
for {
select {
case <- hs . ticker . C :
hs . handleHeartbeat ( )
hs . handleHeartbeatAll ( )
case <- hs . triggerChan :
hs . handleHeartbeat ( )
hs . handleHeartbeatAll ( )
case <- ctx . Done ( ) :
hs . logger . Info ( "心跳服务已停止" )
return
@@ -95,6 +120,10 @@ func (hs *HeartbeatService) Stop() {
hs . cancel ( )
}
if hs . pool != nil {
hs . pool . Release ( )
}
hs . logger . Info ( "[Heartbeat] 心跳任务停止指令已发送" )
}
@@ -114,35 +143,154 @@ func (hs *HeartbeatService) TriggerManualHeartbeatAsync() {
} ( )
}
// sendHeartbeat 发送心跳包到所有中继设备
func ( hs * HeartbeatService ) handleHeartbeat ( ) {
// sendHeartbeatAll 发送心跳包到所有中继设备
func ( hs * HeartbeatService ) handleHeartbeatAll ( ) {
// 记录心跳开始日志
hs . logger . Debug ( "开始发送心跳包" )
// 获取所有已连接的 设备
connectedDevices := hs . websocketService . GetConnectedDevices ( )
// 获取所有中继 设备
relays , err := hs . deviceRepo . ListAll ( )
if err != nil {
hs . logger . Error ( "获取设备列表失败: " + err . Error ( ) )
return
}
// 创建线程安全的临时map用于保存所有设备状态
tempStatusMap := & TempStatusMap {
data : make ( map [ string ] * DeviceStatus ) ,
mu : sync . RWMutex { } ,
}
// 遍历所有连接的设备并发送心跳包
for _ , deviceID := range connectedDevices {
// 发送心跳包到设备
response , err := hs . websocketService . SendCommandAndWait ( deviceID , "heartbeat" , nil , 0 )
if err != nil {
hs . logger . Error ( fmt . Sprintf ( "向设备 %s 发送心跳包失败: %v" , deviceID , err ) )
// 更新设备状态为离线
hs . deviceStatusPool . SetStatus ( deviceID , & DeviceStatus {
Active : false ,
} )
wg := sync . WaitGroup { }
for _ , relay := range relays {
// 心跳包之发送给中继设备
if relay . Type != model . DeviceTypeRelay {
continue
}
// 记录收到心跳响应
hs . logger . Debug ( fmt . Sprintf ( "收到来自设备 %s 的心跳响应: %+v" , deviceID , response ) )
id := fmt . Sprintf ( "%v" , relay . ID )
name := relay . Name
// 更新设备状态为在线
hs . deviceStatusPool . SetStatus ( deviceID , & DeviceStatus {
Active : true ,
wg . Add ( 1 )
err := hs . pool . Submit ( func ( ) {
defer wg . Done ( )
err := hs . handleHeartbeatWithStatus ( id , tempStatusMap )
if err != nil {
hs . logger . Error ( "[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err . Error ( ) )
}
} )
if err != nil {
hs . logger . Error ( "向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err . Error ( ) )
}
}
wg . Wait ( )
// 获取所有设备列表
allDevices , err := hs . deviceRepo . ListAll ( )
if err != nil {
hs . logger . Error ( "获取所有设备列表失败: " + err . Error ( ) )
return
}
// 补齐临时map中缺失的设备, 缺失的设备全部设为离线状态
tempStatusMap . mu . Lock ( )
for _ , device := range allDevices {
id := fmt . Sprintf ( "%v" , device . ID )
if _ , exists := tempStatusMap . data [ id ] ; ! exists {
tempStatusMap . data [ id ] = & DeviceStatus {
Active : false ,
}
}
}
tempStatusMap . mu . Unlock ( )
// 将临时状态更新到全局状态池
hs . deviceStatusPool . SetAllStatuses ( tempStatusMap . data )
hs . logger . Debug ( "心跳包发送完成" )
}
// TempStatusMap 线程安全的临时状态映射
type TempStatusMap struct {
data map [ string ] * DeviceStatus
mu sync . RWMutex
}
// SetStatus 设置设备状态
func ( tsm * TempStatusMap ) SetStatus ( deviceID string , status * DeviceStatus ) {
tsm . mu . Lock ( )
defer tsm . mu . Unlock ( )
tsm . data [ deviceID ] = status
}
// GetStatus 获取设备状态
func ( tsm * TempStatusMap ) GetStatus ( deviceID string ) ( * DeviceStatus , bool ) {
tsm . mu . RLock ( )
defer tsm . mu . RUnlock ( )
status , exists := tsm . data [ deviceID ]
return status , exists
}
// sendHeartbeat 发送心跳包到所有中继设备
func ( hs * HeartbeatService ) handleHeartbeatWithStatus ( deviceID string , tempStatusMap * TempStatusMap ) error {
// 构造带时间戳的心跳包数据
heartbeatData := map [ string ] interface { } {
"timestamp" : time . Now ( ) . Unix ( ) ,
}
// 发送心跳包到设备
response , err := hs . websocketService . SendCommandAndWait ( deviceID , "heartbeat" , heartbeatData , 0 )
if err != nil {
hs . logger . Error ( fmt . Sprintf ( "向设备 %s 发送心跳包失败: %v" , deviceID , err ) )
// 更新设备状态为离线
tempStatusMap . SetStatus ( deviceID , & DeviceStatus {
Active : false ,
} )
return err
}
// 记录收到心跳响应
hs . logger . Debug ( fmt . Sprintf ( "收到来自设备 %s 的心跳响应: %+v" , deviceID , response ) )
// 有响应中继设备就是在线
tempStatusMap . SetStatus ( deviceID , & DeviceStatus {
Active : true ,
} )
// 时间戳校验
if response . Timestamp != heartbeatData [ "timestamp" ] {
hs . logger . Error ( fmt . Sprintf ( "心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致" , response ) )
return errors . New ( "心跳响应时间戳校验失败" )
}
// 解析响应中的下级设备状态
type DeviceStatusInfo struct {
DeviceID string ` json:"device_id" `
DeviceType string ` json:"device_type" `
Status string ` json:"status" `
}
type HeartbeatResponseData struct {
Devices [ ] DeviceStatusInfo ` json:"devices" `
}
var responseData HeartbeatResponseData
if err := response . ParseData ( & responseData ) ; err != nil {
hs . logger . Error ( fmt . Sprintf ( "解析设备 %s 的心跳响应数据失败: %v" , deviceID , err ) )
return err
}
// 更新所有下级设备的状态
for _ , device := range responseData . Devices {
// 根据设备状态确定Active值
isActive := device . Status == "running" || device . Status == "online" || device . Status == "active"
tempStatusMap . SetStatus ( device . DeviceID , & DeviceStatus {
Active : isActive ,
} )
}
return nil
}