Compare commits

...

11 Commits

Author SHA1 Message Date
5c83c19bce Merge pull request 'issue_22' (#41) from issue_22 into main
Reviewed-on: #41
2025-10-25 15:42:19 +08:00
86c9073da8 修bug 2025-10-25 15:41:49 +08:00
43c1839345 实现controller 2025-10-25 15:04:47 +08:00
f62cc1c4a9 实现service层 2025-10-25 14:36:24 +08:00
f6d2069e1a 发送通知时写入数据库 2025-10-25 14:17:17 +08:00
f33e14f60f 发送通知时写入数据库 2025-10-25 14:15:17 +08:00
d6f275b2d1 定义仓库方法 2025-10-25 13:35:43 +08:00
d8de5a68eb 定义通知model 2025-10-25 13:28:19 +08:00
bd8729d473 中文枚举 2025-10-24 21:38:52 +08:00
3fd97aa43f 日志发送逻辑及测试消息发送接口 2025-10-24 21:24:48 +08:00
9d6876684b 实现飞书/微信/邮件发送通知 2025-10-24 20:33:15 +08:00
26 changed files with 2360 additions and 9 deletions

93
config.example.yml Normal file
View File

@@ -0,0 +1,93 @@
# 应用基础配置
app:
name: "PigFarmController" # 应用名称
version: "1.0.0" # 应用版本
jwt_secret: "your_jwt_secret_key_here" # JWT 签名密钥,请务必修改为强密码
# 服务器配置
server:
port: 8080 # 服务器监听端口
mode: "debug" # 运行模式: debug, release, test
# 日志配置
log:
level: "info" # 日志级别: debug, info, warn, error, dpanic, panic, fatal
format: "console" # 日志输出格式: console, json
enable_file: true # 是否同时输出到文件
file_path: "app_logs/pig_farm_controller.log" # 日志文件路径
max_size: 100 # 单个日志文件最大大小 (MB)
max_backups: 7 # 最多保留的旧日志文件数量
max_age: 7 # 最多保留的旧日志文件天数
compress: true # 是否压缩旧日志文件
# 数据库配置
database:
host: "localhost" # 数据库主机地址
port: 5432 # 数据库端口
username: "postgres" # 数据库用户名
password: "your_db_password" # 数据库密码
dbname: "pig_farm_controller_db" # 数据库名称
sslmode: "disable" # SSL模式: disable, require, verify-ca, verify-full
is_timescaledb: false # 是否为 TimescaleDB
max_open_conns: 100 # 最大开放连接数
max_idle_conns: 10 # 最大空闲连接数
conn_max_lifetime: 300 # 连接最大生命周期 (秒)
# WebSocket配置
websocket:
timeout: 60 # WebSocket请求超时时间 (秒)
heartbeat_interval: 30 # 心跳检测间隔 (秒)
# 心跳配置
heartbeat:
interval: 10 # 心跳间隔 (秒)
concurrency: 5 # 请求并发数
# ChirpStack API 配置
chirp_stack:
api_host: "http://localhost:8080" # ChirpStack API 主机地址
api_token: "your_chirpstack_api_token" # ChirpStack API Token
fport: 10 # ChirpStack FPort
api_timeout: 5 # API 请求超时时间 (秒)
collection_request_timeout: 10 # 采集请求超时时间 (秒)
# 任务调度配置
task:
interval: 5 # 任务调度间隔 (秒)
num_workers: 5 # 任务执行器并发工作数量
# Lora 配置
lora:
mode: "lora_mesh" # Lora 运行模式: lora_wan, lora_mesh
# Lora Mesh 配置
lora_mesh:
uart_port: "/dev/ttyUSB0" # UART 串口路径
baud_rate: 115200 # 波特率
timeout: 5 # 超时时间 (秒)
lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command
max_chunk_size: 200 # 最大数据块大小
reassembly_timeout: 10 # 重组超时时间 (秒)
# 通知服务配置
notify:
primary: "日志" # 首选通知渠道: "邮件", "企业微信", "飞书", "日志" (如果其他渠道未启用,"日志" 会自动成为首选)
failureThreshold: 2 # 连续失败多少次后触发广播模式
smtp:
enabled: false # 是否启用 SMTP 邮件通知
host: "smtp.example.com" # SMTP 服务器地址
port: 587 # SMTP 服务器端口
username: "your_email@example.com" # 发件人邮箱地址
password: "your_email_password" # 发件人邮箱授权码或密码
sender: "PigFarm Alarm <no-reply@example.com>" # 发件人名称和地址
wechat:
enabled: false # 是否启用企业微信通知
corpID: "wwxxxxxxxxxxxx" # 企业ID (CorpID)
agentID: "1000001" # 应用ID (AgentID)
secret: "your_wechat_app_secret" # 应用密钥 (Secret)
lark:
enabled: false # 是否启用飞书通知
appID: "cli_xxxxxxxxxx" # 应用 ID
appSecret: "your_lark_app_secret" # 应用密钥

View File

@@ -975,6 +975,149 @@ const docTemplate = `{
} }
} }
}, },
"/api/v1/monitor/notifications": {
"get": {
"security": [
{
"BearerAuth": []
}
],
"description": "根据提供的过滤条件,分页获取通知列表",
"produces": [
"application/json"
],
"tags": [
"数据监控"
],
"summary": "批量查询通知",
"parameters": [
{
"type": "string",
"name": "end_time",
"in": "query"
},
{
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"type": "integer",
"format": "int32",
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
],
"name": "level",
"in": "query"
},
{
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"type": "string",
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
],
"name": "notifier_type",
"in": "query"
},
{
"type": "string",
"name": "order_by",
"in": "query"
},
{
"type": "integer",
"name": "page",
"in": "query"
},
{
"type": "integer",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"name": "start_time",
"in": "query"
},
{
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"type": "string",
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
],
"name": "status",
"in": "query"
},
{
"type": "integer",
"name": "user_id",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/dto.ListNotificationResponse"
}
}
}
]
}
}
}
}
},
"/api/v1/monitor/pending-collections": { "/api/v1/monitor/pending-collections": {
"get": { "get": {
"security": [ "security": [
@@ -3923,6 +4066,64 @@ const docTemplate = `{
} }
} }
} }
},
"/api/v1/users/{id}/notifications/test": {
"post": {
"security": [
{
"BearerAuth": []
}
],
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"用户管理"
],
"summary": "发送测试通知",
"parameters": [
{
"type": "integer",
"description": "用户ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "请求体",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/dto.SendTestNotificationRequest"
}
}
],
"responses": {
"200": {
"description": "成功响应",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"type": "string"
}
}
}
]
}
}
}
}
} }
}, },
"definitions": { "definitions": {
@@ -4431,6 +4632,20 @@ const docTemplate = `{
} }
} }
}, },
"dto.ListNotificationResponse": {
"type": "object",
"properties": {
"list": {
"type": "array",
"items": {
"$ref": "#/definitions/dto.NotificationDTO"
}
},
"pagination": {
"$ref": "#/definitions/dto.PaginationDTO"
}
}
},
"dto.ListPendingCollectionResponse": { "dto.ListPendingCollectionResponse": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -4739,6 +4954,47 @@ const docTemplate = `{
} }
} }
}, },
"dto.NotificationDTO": {
"type": "object",
"properties": {
"alarm_timestamp": {
"type": "string"
},
"created_at": {
"type": "string"
},
"error_message": {
"type": "string"
},
"id": {
"type": "integer"
},
"level": {
"$ref": "#/definitions/zapcore.Level"
},
"message": {
"type": "string"
},
"notifier_type": {
"$ref": "#/definitions/notify.NotifierType"
},
"status": {
"$ref": "#/definitions/models.NotificationStatus"
},
"title": {
"type": "string"
},
"to_address": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"user_id": {
"type": "integer"
}
}
},
"dto.PaginationDTO": { "dto.PaginationDTO": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -5591,6 +5847,22 @@ const docTemplate = `{
} }
} }
}, },
"dto.SendTestNotificationRequest": {
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"description": "Type 指定要测试的通知渠道",
"allOf": [
{
"$ref": "#/definitions/notify.NotifierType"
}
]
}
}
},
"dto.SensorDataDTO": { "dto.SensorDataDTO": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -6201,6 +6473,29 @@ const docTemplate = `{
"ReasonTypeHealthCare" "ReasonTypeHealthCare"
] ]
}, },
"models.NotificationStatus": {
"type": "string",
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
]
},
"models.PenStatus": { "models.PenStatus": {
"type": "string", "type": "string",
"enum": [ "enum": [
@@ -6530,6 +6825,51 @@ const docTemplate = `{
"$ref": "#/definitions/models.SensorType" "$ref": "#/definitions/models.SensorType"
} }
} }
},
"notify.NotifierType": {
"type": "string",
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
]
},
"zapcore.Level": {
"type": "integer",
"format": "int32",
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
]
} }
}, },
"securityDefinitions": { "securityDefinitions": {

View File

@@ -967,6 +967,149 @@
} }
} }
}, },
"/api/v1/monitor/notifications": {
"get": {
"security": [
{
"BearerAuth": []
}
],
"description": "根据提供的过滤条件,分页获取通知列表",
"produces": [
"application/json"
],
"tags": [
"数据监控"
],
"summary": "批量查询通知",
"parameters": [
{
"type": "string",
"name": "end_time",
"in": "query"
},
{
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"type": "integer",
"format": "int32",
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
],
"name": "level",
"in": "query"
},
{
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"type": "string",
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
],
"name": "notifier_type",
"in": "query"
},
{
"type": "string",
"name": "order_by",
"in": "query"
},
{
"type": "integer",
"name": "page",
"in": "query"
},
{
"type": "integer",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"name": "start_time",
"in": "query"
},
{
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"type": "string",
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
],
"name": "status",
"in": "query"
},
{
"type": "integer",
"name": "user_id",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/dto.ListNotificationResponse"
}
}
}
]
}
}
}
}
},
"/api/v1/monitor/pending-collections": { "/api/v1/monitor/pending-collections": {
"get": { "get": {
"security": [ "security": [
@@ -3915,6 +4058,64 @@
} }
} }
} }
},
"/api/v1/users/{id}/notifications/test": {
"post": {
"security": [
{
"BearerAuth": []
}
],
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"用户管理"
],
"summary": "发送测试通知",
"parameters": [
{
"type": "integer",
"description": "用户ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "请求体",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/dto.SendTestNotificationRequest"
}
}
],
"responses": {
"200": {
"description": "成功响应",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"type": "string"
}
}
}
]
}
}
}
}
} }
}, },
"definitions": { "definitions": {
@@ -4423,6 +4624,20 @@
} }
} }
}, },
"dto.ListNotificationResponse": {
"type": "object",
"properties": {
"list": {
"type": "array",
"items": {
"$ref": "#/definitions/dto.NotificationDTO"
}
},
"pagination": {
"$ref": "#/definitions/dto.PaginationDTO"
}
}
},
"dto.ListPendingCollectionResponse": { "dto.ListPendingCollectionResponse": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -4731,6 +4946,47 @@
} }
} }
}, },
"dto.NotificationDTO": {
"type": "object",
"properties": {
"alarm_timestamp": {
"type": "string"
},
"created_at": {
"type": "string"
},
"error_message": {
"type": "string"
},
"id": {
"type": "integer"
},
"level": {
"$ref": "#/definitions/zapcore.Level"
},
"message": {
"type": "string"
},
"notifier_type": {
"$ref": "#/definitions/notify.NotifierType"
},
"status": {
"$ref": "#/definitions/models.NotificationStatus"
},
"title": {
"type": "string"
},
"to_address": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"user_id": {
"type": "integer"
}
}
},
"dto.PaginationDTO": { "dto.PaginationDTO": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -5583,6 +5839,22 @@
} }
} }
}, },
"dto.SendTestNotificationRequest": {
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"description": "Type 指定要测试的通知渠道",
"allOf": [
{
"$ref": "#/definitions/notify.NotifierType"
}
]
}
}
},
"dto.SensorDataDTO": { "dto.SensorDataDTO": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -6193,6 +6465,29 @@
"ReasonTypeHealthCare" "ReasonTypeHealthCare"
] ]
}, },
"models.NotificationStatus": {
"type": "string",
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
]
},
"models.PenStatus": { "models.PenStatus": {
"type": "string", "type": "string",
"enum": [ "enum": [
@@ -6522,6 +6817,51 @@
"$ref": "#/definitions/models.SensorType" "$ref": "#/definitions/models.SensorType"
} }
} }
},
"notify.NotifierType": {
"type": "string",
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
]
},
"zapcore.Level": {
"type": "integer",
"format": "int32",
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
]
} }
}, },
"securityDefinitions": { "securityDefinitions": {

View File

@@ -349,6 +349,15 @@ definitions:
pagination: pagination:
$ref: '#/definitions/dto.PaginationDTO' $ref: '#/definitions/dto.PaginationDTO'
type: object type: object
dto.ListNotificationResponse:
properties:
list:
items:
$ref: '#/definitions/dto.NotificationDTO'
type: array
pagination:
$ref: '#/definitions/dto.PaginationDTO'
type: object
dto.ListPendingCollectionResponse: dto.ListPendingCollectionResponse:
properties: properties:
list: list:
@@ -552,6 +561,33 @@ definitions:
- quantity - quantity
- toPenID - toPenID
type: object type: object
dto.NotificationDTO:
properties:
alarm_timestamp:
type: string
created_at:
type: string
error_message:
type: string
id:
type: integer
level:
$ref: '#/definitions/zapcore.Level'
message:
type: string
notifier_type:
$ref: '#/definitions/notify.NotifierType'
status:
$ref: '#/definitions/models.NotificationStatus'
title:
type: string
to_address:
type: string
updated_at:
type: string
user_id:
type: integer
type: object
dto.PaginationDTO: dto.PaginationDTO:
properties: properties:
page: page:
@@ -1125,6 +1161,15 @@ definitions:
- traderName - traderName
- unitPrice - unitPrice
type: object type: object
dto.SendTestNotificationRequest:
properties:
type:
allOf:
- $ref: '#/definitions/notify.NotifierType'
description: Type 指定要测试的通知渠道
required:
- type
type: object
dto.SensorDataDTO: dto.SensorDataDTO:
properties: properties:
data: data:
@@ -1548,6 +1593,24 @@ definitions:
- ReasonTypePreventive - ReasonTypePreventive
- ReasonTypeTreatment - ReasonTypeTreatment
- ReasonTypeHealthCare - ReasonTypeHealthCare
models.NotificationStatus:
enum:
- 发送成功
- 发送失败
- 已跳过
type: string
x-enum-comments:
NotificationStatusFailed: 通知发送失败
NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式)
NotificationStatusSuccess: 通知已成功发送
x-enum-descriptions:
- 通知已成功发送
- 通知发送失败
- 通知因某些原因被跳过(例如:用户未配置联系方式)
x-enum-varnames:
- NotificationStatusSuccess
- NotificationStatusFailed
- NotificationStatusSkipped
models.PenStatus: models.PenStatus:
enum: enum:
- 空闲 - 空闲
@@ -1816,6 +1879,45 @@ definitions:
type: type:
$ref: '#/definitions/models.SensorType' $ref: '#/definitions/models.SensorType'
type: object type: object
notify.NotifierType:
enum:
- 邮件
- 企业微信
- 飞书
- 日志
type: string
x-enum-varnames:
- NotifierTypeSMTP
- NotifierTypeWeChat
- NotifierTypeLark
- NotifierTypeLog
zapcore.Level:
enum:
- 7
- -1
- 0
- 1
- 2
- 3
- 4
- 5
- -1
- 5
- 6
format: int32
type: integer
x-enum-varnames:
- _numLevels
- DebugLevel
- InfoLevel
- WarnLevel
- ErrorLevel
- DPanicLevel
- PanicLevel
- FatalLevel
- _minLevel
- _maxLevel
- InvalidLevel
info: info:
contact: contact:
email: divano@example.com email: divano@example.com
@@ -2379,6 +2481,105 @@ paths:
summary: 获取用药记录列表 summary: 获取用药记录列表
tags: tags:
- 数据监控 - 数据监控
/api/v1/monitor/notifications:
get:
description: 根据提供的过滤条件,分页获取通知列表
parameters:
- in: query
name: end_time
type: string
- enum:
- 7
- -1
- 0
- 1
- 2
- 3
- 4
- 5
- -1
- 5
- 6
format: int32
in: query
name: level
type: integer
x-enum-varnames:
- _numLevels
- DebugLevel
- InfoLevel
- WarnLevel
- ErrorLevel
- DPanicLevel
- PanicLevel
- FatalLevel
- _minLevel
- _maxLevel
- InvalidLevel
- enum:
- 邮件
- 企业微信
- 飞书
- 日志
in: query
name: notifier_type
type: string
x-enum-varnames:
- NotifierTypeSMTP
- NotifierTypeWeChat
- NotifierTypeLark
- NotifierTypeLog
- in: query
name: order_by
type: string
- in: query
name: page
type: integer
- in: query
name: pageSize
type: integer
- in: query
name: start_time
type: string
- enum:
- 发送成功
- 发送失败
- 已跳过
in: query
name: status
type: string
x-enum-comments:
NotificationStatusFailed: 通知发送失败
NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式)
NotificationStatusSuccess: 通知已成功发送
x-enum-descriptions:
- 通知已成功发送
- 通知发送失败
- 通知因某些原因被跳过(例如:用户未配置联系方式)
x-enum-varnames:
- NotificationStatusSuccess
- NotificationStatusFailed
- NotificationStatusSkipped
- in: query
name: user_id
type: integer
produces:
- application/json
responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/controller.Response'
- properties:
data:
$ref: '#/definitions/dto.ListNotificationResponse'
type: object
security:
- BearerAuth: []
summary: 批量查询通知
tags:
- 数据监控
/api/v1/monitor/pending-collections: /api/v1/monitor/pending-collections:
get: get:
description: 根据提供的过滤条件,分页获取待采集请求 description: 根据提供的过滤条件,分页获取待采集请求
@@ -4086,6 +4287,40 @@ paths:
summary: 获取指定用户的操作历史 summary: 获取指定用户的操作历史
tags: tags:
- 用户管理 - 用户管理
/api/v1/users/{id}/notifications/test:
post:
consumes:
- application/json
description: 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
parameters:
- description: 用户ID
in: path
name: id
required: true
type: integer
- description: 请求体
in: body
name: body
required: true
schema:
$ref: '#/definitions/dto.SendTestNotificationRequest'
produces:
- application/json
responses:
"200":
description: 成功响应
schema:
allOf:
- $ref: '#/definitions/controller.Response'
- properties:
data:
type: string
type: object
security:
- BearerAuth: []
summary: 发送测试通知
tags:
- 用户管理
/api/v1/users/login: /api/v1/users/login:
post: post:
consumes: consumes:

View File

@@ -28,6 +28,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
@@ -68,9 +69,9 @@ func NewAPI(cfg config.ServerConfig,
pigFarmService service.PigFarmService, pigFarmService service.PigFarmService,
pigBatchService service.PigBatchService, pigBatchService service.PigBatchService,
monitorService service.MonitorService, monitorService service.MonitorService,
userActionLogRepository repository.UserActionLogRepository,
tokenService token.TokenService, tokenService token.TokenService,
auditService audit.Service, auditService audit.Service,
notifyService domain_notify.Service,
deviceService domain_device.Service, deviceService domain_device.Service,
listenHandler webhook.ListenHandler, listenHandler webhook.ListenHandler,
analysisTaskManager *task.AnalysisPlanTaskManager) *API { analysisTaskManager *task.AnalysisPlanTaskManager) *API {
@@ -96,7 +97,7 @@ func NewAPI(cfg config.ServerConfig,
config: cfg, config: cfg,
listenHandler: listenHandler, listenHandler: listenHandler,
// 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员 // 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员
userController: user.NewController(userRepo, monitorService, logger, tokenService), userController: user.NewController(userRepo, monitorService, logger, tokenService, notifyService),
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员 // 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
deviceController: device.NewController(deviceRepository, areaControllerRepository, deviceTemplateRepository, deviceService, logger), deviceController: device.NewController(deviceRepository, areaControllerRepository, deviceTemplateRepository, deviceService, logger),
// 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员 // 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员

View File

@@ -57,6 +57,7 @@ func (a *API) setupRoutes() {
userGroup := authGroup.Group("/users") userGroup := authGroup.Group("/users")
{ {
userGroup.GET("/:id/history", a.userController.ListUserHistory) // 获取用户操作历史 userGroup.GET("/:id/history", a.userController.ListUserHistory) // 获取用户操作历史
userGroup.POST("/:id/notifications/test", a.userController.SendTestNotification)
} }
a.logger.Info("用户相关接口注册成功 (需要认证和审计)") a.logger.Info("用户相关接口注册成功 (需要认证和审计)")
@@ -175,6 +176,7 @@ func (a *API) setupRoutes() {
monitorGroup.GET("/pig-sick-logs", a.monitorController.ListPigSickLogs) monitorGroup.GET("/pig-sick-logs", a.monitorController.ListPigSickLogs)
monitorGroup.GET("/pig-purchases", a.monitorController.ListPigPurchases) monitorGroup.GET("/pig-purchases", a.monitorController.ListPigPurchases)
monitorGroup.GET("/pig-sales", a.monitorController.ListPigSales) monitorGroup.GET("/pig-sales", a.monitorController.ListPigSales)
monitorGroup.GET("/notifications", a.monitorController.ListNotifications)
} }
a.logger.Info("数据监控相关接口注册成功 (需要认证和审计)") a.logger.Info("数据监控相关接口注册成功 (需要认证和审计)")
} }

View File

@@ -839,3 +839,50 @@ func (c *Controller) ListPigSales(ctx *gin.Context) {
c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total) c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取猪只售卖记录成功", resp, actionType, "获取猪只售卖记录成功", req) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取猪只售卖记录成功", resp, actionType, "获取猪只售卖记录成功", req)
} }
// ListNotifications godoc
// @Summary 批量查询通知
// @Description 根据提供的过滤条件,分页获取通知列表
// @Tags 数据监控
// @Security BearerAuth
// @Produce json
// @Param query query dto.ListNotificationRequest true "查询参数"
// @Success 200 {object} controller.Response{data=dto.ListNotificationResponse}
// @Router /api/v1/monitor/notifications [get]
func (c *Controller) ListNotifications(ctx *gin.Context) {
const actionType = "批量查询通知"
var req dto.ListNotificationRequest
if err := ctx.ShouldBindQuery(&req); err != nil {
c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "参数绑定失败", req)
return
}
opts := repository.NotificationListOptions{
UserID: req.UserID,
NotifierType: req.NotifierType,
Level: req.Level,
StartTime: req.StartTime,
EndTime: req.EndTime,
OrderBy: req.OrderBy,
Status: req.Status,
}
data, total, err := c.monitorService.ListNotifications(opts, req.Page, req.PageSize)
if err != nil {
if errors.Is(err, repository.ErrInvalidPagination) {
c.logger.Warnf("%s: 无效的分页参数: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的分页参数: "+err.Error(), actionType, "无效分页参数", req)
return
}
c.logger.Errorf("%s: 服务层查询失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "批量查询通知失败: "+err.Error(), actionType, "服务层查询失败", req)
return
}
resp := dto.NewListNotificationResponse(data, total, req.Page, req.PageSize)
c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "批量查询通知成功", resp, actionType, "批量查询通知成功", req)
}

View File

@@ -7,6 +7,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/service"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -19,16 +20,24 @@ import (
type Controller struct { type Controller struct {
userRepo repository.UserRepository userRepo repository.UserRepository
monitorService service.MonitorService monitorService service.MonitorService
tokenService token.TokenService // 注入 token 服务 tokenService token.TokenService
notifyService domain_notify.Service
logger *logs.Logger logger *logs.Logger
} }
// NewController 创建用户控制器实例 // NewController 创建用户控制器实例
func NewController(userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, tokenService token.TokenService) *Controller { func NewController(
userRepo repository.UserRepository,
monitorService service.MonitorService,
logger *logs.Logger,
tokenService token.TokenService,
notifyService domain_notify.Service,
) *Controller {
return &Controller{ return &Controller{
userRepo: userRepo, userRepo: userRepo,
monitorService: monitorService, monitorService: monitorService,
tokenService: tokenService, tokenService: tokenService,
notifyService: notifyService,
logger: logger, logger: logger,
} }
} }
@@ -192,3 +201,46 @@ func (c *Controller) ListUserHistory(ctx *gin.Context) {
c.logger.Infof("%s: 成功获取用户 %d 的操作历史, 数量: %d", actionType, userID, len(data)) c.logger.Infof("%s: 成功获取用户 %d 的操作历史, 数量: %d", actionType, userID, len(data))
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取用户操作历史成功", resp, actionType, "获取用户操作历史成功", opts) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取用户操作历史成功", resp, actionType, "获取用户操作历史成功", opts)
} }
// SendTestNotification godoc
// @Summary 发送测试通知
// @Description 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
// @Tags 用户管理
// @Security BearerAuth
// @Accept json
// @Produce json
// @Param id path int true "用户ID"
// @Param body body dto.SendTestNotificationRequest true "请求体"
// @Success 200 {object} controller.Response{data=string} "成功响应"
// @Router /api/v1/users/{id}/notifications/test [post]
func (c *Controller) SendTestNotification(ctx *gin.Context) {
const actionType = "发送测试通知"
// 1. 从 URL 中获取用户 ID
userID, err := strconv.ParseUint(ctx.Param("id"), 10, 32)
if err != nil {
c.logger.Errorf("%s: 无效的用户ID格式: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的用户ID格式", actionType, "无效的用户ID格式", ctx.Param("id"))
return
}
// 2. 从请求体 (JSON Body) 中获取要测试的通知类型
var req dto.SendTestNotificationRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "请求体格式错误或缺少 'type' 字段: "+err.Error(), actionType, "请求体绑定失败", req)
return
}
// 3. 调用领域服务
err = c.notifyService.SendTestMessage(uint(userID), req.Type)
if err != nil {
c.logger.Errorf("%s: 服务层调用失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "发送测试消息失败: "+err.Error(), actionType, "服务层调用失败", gin.H{"userID": userID, "type": req.Type})
return
}
// 4. 返回成功响应
c.logger.Infof("%s: 成功为用户 %d 发送类型为 %s 的测试消息", actionType, userID, req.Type)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "测试消息已发送,请检查您的接收端。", nil, actionType, "测试消息发送成功", gin.H{"userID": userID, "type": req.Type})
}

View File

@@ -0,0 +1,36 @@
package dto
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"go.uber.org/zap/zapcore"
)
// NewListNotificationResponse 从模型数据创建通知列表响应 DTO
func NewListNotificationResponse(data []models.Notification, total int64, page, pageSize int) *ListNotificationResponse {
dtos := make([]NotificationDTO, len(data))
for i, item := range data {
dtos[i] = NotificationDTO{
ID: item.ID,
CreatedAt: item.CreatedAt,
UpdatedAt: item.UpdatedAt,
NotifierType: item.NotifierType,
UserID: item.UserID,
Title: item.Title,
Message: item.Message,
Level: zapcore.Level(item.Level),
AlarmTimestamp: item.AlarmTimestamp,
ToAddress: item.ToAddress,
Status: item.Status,
ErrorMessage: item.ErrorMessage,
}
}
return &ListNotificationResponse{
List: dtos,
Pagination: PaginationDTO{
Total: total,
Page: page,
PageSize: pageSize,
},
}
}

View File

@@ -0,0 +1,50 @@
package dto
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
)
// SendTestNotificationRequest 定义了发送测试通知请求的 JSON 结构
type SendTestNotificationRequest struct {
// Type 指定要测试的通知渠道
Type notify.NotifierType `json:"type" binding:"required"`
}
// ListNotificationRequest 定义了获取通知列表的请求参数
type ListNotificationRequest struct {
Page int `form:"page,default=1"`
PageSize int `form:"pageSize,default=10"`
UserID *uint `form:"user_id"`
NotifierType *notify.NotifierType `form:"notifier_type"`
Status *models.NotificationStatus `form:"status"`
Level *zapcore.Level `form:"level"`
StartTime *time.Time `form:"start_time"`
EndTime *time.Time `form:"end_time"`
OrderBy string `form:"order_by"`
}
// NotificationDTO 是用于API响应的通知结构
type NotificationDTO struct {
ID uint `json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
NotifierType notify.NotifierType `json:"notifier_type"`
UserID uint `json:"user_id"`
Title string `json:"title"`
Message string `json:"message"`
Level zapcore.Level `json:"level"`
AlarmTimestamp time.Time `json:"alarm_timestamp"`
ToAddress string `json:"to_address"`
Status models.NotificationStatus `json:"status"`
ErrorMessage string `json:"error_message"`
}
// ListNotificationResponse 是获取通知列表的响应结构
type ListNotificationResponse struct {
List []NotificationDTO `json:"list"`
Pagination PaginationDTO `json:"pagination"`
}

View File

@@ -24,6 +24,7 @@ type MonitorService interface {
ListPigSickLogs(opts repository.PigSickLogListOptions, page, pageSize int) ([]models.PigSickLog, int64, error) ListPigSickLogs(opts repository.PigSickLogListOptions, page, pageSize int) ([]models.PigSickLog, int64, error)
ListPigPurchases(opts repository.PigPurchaseListOptions, page, pageSize int) ([]models.PigPurchase, int64, error) ListPigPurchases(opts repository.PigPurchaseListOptions, page, pageSize int) ([]models.PigPurchase, int64, error)
ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error)
ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error)
} }
// monitorService 是 MonitorService 接口的具体实现 // monitorService 是 MonitorService 接口的具体实现
@@ -40,6 +41,7 @@ type monitorService struct {
pigTransferLogRepo repository.PigTransferLogRepository pigTransferLogRepo repository.PigTransferLogRepository
pigSickLogRepo repository.PigSickLogRepository pigSickLogRepo repository.PigSickLogRepository
pigTradeRepo repository.PigTradeRepository pigTradeRepo repository.PigTradeRepository
notificationRepo repository.NotificationRepository
} }
// NewMonitorService 创建一个新的 MonitorService 实例 // NewMonitorService 创建一个新的 MonitorService 实例
@@ -56,6 +58,7 @@ func NewMonitorService(
pigTransferLogRepo repository.PigTransferLogRepository, pigTransferLogRepo repository.PigTransferLogRepository,
pigSickLogRepo repository.PigSickLogRepository, pigSickLogRepo repository.PigSickLogRepository,
pigTradeRepo repository.PigTradeRepository, pigTradeRepo repository.PigTradeRepository,
notificationRepo repository.NotificationRepository,
) MonitorService { ) MonitorService {
return &monitorService{ return &monitorService{
sensorDataRepo: sensorDataRepo, sensorDataRepo: sensorDataRepo,
@@ -70,6 +73,7 @@ func NewMonitorService(
pigTransferLogRepo: pigTransferLogRepo, pigTransferLogRepo: pigTransferLogRepo,
pigSickLogRepo: pigSickLogRepo, pigSickLogRepo: pigSickLogRepo,
pigTradeRepo: pigTradeRepo, pigTradeRepo: pigTradeRepo,
notificationRepo: notificationRepo,
} }
} }
@@ -157,3 +161,8 @@ func (s *monitorService) ListPigPurchases(opts repository.PigPurchaseListOptions
func (s *monitorService) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) { func (s *monitorService) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) {
return s.pigTradeRepo.ListPigSales(opts, page, pageSize) return s.pigTradeRepo.ListPigSales(opts, page, pageSize)
} }
// ListNotifications 负责处理查询通知列表的业务逻辑
func (s *monitorService) ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) {
return s.notificationRepo.List(opts, page, pageSize)
}

