Compare commits
3 Commits
47ed819b9d
...
bd8729d473
| Author | SHA1 | Date | |
|---|---|---|---|
| bd8729d473 | |||
| 3fd97aa43f | |||
| 9d6876684b |
93
config.example.yml
Normal file
93
config.example.yml
Normal file
@@ -0,0 +1,93 @@
|
||||
# 应用基础配置
|
||||
app:
|
||||
name: "PigFarmController" # 应用名称
|
||||
version: "1.0.0" # 应用版本
|
||||
jwt_secret: "your_jwt_secret_key_here" # JWT 签名密钥,请务必修改为强密码
|
||||
|
||||
# 服务器配置
|
||||
server:
|
||||
port: 8080 # 服务器监听端口
|
||||
mode: "debug" # 运行模式: debug, release, test
|
||||
|
||||
# 日志配置
|
||||
log:
|
||||
level: "info" # 日志级别: debug, info, warn, error, dpanic, panic, fatal
|
||||
format: "console" # 日志输出格式: console, json
|
||||
enable_file: true # 是否同时输出到文件
|
||||
file_path: "app_logs/pig_farm_controller.log" # 日志文件路径
|
||||
max_size: 100 # 单个日志文件最大大小 (MB)
|
||||
max_backups: 7 # 最多保留的旧日志文件数量
|
||||
max_age: 7 # 最多保留的旧日志文件天数
|
||||
compress: true # 是否压缩旧日志文件
|
||||
|
||||
# 数据库配置
|
||||
database:
|
||||
host: "localhost" # 数据库主机地址
|
||||
port: 5432 # 数据库端口
|
||||
username: "postgres" # 数据库用户名
|
||||
password: "your_db_password" # 数据库密码
|
||||
dbname: "pig_farm_controller_db" # 数据库名称
|
||||
sslmode: "disable" # SSL模式: disable, require, verify-ca, verify-full
|
||||
is_timescaledb: false # 是否为 TimescaleDB
|
||||
max_open_conns: 100 # 最大开放连接数
|
||||
max_idle_conns: 10 # 最大空闲连接数
|
||||
conn_max_lifetime: 300 # 连接最大生命周期 (秒)
|
||||
|
||||
# WebSocket配置
|
||||
websocket:
|
||||
timeout: 60 # WebSocket请求超时时间 (秒)
|
||||
heartbeat_interval: 30 # 心跳检测间隔 (秒)
|
||||
|
||||
# 心跳配置
|
||||
heartbeat:
|
||||
interval: 10 # 心跳间隔 (秒)
|
||||
concurrency: 5 # 请求并发数
|
||||
|
||||
# ChirpStack API 配置
|
||||
chirp_stack:
|
||||
api_host: "http://localhost:8080" # ChirpStack API 主机地址
|
||||
api_token: "your_chirpstack_api_token" # ChirpStack API Token
|
||||
fport: 10 # ChirpStack FPort
|
||||
api_timeout: 5 # API 请求超时时间 (秒)
|
||||
collection_request_timeout: 10 # 采集请求超时时间 (秒)
|
||||
|
||||
# 任务调度配置
|
||||
task:
|
||||
interval: 5 # 任务调度间隔 (秒)
|
||||
num_workers: 5 # 任务执行器并发工作数量
|
||||
|
||||
# Lora 配置
|
||||
lora:
|
||||
mode: "lora_mesh" # Lora 运行模式: lora_wan, lora_mesh
|
||||
|
||||
# Lora Mesh 配置
|
||||
lora_mesh:
|
||||
uart_port: "/dev/ttyUSB0" # UART 串口路径
|
||||
baud_rate: 115200 # 波特率
|
||||
timeout: 5 # 超时时间 (秒)
|
||||
lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command
|
||||
max_chunk_size: 200 # 最大数据块大小
|
||||
reassembly_timeout: 10 # 重组超时时间 (秒)
|
||||
|
||||
# 通知服务配置
|
||||
notify:
|
||||
primary: "日志" # 首选通知渠道: "邮件", "企业微信", "飞书", "日志" (如果其他渠道未启用,"日志" 会自动成为首选)
|
||||
failureThreshold: 2 # 连续失败多少次后触发广播模式
|
||||
smtp:
|
||||
enabled: false # 是否启用 SMTP 邮件通知
|
||||
host: "smtp.example.com" # SMTP 服务器地址
|
||||
port: 587 # SMTP 服务器端口
|
||||
username: "your_email@example.com" # 发件人邮箱地址
|
||||
password: "your_email_password" # 发件人邮箱授权码或密码
|
||||
sender: "PigFarm Alarm <no-reply@example.com>" # 发件人名称和地址
|
||||
|
||||
wechat:
|
||||
enabled: false # 是否启用企业微信通知
|
||||
corpID: "wwxxxxxxxxxxxx" # 企业ID (CorpID)
|
||||
agentID: "1000001" # 应用ID (AgentID)
|
||||
secret: "your_wechat_app_secret" # 应用密钥 (Secret)
|
||||
|
||||
lark:
|
||||
enabled: false # 是否启用飞书通知
|
||||
appID: "cli_xxxxxxxxxx" # 应用 ID
|
||||
appSecret: "your_lark_app_secret" # 应用密钥
|
||||
89
docs/docs.go
89
docs/docs.go
@@ -3923,6 +3923,64 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v1/users/{id}/notifications/test": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"BearerAuth": []
|
||||
}
|
||||
],
|
||||
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"用户管理"
|
||||
],
|
||||
"summary": "发送测试通知",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "用户ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"description": "请求体",
|
||||
"name": "body",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/dto.SendTestNotificationRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "成功响应",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/controller.Response"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"definitions": {
|
||||
@@ -5591,6 +5649,22 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"dto.SendTestNotificationRequest": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Type 指定要测试的通知渠道",
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/notify.NotifierType"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"dto.SensorDataDTO": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -6530,6 +6604,21 @@ const docTemplate = `{
|
||||
"$ref": "#/definitions/models.SensorType"
|
||||
}
|
||||
}
|
||||
},
|
||||
"notify.NotifierType": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"smtp",
|
||||
"wechat",
|
||||
"lark",
|
||||
"log"
|
||||
],
|
||||
"x-enum-varnames": [
|
||||
"NotifierTypeSMTP",
|
||||
"NotifierTypeWeChat",
|
||||
"NotifierTypeLark",
|
||||
"NotifierTypeLog"
|
||||
]
|
||||
}
|
||||
},
|
||||
"securityDefinitions": {
|
||||
|
||||
@@ -3915,6 +3915,64 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v1/users/{id}/notifications/test": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"BearerAuth": []
|
||||
}
|
||||
],
|
||||
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"用户管理"
|
||||
],
|
||||
"summary": "发送测试通知",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "用户ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"description": "请求体",
|
||||
"name": "body",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/dto.SendTestNotificationRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "成功响应",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/controller.Response"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"definitions": {
|
||||
@@ -5583,6 +5641,22 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"dto.SendTestNotificationRequest": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Type 指定要测试的通知渠道",
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/notify.NotifierType"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"dto.SensorDataDTO": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -6522,6 +6596,21 @@
|
||||
"$ref": "#/definitions/models.SensorType"
|
||||
}
|
||||
}
|
||||
},
|
||||
"notify.NotifierType": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"smtp",
|
||||
"wechat",
|
||||
"lark",
|
||||
"log"
|
||||
],
|
||||
"x-enum-varnames": [
|
||||
"NotifierTypeSMTP",
|
||||
"NotifierTypeWeChat",
|
||||
"NotifierTypeLark",
|
||||
"NotifierTypeLog"
|
||||
]
|
||||
}
|
||||
},
|
||||
"securityDefinitions": {
|
||||
|
||||
@@ -1125,6 +1125,15 @@ definitions:
|
||||
- traderName
|
||||
- unitPrice
|
||||
type: object
|
||||
dto.SendTestNotificationRequest:
|
||||
properties:
|
||||
type:
|
||||
allOf:
|
||||
- $ref: '#/definitions/notify.NotifierType'
|
||||
description: Type 指定要测试的通知渠道
|
||||
required:
|
||||
- type
|
||||
type: object
|
||||
dto.SensorDataDTO:
|
||||
properties:
|
||||
data:
|
||||
@@ -1816,6 +1825,18 @@ definitions:
|
||||
type:
|
||||
$ref: '#/definitions/models.SensorType'
|
||||
type: object
|
||||
notify.NotifierType:
|
||||
enum:
|
||||
- smtp
|
||||
- wechat
|
||||
- lark
|
||||
- log
|
||||
type: string
|
||||
x-enum-varnames:
|
||||
- NotifierTypeSMTP
|
||||
- NotifierTypeWeChat
|
||||
- NotifierTypeLark
|
||||
- NotifierTypeLog
|
||||
info:
|
||||
contact:
|
||||
email: divano@example.com
|
||||
@@ -4086,6 +4107,40 @@ paths:
|
||||
summary: 获取指定用户的操作历史
|
||||
tags:
|
||||
- 用户管理
|
||||
/api/v1/users/{id}/notifications/test:
|
||||
post:
|
||||
consumes:
|
||||
- application/json
|
||||
description: 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
|
||||
parameters:
|
||||
- description: 用户ID
|
||||
in: path
|
||||
name: id
|
||||
required: true
|
||||
type: integer
|
||||
- description: 请求体
|
||||
in: body
|
||||
name: body
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/dto.SendTestNotificationRequest'
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: 成功响应
|
||||
schema:
|
||||
allOf:
|
||||
- $ref: '#/definitions/controller.Response'
|
||||
- properties:
|
||||
data:
|
||||
type: string
|
||||
type: object
|
||||
security:
|
||||
- BearerAuth: []
|
||||
summary: 发送测试通知
|
||||
tags:
|
||||
- 用户管理
|
||||
/api/v1/users/login:
|
||||
post:
|
||||
consumes:
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
|
||||
domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
|
||||
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||
@@ -68,9 +69,9 @@ func NewAPI(cfg config.ServerConfig,
|
||||
pigFarmService service.PigFarmService,
|
||||
pigBatchService service.PigBatchService,
|
||||
monitorService service.MonitorService,
|
||||
userActionLogRepository repository.UserActionLogRepository,
|
||||
tokenService token.TokenService,
|
||||
auditService audit.Service,
|
||||
notifyService domain_notify.Service,
|
||||
deviceService domain_device.Service,
|
||||
listenHandler webhook.ListenHandler,
|
||||
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
|
||||
@@ -96,7 +97,7 @@ func NewAPI(cfg config.ServerConfig,
|
||||
config: cfg,
|
||||
listenHandler: listenHandler,
|
||||
// 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员
|
||||
userController: user.NewController(userRepo, monitorService, logger, tokenService),
|
||||
userController: user.NewController(userRepo, monitorService, logger, tokenService, notifyService),
|
||||
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
|
||||
deviceController: device.NewController(deviceRepository, areaControllerRepository, deviceTemplateRepository, deviceService, logger),
|
||||
// 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员
|
||||
|
||||
@@ -57,6 +57,7 @@ func (a *API) setupRoutes() {
|
||||
userGroup := authGroup.Group("/users")
|
||||
{
|
||||
userGroup.GET("/:id/history", a.userController.ListUserHistory) // 获取用户操作历史
|
||||
userGroup.POST("/:id/notifications/test", a.userController.SendTestNotification)
|
||||
}
|
||||
a.logger.Info("用户相关接口注册成功 (需要认证和审计)")
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
|
||||
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
@@ -19,16 +20,24 @@ import (
|
||||
type Controller struct {
|
||||
userRepo repository.UserRepository
|
||||
monitorService service.MonitorService
|
||||
tokenService token.TokenService // 注入 token 服务
|
||||
tokenService token.TokenService
|
||||
notifyService domain_notify.Service
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewController 创建用户控制器实例
|
||||
func NewController(userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, tokenService token.TokenService) *Controller {
|
||||
func NewController(
|
||||
userRepo repository.UserRepository,
|
||||
monitorService service.MonitorService,
|
||||
logger *logs.Logger,
|
||||
tokenService token.TokenService,
|
||||
notifyService domain_notify.Service,
|
||||
) *Controller {
|
||||
return &Controller{
|
||||
userRepo: userRepo,
|
||||
monitorService: monitorService,
|
||||
tokenService: tokenService,
|
||||
notifyService: notifyService,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
@@ -192,3 +201,46 @@ func (c *Controller) ListUserHistory(ctx *gin.Context) {
|
||||
c.logger.Infof("%s: 成功获取用户 %d 的操作历史, 数量: %d", actionType, userID, len(data))
|
||||
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取用户操作历史成功", resp, actionType, "获取用户操作历史成功", opts)
|
||||
}
|
||||
|
||||
// SendTestNotification godoc
|
||||
// @Summary 发送测试通知
|
||||
// @Description 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
|
||||
// @Tags 用户管理
|
||||
// @Security BearerAuth
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param id path int true "用户ID"
|
||||
// @Param body body dto.SendTestNotificationRequest true "请求体"
|
||||
// @Success 200 {object} controller.Response{data=string} "成功响应"
|
||||
// @Router /api/v1/users/{id}/notifications/test [post]
|
||||
func (c *Controller) SendTestNotification(ctx *gin.Context) {
|
||||
const actionType = "发送测试通知"
|
||||
|
||||
// 1. 从 URL 中获取用户 ID
|
||||
userID, err := strconv.ParseUint(ctx.Param("id"), 10, 32)
|
||||
if err != nil {
|
||||
c.logger.Errorf("%s: 无效的用户ID格式: %v", actionType, err)
|
||||
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的用户ID格式", actionType, "无效的用户ID格式", ctx.Param("id"))
|
||||
return
|
||||
}
|
||||
|
||||
// 2. 从请求体 (JSON Body) 中获取要测试的通知类型
|
||||
var req dto.SendTestNotificationRequest
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err)
|
||||
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "请求体格式错误或缺少 'type' 字段: "+err.Error(), actionType, "请求体绑定失败", req)
|
||||
return
|
||||
}
|
||||
|
||||
// 3. 调用领域服务
|
||||
err = c.notifyService.SendTestMessage(uint(userID), req.Type)
|
||||
if err != nil {
|
||||
c.logger.Errorf("%s: 服务层调用失败: %v", actionType, err)
|
||||
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "发送测试消息失败: "+err.Error(), actionType, "服务层调用失败", gin.H{"userID": userID, "type": req.Type})
|
||||
return
|
||||
}
|
||||
|
||||
// 4. 返回成功响应
|
||||
c.logger.Infof("%s: 成功为用户 %d 发送类型为 %s 的测试消息", actionType, userID, req.Type)
|
||||
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "测试消息已发送,请检查您的接收端。", nil, actionType, "测试消息发送成功", gin.H{"userID": userID, "type": req.Type})
|
||||
}
|
||||
|
||||
9
internal/app/dto/notification_dto.go
Normal file
9
internal/app/dto/notification_dto.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package dto
|
||||
|
||||
import "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
|
||||
|
||||
// SendTestNotificationRequest 定义了发送测试通知请求的 JSON 结构
|
||||
type SendTestNotificationRequest struct {
|
||||
// Type 指定要测试的通知渠道
|
||||
Type notify.NotifierType `json:"type" binding:"required"`
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
|
||||
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
|
||||
@@ -19,6 +20,7 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
|
||||
@@ -41,6 +43,9 @@ type Application struct {
|
||||
|
||||
// Lora Mesh 监听器
|
||||
loraMeshCommunicator transport.Listener
|
||||
|
||||
// 通知服务
|
||||
NotifyService domain_notify.Service
|
||||
}
|
||||
|
||||
// NewApplication 创建并初始化一个新的 Application 实例。
|
||||
@@ -117,6 +122,12 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
// 初始化审计服务
|
||||
auditService := audit.NewService(userActionLogRepo, logger)
|
||||
|
||||
// 初始化通知服务
|
||||
notifyService, err := initNotifyService(cfg.Notify, logger, userRepo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
|
||||
}
|
||||
|
||||
// --- 初始化 LoRa 相关组件 ---
|
||||
var listenHandler webhook.ListenHandler
|
||||
var comm transport.Communicator
|
||||
@@ -176,9 +187,9 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
pigFarmService,
|
||||
pigBatchService,
|
||||
monitorService,
|
||||
userActionLogRepo,
|
||||
tokenService,
|
||||
auditService,
|
||||
notifyService,
|
||||
generalDeviceService,
|
||||
listenHandler,
|
||||
analysisPlanTaskManager,
|
||||
@@ -197,11 +208,92 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
pendingCollectionRepo: pendingCollectionRepo,
|
||||
analysisPlanTaskManager: analysisPlanTaskManager,
|
||||
loraMeshCommunicator: loraListener,
|
||||
NotifyService: notifyService,
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
// initNotifyService 根据配置初始化并返回一个通知领域服务。
|
||||
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
|
||||
func initNotifyService(
|
||||
cfg config.NotifyConfig,
|
||||
log *logs.Logger,
|
||||
userRepo repository.UserRepository,
|
||||
) (domain_notify.Service, error) {
|
||||
var availableNotifiers []notify.Notifier
|
||||
|
||||
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
|
||||
logNotifier := notify.NewLogNotifier(log)
|
||||
availableNotifiers = append(availableNotifiers, logNotifier)
|
||||
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
|
||||
|
||||
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
|
||||
if cfg.SMTP.Enabled {
|
||||
smtpNotifier := notify.NewSMTPNotifier(
|
||||
cfg.SMTP.Host,
|
||||
cfg.SMTP.Port,
|
||||
cfg.SMTP.Username,
|
||||
cfg.SMTP.Password,
|
||||
cfg.SMTP.Sender,
|
||||
)
|
||||
availableNotifiers = append(availableNotifiers, smtpNotifier)
|
||||
log.Info("SMTP通知器已启用")
|
||||
}
|
||||
|
||||
if cfg.WeChat.Enabled {
|
||||
wechatNotifier := notify.NewWechatNotifier(
|
||||
cfg.WeChat.CorpID,
|
||||
cfg.WeChat.AgentID,
|
||||
cfg.WeChat.Secret,
|
||||
)
|
||||
availableNotifiers = append(availableNotifiers, wechatNotifier)
|
||||
log.Info("企业微信通知器已启用")
|
||||
}
|
||||
|
||||
if cfg.Lark.Enabled {
|
||||
larkNotifier := notify.NewLarkNotifier(
|
||||
cfg.Lark.AppID,
|
||||
cfg.Lark.AppSecret,
|
||||
)
|
||||
availableNotifiers = append(availableNotifiers, larkNotifier)
|
||||
log.Info("飞书通知器已启用")
|
||||
}
|
||||
|
||||
// 3. 动态确定首选通知器
|
||||
var primaryNotifier notify.Notifier
|
||||
primaryNotifierType := notify.NotifierType(cfg.Primary)
|
||||
|
||||
// 检查用户指定的主渠道是否已启用
|
||||
for _, n := range availableNotifiers {
|
||||
if n.Type() == primaryNotifierType {
|
||||
primaryNotifier = n
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用)
|
||||
if primaryNotifier == nil {
|
||||
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
|
||||
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
|
||||
}
|
||||
|
||||
// 4. 使用创建的 Notifier 列表来组装领域服务
|
||||
notifyService, err := domain_notify.NewFailoverService(
|
||||
log,
|
||||
userRepo,
|
||||
availableNotifiers,
|
||||
primaryNotifier.Type(),
|
||||
cfg.FailureThreshold,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
|
||||
return notifyService, nil
|
||||
}
|
||||
|
||||
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
|
||||
func (app *Application) Start() error {
|
||||
app.Logger.Info("应用启动中...")
|
||||
|
||||
223
internal/domain/notify/notify.go
Normal file
223
internal/domain/notify/notify.go
Normal file
@@ -0,0 +1,223 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Service 定义了通知领域的核心业务逻辑接口
|
||||
type Service interface {
|
||||
// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
|
||||
SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error
|
||||
|
||||
// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
|
||||
BroadcastAlarm(content notify.AlarmContent) error
|
||||
|
||||
// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。
|
||||
SendTestMessage(userID uint, notifierType notify.NotifierType) error
|
||||
}
|
||||
|
||||
// failoverService 是 Service 接口的实现,提供了故障转移功能
|
||||
type failoverService struct {
|
||||
log *logs.Logger
|
||||
userRepo repository.UserRepository
|
||||
notifiers map[notify.NotifierType]notify.Notifier
|
||||
primaryNotifier notify.Notifier
|
||||
failureThreshold int
|
||||
failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int)
|
||||
}
|
||||
|
||||
// NewFailoverService 创建一个新的故障转移通知服务
|
||||
func NewFailoverService(
|
||||
log *logs.Logger,
|
||||
userRepo repository.UserRepository,
|
||||
notifiers []notify.Notifier,
|
||||
primaryNotifierType notify.NotifierType,
|
||||
failureThreshold int,
|
||||
) (Service, error) {
|
||||
notifierMap := make(map[notify.NotifierType]notify.Notifier)
|
||||
for _, n := range notifiers {
|
||||
notifierMap[n.Type()] = n
|
||||
}
|
||||
|
||||
primaryNotifier, ok := notifierMap[primaryNotifierType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType)
|
||||
}
|
||||
|
||||
return &failoverService{
|
||||
log: log,
|
||||
userRepo: userRepo,
|
||||
notifiers: notifierMap,
|
||||
primaryNotifier: primaryNotifier,
|
||||
failureThreshold: failureThreshold,
|
||||
failureCounters: &sync.Map{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SendBatchAlarm 实现了向多个用户并发发送告警的功能
|
||||
func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error {
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var allErrors []string
|
||||
|
||||
s.log.Infow("开始批量发送告警...", "userCount", len(userIDs))
|
||||
|
||||
for _, userID := range userIDs {
|
||||
wg.Add(1)
|
||||
go func(id uint) {
|
||||
defer wg.Done()
|
||||
if err := s.sendAlarmToUser(id, content); err != nil {
|
||||
mu.Lock()
|
||||
allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err))
|
||||
mu.Unlock()
|
||||
}
|
||||
}(userID)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if len(allErrors) > 0 {
|
||||
finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n"))
|
||||
s.log.Error(finalError.Error())
|
||||
return finalError
|
||||
}
|
||||
|
||||
s.log.Info("批量发送告警成功完成,所有用户均已通知。")
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastAlarm 实现了向所有用户发送告警的功能
|
||||
func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error {
|
||||
users, err := s.userRepo.FindAll()
|
||||
if err != nil {
|
||||
s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err)
|
||||
return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err)
|
||||
}
|
||||
|
||||
var userIDs []uint
|
||||
for _, user := range users {
|
||||
userIDs = append(userIDs, user.ID)
|
||||
}
|
||||
|
||||
s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs))
|
||||
// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理
|
||||
return s.SendBatchAlarm(userIDs, content)
|
||||
}
|
||||
|
||||
// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑
|
||||
func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error {
|
||||
user, err := s.userRepo.FindByID(userID)
|
||||
if err != nil {
|
||||
s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err)
|
||||
return fmt.Errorf("查找用户失败: %w", err)
|
||||
}
|
||||
|
||||
counter, _ := s.failureCounters.LoadOrStore(userID, 0)
|
||||
failureCount := counter.(int)
|
||||
|
||||
if failureCount < s.failureThreshold {
|
||||
primaryType := s.primaryNotifier.Type()
|
||||
addr := getAddressForNotifier(primaryType, user.Contact)
|
||||
if addr == "" {
|
||||
return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)
|
||||
}
|
||||
|
||||
err = s.primaryNotifier.Send(content, addr)
|
||||
if err == nil {
|
||||
if failureCount > 0 {
|
||||
s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType)
|
||||
s.failureCounters.Store(userID, 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
newFailureCount := failureCount + 1
|
||||
s.failureCounters.Store(userID, newFailureCount)
|
||||
s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount)
|
||||
failureCount = newFailureCount
|
||||
}
|
||||
|
||||
if failureCount >= s.failureThreshold {
|
||||
s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold)
|
||||
var lastErr error
|
||||
for _, notifier := range s.notifiers {
|
||||
addr := getAddressForNotifier(notifier.Type(), user.Contact)
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if err := notifier.Send(content, addr); err == nil {
|
||||
s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type())
|
||||
s.failureCounters.Store(userID, 0)
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err)
|
||||
}
|
||||
return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendTestMessage 实现了手动发送测试消息的功能
|
||||
func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error {
|
||||
user, err := s.userRepo.FindByID(userID)
|
||||
if err != nil {
|
||||
s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err)
|
||||
return fmt.Errorf("查找用户失败: %w", err)
|
||||
}
|
||||
|
||||
notifier, ok := s.notifiers[notifierType]
|
||||
if !ok {
|
||||
s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType)
|
||||
return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType)
|
||||
}
|
||||
|
||||
addr := getAddressForNotifier(notifierType, user.Contact)
|
||||
if addr == "" {
|
||||
s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType)
|
||||
return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)
|
||||
}
|
||||
|
||||
testContent := notify.AlarmContent{
|
||||
Title: "通知服务测试",
|
||||
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType),
|
||||
Level: zap.InfoLevel,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr)
|
||||
err = notifier.Send(testContent, addr)
|
||||
if err != nil {
|
||||
s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址
|
||||
func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string {
|
||||
switch notifierType {
|
||||
case notify.NotifierTypeSMTP:
|
||||
return contact.Email
|
||||
case notify.NotifierTypeWeChat:
|
||||
return contact.WeChat
|
||||
case notify.NotifierTypeLark:
|
||||
return contact.Feishu
|
||||
case notify.NotifierTypeLog:
|
||||
return "log" // LogNotifier不需要具体的地址,但为了函数签名一致性,返回一个无意义的非空字符串以绕过配置存在检查
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,9 @@ type Config struct {
|
||||
|
||||
// LoraMesh LoraMesh配置
|
||||
LoraMesh LoraMeshConfig `yaml:"lora_mesh"`
|
||||
|
||||
// Notify 通知服务配置
|
||||
Notify NotifyConfig `yaml:"notify"`
|
||||
}
|
||||
|
||||
// AppConfig 代表应用基础配置
|
||||
@@ -158,6 +161,40 @@ type LoraMeshConfig struct {
|
||||
ReassemblyTimeout int `yaml:"reassembly_timeout"`
|
||||
}
|
||||
|
||||
// NotifyConfig 包含了所有与通知服务相关的配置
|
||||
type NotifyConfig struct {
|
||||
Primary string `yaml:"primary"` // 首选通知渠道 (e.g., "邮件", "企业微信", "飞书", "日志")
|
||||
FailureThreshold int `yaml:"failureThreshold"` // 连续失败多少次后触发广播模式
|
||||
SMTP SMTPConfig `yaml:"smtp"`
|
||||
WeChat WeChatConfig `yaml:"wechat"`
|
||||
Lark LarkConfig `yaml:"lark"`
|
||||
}
|
||||
|
||||
// SMTPConfig SMTP邮件配置
|
||||
type SMTPConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Host string `yaml:"host"`
|
||||
Port int `yaml:"port"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Sender string `yaml:"sender"`
|
||||
}
|
||||
|
||||
// WeChatConfig 企业微信应用配置
|
||||
type WeChatConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
CorpID string `yaml:"corpID"`
|
||||
AgentID string `yaml:"agentID"`
|
||||
Secret string `yaml:"secret"`
|
||||
}
|
||||
|
||||
// LarkConfig 飞书应用配置
|
||||
type LarkConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
AppID string `yaml:"appID"`
|
||||
AppSecret string `yaml:"appSecret"`
|
||||
}
|
||||
|
||||
// NewConfig 创建并返回一个新的配置实例
|
||||
func NewConfig() *Config {
|
||||
// 默认值可以在这里设置,但我们优先使用配置文件中的值
|
||||
|
||||
193
internal/infra/notify/lark.go
Normal file
193
internal/infra/notify/lark.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// 飞书获取 tenant_access_token 的 API 地址
|
||||
larkGetTokenURL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
|
||||
// 飞书发送消息的 API 地址
|
||||
larkSendMessageURL = "https://open.feishu.cn/open-apis/im/v1/messages"
|
||||
)
|
||||
|
||||
// larkNotifier 实现了 Notifier 接口,用于通过飞书自建应用发送私聊消息。
|
||||
type larkNotifier struct {
|
||||
appID string // 应用 ID
|
||||
appSecret string // 应用密钥
|
||||
|
||||
// 用于线程安全地管理 tenant_access_token
|
||||
mu sync.Mutex
|
||||
accessToken string
|
||||
tokenExpiresAt time.Time
|
||||
}
|
||||
|
||||
// NewLarkNotifier 创建一个新的 larkNotifier 实例。
|
||||
// 调用者需要注入飞书应用的 AppID 和 AppSecret。
|
||||
func NewLarkNotifier(appID, appSecret string) Notifier {
|
||||
return &larkNotifier{
|
||||
appID: appID,
|
||||
appSecret: appSecret,
|
||||
}
|
||||
}
|
||||
|
||||
// Send 向指定用户发送一条飞书消息卡片。
|
||||
// toAddr 参数是接收者的邮箱地址。
|
||||
func (l *larkNotifier) Send(content AlarmContent, toAddr string) error {
|
||||
// 1. 获取有效的 tenant_access_token
|
||||
token, err := l.getAccessToken()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. 构建消息卡片 JSON
|
||||
// 飞书消息卡片结构复杂,这里构建一个简单的 Markdown 文本卡片
|
||||
cardContent := map[string]interface{}{
|
||||
"config": map[string]bool{
|
||||
"wide_screen_mode": true,
|
||||
},
|
||||
"elements": []map[string]interface{}{
|
||||
{
|
||||
"tag": "div",
|
||||
"text": map[string]string{
|
||||
"tag": "lark_md",
|
||||
"content": fmt.Sprintf("## %s\n**级别**: %s\n**时间**: %s\n\n%s",
|
||||
content.Title,
|
||||
content.Level.String(),
|
||||
content.Timestamp.Format(DefaultTimeFormat),
|
||||
content.Message,
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cardJSON, err := json.Marshal(cardContent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化飞书卡片内容失败: %w", err)
|
||||
}
|
||||
|
||||
// 3. 构建请求的 JSON Body
|
||||
payload := larkMessagePayload{
|
||||
ReceiveID: toAddr,
|
||||
ReceiveIDType: "email", // 指定接收者类型为邮箱
|
||||
MsgType: "interactive", // 消息卡片类型
|
||||
Content: string(cardJSON),
|
||||
}
|
||||
|
||||
jsonBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化飞书消息失败: %w", err)
|
||||
}
|
||||
|
||||
// 4. 发送 HTTP POST 请求
|
||||
url := fmt.Sprintf("%s?receive_id_type=email", larkSendMessageURL) // 在 URL 中指定 receive_id_type
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建飞书请求失败: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+token) // 携带 access_token
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("发送飞书通知失败: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 5. 检查响应
|
||||
var response larkResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return fmt.Errorf("解析飞书响应失败: %w", err)
|
||||
}
|
||||
|
||||
if response.Code != 0 {
|
||||
return fmt.Errorf("飞书API返回错误: code=%d, msg=%s", response.Code, response.Msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAccessToken 获取并缓存 tenant_access_token,处理了线程安全和自动刷新。
|
||||
func (l *larkNotifier) getAccessToken() (string, error) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
// 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token
|
||||
if l.accessToken != "" && time.Now().Before(l.tokenExpiresAt.Add(-5*time.Minute)) {
|
||||
return l.accessToken, nil
|
||||
}
|
||||
|
||||
// 否则,重新获取 token
|
||||
payload := map[string]string{
|
||||
"app_id": l.appID,
|
||||
"app_secret": l.appSecret,
|
||||
}
|
||||
jsonBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("序列化获取 token 请求体失败: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", larkGetTokenURL, bytes.NewBuffer(jsonBytes))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建获取 token 请求失败: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("获取 tenant_access_token 请求失败: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var tokenResp larkTokenResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
|
||||
return "", fmt.Errorf("解析 tenant_access_token 响应失败: %w", err)
|
||||
}
|
||||
|
||||
if tokenResp.Code != 0 {
|
||||
return "", fmt.Errorf("获取 tenant_access_token API 返回错误: code=%d, msg=%s", tokenResp.Code, tokenResp.Msg)
|
||||
}
|
||||
|
||||
// 缓存新的 token 和过期时间
|
||||
l.accessToken = tokenResp.TenantAccessToken
|
||||
l.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.Expire) * time.Second)
|
||||
|
||||
return l.accessToken, nil
|
||||
}
|
||||
|
||||
// Type 返回通知器的类型
|
||||
func (l *larkNotifier) Type() NotifierType {
|
||||
return NotifierTypeLark
|
||||
}
|
||||
|
||||
// --- API 数据结构 ---
|
||||
|
||||
// larkTokenResponse 是获取 tenant_access_token API 的响应结构体
|
||||
type larkTokenResponse struct {
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
TenantAccessToken string `json:"tenant_access_token"`
|
||||
Expire int `json:"expire"` // 有效期,单位秒
|
||||
}
|
||||
|
||||
// larkMessagePayload 是发送消息 API 的请求体结构
|
||||
type larkMessagePayload struct {
|
||||
ReceiveID string `json:"receive_id"`
|
||||
ReceiveIDType string `json:"receive_id_type"`
|
||||
MsgType string `json:"msg_type"`
|
||||
Content string `json:"content"` // 对于 interactive 消息,这里是卡片的 JSON 字符串
|
||||
}
|
||||
|
||||
// larkResponse 是飞书 API 的通用响应结构体
|
||||
type larkResponse struct {
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
37
internal/infra/notify/log_notifier.go
Normal file
37
internal/infra/notify/log_notifier.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
)
|
||||
|
||||
// logNotifier 实现了 Notifier 接口,用于将告警信息记录到日志中。
|
||||
type logNotifier struct {
|
||||
logger *logs.Logger
|
||||
}
|
||||
|
||||
// NewLogNotifier 创建一个新的 logNotifier 实例。
|
||||
// 它接收一个日志记录器,用于实际的日志输出。
|
||||
func NewLogNotifier(logger *logs.Logger) Notifier {
|
||||
return &logNotifier{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Send 将告警内容以结构化的方式记录到日志中。
|
||||
// toAddr 参数在这里表示告警的预期接收者地址,也会被记录。
|
||||
func (l *logNotifier) Send(content AlarmContent, toAddr string) error {
|
||||
l.logger.Infow("告警已记录到日志",
|
||||
"notifierType", NotifierTypeLog,
|
||||
"title", content.Title,
|
||||
"message", content.Message,
|
||||
"level", content.Level.String(),
|
||||
"timestamp", content.Timestamp.Format(DefaultTimeFormat),
|
||||
"toAddr", toAddr,
|
||||
)
|
||||
return nil // 记录日志操作本身不应失败
|
||||
}
|
||||
|
||||
// Type 返回通知器的类型。
|
||||
func (l *logNotifier) Type() NotifierType {
|
||||
return NotifierTypeLog
|
||||
}
|
||||
44
internal/infra/notify/notify.go
Normal file
44
internal/infra/notify/notify.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// DefaultTimeFormat 定义了所有通知中统一使用的时间格式。
|
||||
const DefaultTimeFormat = "2006-01-02 15:04:05"
|
||||
|
||||
// NotifierType 定义了通知器的类型。
|
||||
type NotifierType string
|
||||
|
||||
const (
|
||||
// NotifierTypeSMTP 表示 SMTP 邮件通知器。
|
||||
NotifierTypeSMTP NotifierType = "邮件"
|
||||
// NotifierTypeWeChat 表示企业微信通知器。
|
||||
NotifierTypeWeChat NotifierType = "企业微信"
|
||||
// NotifierTypeLark 表示飞书通知器。
|
||||
NotifierTypeLark NotifierType = "飞书"
|
||||
// NotifierTypeLog 表示日志通知器,作为最终的告警记录渠道。
|
||||
NotifierTypeLog NotifierType = "日志"
|
||||
)
|
||||
|
||||
// AlarmContent 定义了通知的内容
|
||||
type AlarmContent struct {
|
||||
// 通知标题
|
||||
Title string
|
||||
// 通知信息
|
||||
Message string
|
||||
// 通知级别
|
||||
Level zapcore.Level
|
||||
// 通知时间
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Notifier 定义了通知发送器的接口
|
||||
type Notifier interface {
|
||||
// Send 发送通知
|
||||
Send(content AlarmContent, toAddr string) error
|
||||
// Type 返回通知器的类型
|
||||
Type() NotifierType
|
||||
}
|
||||
73
internal/infra/notify/smtp.go
Normal file
73
internal/infra/notify/smtp.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/smtp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// smtpNotifier 实现了 Notifier 接口,用于通过 SMTP 发送邮件通知。
|
||||
type smtpNotifier struct {
|
||||
host string // SMTP 服务器地址
|
||||
port int // SMTP 服务器端口
|
||||
username string // 发件人邮箱地址
|
||||
password string // 发件人邮箱授权码
|
||||
sender string // 发件人名称或地址,显示在邮件的 \"From\" 字段
|
||||
}
|
||||
|
||||
// NewSMTPNotifier 创建一个新的 smtpNotifier 实例。
|
||||
// 调用者需要注入 SMTP 相关的配置。
|
||||
func NewSMTPNotifier(host string, port int, username, password, sender string) Notifier {
|
||||
return &smtpNotifier{
|
||||
host: host,
|
||||
port: port,
|
||||
username: username,
|
||||
password: password,
|
||||
sender: sender,
|
||||
}
|
||||
}
|
||||
|
||||
// Send 使用 net/smtp 包发送一封邮件。
|
||||
func (s *smtpNotifier) Send(content AlarmContent, toAddr string) error {
|
||||
// 1. 设置认证信息
|
||||
auth := smtp.PlainAuth("", s.username, s.password, s.host)
|
||||
|
||||
// 2. 构建邮件内容
|
||||
// 邮件头
|
||||
subject := content.Title
|
||||
contentType := "Content-Type: text/plain; charset=UTF-8"
|
||||
fromHeader := fmt.Sprintf("From: %s", s.sender)
|
||||
toHeader := fmt.Sprintf("To: %s", toAddr)
|
||||
subjectHeader := fmt.Sprintf("Subject: %s", subject)
|
||||
|
||||
// 邮件正文
|
||||
body := fmt.Sprintf("级别: %s\n时间: %s\n\n%s",
|
||||
content.Level.String(),
|
||||
content.Timestamp.Format(DefaultTimeFormat),
|
||||
content.Message,
|
||||
)
|
||||
|
||||
// 拼接完整的邮件报文
|
||||
msg := strings.Join([]string{
|
||||
fromHeader,
|
||||
toHeader,
|
||||
subjectHeader,
|
||||
contentType,
|
||||
"", // 邮件头和正文之间的空行
|
||||
body,
|
||||
}, "\r\n")
|
||||
|
||||
// 3. 发送邮件
|
||||
addr := fmt.Sprintf("%s:%d", s.host, s.port)
|
||||
err := smtp.SendMail(addr, auth, s.username, []string{toAddr}, []byte(msg))
|
||||
if err != nil {
|
||||
return fmt.Errorf("发送邮件失败: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type 返回通知器的类型
|
||||
func (s *smtpNotifier) Type() NotifierType {
|
||||
return NotifierTypeSMTP
|
||||
}
|
||||
169
internal/infra/notify/wechat.go
Normal file
169
internal/infra/notify/wechat.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// 获取 access_token 的 API 地址
|
||||
getTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
||||
// 发送应用消息的 API 地址
|
||||
sendMessageURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
|
||||
)
|
||||
|
||||
// wechatNotifier 实现了 Notifier 接口,用于通过企业微信自建应用发送私聊消息。
|
||||
type wechatNotifier struct {
|
||||
corpID string // 企业ID (CorpID)
|
||||
agentID string // 应用ID (AgentID)
|
||||
secret string // 应用密钥 (Secret)
|
||||
|
||||
// 用于线程安全地管理 access_token
|
||||
mu sync.Mutex
|
||||
accessToken string
|
||||
tokenExpiresAt time.Time
|
||||
}
|
||||
|
||||
// NewWechatNotifier 创建一个新的 wechatNotifier 实例。
|
||||
// 调用者需要注入企业微信应用的 CorpID, AgentID 和 Secret。
|
||||
func NewWechatNotifier(corpID, agentID, secret string) Notifier {
|
||||
return &wechatNotifier{
|
||||
corpID: corpID,
|
||||
agentID: agentID,
|
||||
secret: secret,
|
||||
}
|
||||
}
|
||||
|
||||
// Send 向指定用户发送一条 markdown 格式的私聊消息。
|
||||
// toAddr 参数是接收者的 UserID 列表,用逗号或竖线分隔。
|
||||
func (w *wechatNotifier) Send(content AlarmContent, toAddr string) error {
|
||||
// 1. 获取有效的 access_token
|
||||
token, err := w.getAccessToken()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. 构建 markdown 内容
|
||||
markdownContent := fmt.Sprintf("## %s\n> 级别: <font color=\"warning\">%s</font>\n> 时间: %s\n\n%s",
|
||||
content.Title,
|
||||
content.Level.String(),
|
||||
content.Timestamp.Format(DefaultTimeFormat),
|
||||
content.Message,
|
||||
)
|
||||
|
||||
// 3. 构建请求的 JSON Body
|
||||
// 将逗号分隔的 toAddr 转换为竖线分隔,以符合 API 要求
|
||||
userList := strings.ReplaceAll(toAddr, ",", "|")
|
||||
payload := wechatMessagePayload{
|
||||
ToUser: userList,
|
||||
MsgType: "markdown",
|
||||
AgentID: w.agentID,
|
||||
Markdown: struct {
|
||||
Content string `json:"content"`
|
||||
}{
|
||||
Content: markdownContent,
|
||||
},
|
||||
}
|
||||
|
||||
jsonBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化企业微信消息失败: %w", err)
|
||||
}
|
||||
|
||||
// 4. 发送 HTTP POST 请求
|
||||
url := fmt.Sprintf("%s?access_token=%s", sendMessageURL, token)
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建企业微信请求失败: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("发送企业微信通知失败: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 5. 检查响应
|
||||
var response wechatResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return fmt.Errorf("解析企业微信响应失败: %w", err)
|
||||
}
|
||||
|
||||
if response.ErrCode != 0 {
|
||||
return fmt.Errorf("企业微信API返回错误: code=%d, msg=%s", response.ErrCode, response.ErrMsg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAccessToken 获取并缓存 access_token,处理了线程安全和自动刷新。
|
||||
func (w *wechatNotifier) getAccessToken() (string, error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token
|
||||
if w.accessToken != "" && time.Now().Before(w.tokenExpiresAt.Add(-5*time.Minute)) {
|
||||
return w.accessToken, nil
|
||||
}
|
||||
|
||||
// 否则,重新获取 token
|
||||
url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", getTokenURL, w.corpID, w.secret)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("获取 access_token 请求失败: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var tokenResp tokenResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
|
||||
return "", fmt.Errorf("解析 access_token 响应失败: %w", err)
|
||||
}
|
||||
|
||||
if tokenResp.ErrCode != 0 {
|
||||
return "", fmt.Errorf("获取 access_token API 返回错误: code=%d, msg=%s", tokenResp.ErrCode, tokenResp.ErrMsg)
|
||||
}
|
||||
|
||||
// 缓存新的 token 和过期时间
|
||||
w.accessToken = tokenResp.AccessToken
|
||||
w.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second)
|
||||
|
||||
return w.accessToken, nil
|
||||
}
|
||||
|
||||
// Type 返回通知器的类型
|
||||
func (w *wechatNotifier) Type() NotifierType {
|
||||
return NotifierTypeWeChat
|
||||
}
|
||||
|
||||
// --- API 数据结构 ---
|
||||
|
||||
// tokenResponse 是获取 access_token API 的响应结构体
|
||||
type tokenResponse struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
ErrMsg string `json:"errmsg"`
|
||||
AccessToken string `json:"access_token"`
|
||||
ExpiresIn int `json:"expires_in"`
|
||||
}
|
||||
|
||||
// wechatMessagePayload 是发送应用消息 API 的请求体结构
|
||||
type wechatMessagePayload struct {
|
||||
ToUser string `json:"touser"`
|
||||
MsgType string `json:"msgtype"`
|
||||
AgentID string `json:"agentid"`
|
||||
Markdown struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"markdown"`
|
||||
}
|
||||
|
||||
// wechatResponse 是企业微信 API 的通用响应结构体
|
||||
type wechatResponse struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
ErrMsg string `json:"errmsg"`
|
||||
}
|
||||
@@ -13,6 +13,7 @@ type UserRepository interface {
|
||||
FindByUsername(username string) (*models.User, error)
|
||||
FindByID(id uint) (*models.User, error)
|
||||
FindUserForLogin(identifier string) (*models.User, error)
|
||||
FindAll() ([]*models.User, error)
|
||||
}
|
||||
|
||||
// gormUserRepository 是 UserRepository 的 GORM 实现
|
||||
@@ -64,3 +65,12 @@ func (r *gormUserRepository) FindByID(id uint) (*models.User, error) {
|
||||
}
|
||||
return &user, nil
|
||||
}
|
||||
|
||||
// FindAll 返回数据库中的所有用户
|
||||
func (r *gormUserRepository) FindAll() ([]*models.User, error) {
|
||||
var users []*models.User
|
||||
if err := r.db.Where("1 = 1").Find(&users).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return users, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user