293 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			293 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package notify
 | 
						||
 | 
						||
import (
 | 
						||
	"fmt"
 | 
						||
	"strings"
 | 
						||
	"sync"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
 | 
						||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						||
	"go.uber.org/zap"
 | 
						||
)
 | 
						||
 | 
						||
// Service 定义了通知领域的核心业务逻辑接口
 | 
						||
type Service interface {
 | 
						||
	// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
 | 
						||
	SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error
 | 
						||
 | 
						||
	// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
 | 
						||
	BroadcastAlarm(content notify.AlarmContent) error
 | 
						||
 | 
						||
	// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。
 | 
						||
	SendTestMessage(userID uint, notifierType notify.NotifierType) error
 | 
						||
}
 | 
						||
 | 
						||
// failoverService 是 Service 接口的实现,提供了故障转移功能
 | 
						||
type failoverService struct {
 | 
						||
	log              *logs.Logger
 | 
						||
	userRepo         repository.UserRepository
 | 
						||
	notifiers        map[notify.NotifierType]notify.Notifier
 | 
						||
	primaryNotifier  notify.Notifier
 | 
						||
	failureThreshold int
 | 
						||
	failureCounters  *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int)
 | 
						||
	notificationRepo repository.NotificationRepository
 | 
						||
}
 | 
						||
 | 
						||
// NewFailoverService 创建一个新的故障转移通知服务
 | 
						||
func NewFailoverService(
 | 
						||
	log *logs.Logger,
 | 
						||
	userRepo repository.UserRepository,
 | 
						||
	notifiers []notify.Notifier,
 | 
						||
	primaryNotifierType notify.NotifierType,
 | 
						||
	failureThreshold int,
 | 
						||
	notificationRepo repository.NotificationRepository,
 | 
						||
) (Service, error) {
 | 
						||
	notifierMap := make(map[notify.NotifierType]notify.Notifier)
 | 
						||
	for _, n := range notifiers {
 | 
						||
		notifierMap[n.Type()] = n
 | 
						||
	}
 | 
						||
 | 
						||
	primaryNotifier, ok := notifierMap[primaryNotifierType]
 | 
						||
	if !ok {
 | 
						||
		return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType)
 | 
						||
	}
 | 
						||
 | 
						||
	return &failoverService{
 | 
						||
		log:              log,
 | 
						||
		userRepo:         userRepo,
 | 
						||
		notifiers:        notifierMap,
 | 
						||
		primaryNotifier:  primaryNotifier,
 | 
						||
		failureThreshold: failureThreshold,
 | 
						||
		failureCounters:  &sync.Map{},
 | 
						||
		notificationRepo: notificationRepo,
 | 
						||
	}, nil
 | 
						||
}
 | 
						||
 | 
						||
// SendBatchAlarm 实现了向多个用户并发发送告警的功能
 | 
						||
func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error {
 | 
						||
	var wg sync.WaitGroup
 | 
						||
	var mu sync.Mutex
 | 
						||
	var allErrors []string
 | 
						||
 | 
						||
	s.log.Infow("开始批量发送告警...", "userCount", len(userIDs))
 | 
						||
 | 
						||
	for _, userID := range userIDs {
 | 
						||
		wg.Add(1)
 | 
						||
		go func(id uint) {
 | 
						||
			defer wg.Done()
 | 
						||
			if err := s.sendAlarmToUser(id, content); err != nil {
 | 
						||
				mu.Lock()
 | 
						||
				allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err))
 | 
						||
				mu.Unlock()
 | 
						||
			}
 | 
						||
		}(userID)
 | 
						||
	}
 | 
						||
 | 
						||
	wg.Wait()
 | 
						||
 | 
						||
	if len(allErrors) > 0 {
 | 
						||
		finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n"))
 | 
						||
		s.log.Error(finalError.Error())
 | 
						||
		return finalError
 | 
						||
	}
 | 
						||
 | 
						||
	s.log.Info("批量发送告警成功完成,所有用户均已通知。")
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// BroadcastAlarm 实现了向所有用户发送告警的功能
 | 
						||