View File

@@ -12,6 +12,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
@@ -19,6 +20,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database" "git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
@@ -41,6 +43,9 @@ type Application struct {
// Lora Mesh 监听器 // Lora Mesh 监听器
loraMeshCommunicator transport.Listener loraMeshCommunicator transport.Listener
// 通知服务
NotifyService domain_notify.Service
} }
// NewApplication 创建并初始化一个新的 Application 实例。 // NewApplication 创建并初始化一个新的 Application 实例。
@@ -85,6 +90,7 @@ func NewApplication(configPath string) (*Application, error) {
pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB()) pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB())
medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB()) medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB())
rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB()) rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB())
notificationRepo := repository.NewGormNotificationRepository(storage.GetDB())
// 初始化事务管理器 // 初始化事务管理器
unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger) unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger)
@@ -112,11 +118,18 @@ func NewApplication(configPath string) (*Application, error) {
pigTransferLogRepo, pigTransferLogRepo,
pigSickPigLogRepo, pigSickPigLogRepo,
pigTradeRepo, pigTradeRepo,
notificationRepo,
) )
// 初始化审计服务 // 初始化审计服务
auditService := audit.NewService(userActionLogRepo, logger) auditService := audit.NewService(userActionLogRepo, logger)
// 初始化通知服务
notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo)
if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
}
// --- 初始化 LoRa 相关组件 --- // --- 初始化 LoRa 相关组件 ---
var listenHandler webhook.ListenHandler var listenHandler webhook.ListenHandler
var comm transport.Communicator var comm transport.Communicator
@@ -176,9 +189,9 @@ func NewApplication(configPath string) (*Application, error) {
pigFarmService, pigFarmService,
pigBatchService, pigBatchService,
monitorService, monitorService,
userActionLogRepo,
tokenService, tokenService,
auditService, auditService,
notifyService,
generalDeviceService, generalDeviceService,
listenHandler, listenHandler,
analysisPlanTaskManager, analysisPlanTaskManager,
@@ -197,11 +210,94 @@ func NewApplication(configPath string) (*Application, error) {
pendingCollectionRepo: pendingCollectionRepo, pendingCollectionRepo: pendingCollectionRepo,
analysisPlanTaskManager: analysisPlanTaskManager, analysisPlanTaskManager: analysisPlanTaskManager,
loraMeshCommunicator: loraListener, loraMeshCommunicator: loraListener,
NotifyService: notifyService,
} }
return app, nil return app, nil
} }
// initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService(
cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log)
availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
cfg.SMTP.Password,
cfg.SMTP.Sender,
)
availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用")
}
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
)
availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用")
}
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用")
}
// 3. 动态确定首选通知器
var primaryNotifier notify.Notifier
primaryNotifierType := notify.NotifierType(cfg.Primary)
// 检查用户指定的主渠道是否已启用
for _, n := range availableNotifiers {
if n.Type() == primaryNotifierType {
primaryNotifier = n
break
}
}
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
}
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService(
log,
userRepo,
availableNotifiers,
primaryNotifier.Type(),
cfg.FailureThreshold,
notificationRepo,
)
if err != nil {
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
}
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil
}
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
func (app *Application) Start() error { func (app *Application) Start() error {
app.Logger.Info("应用启动中...") app.Logger.Info("应用启动中...")

View File

@@ -0,0 +1,292 @@
package notify
import (
"fmt"
"strings"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"go.uber.org/zap"
)
// Service 定义了通知领域的核心业务逻辑接口
type Service interface {
// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error
// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
BroadcastAlarm(content notify.AlarmContent) error
// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。
SendTestMessage(userID uint, notifierType notify.NotifierType) error
}
// failoverService 是 Service 接口的实现,提供了故障转移功能
type failoverService struct {
log *logs.Logger
userRepo repository.UserRepository
notifiers map[notify.NotifierType]notify.Notifier
primaryNotifier notify.Notifier
failureThreshold int
failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int)
notificationRepo repository.NotificationRepository
}
// NewFailoverService 创建一个新的故障转移通知服务
func NewFailoverService(
log *logs.Logger,
userRepo repository.UserRepository,
notifiers []notify.Notifier,
primaryNotifierType notify.NotifierType,
failureThreshold int,
notificationRepo repository.NotificationRepository,
) (Service, error) {
notifierMap := make(map[notify.NotifierType]notify.Notifier)
for _, n := range notifiers {
notifierMap[n.Type()] = n
}
primaryNotifier, ok := notifierMap[primaryNotifierType]
if !ok {
return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType)
}
return &failoverService{
log: log,
userRepo: userRepo,
notifiers: notifierMap,
primaryNotifier: primaryNotifier,
failureThreshold: failureThreshold,
failureCounters: &sync.Map{},
notificationRepo: notificationRepo,
}, nil
}
// SendBatchAlarm 实现了向多个用户并发发送告警的功能
func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error {
var wg sync.WaitGroup
var mu sync.Mutex
var allErrors []string
s.log.Infow("开始批量发送告警...", "userCount", len(userIDs))
for _, userID := range userIDs {
wg.Add(1)
go func(id uint) {
defer wg.Done()
if err := s.sendAlarmToUser(id, content); err != nil {
mu.Lock()
allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err))
mu.Unlock()
}
}(userID)
}
wg.Wait()
if len(allErrors) > 0 {
finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n"))
s.log.Error(finalError.Error())
return finalError
}
s.log.Info("批量发送告警成功完成,所有用户均已通知。")
return nil
}
// BroadcastAlarm 实现了向所有用户发送告警的功能
func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error {
users, err := s.userRepo.FindAll()
if err != nil {
s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err)
return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err)
}
var userIDs []uint
for _, user := range users {
userIDs = append(userIDs, user.ID)
}
s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs))
// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理
return s.SendBatchAlarm(userIDs, content)
}
// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑
func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error {
user, err := s.userRepo.FindByID(userID)
if err != nil {
s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err)
return fmt.Errorf("查找用户失败: %w", err)
}
counter, _ := s.failureCounters.LoadOrStore(userID, 0)
failureCount := counter.(int)
if failureCount < s.failureThreshold {
primaryType := s.primaryNotifier.Type()
addr := getAddressForNotifier(primaryType, user.Contact)
if addr == "" {
// 记录跳过通知
s.recordNotificationAttempt(userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType))
return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)
}
err = s.primaryNotifier.Send(content, addr)
if err == nil {
// 记录成功通知
s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusSuccess, nil)
if failureCount > 0 {
s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType)
s.failureCounters.Store(userID, 0)
}
return nil
}
// 记录失败通知
s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusFailed, err)
newFailureCount := failureCount + 1
s.failureCounters.Store(userID, newFailureCount)
s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount)
failureCount = newFailureCount
}
if failureCount >= s.failureThreshold {
s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold)
var lastErr error
for _, notifier := range s.notifiers {
addr := getAddressForNotifier(notifier.Type(), user.Contact)
if addr == "" {
// 记录跳过通知
s.recordNotificationAttempt(userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type()))
continue
}
if err := notifier.Send(content, addr); err == nil {
// 记录成功通知
s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil)
s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type())
s.failureCounters.Store(userID, 0)
return nil
}
// 记录失败通知
s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err)
lastErr = err
s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err)
}
return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr)
}
return nil
}
// SendTestMessage 实现了手动发送测试消息的功能
func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error {
user, err := s.userRepo.FindByID(userID)
if err != nil {
s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err)
return fmt.Errorf("查找用户失败: %w", err)
}
notifier, ok := s.notifiers[notifierType]
if !ok {
s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType)
return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType)
}
addr := getAddressForNotifier(notifierType, user.Contact)
if addr == "" {
s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType)
// 记录跳过通知
s.recordNotificationAttempt(userID, notifierType, notify.AlarmContent{
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: zap.InfoLevel,
Timestamp: time.Now(),
}, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType))
return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)
}
testContent := notify.AlarmContent{
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: zap.InfoLevel,
Timestamp: time.Now(),
}
s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr)
err = notifier.Send(testContent, addr)
if err != nil {
s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err)
// 记录失败通知
s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusFailed, err)
return err
}
s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType)
// 记录成功通知
s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil)
return nil
}
// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址
func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string {
switch notifierType {
case notify.NotifierTypeSMTP:
return contact.Email
case notify.NotifierTypeWeChat:
return contact.WeChat
case notify.NotifierTypeLark:
return contact.Feishu
case notify.NotifierTypeLog:
return "log" // LogNotifier不需要具体的地址但为了函数签名一致性返回一个无意义的非空字符串以绕过配置存在检查
default:
return ""
}
}
// recordNotificationAttempt 记录一次通知发送尝试的结果
// userID: 接收通知的用户ID
// notifierType: 使用的通知器类型
// content: 通知内容
// toAddress: 实际发送到的地址
// status: 发送尝试的状态 (成功、失败、跳过)
// err: 如果发送失败,记录的错误信息
func (s *failoverService) recordNotificationAttempt(
userID uint,
notifierType notify.NotifierType,
content notify.AlarmContent,
toAddress string,
status models.NotificationStatus,
err error,
) {
errorMessage := ""
if err != nil {
errorMessage = err.Error()
}
notification := &models.Notification{
NotifierType: notifierType,
UserID: userID,
Title: content.Title,
Message: content.Message,
Level: models.LogLevel(content.Level),
AlarmTimestamp: content.Timestamp,
ToAddress: toAddress,
Status: status,
ErrorMessage: errorMessage,
}
if saveErr := s.notificationRepo.Create(notification); saveErr != nil {
s.log.Errorw("无法保存通知发送记录到数据库",
"userID", userID,
"notifierType", notifierType,
"status", status,
"originalError", errorMessage,
"saveError", saveErr,
)
}
}

