package notify import ( "fmt" "strings" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "go.uber.org/zap" ) // Service 定义了通知领域的核心业务逻辑接口 type Service interface { // SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error // BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 BroadcastAlarm(content notify.AlarmContent) error // SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。 SendTestMessage(userID uint, notifierType notify.NotifierType) error } // failoverService 是 Service 接口的实现,提供了故障转移功能 type failoverService struct { log *logs.Logger userRepo repository.UserRepository notifiers map[notify.NotifierType]notify.Notifier primaryNotifier notify.Notifier failureThreshold int failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int) notificationRepo repository.NotificationRepository } // NewFailoverService 创建一个新的故障转移通知服务 func NewFailoverService( log *logs.Logger, userRepo repository.UserRepository, notifiers []notify.Notifier, primaryNotifierType notify.NotifierType, failureThreshold int, notificationRepo repository.NotificationRepository, ) (Service, error) { notifierMap := make(map[notify.NotifierType]notify.Notifier) for _, n := range notifiers { notifierMap[n.Type()] = n } primaryNotifier, ok := notifierMap[primaryNotifierType] if !ok { return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType) } return &failoverService{ log: log, userRepo: userRepo, notifiers: notifierMap, primaryNotifier: primaryNotifier, failureThreshold: failureThreshold, failureCounters: &sync.Map{}, notificationRepo: notificationRepo, }, nil } // SendBatchAlarm 实现了向多个用户并发发送告警的功能 func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error { var wg sync.WaitGroup var mu sync.Mutex var allErrors []string s.log.Infow("开始批量发送告警...", "userCount", len(userIDs)) for _, userID := range userIDs { wg.Add(1) go func(id uint) { defer wg.Done() if err := s.sendAlarmToUser(id, content); err != nil { mu.Lock() allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err)) mu.Unlock() } }(userID) } wg.Wait() if len(allErrors) > 0 { finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n")) s.log.Error(finalError.Error()) return finalError } s.log.Info("批量发送告警成功完成,所有用户均已通知。") return nil } // BroadcastAlarm 实现了向所有用户发送告警的功能 func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error { users, err := s.userRepo.FindAll() if err != nil { s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err) return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err) } var userIDs []uint for _, user := range users { userIDs = append(userIDs, user.ID) } s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs)) // 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理 return s.SendBatchAlarm(userIDs, content) } // sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑 func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error { user, err := s.userRepo.FindByID(userID) if err != nil { s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err) return fmt.Errorf("查找用户失败: %w", err) } counter, _ := s.failureCounters.LoadOrStore(userID, 0) failureCount := counter.(int) if failureCount < s.failureThreshold { primaryType := s.primaryNotifier.Type() addr := getAddressForNotifier(primaryType, user.Contact) if addr == "" { // 记录跳过通知 s.recordNotificationAttempt(userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)) return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType) } err = s.primaryNotifier.Send(content, addr) if err == nil { // 记录成功通知 s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusSuccess, nil) if failureCount > 0 { s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType) s.failureCounters.Store(userID, 0) } return nil } // 记录失败通知 s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusFailed, err) newFailureCount := failureCount + 1 s.failureCounters.Store(userID, newFailureCount) s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount) failureCount = newFailureCount } if failureCount >= s.failureThreshold { s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold) var lastErr error for _, notifier := range s.notifiers { addr := getAddressForNotifier(notifier.Type(), user.Contact) if addr == "" { // 记录跳过通知 s.recordNotificationAttempt(userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type())) continue } if err := notifier.Send(content, addr); err == nil { // 记录成功通知 s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil) s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type()) s.failureCounters.Store(userID, 0) return nil } // 记录失败通知 s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err) lastErr = err s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err) } return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr) } return nil } // SendTestMessage 实现了手动发送测试消息的功能 func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error { user, err := s.userRepo.FindByID(userID) if err != nil { s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err) return fmt.Errorf("查找用户失败: %w", err) } notifier, ok := s.notifiers[notifierType] if !ok { s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType) return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType) } addr := getAddressForNotifier(notifierType, user.Contact) if addr == "" { s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType) // 记录跳过通知 s.recordNotificationAttempt(userID, notifierType, notify.AlarmContent{ Title: "通知服务测试", Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType), Level: zap.InfoLevel, Timestamp: time.Now(), }, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)) return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType) } testContent := notify.AlarmContent{ Title: "通知服务测试", Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType), Level: zap.InfoLevel, Timestamp: time.Now(), } s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr) err = notifier.Send(testContent, addr) if err != nil { s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err) // 记录失败通知 s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusFailed, err) return err } s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType) // 记录成功通知 s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil) return nil } // getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址 func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string { switch notifierType { case notify.NotifierTypeSMTP: return contact.Email case notify.NotifierTypeWeChat: return contact.WeChat case notify.NotifierTypeLark: return contact.Feishu case notify.NotifierTypeLog: return "log" // LogNotifier不需要具体的地址,但为了函数签名一致性,返回一个无意义的非空字符串以绕过配置存在检查 default: return "" } } // recordNotificationAttempt 记录一次通知发送尝试的结果 // userID: 接收通知的用户ID // notifierType: 使用的通知器类型 // content: 通知内容 // toAddress: 实际发送到的地址 // status: 发送尝试的状态 (成功、失败、跳过) // err: 如果发送失败,记录的错误信息 func (s *failoverService) recordNotificationAttempt( userID uint, notifierType notify.NotifierType, content notify.AlarmContent, toAddress string, status models.NotificationStatus, err error, ) { errorMessage := "" if err != nil { errorMessage = err.Error() } notification := &models.Notification{ NotifierType: notifierType, UserID: userID, Title: content.Title, Message: content.Message, Level: content.Level, AlarmTimestamp: content.Timestamp, ToAddress: toAddress, Status: status, ErrorMessage: errorMessage, } if saveErr := s.notificationRepo.Create(notification); saveErr != nil { s.log.Errorw("无法保存通知发送记录到数据库", "userID", userID, "notifierType", notifierType, "status", status, "originalError", errorMessage, "saveError", saveErr, ) } }