// Package service 提供各种业务服务功能 package service import ( "context" "errors" "fmt" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/config" "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/panjf2000/ants/v2" ) // HeartbeatService 心跳服务,负责管理设备的心跳检测 type HeartbeatService struct { // websocketManager WebSocket管理器 websocketManager *websocket.Manager // deviceStatusPool 设备状态池 deviceStatusPool *DeviceStatusPool // deviceRepo 设备仓库 deviceRepo repository.DeviceRepo // logger 日志记录器 logger *logs.Logger // 心跳间隔 heartbeatInterval time.Duration // 手动心跳触发器 triggerChan chan struct{} // ticker 心跳定时器 ticker *time.Ticker // poolSize 线程池大小 poolSize int // pool 线程池 pool *ants.Pool // ctx 上下文 ctx context.Context // cancel 取消函数 cancel context.CancelFunc } // NewHeartbeatService 创建心跳服务实例 func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { interval := config.GetHeartbeatConfig().Interval if interval <= 0 { interval = 30 // 默认30秒心跳间隔 } concurrency := config.GetHeartbeatConfig().Concurrency if concurrency <= 0 { concurrency = 10 // 默认10个并发 } return &HeartbeatService{ websocketManager: websocketManager, deviceStatusPool: deviceStatusPool, deviceRepo: deviceRepo, logger: logs.NewLogger(), heartbeatInterval: time.Duration(interval) * time.Second, poolSize: concurrency, triggerChan: make(chan struct{}), } } // Start 启动心跳服务 func (hs *HeartbeatService) Start() { // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) hs.cancel = cancel // 创建定时器 hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds()))) hs.ticker = time.NewTicker(hs.heartbeatInterval) // 创建线程池 hs.pool, _ = ants.NewPool(hs.poolSize) // 启动心跳goroutine go func() { for { select { case <-hs.ticker.C: hs.handleHeartbeatAll() case <-hs.triggerChan: hs.handleHeartbeatAll() case <-ctx.Done(): hs.logger.Info("心跳服务已停止") return } } }() hs.logger.Info("心跳服务已启动") } // Stop 停止心跳服务 func (hs *HeartbeatService) Stop() { if hs == nil { return } if hs.ticker != nil { hs.ticker.Stop() } if hs.cancel != nil { hs.cancel() } if hs.pool != nil { hs.pool.Release() } hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送") } // TriggerManualHeartbeat 手动触发心跳检测 func (hs *HeartbeatService) TriggerManualHeartbeat() { hs.logger.Info("收到手动触发心跳检测请求") hs.triggerChan <- struct{}{} hs.logger.Info("手动心跳检测完成") } // TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果 func (hs *HeartbeatService) TriggerManualHeartbeatAsync() { hs.logger.Info("收到手动触发异步心跳检测请求") go func() { hs.triggerChan <- struct{}{} hs.logger.Info("手动心跳检测完成") }() } // sendHeartbeatAll 发送心跳包到所有中继设备 func (hs *HeartbeatService) handleHeartbeatAll() { // 记录心跳开始日志 hs.logger.Debug("开始发送心跳包") // 获取所有中继设备 relays, err := hs.deviceRepo.ListAll() if err != nil { hs.logger.Error("获取设备列表失败: " + err.Error()) return } // 创建线程安全的临时map用于保存所有设备状态 tempStatusMap := &TempStatusMap{ data: make(map[string]*DeviceStatus), mu: sync.RWMutex{}, } // 遍历所有连接的设备并发送心跳包 wg := sync.WaitGroup{} for _, relay := range relays { // 心跳包之发送给中继设备 if relay.Type != model.DeviceTypeRelay { continue } id := fmt.Sprintf("%v", relay.ID) name := relay.Name wg.Add(1) err := hs.pool.Submit(func() { defer wg.Done() err := hs.handleHeartbeatWithStatus(id, tempStatusMap) if err != nil { hs.logger.Error("[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err.Error()) } }) if err != nil { hs.logger.Error("向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err.Error()) } } wg.Wait() // 获取所有设备列表 allDevices, err := hs.deviceRepo.ListAll() if err != nil { hs.logger.Error("获取所有设备列表失败: " + err.Error()) return } // 补齐临时map中缺失的设备,缺失的设备全部设为离线状态 tempStatusMap.mu.Lock() for _, device := range allDevices { id := fmt.Sprintf("%v", device.ID) if _, exists := tempStatusMap.data[id]; !exists { tempStatusMap.data[id] = &DeviceStatus{ Active: false, } } } tempStatusMap.mu.Unlock() // 将临时状态更新到全局状态池 hs.deviceStatusPool.SetAllStatuses(tempStatusMap.data) hs.logger.Debug("心跳包发送完成") } // TempStatusMap 线程安全的临时状态映射 type TempStatusMap struct { data map[string]*DeviceStatus mu sync.RWMutex } // SetStatus 设置设备状态 func (tsm *TempStatusMap) SetStatus(deviceID string, status *DeviceStatus) { tsm.mu.Lock() defer tsm.mu.Unlock() tsm.data[deviceID] = status } // GetStatus 获取设备状态 func (tsm *TempStatusMap) GetStatus(deviceID string) (*DeviceStatus, bool) { tsm.mu.RLock() defer tsm.mu.RUnlock() status, exists := tsm.data[deviceID] return status, exists } // sendHeartbeat 发送心跳包到所有中继设备 func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatusMap *TempStatusMap) error { // 构造带时间戳的心跳包数据 heartbeatData := map[string]interface{}{ "timestamp": time.Now().Unix(), } // 发送心跳包到设备 response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) if err != nil { hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) // 更新设备状态为离线 tempStatusMap.SetStatus(deviceID, &DeviceStatus{ Active: false, }) return err } // 记录收到心跳响应 hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response)) // 有响应中继设备就是在线 tempStatusMap.SetStatus(deviceID, &DeviceStatus{ Active: true, }) // 时间戳校验 if response.Timestamp.Unix() != heartbeatData["timestamp"] { hs.logger.Error(fmt.Sprintf("心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致", response)) return errors.New("心跳响应时间戳校验失败") } // 解析响应中的下级设备状态 type DeviceStatusInfo struct { DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` Status string `json:"status"` } type HeartbeatResponseData struct { Devices []DeviceStatusInfo `json:"devices"` } var responseData HeartbeatResponseData if err := response.ParseData(&responseData); err != nil { hs.logger.Error(fmt.Sprintf("解析设备 %s 的心跳响应数据失败: %v", deviceID, err)) return err } // 更新所有下级设备的状态 for _, device := range responseData.Devices { // 根据设备状态确定Active值 isActive := device.Status == "running" || device.Status == "online" || device.Status == "active" tempStatusMap.SetStatus(device.DeviceID, &DeviceStatus{ Active: isActive, }) } return nil }