View File

@@ -41,6 +41,9 @@ type Config struct {
// LoraMesh LoraMesh配置 // LoraMesh LoraMesh配置
LoraMesh LoraMeshConfig `yaml:"lora_mesh"` LoraMesh LoraMeshConfig `yaml:"lora_mesh"`
// Notify 通知服务配置
Notify NotifyConfig `yaml:"notify"`
} }
// AppConfig 代表应用基础配置 // AppConfig 代表应用基础配置
@@ -158,6 +161,40 @@ type LoraMeshConfig struct {
ReassemblyTimeout int `yaml:"reassembly_timeout"` ReassemblyTimeout int `yaml:"reassembly_timeout"`
} }
// NotifyConfig 包含了所有与通知服务相关的配置
type NotifyConfig struct {
Primary string `yaml:"primary"` // 首选通知渠道 (e.g., "邮件", "企业微信", "飞书", "日志")
FailureThreshold int `yaml:"failureThreshold"` // 连续失败多少次后触发广播模式
SMTP SMTPConfig `yaml:"smtp"`
WeChat WeChatConfig `yaml:"wechat"`
Lark LarkConfig `yaml:"lark"`
}
// SMTPConfig SMTP邮件配置
type SMTPConfig struct {
Enabled bool `yaml:"enabled"`
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Sender string `yaml:"sender"`
}
// WeChatConfig 企业微信应用配置
type WeChatConfig struct {
Enabled bool `yaml:"enabled"`
CorpID string `yaml:"corpID"`
AgentID string `yaml:"agentID"`
Secret string `yaml:"secret"`
}
// LarkConfig 飞书应用配置
type LarkConfig struct {
Enabled bool `yaml:"enabled"`
AppID string `yaml:"appID"`
AppSecret string `yaml:"appSecret"`
}
// NewConfig 创建并返回一个新的配置实例 // NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config { func NewConfig() *Config {
// 默认值可以在这里设置,但我们优先使用配置文件中的值 // 默认值可以在这里设置,但我们优先使用配置文件中的值