func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error {
 | 
						||
	users, err := s.userRepo.FindAll()
 | 
						||
	if err != nil {
 | 
						||
		s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err)
 | 
						||
		return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	var userIDs []uint
 | 
						||
	for _, user := range users {
 | 
						||
		userIDs = append(userIDs, user.ID)
 | 
						||
	}
 | 
						||
 | 
						||
	s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs))
 | 
						||
	// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理
 | 
						||
	return s.SendBatchAlarm(userIDs, content)
 | 
						||
}
 | 
						||
 | 
						||
// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑
 | 
						||
func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error {
 | 
						||
	user, err := s.userRepo.FindByID(userID)
 | 
						||
	if err != nil {
 | 
						||
		s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err)
 | 
						||
		return fmt.Errorf("查找用户失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	counter, _ := s.failureCounters.LoadOrStore(userID, 0)
 | 
						||
	failureCount := counter.(int)
 | 
						||
 | 
						||
	if failureCount < s.failureThreshold {
 | 
						||
		primaryType := s.primaryNotifier.Type()
 | 
						||
		addr := getAddressForNotifier(primaryType, user.Contact)
 | 
						||
		if addr == "" {
 | 
						||
			// 记录跳过通知
 | 
						||
			s.recordNotificationAttempt(userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType))
 | 
						||
			return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)
 | 
						||
		}
 | 
						||
 | 
						||
		err = s.primaryNotifier.Send(content, addr)
 | 
						||
		if err == nil {
 | 
						||
			// 记录成功通知
 | 
						||
			s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusSuccess, nil)
 | 
						||
			if failureCount > 0 {
 | 
						||
				s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType)
 | 
						||
				s.failureCounters.Store(userID, 0)
 | 
						||
			}
 | 
						||
			return nil
 | 
						||
		}
 | 
						||
 | 
						||
		// 记录失败通知
 | 
						||
		s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusFailed, err)
 | 
						||
		newFailureCount := failureCount + 1
 | 
						||
		s.failureCounters.Store(userID, newFailureCount)
 | 
						||
		s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount)
 | 
						||
		failureCount = newFailureCount
 | 
						||
	}
 | 
						||
 | 
						||
	if failureCount >= s.failureThreshold {
 | 
						||
		s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold)
 | 
						||
		var lastErr error
 | 
						||
		for _, notifier := range s.notifiers {
 | 
						||
			addr := getAddressForNotifier(notifier.Type(), user.Contact)
 | 
						||
			if addr == "" {
 | 
						||
				// 记录跳过通知
 | 
						||
				s.recordNotificationAttempt(userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type()))
 | 
						||
				continue
 | 
						||
			}
 | 
						||
			if err := notifier.Send(content, addr); err == nil {
 | 
						||
				// 记录成功通知
 | 
						||
				s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil)
 | 
						||
				s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type())
 | 
						||
				s.failureCounters.Store(userID, 0)
 | 
						||
				return nil
 | 
						||
			}
 | 
						||
			// 记录失败通知
 | 
						||
			s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err)
 | 
						||
			lastErr = err
 | 
						||
			s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err)
 | 
						||
		}
 | 
						||
		return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr)
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// SendTestMessage 实现了手动发送测试消息的功能
 | 
						||
