实现飞书/微信/邮件发送通知
This commit is contained in:
		
							
								
								
									
										221
									
								
								internal/domain/notify/notify.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										221
									
								
								internal/domain/notify/notify.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,221 @@ | ||||
| package notify | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" | ||||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
|  | ||||
| // Service 定义了通知领域的核心业务逻辑接口 | ||||
| type Service interface { | ||||
| 	// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 | ||||
| 	SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error | ||||
|  | ||||
| 	// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 | ||||
| 	BroadcastAlarm(content notify.AlarmContent) error | ||||
|  | ||||
| 	// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。 | ||||
| 	SendTestMessage(userID uint, notifierType notify.NotifierType) error | ||||
| } | ||||
|  | ||||
| // failoverService 是 Service 接口的实现,提供了故障转移功能 | ||||
| type failoverService struct { | ||||
| 	log              *logs.Logger | ||||
| 	userRepo         repository.UserRepository | ||||
| 	notifiers        map[notify.NotifierType]notify.Notifier | ||||
| 	primaryNotifier  notify.Notifier | ||||
| 	failureThreshold int | ||||
| 	failureCounters  *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int) | ||||
| } | ||||
|  | ||||
| // NewFailoverService 创建一个新的故障转移通知服务 | ||||
| func NewFailoverService( | ||||
| 	log *logs.Logger, | ||||
| 	userRepo repository.UserRepository, | ||||
| 	notifiers []notify.Notifier, | ||||
| 	primaryNotifierType notify.NotifierType, | ||||
| 	failureThreshold int, | ||||
| ) (Service, error) { | ||||
| 	notifierMap := make(map[notify.NotifierType]notify.Notifier) | ||||
| 	for _, n := range notifiers { | ||||
| 		notifierMap[n.Type()] = n | ||||
| 	} | ||||
|  | ||||
| 	primaryNotifier, ok := notifierMap[primaryNotifierType] | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType) | ||||
| 	} | ||||
|  | ||||
| 	return &failoverService{ | ||||
| 		log:              log, | ||||
| 		userRepo:         userRepo, | ||||
| 		notifiers:        notifierMap, | ||||
| 		primaryNotifier:  primaryNotifier, | ||||
| 		failureThreshold: failureThreshold, | ||||
| 		failureCounters:  &sync.Map{}, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // SendBatchAlarm 实现了向多个用户并发发送告警的功能 | ||||
| func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error { | ||||
| 	var wg sync.WaitGroup | ||||
| 	var mu sync.Mutex | ||||
| 	var allErrors []string | ||||
|  | ||||
| 	s.log.Infow("开始批量发送告警...", "userCount", len(userIDs)) | ||||
|  | ||||
| 	for _, userID := range userIDs { | ||||
| 		wg.Add(1) | ||||
| 		go func(id uint) { | ||||
| 			defer wg.Done() | ||||
| 			if err := s.sendAlarmToUser(id, content); err != nil { | ||||
| 				mu.Lock() | ||||
| 				allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err)) | ||||
| 				mu.Unlock() | ||||
| 			} | ||||
| 		}(userID) | ||||
| 	} | ||||
|  | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	if len(allErrors) > 0 { | ||||
| 		finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n")) | ||||
| 		s.log.Error(finalError.Error()) | ||||
| 		return finalError | ||||
| 	} | ||||
|  | ||||
| 	s.log.Info("批量发送告警成功完成,所有用户均已通知。") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // BroadcastAlarm 实现了向所有用户发送告警的功能 | ||||
| func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error { | ||||
| 	users, err := s.userRepo.FindAll() | ||||
| 	if err != nil { | ||||
| 		s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err) | ||||
| 		return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	var userIDs []uint | ||||
| 	for _, user := range users { | ||||
| 		userIDs = append(userIDs, user.ID) | ||||
| 	} | ||||
|  | ||||
| 	s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs)) | ||||
| 	// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理 | ||||
| 	return s.SendBatchAlarm(userIDs, content) | ||||
| } | ||||
|  | ||||
| // sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑 | ||||
| func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error { | ||||
| 	user, err := s.userRepo.FindByID(userID) | ||||
| 	if err != nil { | ||||
| 		s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err) | ||||
| 		return fmt.Errorf("查找用户失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	counter, _ := s.failureCounters.LoadOrStore(userID, 0) | ||||
| 	failureCount := counter.(int) | ||||
|  | ||||
| 	if failureCount < s.failureThreshold { | ||||
| 		primaryType := s.primaryNotifier.Type() | ||||
| 		addr := getAddressForNotifier(primaryType, user.Contact) | ||||
| 		if addr == "" { | ||||
| 			return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType) | ||||
| 		} | ||||
|  | ||||
| 		err = s.primaryNotifier.Send(content, addr) | ||||
| 		if err == nil { | ||||
| 			if failureCount > 0 { | ||||
| 				s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType) | ||||
| 				s.failureCounters.Store(userID, 0) | ||||
| 			} | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		newFailureCount := failureCount + 1 | ||||
| 		s.failureCounters.Store(userID, newFailureCount) | ||||
| 		s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount) | ||||
| 		failureCount = newFailureCount | ||||
| 	} | ||||
|  | ||||
| 	if failureCount >= s.failureThreshold { | ||||
| 		s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold) | ||||
| 		var lastErr error | ||||
| 		for _, notifier := range s.notifiers { | ||||
| 			addr := getAddressForNotifier(notifier.Type(), user.Contact) | ||||
| 			if addr == "" { | ||||
| 				continue | ||||
| 			} | ||||
| 			if err := notifier.Send(content, addr); err == nil { | ||||
| 				s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type()) | ||||
| 				s.failureCounters.Store(userID, 0) | ||||
| 				return nil | ||||
| 			} | ||||
| 			lastErr = err | ||||
| 			s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err) | ||||
| 		} | ||||
| 		return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // SendTestMessage 实现了手动发送测试消息的功能 | ||||
| func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error { | ||||
| 	user, err := s.userRepo.FindByID(userID) | ||||
| 	if err != nil { | ||||
| 		s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err) | ||||
| 		return fmt.Errorf("查找用户失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	notifier, ok := s.notifiers[notifierType] | ||||
| 	if !ok { | ||||
| 		s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType) | ||||
| 		return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType) | ||||
| 	} | ||||
|  | ||||
| 	addr := getAddressForNotifier(notifierType, user.Contact) | ||||
| 	if addr == "" { | ||||
| 		s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType) | ||||
| 		return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType) | ||||
| 	} | ||||
|  | ||||
| 	testContent := notify.AlarmContent{ | ||||
| 		Title:     "通知服务测试", | ||||
| 		Message:   fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType), | ||||
| 		Level:     zap.InfoLevel, | ||||
| 		Timestamp: time.Now(), | ||||
| 	} | ||||
|  | ||||
| 	s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr) | ||||
| 	err = notifier.Send(testContent, addr) | ||||
| 	if err != nil { | ||||
| 		s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址 | ||||
| func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string { | ||||
| 	switch notifierType { | ||||
| 	case notify.NotifierTypeSMTP: | ||||
| 		return contact.Email | ||||
| 	case notify.NotifierTypeWeChat: | ||||
| 		return contact.WeChat | ||||
| 	case notify.NotifierTypeLark: | ||||
| 		return contact.Feishu | ||||
| 	default: | ||||
| 		return "" | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										194
									
								
								internal/infra/notify/lark.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										194
									
								
								internal/infra/notify/lark.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,194 @@ | ||||
| package notify | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	// 飞书获取 tenant_access_token 的 API 地址 | ||||
| 	larkGetTokenURL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" | ||||
| 	// 飞书发送消息的 API 地址 | ||||
| 	larkSendMessageURL = "https://open.feishu.cn/open-apis/im/v1/messages" | ||||
| ) | ||||
|  | ||||
| // larkNotifier 实现了 Notifier 接口,用于通过飞书自建应用发送私聊消息。 | ||||
| type larkNotifier struct { | ||||
| 	appID     string // 应用 ID | ||||
| 	appSecret string // 应用密钥 | ||||
|  | ||||
| 	// 用于线程安全地管理 tenant_access_token | ||||
| 	mu             sync.Mutex | ||||
| 	accessToken    string | ||||
| 	tokenExpiresAt time.Time | ||||
| } | ||||
|  | ||||
| // NewLarkNotifier 创建一个新的 larkNotifier 实例。 | ||||
| // 调用者需要注入飞书应用的 AppID 和 AppSecret。 | ||||
| func NewLarkNotifier(appID, appSecret string) Notifier { | ||||
| 	return &larkNotifier{ | ||||
| 		appID:     appID, | ||||
| 		appSecret: appSecret, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Send 向指定用户发送一条飞书消息卡片。 | ||||
| // toAddr 参数是接收者的邮箱地址。 | ||||
| func (l *larkNotifier) Send(content AlarmContent, toAddr string) error { | ||||
| 	// 1. 获取有效的 tenant_access_token | ||||
| 	token, err := l.getAccessToken() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 2. 构建消息卡片 JSON | ||||
| 	// 飞书消息卡片结构复杂,这里构建一个简单的 Markdown 文本卡片 | ||||
| 	cardContent := map[string]interface{}{ | ||||
| 		"config": map[string]bool{ | ||||
| 			"wide_screen_mode": true, | ||||
| 		}, | ||||
| 		"elements": []map[string]interface{}{ | ||||
| 			{ | ||||
| 				"tag": "div", | ||||
| 				"text": map[string]string{ | ||||
| 					"tag": "lark_md", | ||||
| 					"content": fmt.Sprintf("## %s\n**级别**: %s\n**时间**: %s\n\n%s", | ||||
| 						content.Title, | ||||
| 						content.Level.String(), | ||||
| 						content.Timestamp.Format(DefaultTimeFormat), | ||||
| 						content.Message, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	cardJSON, err := json.Marshal(cardContent) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("序列化飞书卡片内容失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// 3. 构建请求的 JSON Body | ||||
| 	payload := larkMessagePayload{ | ||||
| 		ReceiveID:     toAddr, | ||||
| 		ReceiveIDType: "email",       // 指定接收者类型为邮箱 | ||||
| 		MsgType:       "interactive", // 消息卡片类型 | ||||
| 		Content:       string(cardJSON), | ||||
| 	} | ||||
|  | ||||
| 	jsonBytes, err := json.Marshal(payload) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("序列化飞书消息失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// 4. 发送 HTTP POST 请求 | ||||
| 	url := fmt.Sprintf("%s?receive_id_type=email", larkSendMessageURL) // 在 URL 中指定 receive_id_type | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("创建飞书请求失败: %w", err) | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	req.Header.Set("Authorization", "Bearer "+token) // 携带 access_token | ||||
|  | ||||
| 	client := &http.Client{} | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("发送飞书通知失败: %w", err) | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
|  | ||||
| 	// 5. 检查响应 | ||||
| 	var response larkResponse | ||||
| 	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { | ||||
| 		return fmt.Errorf("解析飞书响应失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	if response.Code != 0 { | ||||
| 		return fmt.Errorf("飞书API返回错误: code=%d, msg=%s", response.Code, response.Msg) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // getAccessToken 获取并缓存 tenant_access_token,处理了线程安全和自动刷新。 | ||||
| func (l *larkNotifier) getAccessToken() (string, error) { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
|  | ||||
| 	// 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token | ||||
| 	if l.accessToken != "" && time.Now().Before(l.tokenExpiresAt.Add(-5*time.Minute)) { | ||||
| 		return l.accessToken, nil | ||||
| 	} | ||||
|  | ||||
| 	// 否则,重新获取 token | ||||
| 	payload := map[string]string{ | ||||
| 		"app_id":     l.appID, | ||||
| 		"app_secret": l.appSecret, | ||||
| 	} | ||||
| 	jsonBytes, err := json.Marshal(payload) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("序列化获取 token 请求体失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	req, err := http.NewRequest("POST", larkGetTokenURL, bytes.NewBuffer(jsonBytes)) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("创建获取 token 请求失败: %w", err) | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
|  | ||||
| 	client := &http.Client{} | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("获取 tenant_access_token 请求失败: %w", err) | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
|  | ||||
| 	var tokenResp larkTokenResponse | ||||
| 	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { | ||||
| 		return "", fmt.Errorf("解析 tenant_access_token 响应失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	if tokenResp.Code != 0 { | ||||
| 		return "", fmt.Errorf("获取 tenant_access_token API 返回错误: code=%d, msg=%s", tokenResp.Code, tokenResp.Msg) | ||||
| 	} | ||||
|  | ||||
| 	// 缓存新的 token 和过期时间 | ||||
| 	l.accessToken = tokenResp.TenantAccessToken | ||||
| 	l.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.Expire) * time.Second) | ||||
|  | ||||
| 	return l.accessToken, nil | ||||
| } | ||||
|  | ||||
| // Type 返回通知器的类型 | ||||
| func (l *larkNotifier) Type() NotifierType { | ||||
| 	return NotifierTypeLark | ||||
| } | ||||
|  | ||||
| // --- API 数据结构 --- | ||||
|  | ||||
| // larkTokenResponse 是获取 tenant_access_token API 的响应结构体 | ||||
| type larkTokenResponse struct { | ||||
| 	Code              int    `json:"code"` | ||||
| 	Msg               string `json:"msg"` | ||||
| 	TenantAccessToken string `json:"tenant_access_token"` | ||||
| 	Expire            int    `json:"expire"` // 有效期,单位秒 | ||||
| } | ||||
|  | ||||
| // larkMessagePayload 是发送消息 API 的请求体结构 | ||||
| type larkMessagePayload struct { | ||||
| 	ReceiveID     string `json:"receive_id"` | ||||
| 	ReceiveIDType string `json:"receive_id_type"` | ||||
| 	MsgType       string `json:"msg_type"` | ||||
| 	Content       string `json:"content"` // 对于 interactive 消息,这里是卡片的 JSON 字符串 | ||||
| } | ||||
|  | ||||
| // larkResponse 是飞书 API 的通用响应结构体 | ||||
| type larkResponse struct { | ||||
| 	Code int    `json:"code"` | ||||
| 	Msg  string `json:"msg"` | ||||
| } | ||||
							
								
								
									
										42
									
								
								internal/infra/notify/notify.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								internal/infra/notify/notify.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| package notify | ||||
|  | ||||
| import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"go.uber.org/zap/zapcore" | ||||
| ) | ||||
|  | ||||
| // DefaultTimeFormat 定义了所有通知中统一使用的时间格式。 | ||||
| const DefaultTimeFormat = "2006-01-02 15:04:05" | ||||
|  | ||||
| // NotifierType 定义了通知器的类型。 | ||||
| type NotifierType string | ||||
|  | ||||
| const ( | ||||
| 	// NotifierTypeSMTP 表示 SMTP 邮件通知器。 | ||||
| 	NotifierTypeSMTP NotifierType = "邮件" | ||||
| 	// NotifierTypeWeChat 表示企业微信通知器。 | ||||
| 	NotifierTypeWeChat NotifierType = "企业微信" | ||||
| 	// NotifierTypeLark 表示飞书通知器。 | ||||
| 	NotifierTypeLark NotifierType = "飞书" | ||||
| ) | ||||
|  | ||||
| // AlarmContent 定义了通知的内容 | ||||
| type AlarmContent struct { | ||||
| 	// 通知标题 | ||||
| 	Title string | ||||
| 	// 通知信息 | ||||
| 	Message string | ||||
| 	// 通知级别 | ||||
| 	Level zapcore.Level | ||||
| 	// 通知时间 | ||||
| 	Timestamp time.Time | ||||
| } | ||||
|  | ||||
| // Notifier 定义了通知发送器的接口 | ||||
| type Notifier interface { | ||||
| 	// Send 发送通知 | ||||
| 	Send(content AlarmContent, toAddr string) error | ||||
| 	// Type 返回通知器的类型 | ||||
| 	Type() NotifierType | ||||
| } | ||||
							
								
								
									
										73
									
								
								internal/infra/notify/smtp.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								internal/infra/notify/smtp.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,73 @@ | ||||
| package notify | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"net/smtp" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // smtpNotifier 实现了 Notifier 接口,用于通过 SMTP 发送邮件通知。 | ||||
| type smtpNotifier struct { | ||||
| 	host     string // SMTP 服务器地址 | ||||
| 	port     int    // SMTP 服务器端口 | ||||
| 	username string // 发件人邮箱地址 | ||||
| 	password string // 发件人邮箱授权码 | ||||
| 	sender   string // 发件人名称或地址,显示在邮件的 \"From\" 字段 | ||||
| } | ||||
|  | ||||
| // NewSMTPNotifier 创建一个新的 smtpNotifier 实例。 | ||||
| // 调用者需要注入 SMTP 相关的配置。 | ||||
| func NewSMTPNotifier(host string, port int, username, password, sender string) Notifier { | ||||
| 	return &smtpNotifier{ | ||||
| 		host:     host, | ||||
| 		port:     port, | ||||
| 		username: username, | ||||
| 		password: password, | ||||
| 		sender:   sender, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Send 使用 net/smtp 包发送一封邮件。 | ||||
| func (s *smtpNotifier) Send(content AlarmContent, toAddr string) error { | ||||
| 	// 1. 设置认证信息 | ||||
| 	auth := smtp.PlainAuth("", s.username, s.password, s.host) | ||||
|  | ||||
| 	// 2. 构建邮件内容 | ||||
| 	// 邮件头 | ||||
| 	subject := content.Title | ||||
| 	contentType := "Content-Type: text/plain; charset=UTF-8" | ||||
| 	fromHeader := fmt.Sprintf("From: %s", s.sender) | ||||
| 	toHeader := fmt.Sprintf("To: %s", toAddr) | ||||
| 	subjectHeader := fmt.Sprintf("Subject: %s", subject) | ||||
|  | ||||
| 	// 邮件正文 | ||||
| 	body := fmt.Sprintf("级别: %s\n时间: %s\n\n%s", | ||||
| 		content.Level.String(), | ||||
| 		content.Timestamp.Format(DefaultTimeFormat), | ||||
| 		content.Message, | ||||
| 	) | ||||
|  | ||||
| 	// 拼接完整的邮件报文 | ||||
| 	msg := strings.Join([]string{ | ||||
| 		fromHeader, | ||||
| 		toHeader, | ||||
| 		subjectHeader, | ||||
| 		contentType, | ||||
| 		"", // 邮件头和正文之间的空行 | ||||
| 		body, | ||||
| 	}, "\r\n") | ||||
|  | ||||
| 	// 3. 发送邮件 | ||||
| 	addr := fmt.Sprintf("%s:%d", s.host, s.port) | ||||
| 	err := smtp.SendMail(addr, auth, s.username, []string{toAddr}, []byte(msg)) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("发送邮件失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Type 返回通知器的类型 | ||||
| func (s *smtpNotifier) Type() NotifierType { | ||||
| 	return NotifierTypeSMTP | ||||
| } | ||||
							
								
								
									
										169
									
								
								internal/infra/notify/wechat.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										169
									
								
								internal/infra/notify/wechat.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,169 @@ | ||||
| package notify | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	// 获取 access_token 的 API 地址 | ||||
| 	getTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" | ||||
| 	// 发送应用消息的 API 地址 | ||||
| 	sendMessageURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send" | ||||
| ) | ||||
|  | ||||
| // wechatNotifier 实现了 Notifier 接口,用于通过企业微信自建应用发送私聊消息。 | ||||
| type wechatNotifier struct { | ||||
| 	corpID  string // 企业ID (CorpID) | ||||
| 	agentID string // 应用ID (AgentID) | ||||
| 	secret  string // 应用密钥 (Secret) | ||||
|  | ||||
| 	// 用于线程安全地管理 access_token | ||||
| 	mu             sync.Mutex | ||||
| 	accessToken    string | ||||
| 	tokenExpiresAt time.Time | ||||
| } | ||||
|  | ||||
| // NewWechatNotifier 创建一个新的 wechatNotifier 实例。 | ||||
| // 调用者需要注入企业微信应用的 CorpID, AgentID 和 Secret。 | ||||
| func NewWechatNotifier(corpID, agentID, secret string) Notifier { | ||||
| 	return &wechatNotifier{ | ||||
| 		corpID:  corpID, | ||||
| 		agentID: agentID, | ||||
| 		secret:  secret, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Send 向指定用户发送一条 markdown 格式的私聊消息。 | ||||
| // toAddr 参数是接收者的 UserID 列表,用逗号或竖线分隔。 | ||||
| func (w *wechatNotifier) Send(content AlarmContent, toAddr string) error { | ||||
| 	// 1. 获取有效的 access_token | ||||
| 	token, err := w.getAccessToken() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// 2. 构建 markdown 内容 | ||||
| 	markdownContent := fmt.Sprintf("## %s\n> 级别: <font color=\"warning\">%s</font>\n> 时间: %s\n\n%s", | ||||
| 		content.Title, | ||||
| 		content.Level.String(), | ||||
| 		content.Timestamp.Format(DefaultTimeFormat), | ||||
| 		content.Message, | ||||
| 	) | ||||
|  | ||||
| 	// 3. 构建请求的 JSON Body | ||||
| 	// 将逗号分隔的 toAddr 转换为竖线分隔,以符合 API 要求 | ||||
| 	userList := strings.ReplaceAll(toAddr, ",", "|") | ||||
| 	payload := wechatMessagePayload{ | ||||
| 		ToUser:  userList, | ||||
| 		MsgType: "markdown", | ||||
| 		AgentID: w.agentID, | ||||
| 		Markdown: struct { | ||||
| 			Content string `json:"content"` | ||||
| 		}{ | ||||
| 			Content: markdownContent, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	jsonBytes, err := json.Marshal(payload) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("序列化企业微信消息失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// 4. 发送 HTTP POST 请求 | ||||
| 	url := fmt.Sprintf("%s?access_token=%s", sendMessageURL, token) | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("创建企业微信请求失败: %w", err) | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
|  | ||||
| 	client := &http.Client{} | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("发送企业微信通知失败: %w", err) | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
|  | ||||
| 	// 5. 检查响应 | ||||
| 	var response wechatResponse | ||||
| 	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { | ||||
| 		return fmt.Errorf("解析企业微信响应失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	if response.ErrCode != 0 { | ||||
| 		return fmt.Errorf("企业微信API返回错误: code=%d, msg=%s", response.ErrCode, response.ErrMsg) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // getAccessToken 获取并缓存 access_token,处理了线程安全和自动刷新。 | ||||
| func (w *wechatNotifier) getAccessToken() (string, error) { | ||||
| 	w.mu.Lock() | ||||
| 	defer w.mu.Unlock() | ||||
|  | ||||
| 	// 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token | ||||
| 	if w.accessToken != "" && time.Now().Before(w.tokenExpiresAt.Add(-5*time.Minute)) { | ||||
| 		return w.accessToken, nil | ||||
| 	} | ||||
|  | ||||
| 	// 否则,重新获取 token | ||||
| 	url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", getTokenURL, w.corpID, w.secret) | ||||
| 	resp, err := http.Get(url) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("获取 access_token 请求失败: %w", err) | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
|  | ||||
| 	var tokenResp tokenResponse | ||||
| 	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { | ||||
| 		return "", fmt.Errorf("解析 access_token 响应失败: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	if tokenResp.ErrCode != 0 { | ||||
| 		return "", fmt.Errorf("获取 access_token API 返回错误: code=%d, msg=%s", tokenResp.ErrCode, tokenResp.ErrMsg) | ||||
| 	} | ||||
|  | ||||
| 	// 缓存新的 token 和过期时间 | ||||
| 	w.accessToken = tokenResp.AccessToken | ||||
| 	w.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second) | ||||
|  | ||||
| 	return w.accessToken, nil | ||||
| } | ||||
|  | ||||
| // Type 返回通知器的类型 | ||||
| func (w *wechatNotifier) Type() NotifierType { | ||||
| 	return NotifierTypeWeChat | ||||
| } | ||||
|  | ||||
| // --- API 数据结构 --- | ||||
|  | ||||
| // tokenResponse 是获取 access_token API 的响应结构体 | ||||
| type tokenResponse struct { | ||||
| 	ErrCode     int    `json:"errcode"` | ||||
| 	ErrMsg      string `json:"errmsg"` | ||||
| 	AccessToken string `json:"access_token"` | ||||
| 	ExpiresIn   int    `json:"expires_in"` | ||||
| } | ||||
|  | ||||
| // wechatMessagePayload 是发送应用消息 API 的请求体结构 | ||||
| type wechatMessagePayload struct { | ||||
| 	ToUser   string `json:"touser"` | ||||
| 	MsgType  string `json:"msgtype"` | ||||
| 	AgentID  string `json:"agentid"` | ||||
| 	Markdown struct { | ||||
| 		Content string `json:"content"` | ||||
| 	} `json:"markdown"` | ||||
| } | ||||
|  | ||||
| // wechatResponse 是企业微信 API 的通用响应结构体 | ||||
| type wechatResponse struct { | ||||
| 	ErrCode int    `json:"errcode"` | ||||
| 	ErrMsg  string `json:"errmsg"` | ||||
| } | ||||
| @@ -13,6 +13,7 @@ type UserRepository interface { | ||||
| 	FindByUsername(username string) (*models.User, error) | ||||
| 	FindByID(id uint) (*models.User, error) | ||||
| 	FindUserForLogin(identifier string) (*models.User, error) | ||||
| 	FindAll() ([]*models.User, error) | ||||
| } | ||||
|  | ||||
| // gormUserRepository 是 UserRepository 的 GORM 实现 | ||||
| @@ -64,3 +65,12 @@ func (r *gormUserRepository) FindByID(id uint) (*models.User, error) { | ||||
| 	} | ||||
| 	return &user, nil | ||||
| } | ||||
|  | ||||
| // FindAll 返回数据库中的所有用户 | ||||
| func (r *gormUserRepository) FindAll() ([]*models.User, error) { | ||||
| 	var users []*models.User | ||||
| 	if err := r.db.Where("1 = 1").Find(&users).Error; err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return users, nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user