View File

@@ -171,6 +171,7 @@ func (ps *PostgresStorage) creatingHyperTable() error {
{models.PigSickLog{}, "happened_at"}, {models.PigSickLog{}, "happened_at"},
{models.PigPurchase{}, "purchase_date"}, {models.PigPurchase{}, "purchase_date"},
{models.PigSale{}, "sale_date"}, {models.PigSale{}, "sale_date"},
{models.Notification{}, "alarm_timestamp"},
} }
for _, table := range tablesToConvert { for _, table := range tablesToConvert {
@@ -211,6 +212,7 @@ func (ps *PostgresStorage) applyCompressionPolicies() error {
{models.PigSickLog{}, "pig_batch_id"}, {models.PigSickLog{}, "pig_batch_id"},
{models.PigPurchase{}, "pig_batch_id"}, {models.PigPurchase{}, "pig_batch_id"},
{models.PigSale{}, "pig_batch_id"}, {models.PigSale{}, "pig_batch_id"},
{models.Notification{}, "user_id"},
} }
for _, policy := range policies { for _, policy := range policies {

View File

@@ -59,6 +59,9 @@ func GetAllModels() []interface{} {
// Medication Models // Medication Models
&Medication{}, &Medication{},
&MedicationLog{}, &MedicationLog{},
// Notification Models
&Notification{},
} }
} }

