Files
pig-farm-controller/internal/service/heartbeat.go

298 lines
7.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package service 提供各种业务服务功能
package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/panjf2000/ants/v2"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
type HeartbeatService struct {
// websocketManager WebSocket管理器
websocketManager *websocket.Manager
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// logger 日志记录器
logger *logs.Logger
// 心跳间隔
heartbeatInterval time.Duration
// 手动心跳触发器
triggerChan chan struct{}
// ticker 心跳定时器
ticker *time.Ticker
// poolSize 线程池大小
poolSize int
// pool 线程池
pool *ants.Pool
// ctx 上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
}
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
interval := config.GetHeartbeatConfig().Interval
if interval <= 0 {
interval = 30 // 默认30秒心跳间隔
}
concurrency := config.GetHeartbeatConfig().Concurrency
if concurrency <= 0 {
concurrency = 10 // 默认10个并发
}
return &HeartbeatService{
websocketManager: websocketManager,
deviceStatusPool: deviceStatusPool,
deviceRepo: deviceRepo,
logger: logs.NewLogger(),
heartbeatInterval: time.Duration(interval) * time.Second,
poolSize: concurrency,
triggerChan: make(chan struct{}),
}
}
// Start 启动心跳服务
func (hs *HeartbeatService) Start() {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
hs.cancel = cancel
// 创建定时器
hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds())))
hs.ticker = time.NewTicker(hs.heartbeatInterval)
// 创建线程池
hs.pool, _ = ants.NewPool(hs.poolSize)
// 启动心跳goroutine
go func() {
for {
select {
case <-hs.ticker.C:
hs.handleHeartbeatAll()
case <-hs.triggerChan:
hs.handleHeartbeatAll()
case <-ctx.Done():
hs.logger.Info("心跳服务已停止")
return
}
}
}()
hs.logger.Info("心跳服务已启动")
}
// Stop 停止心跳服务
func (hs *HeartbeatService) Stop() {
if hs == nil {
return
}
if hs.ticker != nil {
hs.ticker.Stop()
}
if hs.cancel != nil {
hs.cancel()
}
if hs.pool != nil {
hs.pool.Release()
}
hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送")
}
// TriggerManualHeartbeat 手动触发心跳检测
func (hs *HeartbeatService) TriggerManualHeartbeat() {
hs.logger.Info("收到手动触发心跳检测请求")
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}
// TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果
func (hs *HeartbeatService) TriggerManualHeartbeatAsync() {
hs.logger.Info("收到手动触发异步心跳检测请求")
go func() {
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}()
}
// sendHeartbeatAll 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatAll() {
// 记录心跳开始日志
hs.logger.Debug("开始发送心跳包")
// 获取所有中继设备
relays, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取设备列表失败: " + err.Error())
return
}
// 创建线程安全的临时map用于保存所有设备状态
tempStatusMap := &TempStatusMap{
data: make(map[string]*DeviceStatus),
mu: sync.RWMutex{},
}
// 遍历所有连接的设备并发送心跳包
wg := sync.WaitGroup{}
for _, relay := range relays {
// 心跳包之发送给中继设备
if relay.Type != model.DeviceTypeRelay {
continue
}
id := fmt.Sprintf("%v", relay.ID)
name := relay.Name
wg.Add(1)
err := hs.pool.Submit(func() {
defer wg.Done()
err := hs.handleHeartbeatWithStatus(id, tempStatusMap)
if err != nil {
hs.logger.Error("[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err.Error())
}
})
if err != nil {
hs.logger.Error("向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err.Error())
}
}
wg.Wait()
// 获取所有设备列表
allDevices, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取所有设备列表失败: " + err.Error())
return
}
// 补齐临时map中缺失的设备缺失的设备全部设为离线状态
tempStatusMap.mu.Lock()
for _, device := range allDevices {
id := fmt.Sprintf("%v", device.ID)
if _, exists := tempStatusMap.data[id]; !exists {
tempStatusMap.data[id] = &DeviceStatus{
Active: false,
}
}
}
tempStatusMap.mu.Unlock()
// 将临时状态更新到全局状态池
hs.deviceStatusPool.SetAllStatuses(tempStatusMap.data)
hs.logger.Debug("心跳包发送完成")
}
// TempStatusMap 线程安全的临时状态映射
type TempStatusMap struct {
data map[string]*DeviceStatus
mu sync.RWMutex
}
// SetStatus 设置设备状态
func (tsm *TempStatusMap) SetStatus(deviceID string, status *DeviceStatus) {
tsm.mu.Lock()
defer tsm.mu.Unlock()
tsm.data[deviceID] = status
}
// GetStatus 获取设备状态
func (tsm *TempStatusMap) GetStatus(deviceID string) (*DeviceStatus, bool) {
tsm.mu.RLock()
defer tsm.mu.RUnlock()
status, exists := tsm.data[deviceID]
return status, exists
}
// sendHeartbeat 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatusMap *TempStatusMap) error {
// 构造带时间戳的心跳包数据
heartbeatData := map[string]interface{}{
"timestamp": time.Now().Unix(),
}
// 发送心跳包到设备
response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0)
if err != nil {
hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err))
// 更新设备状态为离线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: false,
})
return err
}
// 记录收到心跳响应
hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response))
// 有响应中继设备就是在线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: true,
})
// 时间戳校验
if response.Timestamp.Unix() != heartbeatData["timestamp"] {
hs.logger.Error(fmt.Sprintf("心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致", response))
return errors.New("心跳响应时间戳校验失败")
}
// 解析响应中的下级设备状态
type DeviceStatusInfo struct {
DeviceID string `json:"device_id"`
DeviceType string `json:"device_type"`
Status string `json:"status"`
}
type HeartbeatResponseData struct {
Devices []DeviceStatusInfo `json:"devices"`
}
var responseData HeartbeatResponseData
if err := response.ParseData(&responseData); err != nil {
hs.logger.Error(fmt.Sprintf("解析设备 %s 的心跳响应数据失败: %v", deviceID, err))
return err
}
// 更新所有下级设备的状态
for _, device := range responseData.Devices {
// 根据设备状态确定Active值
isActive := device.Status == "running" || device.Status == "online" || device.Status == "active"
tempStatusMap.SetStatus(device.DeviceID, &DeviceStatus{
Active: isActive,
})
}
return nil
}