func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error {
 | 
						||
	user, err := s.userRepo.FindByID(userID)
 | 
						||
	if err != nil {
 | 
						||
		s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err)
 | 
						||
		return fmt.Errorf("查找用户失败: %w", err)
 | 
						||
	}
 | 
						||
 | 
						||
	notifier, ok := s.notifiers[notifierType]
 | 
						||
	if !ok {
 | 
						||
		s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType)
 | 
						||
		return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType)
 | 
						||
	}
 | 
						||
 | 
						||
	addr := getAddressForNotifier(notifierType, user.Contact)
 | 
						||
	if addr == "" {
 | 
						||
		s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType)
 | 
						||
		// 记录跳过通知
 | 
						||
		s.recordNotificationAttempt(userID, notifierType, notify.AlarmContent{
 | 
						||
			Title:     "通知服务测试",
 | 
						||
			Message:   fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType),
 | 
						||
			Level:     zap.InfoLevel,
 | 
						||
			Timestamp: time.Now(),
 | 
						||
		}, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType))
 | 
						||
		return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)
 | 
						||
	}
 | 
						||
 | 
						||
	testContent := notify.AlarmContent{
 | 
						||
		Title:     "通知服务测试",
 | 
						||
		Message:   fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType),
 | 
						||
		Level:     zap.InfoLevel,
 | 
						||
		Timestamp: time.Now(),
 | 
						||
	}
 | 
						||
 | 
						||
	s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr)
 | 
						||
	err = notifier.Send(testContent, addr)
 | 
						||
	if err != nil {
 | 
						||
		s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err)
 | 
						||
		// 记录失败通知
 | 
						||
		s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusFailed, err)
 | 
						||
		return err
 | 
						||
	}
 | 
						||
 | 
						||
	s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType)
 | 
						||
	// 记录成功通知
 | 
						||
	s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil)
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址
 | 
						||
func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string {
 | 
						||
	switch notifierType {
 | 
						||
	case notify.NotifierTypeSMTP:
 | 
						||
		return contact.Email
 | 
						||
	case notify.NotifierTypeWeChat:
 | 
						||
		return contact.WeChat
 | 
						||
	case notify.NotifierTypeLark:
 | 
						||
		return contact.Feishu
 | 
						||
	case notify.NotifierTypeLog:
 | 
						||
		return "log" // LogNotifier不需要具体的地址,但为了函数签名一致性,返回一个无意义的非空字符串以绕过配置存在检查
 | 
						||
	default:
 | 
						||
		return ""
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// recordNotificationAttempt 记录一次通知发送尝试的结果
 | 
						||
// userID: 接收通知的用户ID
 | 
						||
// notifierType: 使用的通知器类型
 | 
						||
// content: 通知内容
 | 
						||
// toAddress: 实际发送到的地址
 | 
						||
// status: 发送尝试的状态 (成功、失败、跳过)
 | 
						||
// err: 如果发送失败,记录的错误信息
 | 
						||
func (s *failoverService) recordNotificationAttempt(
 | 
						||
	userID uint,
 | 
						||
	notifierType notify.NotifierType,
 | 
						||
	content notify.AlarmContent,
 | 
						||
	toAddress string,
 | 
						||
	status models.NotificationStatus,
 | 
						||
	err error,
 | 
						||
) {
 | 
						||
	errorMessage := ""
 | 
						||
	if err != nil {
 | 
						||
		errorMessage = err.Error()
 | 
						||
	}
 | 
						||
 | 
						||
	notification := &models.Notification{
 | 
						||
		NotifierType:   notifierType,
 | 
						||
		UserID:         userID,
 | 
						||
		Title:          content.Title,
 | 
						||
		Message:        content.Message,
 | 
						||
		Level:          models.LogLevel(content.Level),
 | 
						||
		AlarmTimestamp: content.Timestamp,
 | 
						||
		ToAddress:      toAddress,
 | 
						||
		Status:         status,
 | 
						||
		ErrorMessage:   errorMessage,
 | 
						||
	}
 | 
						||
 | 
						||
	if saveErr := s.notificationRepo.Create(notification); saveErr != nil {
 | 
						||
		s.log.Errorw("无法保存通知发送记录到数据库",
 | 
						||
			"userID", userID,
 | 
						||
			"notifierType", notifierType,
 | 
						||
			"status", status,
 | 
						||
			"originalError", errorMessage,
 | 
						||
			"saveError", saveErr,
 | 
						||
		)
 | 
						||
	}
 | 
						||
}
 |