View File

@@ -0,0 +1,77 @@
package models
import (
"database/sql/driver"
"errors"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
"gorm.io/gorm"
)
// NotificationStatus 定义了通知发送尝试的状态枚举。
type NotificationStatus string
const (
NotificationStatusSuccess NotificationStatus = "发送成功" // 通知已成功发送
NotificationStatusFailed NotificationStatus = "发送失败" // 通知发送失败
NotificationStatusSkipped NotificationStatus = "已跳过" // 通知因某些原因被跳过(例如:用户未配置联系方式)
)
// LogLevel is a custom type for zapcore.Level to handle database scanning and valuing.
type LogLevel zapcore.Level
// Scan implements the sql.Scanner interface.
func (l *LogLevel) Scan(value interface{}) error {
var s string
switch v := value.(type) {
case []byte:
s = string(v)
case string:
s = v
default:
return errors.New("LogLevel的类型无效")
}
var zl zapcore.Level
if err := zl.UnmarshalText([]byte(s)); err != nil {
return err
}
*l = LogLevel(zl)
return nil
}
// Value implements the driver.Valuer interface.
func (l LogLevel) Value() (driver.Value, error) {
return (zapcore.Level)(l).String(), nil
}
// Notification 表示已发送或尝试发送的通知记录。
type Notification struct {
gorm.Model
// NotifierType 通知器类型 (例如:"邮件", "企业微信", "飞书", "日志")
NotifierType notify.NotifierType `gorm:"type:varchar(20);not null;index" json:"notifier_type"`
// UserID 接收通知的用户ID用于追溯通知记录到特定用户
UserID uint `gorm:"index" json:"user_id"` // 增加 UserID 字段,并添加索引
// Title 通知标题
Title string `gorm:"type:varchar(255);not null" json:"title"`
// Message 通知内容
Message string `gorm:"type:text;not null" json:"message"`
// Level 通知级别 (例如INFO, WARN, ERROR)
Level LogLevel `gorm:"type:varchar(10);not null" json:"level"`
// AlarmTimestamp 通知内容生成时的时间戳,与 ID 构成复合主键
AlarmTimestamp time.Time `gorm:"primaryKey;not null" json:"alarm_timestamp"`
// ToAddress 接收地址 (例如:邮箱地址, 企业微信ID, 日志标识符)
ToAddress string `gorm:"type:varchar(255);not null" json:"to_address"`
// Status 通知发送尝试的状态 (例如:"待发送", "发送成功", "发送失败", "已跳过")
Status NotificationStatus `gorm:"type:varchar(20);not null;default:'待发送'" json:"status"`
// ErrorMessage 如果通知发送失败,此字段存储错误信息
ErrorMessage string `gorm:"type:text" json:"error_message"`
}
// TableName 指定 Notification 模型的表名。
func (Notification) TableName() string {
return "notifications"
}

View File

@@ -0,0 +1,193 @@
package notify
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
const (
// 飞书获取 tenant_access_token 的 API 地址
larkGetTokenURL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
// 飞书发送消息的 API 地址
larkSendMessageURL = "https://open.feishu.cn/open-apis/im/v1/messages"
)
// larkNotifier 实现了 Notifier 接口,用于通过飞书自建应用发送私聊消息。
type larkNotifier struct {
appID string // 应用 ID
appSecret string // 应用密钥
// 用于线程安全地管理 tenant_access_token
mu sync.Mutex
accessToken string
tokenExpiresAt time.Time
}
// NewLarkNotifier 创建一个新的 larkNotifier 实例。
// 调用者需要注入飞书应用的 AppID 和 AppSecret。
func NewLarkNotifier(appID, appSecret string) Notifier {
return &larkNotifier{
appID: appID,
appSecret: appSecret,
}
}
// Send 向指定用户发送一条飞书消息卡片。
// toAddr 参数是接收者的邮箱地址。
func (l *larkNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 获取有效的 tenant_access_token
token, err := l.getAccessToken()
if err != nil {
return err
}
// 2. 构建消息卡片 JSON
// 飞书消息卡片结构复杂,这里构建一个简单的 Markdown 文本卡片
cardContent := map[string]interface{}{
"config": map[string]bool{
"wide_screen_mode": true,
},
"elements": []map[string]interface{}{
{
"tag": "div",
"text": map[string]string{
"tag": "lark_md",
"content": fmt.Sprintf("## %s\n**级别**: %s\n**时间**: %s\n\n%s",
content.Title,
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
),
},
},
},
}
cardJSON, err := json.Marshal(cardContent)
if err != nil {
return fmt.Errorf("序列化飞书卡片内容失败: %w", err)
}
// 3. 构建请求的 JSON Body
payload := larkMessagePayload{
ReceiveID: toAddr,
ReceiveIDType: "email", // 指定接收者类型为邮箱
MsgType: "interactive", // 消息卡片类型
Content: string(cardJSON),
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("序列化飞书消息失败: %w", err)
}
// 4. 发送 HTTP POST 请求
url := fmt.Sprintf("%s?receive_id_type=email", larkSendMessageURL) // 在 URL 中指定 receive_id_type
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
return fmt.Errorf("创建飞书请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token) // 携带 access_token
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送飞书通知失败: %w", err)
}
defer resp.Body.Close()
// 5. 检查响应
var response larkResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("解析飞书响应失败: %w", err)
}
if response.Code != 0 {
return fmt.Errorf("飞书API返回错误: code=%d, msg=%s", response.Code, response.Msg)
}
return nil
}
// getAccessToken 获取并缓存 tenant_access_token处理了线程安全和自动刷新。
func (l *larkNotifier) getAccessToken() (string, error) {
l.mu.Lock()
defer l.mu.Unlock()
// 如果 token 存在且有效期还有5分钟以上则直接返回缓存的 token
if l.accessToken != "" && time.Now().Before(l.tokenExpiresAt.Add(-5*time.Minute)) {
return l.accessToken, nil
}
// 否则,重新获取 token
payload := map[string]string{
"app_id": l.appID,
"app_secret": l.appSecret,
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("序列化获取 token 请求体失败: %w", err)
}
req, err := http.NewRequest("POST", larkGetTokenURL, bytes.NewBuffer(jsonBytes))
if err != nil {
return "", fmt.Errorf("创建获取 token 请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("获取 tenant_access_token 请求失败: %w", err)
}
defer resp.Body.Close()
var tokenResp larkTokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("解析 tenant_access_token 响应失败: %w", err)
}
if tokenResp.Code != 0 {
return "", fmt.Errorf("获取 tenant_access_token API 返回错误: code=%d, msg=%s", tokenResp.Code, tokenResp.Msg)
}
// 缓存新的 token 和过期时间
l.accessToken = tokenResp.TenantAccessToken
l.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.Expire) * time.Second)
return l.accessToken, nil
}
// Type 返回通知器的类型
func (l *larkNotifier) Type() NotifierType {
return NotifierTypeLark
}
// --- API 数据结构 ---
// larkTokenResponse 是获取 tenant_access_token API 的响应结构体
type larkTokenResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
TenantAccessToken string `json:"tenant_access_token"`
Expire int `json:"expire"` // 有效期,单位秒
}
// larkMessagePayload 是发送消息 API 的请求体结构
type larkMessagePayload struct {
ReceiveID string `json:"receive_id"`
ReceiveIDType string `json:"receive_id_type"`
MsgType string `json:"msg_type"`
Content string `json:"content"` // 对于 interactive 消息,这里是卡片的 JSON 字符串
}
// larkResponse 是飞书 API 的通用响应结构体
type larkResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
}

View File

@@ -0,0 +1,37 @@
package notify
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
)
// logNotifier 实现了 Notifier 接口,用于将告警信息记录到日志中。
type logNotifier struct {
logger *logs.Logger
}
// NewLogNotifier 创建一个新的 logNotifier 实例。
// 它接收一个日志记录器,用于实际的日志输出。
func NewLogNotifier(logger *logs.Logger) Notifier {
return &logNotifier{
logger: logger,
}
}
// Send 将告警内容以结构化的方式记录到日志中。
// toAddr 参数在这里表示告警的预期接收者地址,也会被记录。
func (l *logNotifier) Send(content AlarmContent, toAddr string) error {
l.logger.Infow("告警已记录到日志",
"notifierType", NotifierTypeLog,
"title", content.Title,
"message", content.Message,
"level", content.Level.String(),
"timestamp", content.Timestamp.Format(DefaultTimeFormat),
"toAddr", toAddr,
)
return nil // 记录日志操作本身不应失败
}
// Type 返回通知器的类型。
func (l *logNotifier) Type() NotifierType {
return NotifierTypeLog
}

View File

@@ -0,0 +1,44 @@
package notify
import (
"time"
"go.uber.org/zap/zapcore"
)
// DefaultTimeFormat 定义了所有通知中统一使用的时间格式。
const DefaultTimeFormat = "2006-01-02 15:04:05"
// NotifierType 定义了通知器的类型。
type NotifierType string
const (
// NotifierTypeSMTP 表示 SMTP 邮件通知器。
NotifierTypeSMTP NotifierType = "邮件"
// NotifierTypeWeChat 表示企业微信通知器。
NotifierTypeWeChat NotifierType = "企业微信"
// NotifierTypeLark 表示飞书通知器。
NotifierTypeLark NotifierType = "飞书"
// NotifierTypeLog 表示日志通知器,作为最终的告警记录渠道。
NotifierTypeLog NotifierType = "日志"
)
// AlarmContent 定义了通知的内容
type AlarmContent struct {
// 通知标题
Title string
// 通知信息
Message string
// 通知级别
Level zapcore.Level
// 通知时间
Timestamp time.Time
}
// Notifier 定义了通知发送器的接口
type Notifier interface {
// Send 发送通知
Send(content AlarmContent, toAddr string) error
// Type 返回通知器的类型
Type() NotifierType
}

View File

@@ -0,0 +1,73 @@
package notify
import (
"fmt"
"net/smtp"
"strings"
)
// smtpNotifier 实现了 Notifier 接口,用于通过 SMTP 发送邮件通知。
type smtpNotifier struct {
host string // SMTP 服务器地址
port int // SMTP 服务器端口
username string // 发件人邮箱地址
password string // 发件人邮箱授权码
sender string // 发件人名称或地址,显示在邮件的 \"From\" 字段
}
// NewSMTPNotifier 创建一个新的 smtpNotifier 实例。
// 调用者需要注入 SMTP 相关的配置。
func NewSMTPNotifier(host string, port int, username, password, sender string) Notifier {
return &smtpNotifier{
host: host,
port: port,
username: username,
password: password,
sender: sender,
}
}
// Send 使用 net/smtp 包发送一封邮件。
func (s *smtpNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 设置认证信息
auth := smtp.PlainAuth("", s.username, s.password, s.host)
// 2. 构建邮件内容
// 邮件头
subject := content.Title
contentType := "Content-Type: text/plain; charset=UTF-8"
fromHeader := fmt.Sprintf("From: %s", s.sender)
toHeader := fmt.Sprintf("To: %s", toAddr)
subjectHeader := fmt.Sprintf("Subject: %s", subject)
// 邮件正文
body := fmt.Sprintf("级别: %s\n时间: %s\n\n%s",
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
)
// 拼接完整的邮件报文
msg := strings.Join([]string{
fromHeader,
toHeader,
subjectHeader,
contentType,
"", // 邮件头和正文之间的空行
body,
}, "\r\n")
// 3. 发送邮件
addr := fmt.Sprintf("%s:%d", s.host, s.port)
err := smtp.SendMail(addr, auth, s.username, []string{toAddr}, []byte(msg))
if err != nil {
return fmt.Errorf("发送邮件失败: %w", err)
}
return nil
}
// Type 返回通知器的类型
func (s *smtpNotifier) Type() NotifierType {
return NotifierTypeSMTP
}

View File

@@ -0,0 +1,169 @@
package notify
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
)
const (
// 获取 access_token 的 API 地址
getTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
// 发送应用消息的 API 地址
sendMessageURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
)
// wechatNotifier 实现了 Notifier 接口,用于通过企业微信自建应用发送私聊消息。
type wechatNotifier struct {
corpID string // 企业ID (CorpID)
agentID string // 应用ID (AgentID)
secret string // 应用密钥 (Secret)
// 用于线程安全地管理 access_token
mu sync.Mutex
accessToken string
tokenExpiresAt time.Time
}
// NewWechatNotifier 创建一个新的 wechatNotifier 实例。
// 调用者需要注入企业微信应用的 CorpID, AgentID 和 Secret。
func NewWechatNotifier(corpID, agentID, secret string) Notifier {
return &wechatNotifier{
corpID: corpID,
agentID: agentID,
secret: secret,
}
}
// Send 向指定用户发送一条 markdown 格式的私聊消息。
// toAddr 参数是接收者的 UserID 列表,用逗号或竖线分隔。
func (w *wechatNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 获取有效的 access_token
token, err := w.getAccessToken()
if err != nil {
return err
}
// 2. 构建 markdown 内容
markdownContent := fmt.Sprintf("## %s\n> 级别: <font color=\"warning\">%s</font>\n> 时间: %s\n\n%s",
content.Title,
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
)
// 3. 构建请求的 JSON Body
// 将逗号分隔的 toAddr 转换为竖线分隔,以符合 API 要求
userList := strings.ReplaceAll(toAddr, ",", "|")
payload := wechatMessagePayload{
ToUser: userList,
MsgType: "markdown",
AgentID: w.agentID,
Markdown: struct {
Content string `json:"content"`
}{
Content: markdownContent,
},
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("序列化企业微信消息失败: %w", err)
}
// 4. 发送 HTTP POST 请求
url := fmt.Sprintf("%s?access_token=%s", sendMessageURL, token)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
return fmt.Errorf("创建企业微信请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送企业微信通知失败: %w", err)
}
defer resp.Body.Close()
// 5. 检查响应
var response wechatResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("解析企业微信响应失败: %w", err)
}
if response.ErrCode != 0 {
return fmt.Errorf("企业微信API返回错误: code=%d, msg=%s", response.ErrCode, response.ErrMsg)
}
return nil
}
// getAccessToken 获取并缓存 access_token处理了线程安全和自动刷新。
func (w *wechatNotifier) getAccessToken() (string, error) {
w.mu.Lock()
defer w.mu.Unlock()
// 如果 token 存在且有效期还有5分钟以上则直接返回缓存的 token
if w.accessToken != "" && time.Now().Before(w.tokenExpiresAt.Add(-5*time.Minute)) {
return w.accessToken, nil
}
// 否则,重新获取 token
url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", getTokenURL, w.corpID, w.secret)
resp, err := http.Get(url)
if err != nil {
return "", fmt.Errorf("获取 access_token 请求失败: %w", err)
}
defer resp.Body.Close()
var tokenResp tokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("解析 access_token 响应失败: %w", err)
}
if tokenResp.ErrCode != 0 {
return "", fmt.Errorf("获取 access_token API 返回错误: code=%d, msg=%s", tokenResp.ErrCode, tokenResp.ErrMsg)
}
// 缓存新的 token 和过期时间
w.accessToken = tokenResp.AccessToken
w.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second)
return w.accessToken, nil
}
// Type 返回通知器的类型
func (w *wechatNotifier) Type() NotifierType {
return NotifierTypeWeChat
}
// --- API 数据结构 ---
// tokenResponse 是获取 access_token API 的响应结构体
type tokenResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
// wechatMessagePayload 是发送应用消息 API 的请求体结构
type wechatMessagePayload struct {
ToUser string `json:"touser"`
MsgType string `json:"msgtype"`
AgentID string `json:"agentid"`
Markdown struct {
Content string `json:"content"`
} `json:"markdown"`
}
// wechatResponse 是企业微信 API 的通用响应结构体
type wechatResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
}

View File

@@ -0,0 +1,111 @@
package repository
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
"gorm.io/gorm"
)
// NotificationListOptions 定义了查询通知列表时的可选参数
type NotificationListOptions struct {
UserID *uint // 按用户ID过滤
NotifierType *notify.NotifierType // 按通知器类型过滤
Status *models.NotificationStatus // 按通知状态过滤 (例如:"success", "failed")
Level *zapcore.Level // 按通知等级过滤 (例如:"info", "warning", "error")
StartTime *time.Time // 通知内容生成时间范围 - 开始时间 (对应 AlarmTimestamp)
EndTime *time.Time // 通知内容生成时间范围 - 结束时间 (对应 AlarmTimestamp)
OrderBy string // 排序字段,例如 "alarm_timestamp DESC"
}
// NotificationRepository 定义了与通知记录相关的数据库操作接口。
type NotificationRepository interface {
// Create 将一条新的通知记录插入数据库。
Create(notification *models.Notification) error
// CreateInTx 在给定的事务中插入一条新的通知记录。
CreateInTx(tx *gorm.DB, notification *models.Notification) error
// BatchCreate 批量插入多条通知记录。
BatchCreate(notifications []*models.Notification) error
// List 支持分页和过滤的通知列表查询。
// 返回通知列表、总记录数和错误。
List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error)
}
// gormNotificationRepository 是 NotificationRepository 的 GORM 实现。
type gormNotificationRepository struct {
db *gorm.DB
}
// NewGormNotificationRepository 创建一个新的 NotificationRepository GORM 实现实例。
func NewGormNotificationRepository(db *gorm.DB) NotificationRepository {
return &gormNotificationRepository{db: db}
}
// Create 将一条新的通知记录插入数据库。
func (r *gormNotificationRepository) Create(notification *models.Notification) error {
return r.db.Create(notification).Error
}
// CreateInTx 在给定的事务中插入一条新的通知记录。
func (r *gormNotificationRepository) CreateInTx(tx *gorm.DB, notification *models.Notification) error {
return tx.Create(notification).Error
}
// BatchCreate 批量插入多条通知记录。
func (r *gormNotificationRepository) BatchCreate(notifications []*models.Notification) error {
// GORM 的 Create 方法在传入切片时会自动进行批量插入
return r.db.Create(&notifications).Error
}
// List 实现了分页和过滤查询通知记录的功能
func (r *gormNotificationRepository) List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) {
// --- 校验分页参数 ---
if page <= 0 || pageSize <= 0 {
return nil, 0, ErrInvalidPagination // 复用已定义的错误
}
var results []models.Notification
var total int64
query := r.db.Model(&models.Notification{})
// --- 应用过滤条件 ---
if opts.UserID != nil {
query = query.Where("user_id = ?", *opts.UserID)
}
if opts.NotifierType != nil {
query = query.Where("notifier_type = ?", *opts.NotifierType)
}
if opts.Status != nil {
query = query.Where("status = ?", *opts.Status)
}
if opts.Level != nil {
query = query.Where("level = ?", opts.Level.String())
}
if opts.StartTime != nil {
query = query.Where("alarm_timestamp >= ?", *opts.StartTime)
}
if opts.EndTime != nil {
query = query.Where("alarm_timestamp <= ?", *opts.EndTime)
}
// --- 计算总数 ---
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
// --- 应用排序条件 ---
orderBy := "alarm_timestamp DESC" // 默认按时间倒序
if opts.OrderBy != "" {
orderBy = opts.OrderBy
}
query = query.Order(orderBy)
// --- 分页 ---
offset := (page - 1) * pageSize
err := query.Limit(pageSize).Offset(offset).Find(&results).Error
return results, total, err
}

View File

@@ -0,0 +1,6 @@
package repository
import "errors"
// ErrInvalidPagination 表示分页参数无效
var ErrInvalidPagination = errors.New("无效的分页参数page和pageSize必须为大于0")

View File

@@ -1,16 +1,12 @@
package repository package repository
import ( import (
"errors"
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm" "gorm.io/gorm"
) )
// ErrInvalidPagination 表示分页参数无效
var ErrInvalidPagination = errors.New("无效的分页参数page和pageSize必须为大于0")
// SensorDataListOptions 定义了查询传感器数据列表时的可选参数 // SensorDataListOptions 定义了查询传感器数据列表时的可选参数
type SensorDataListOptions struct { type SensorDataListOptions struct {
DeviceID *uint DeviceID *uint

View File

@@ -13,6 +13,7 @@ type UserRepository interface {
FindByUsername(username string) (*models.User, error) FindByUsername(username string) (*models.User, error)
FindByID(id uint) (*models.User, error) FindByID(id uint) (*models.User, error)
FindUserForLogin(identifier string) (*models.User, error) FindUserForLogin(identifier string) (*models.User, error)
FindAll() ([]*models.User, error)
} }
// gormUserRepository 是 UserRepository 的 GORM 实现 // gormUserRepository 是 UserRepository 的 GORM 实现
@@ -64,3 +65,12 @@ func (r *gormUserRepository) FindByID(id uint) (*models.User, error) {
} }
return &user, nil return &user, nil
} }
// FindAll 返回数据库中的所有用户
func (r *gormUserRepository) FindAll() ([]*models.User, error) {
var users []*models.User
if err := r.db.Where("1 = 1").Find(&users).Error; err != nil {
return nil, err
}
return users, nil
}