Compare commits

...

14 Commits

Author SHA1 Message Date
e66ee67cf7 Merge pull request 'issue_42' (#44) from issue_42 into main
Reviewed-on: #44
2025-10-26 15:57:04 +08:00
40eb57ee47 重构core包 2025-10-26 15:48:38 +08:00
6a8e8f1f7d 实现定时采集 2025-10-26 15:10:38 +08:00
5c83c19bce Merge pull request 'issue_22' (#41) from issue_22 into main
Reviewed-on: #41
2025-10-25 15:42:19 +08:00
86c9073da8 修bug 2025-10-25 15:41:49 +08:00
43c1839345 实现controller 2025-10-25 15:04:47 +08:00
f62cc1c4a9 实现service层 2025-10-25 14:36:24 +08:00
f6d2069e1a 发送通知时写入数据库 2025-10-25 14:17:17 +08:00
f33e14f60f 发送通知时写入数据库 2025-10-25 14:15:17 +08:00
d6f275b2d1 定义仓库方法 2025-10-25 13:35:43 +08:00
d8de5a68eb 定义通知model 2025-10-25 13:28:19 +08:00
bd8729d473 中文枚举 2025-10-24 21:38:52 +08:00
3fd97aa43f 日志发送逻辑及测试消息发送接口 2025-10-24 21:24:48 +08:00
9d6876684b 实现飞书/微信/邮件发送通知 2025-10-24 20:33:15 +08:00
33 changed files with 2853 additions and 229 deletions

115
config.example.yml Normal file
View File

@@ -0,0 +1,115 @@
# 应用基础配置
app:
name: "PigFarmController" # 应用名称
version: "1.0.0" # 应用版本
jwt_secret: "your_jwt_secret_key_here" # JWT 签名密钥,请务必修改为强密码
# 服务器配置
server:
port: 8080 # 服务器监听端口
mode: "debug" # 运行模式: debug, release, test
# 日志配置
log:
level: "info" # 日志级别: debug, info, warn, error, dpanic, panic, fatal
format: "console" # 日志输出格式: console, json
enable_file: true # 是否同时输出到文件
file_path: "app_logs/pig_farm_controller.log" # 日志文件路径
max_size: 100 # 单个日志文件最大大小 (MB)
max_backups: 7 # 最多保留的旧日志文件数量
max_age: 7 # 最多保留的旧日志文件天数
compress: true # 是否压缩旧日志文件
# 数据库配置
database:
host: "localhost" # 数据库主机地址
port: 5432 # 数据库端口
username: "postgres" # 数据库用户名
password: "your_db_password" # 数据库密码
dbname: "pig_farm_controller_db" # 数据库名称
sslmode: "disable" # SSL模式: disable, require, verify-ca, verify-full
is_timescaledb: false # 是否为 TimescaleDB
max_open_conns: 100 # 最大开放连接数
max_idle_conns: 10 # 最大空闲连接数
conn_max_lifetime: 300 # 连接最大生命周期 (秒)
# WebSocket配置
websocket:
timeout: 60 # WebSocket请求超时时间 (秒)
heartbeat_interval: 30 # 心跳检测间隔 (秒)
# 心跳配置
heartbeat:
interval: 10 # 心跳间隔 (秒)
concurrency: 5 # 请求并发数
# ChirpStack API 配置
chirp_stack:
api_host: "http://localhost:8080" # ChirpStack API 主机地址
api_token: "your_chirpstack_api_token" # ChirpStack API Token
fport: 10 # ChirpStack FPort
api_timeout: 10 # ChirpStack API请求超时时间(秒)
# 等待设备上行响应的超时时间(秒)。
# 对于LoRaWAN这种延迟较高的网络建议设置为5分钟 (300秒) 或更长。
collection_request_timeout: 300
# 任务调度配置
task:
interval: 5 # 任务调度间隔 (秒)
num_workers: 5 # 任务执行器并发工作数量
# Lora 配置
lora:
mode: "lora_mesh" # Lora 运行模式: lora_wan, lora_mesh
# Lora Mesh 配置
lora_mesh:
# 主节点串口
uart_port: "COM7"
# LoRa模块的通信波特率
baud_rate: 9600
# 等待LoRa模块AT指令响应的超时时间(ms)
timeout: 50
# LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
# e.g.
# EC: 接收端只会接收到消息, 不会接收到请求头
# e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
# (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
# 接收: 48 65 6c 6c 6f ("Hello")
# ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。
# e.g. 发送: ED 05 12 34 01 00 01 02 03
# (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块))
# 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾)
lora_mesh_mode: "ED"
# 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
max_chunk_size: 238
#分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间
# 还没收到完整的包,则认为接收失败。
reassembly_timeout: 30
# 通知服务配置
notify:
primary: "日志" # 首选通知渠道: "邮件", "企业微信", "飞书", "日志" (如果其他渠道未启用,"日志" 会自动成为首选)
failureThreshold: 2 # 连续失败多少次后触发广播模式
smtp:
enabled: false # 是否启用 SMTP 邮件通知
host: "smtp.example.com" # SMTP 服务器地址
port: 587 # SMTP 服务器端口
username: "your_email@example.com" # 发件人邮箱地址
password: "your_email_password" # 发件人邮箱授权码或密码
sender: "PigFarm Alarm <no-reply@example.com>" # 发件人名称和地址
wechat:
enabled: false # 是否启用企业微信通知
corpID: "wwxxxxxxxxxxxx" # 企业ID (CorpID)
agentID: "1000001" # 应用ID (AgentID)
secret: "your_wechat_app_secret" # 应用密钥 (Secret)
lark:
enabled: false # 是否启用飞书通知
appID: "cli_xxxxxxxxxx" # 应用 ID
appSecret: "your_lark_app_secret" # 应用密钥
# 定时采集配置
collection:
interval: 300 # 采集间隔 (秒)

View File

@@ -86,4 +86,8 @@ lora_mesh:
max_chunk_size: 238
#分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间
# 还没收到完整的包,则认为接收失败。
reassembly_timeout: 30
reassembly_timeout: 30
# 定时采集配置
collection:
interval: 300 # 采集间隔 (秒)

View File

@@ -975,6 +975,149 @@ const docTemplate = `{
}
}
},
"/api/v1/monitor/notifications": {
"get": {
"security": [
{
"BearerAuth": []
}
],
"description": "根据提供的过滤条件,分页获取通知列表",
"produces": [
"application/json"
],
"tags": [
"数据监控"
],
"summary": "批量查询通知",
"parameters": [
{
"type": "string",
"name": "end_time",
"in": "query"
},
{
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"type": "integer",
"format": "int32",
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
],
"name": "level",
"in": "query"
},
{
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"type": "string",
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
],
"name": "notifier_type",
"in": "query"
},
{
"type": "string",
"name": "order_by",
"in": "query"
},
{
"type": "integer",
"name": "page",
"in": "query"
},
{
"type": "integer",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"name": "start_time",
"in": "query"
},
{
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"type": "string",
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
],
"name": "status",
"in": "query"
},
{
"type": "integer",
"name": "user_id",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/dto.ListNotificationResponse"
}
}
}
]
}
}
}
}
},
"/api/v1/monitor/pending-collections": {
"get": {
"security": [
@@ -3923,6 +4066,64 @@ const docTemplate = `{
}
}
}
},
"/api/v1/users/{id}/notifications/test": {
"post": {
"security": [
{
"BearerAuth": []
}
],
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"用户管理"
],
"summary": "发送测试通知",
"parameters": [
{
"type": "integer",
"description": "用户ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "请求体",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/dto.SendTestNotificationRequest"
}
}
],
"responses": {
"200": {
"description": "成功响应",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"type": "string"
}
}
}
]
}
}
}
}
}
},
"definitions": {
@@ -4431,6 +4632,20 @@ const docTemplate = `{
}
}
},
"dto.ListNotificationResponse": {
"type": "object",
"properties": {
"list": {
"type": "array",
"items": {
"$ref": "#/definitions/dto.NotificationDTO"
}
},
"pagination": {
"$ref": "#/definitions/dto.PaginationDTO"
}
}
},
"dto.ListPendingCollectionResponse": {
"type": "object",
"properties": {
@@ -4739,6 +4954,47 @@ const docTemplate = `{
}
}
},
"dto.NotificationDTO": {
"type": "object",
"properties": {
"alarm_timestamp": {
"type": "string"
},
"created_at": {
"type": "string"
},
"error_message": {
"type": "string"
},
"id": {
"type": "integer"
},
"level": {
"$ref": "#/definitions/zapcore.Level"
},
"message": {
"type": "string"
},
"notifier_type": {
"$ref": "#/definitions/notify.NotifierType"
},
"status": {
"$ref": "#/definitions/models.NotificationStatus"
},
"title": {
"type": "string"
},
"to_address": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"user_id": {
"type": "integer"
}
}
},
"dto.PaginationDTO": {
"type": "object",
"properties": {
@@ -5591,6 +5847,22 @@ const docTemplate = `{
}
}
},
"dto.SendTestNotificationRequest": {
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"description": "Type 指定要测试的通知渠道",
"allOf": [
{
"$ref": "#/definitions/notify.NotifierType"
}
]
}
}
},
"dto.SensorDataDTO": {
"type": "object",
"properties": {
@@ -6201,6 +6473,29 @@ const docTemplate = `{
"ReasonTypeHealthCare"
]
},
"models.NotificationStatus": {
"type": "string",
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
]
},
"models.PenStatus": {
"type": "string",
"enum": [
@@ -6530,6 +6825,51 @@ const docTemplate = `{
"$ref": "#/definitions/models.SensorType"
}
}
},
"notify.NotifierType": {
"type": "string",
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
]
},
"zapcore.Level": {
"type": "integer",
"format": "int32",
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
]
}
},
"securityDefinitions": {

View File

@@ -967,6 +967,149 @@
}
}
},
"/api/v1/monitor/notifications": {
"get": {
"security": [
{
"BearerAuth": []
}
],
"description": "根据提供的过滤条件,分页获取通知列表",
"produces": [
"application/json"
],
"tags": [
"数据监控"
],
"summary": "批量查询通知",
"parameters": [
{
"type": "string",
"name": "end_time",
"in": "query"
},
{
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"type": "integer",
"format": "int32",
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
],
"name": "level",
"in": "query"
},
{
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"type": "string",
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
],
"name": "notifier_type",
"in": "query"
},
{
"type": "string",
"name": "order_by",
"in": "query"
},
{
"type": "integer",
"name": "page",
"in": "query"
},
{
"type": "integer",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"name": "start_time",
"in": "query"
},
{
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"type": "string",
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
],
"name": "status",
"in": "query"
},
{
"type": "integer",
"name": "user_id",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"$ref": "#/definitions/dto.ListNotificationResponse"
}
}
}
]
}
}
}
}
},
"/api/v1/monitor/pending-collections": {
"get": {
"security": [
@@ -3915,6 +4058,64 @@
}
}
}
},
"/api/v1/users/{id}/notifications/test": {
"post": {
"security": [
{
"BearerAuth": []
}
],
"description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"用户管理"
],
"summary": "发送测试通知",
"parameters": [
{
"type": "integer",
"description": "用户ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "请求体",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/dto.SendTestNotificationRequest"
}
}
],
"responses": {
"200": {
"description": "成功响应",
"schema": {
"allOf": [
{
"$ref": "#/definitions/controller.Response"
},
{
"type": "object",
"properties": {
"data": {
"type": "string"
}
}
}
]
}
}
}
}
}
},
"definitions": {
@@ -4423,6 +4624,20 @@
}
}
},
"dto.ListNotificationResponse": {
"type": "object",
"properties": {
"list": {
"type": "array",
"items": {
"$ref": "#/definitions/dto.NotificationDTO"
}
},
"pagination": {
"$ref": "#/definitions/dto.PaginationDTO"
}
}
},
"dto.ListPendingCollectionResponse": {
"type": "object",
"properties": {
@@ -4731,6 +4946,47 @@
}
}
},
"dto.NotificationDTO": {
"type": "object",
"properties": {
"alarm_timestamp": {
"type": "string"
},
"created_at": {
"type": "string"
},
"error_message": {
"type": "string"
},
"id": {
"type": "integer"
},
"level": {
"$ref": "#/definitions/zapcore.Level"
},
"message": {
"type": "string"
},
"notifier_type": {
"$ref": "#/definitions/notify.NotifierType"
},
"status": {
"$ref": "#/definitions/models.NotificationStatus"
},
"title": {
"type": "string"
},
"to_address": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"user_id": {
"type": "integer"
}
}
},
"dto.PaginationDTO": {
"type": "object",
"properties": {
@@ -5583,6 +5839,22 @@
}
}
},
"dto.SendTestNotificationRequest": {
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"description": "Type 指定要测试的通知渠道",
"allOf": [
{
"$ref": "#/definitions/notify.NotifierType"
}
]
}
}
},
"dto.SensorDataDTO": {
"type": "object",
"properties": {
@@ -6193,6 +6465,29 @@
"ReasonTypeHealthCare"
]
},
"models.NotificationStatus": {
"type": "string",
"enum": [
"发送成功",
"发送失败",
"已跳过"
],
"x-enum-comments": {
"NotificationStatusFailed": "通知发送失败",
"NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)",
"NotificationStatusSuccess": "通知已成功发送"
},
"x-enum-descriptions": [
"通知已成功发送",
"通知发送失败",
"通知因某些原因被跳过(例如:用户未配置联系方式)"
],
"x-enum-varnames": [
"NotificationStatusSuccess",
"NotificationStatusFailed",
"NotificationStatusSkipped"
]
},
"models.PenStatus": {
"type": "string",
"enum": [
@@ -6522,6 +6817,51 @@
"$ref": "#/definitions/models.SensorType"
}
}
},
"notify.NotifierType": {
"type": "string",
"enum": [
"邮件",
"企业微信",
"飞书",
"日志"
],
"x-enum-varnames": [
"NotifierTypeSMTP",
"NotifierTypeWeChat",
"NotifierTypeLark",
"NotifierTypeLog"
]
},
"zapcore.Level": {
"type": "integer",
"format": "int32",
"enum": [
7,
-1,
0,
1,
2,
3,
4,
5,
-1,
5,
6
],
"x-enum-varnames": [
"_numLevels",
"DebugLevel",
"InfoLevel",
"WarnLevel",
"ErrorLevel",
"DPanicLevel",
"PanicLevel",
"FatalLevel",
"_minLevel",
"_maxLevel",
"InvalidLevel"
]
}
},
"securityDefinitions": {

View File

@@ -349,6 +349,15 @@ definitions:
pagination:
$ref: '#/definitions/dto.PaginationDTO'
type: object
dto.ListNotificationResponse:
properties:
list:
items:
$ref: '#/definitions/dto.NotificationDTO'
type: array
pagination:
$ref: '#/definitions/dto.PaginationDTO'
type: object
dto.ListPendingCollectionResponse:
properties:
list:
@@ -552,6 +561,33 @@ definitions:
- quantity
- toPenID
type: object
dto.NotificationDTO:
properties:
alarm_timestamp:
type: string
created_at:
type: string
error_message:
type: string
id:
type: integer
level:
$ref: '#/definitions/zapcore.Level'
message:
type: string
notifier_type:
$ref: '#/definitions/notify.NotifierType'
status:
$ref: '#/definitions/models.NotificationStatus'
title:
type: string
to_address:
type: string
updated_at:
type: string
user_id:
type: integer
type: object
dto.PaginationDTO:
properties:
page:
@@ -1125,6 +1161,15 @@ definitions:
- traderName
- unitPrice
type: object
dto.SendTestNotificationRequest:
properties:
type:
allOf:
- $ref: '#/definitions/notify.NotifierType'
description: Type 指定要测试的通知渠道
required:
- type
type: object
dto.SensorDataDTO:
properties:
data:
@@ -1548,6 +1593,24 @@ definitions:
- ReasonTypePreventive
- ReasonTypeTreatment
- ReasonTypeHealthCare
models.NotificationStatus:
enum:
- 发送成功
- 发送失败
- 已跳过
type: string
x-enum-comments:
NotificationStatusFailed: 通知发送失败
NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式)
NotificationStatusSuccess: 通知已成功发送
x-enum-descriptions:
- 通知已成功发送
- 通知发送失败
- 通知因某些原因被跳过(例如:用户未配置联系方式)
x-enum-varnames:
- NotificationStatusSuccess
- NotificationStatusFailed
- NotificationStatusSkipped
models.PenStatus:
enum:
- 空闲
@@ -1816,6 +1879,45 @@ definitions:
type:
$ref: '#/definitions/models.SensorType'
type: object
notify.NotifierType:
enum:
- 邮件
- 企业微信
- 飞书
- 日志
type: string
x-enum-varnames:
- NotifierTypeSMTP
- NotifierTypeWeChat
- NotifierTypeLark
- NotifierTypeLog
zapcore.Level:
enum:
- 7
- -1
- 0
- 1
- 2
- 3
- 4
- 5
- -1
- 5
- 6
format: int32
type: integer
x-enum-varnames:
- _numLevels
- DebugLevel
- InfoLevel
- WarnLevel
- ErrorLevel
- DPanicLevel
- PanicLevel
- FatalLevel
- _minLevel
- _maxLevel
- InvalidLevel
info:
contact:
email: divano@example.com
@@ -2379,6 +2481,105 @@ paths:
summary: 获取用药记录列表
tags:
- 数据监控
/api/v1/monitor/notifications:
get:
description: 根据提供的过滤条件,分页获取通知列表
parameters:
- in: query
name: end_time
type: string
- enum:
- 7
- -1
- 0
- 1
- 2
- 3
- 4
- 5
- -1
- 5
- 6
format: int32
in: query
name: level
type: integer
x-enum-varnames:
- _numLevels
- DebugLevel
- InfoLevel
- WarnLevel
- ErrorLevel
- DPanicLevel
- PanicLevel
- FatalLevel
- _minLevel
- _maxLevel
- InvalidLevel
- enum:
- 邮件
- 企业微信
- 飞书
- 日志
in: query
name: notifier_type
type: string
x-enum-varnames:
- NotifierTypeSMTP
- NotifierTypeWeChat
- NotifierTypeLark
- NotifierTypeLog
- in: query
name: order_by
type: string
- in: query
name: page
type: integer
- in: query
name: pageSize
type: integer
- in: query
name: start_time
type: string
- enum:
- 发送成功
- 发送失败
- 已跳过
in: query
name: status
type: string
x-enum-comments:
NotificationStatusFailed: 通知发送失败
NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式)
NotificationStatusSuccess: 通知已成功发送
x-enum-descriptions:
- 通知已成功发送
- 通知发送失败
- 通知因某些原因被跳过(例如:用户未配置联系方式)
x-enum-varnames:
- NotificationStatusSuccess
- NotificationStatusFailed
- NotificationStatusSkipped
- in: query
name: user_id
type: integer
produces:
- application/json
responses:
"200":
description: OK
schema:
allOf:
- $ref: '#/definitions/controller.Response'
- properties:
data:
$ref: '#/definitions/dto.ListNotificationResponse'
type: object
security:
- BearerAuth: []
summary: 批量查询通知
tags:
- 数据监控
/api/v1/monitor/pending-collections:
get:
description: 根据提供的过滤条件,分页获取待采集请求
@@ -4086,6 +4287,40 @@ paths:
summary: 获取指定用户的操作历史
tags:
- 用户管理
/api/v1/users/{id}/notifications/test:
post:
consumes:
- application/json
description: 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
parameters:
- description: 用户ID
in: path
name: id
required: true
type: integer
- description: 请求体
in: body
name: body
required: true
schema:
$ref: '#/definitions/dto.SendTestNotificationRequest'
produces:
- application/json
responses:
"200":
description: 成功响应
schema:
allOf:
- $ref: '#/definitions/controller.Response'
- properties:
data:
type: string
type: object
security:
- BearerAuth: []
summary: 发送测试通知
tags:
- 用户管理
/api/v1/users/login:
post:
consumes:

View File

@@ -28,6 +28,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
@@ -42,7 +43,7 @@ type API struct {
engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求
logger *logs.Logger // 日志记录器,用于输出日志信息
userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作
tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析
tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析
auditService audit.Service // 审计服务,用于记录用户操作
httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务
config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig
@@ -68,9 +69,9 @@ func NewAPI(cfg config.ServerConfig,
pigFarmService service.PigFarmService,
pigBatchService service.PigBatchService,
monitorService service.MonitorService,
userActionLogRepository repository.UserActionLogRepository,
tokenService token.TokenService,
tokenService token.Service,
auditService audit.Service,
notifyService domain_notify.Service,
deviceService domain_device.Service,
listenHandler webhook.ListenHandler,
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
@@ -96,7 +97,7 @@ func NewAPI(cfg config.ServerConfig,
config: cfg,
listenHandler: listenHandler,
// 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员
userController: user.NewController(userRepo, monitorService, logger, tokenService),
userController: user.NewController(userRepo, monitorService, logger, tokenService, notifyService),
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
deviceController: device.NewController(deviceRepository, areaControllerRepository, deviceTemplateRepository, deviceService, logger),
// 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员

View File

@@ -57,6 +57,7 @@ func (a *API) setupRoutes() {
userGroup := authGroup.Group("/users")
{
userGroup.GET("/:id/history", a.userController.ListUserHistory) // 获取用户操作历史
userGroup.POST("/:id/notifications/test", a.userController.SendTestNotification)
}
a.logger.Info("用户相关接口注册成功 (需要认证和审计)")
@@ -175,6 +176,7 @@ func (a *API) setupRoutes() {
monitorGroup.GET("/pig-sick-logs", a.monitorController.ListPigSickLogs)
monitorGroup.GET("/pig-purchases", a.monitorController.ListPigPurchases)
monitorGroup.GET("/pig-sales", a.monitorController.ListPigSales)
monitorGroup.GET("/notifications", a.monitorController.ListNotifications)
}
a.logger.Info("数据监控相关接口注册成功 (需要认证和审计)")
}

View File

@@ -839,3 +839,50 @@ func (c *Controller) ListPigSales(ctx *gin.Context) {
c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取猪只售卖记录成功", resp, actionType, "获取猪只售卖记录成功", req)
}
// ListNotifications godoc
// @Summary 批量查询通知
// @Description 根据提供的过滤条件,分页获取通知列表
// @Tags 数据监控
// @Security BearerAuth
// @Produce json
// @Param query query dto.ListNotificationRequest true "查询参数"
// @Success 200 {object} controller.Response{data=dto.ListNotificationResponse}
// @Router /api/v1/monitor/notifications [get]
func (c *Controller) ListNotifications(ctx *gin.Context) {
const actionType = "批量查询通知"
var req dto.ListNotificationRequest
if err := ctx.ShouldBindQuery(&req); err != nil {
c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "参数绑定失败", req)
return
}
opts := repository.NotificationListOptions{
UserID: req.UserID,
NotifierType: req.NotifierType,
Level: req.Level,
StartTime: req.StartTime,
EndTime: req.EndTime,
OrderBy: req.OrderBy,
Status: req.Status,
}
data, total, err := c.monitorService.ListNotifications(opts, req.Page, req.PageSize)
if err != nil {
if errors.Is(err, repository.ErrInvalidPagination) {
c.logger.Warnf("%s: 无效的分页参数: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的分页参数: "+err.Error(), actionType, "无效分页参数", req)
return
}
c.logger.Errorf("%s: 服务层查询失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "批量查询通知失败: "+err.Error(), actionType, "服务层查询失败", req)
return
}
resp := dto.NewListNotificationResponse(data, total, req.Page, req.PageSize)
c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "批量查询通知成功", resp, actionType, "批量查询通知成功", req)
}

View File

@@ -7,6 +7,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
@@ -19,16 +20,24 @@ import (
type Controller struct {
userRepo repository.UserRepository
monitorService service.MonitorService
tokenService token.TokenService // 注入 token 服务
tokenService token.Service
notifyService domain_notify.Service
logger *logs.Logger
}
// NewController 创建用户控制器实例
func NewController(userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, tokenService token.TokenService) *Controller {
func NewController(
userRepo repository.UserRepository,
monitorService service.MonitorService,
logger *logs.Logger,
tokenService token.Service,
notifyService domain_notify.Service,
) *Controller {
return &Controller{
userRepo: userRepo,
monitorService: monitorService,
tokenService: tokenService,
notifyService: notifyService,
logger: logger,
}
}
@@ -192,3 +201,46 @@ func (c *Controller) ListUserHistory(ctx *gin.Context) {
c.logger.Infof("%s: 成功获取用户 %d 的操作历史, 数量: %d", actionType, userID, len(data))
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取用户操作历史成功", resp, actionType, "获取用户操作历史成功", opts)
}
// SendTestNotification godoc
// @Summary 发送测试通知
// @Description 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。
// @Tags 用户管理
// @Security BearerAuth
// @Accept json
// @Produce json
// @Param id path int true "用户ID"
// @Param body body dto.SendTestNotificationRequest true "请求体"
// @Success 200 {object} controller.Response{data=string} "成功响应"
// @Router /api/v1/users/{id}/notifications/test [post]
func (c *Controller) SendTestNotification(ctx *gin.Context) {
const actionType = "发送测试通知"
// 1. 从 URL 中获取用户 ID
userID, err := strconv.ParseUint(ctx.Param("id"), 10, 32)
if err != nil {
c.logger.Errorf("%s: 无效的用户ID格式: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的用户ID格式", actionType, "无效的用户ID格式", ctx.Param("id"))
return
}
// 2. 从请求体 (JSON Body) 中获取要测试的通知类型
var req dto.SendTestNotificationRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "请求体格式错误或缺少 'type' 字段: "+err.Error(), actionType, "请求体绑定失败", req)
return
}
// 3. 调用领域服务
err = c.notifyService.SendTestMessage(uint(userID), req.Type)
if err != nil {
c.logger.Errorf("%s: 服务层调用失败: %v", actionType, err)
controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "发送测试消息失败: "+err.Error(), actionType, "服务层调用失败", gin.H{"userID": userID, "type": req.Type})
return
}
// 4. 返回成功响应
c.logger.Infof("%s: 成功为用户 %d 发送类型为 %s 的测试消息", actionType, userID, req.Type)
controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "测试消息已发送,请检查您的接收端。", nil, actionType, "测试消息发送成功", gin.H{"userID": userID, "type": req.Type})
}

View File

@@ -0,0 +1,36 @@
package dto
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"go.uber.org/zap/zapcore"
)
// NewListNotificationResponse 从模型数据创建通知列表响应 DTO
func NewListNotificationResponse(data []models.Notification, total int64, page, pageSize int) *ListNotificationResponse {
dtos := make([]NotificationDTO, len(data))
for i, item := range data {
dtos[i] = NotificationDTO{
ID: item.ID,
CreatedAt: item.CreatedAt,
UpdatedAt: item.UpdatedAt,
NotifierType: item.NotifierType,
UserID: item.UserID,
Title: item.Title,
Message: item.Message,
Level: zapcore.Level(item.Level),
AlarmTimestamp: item.AlarmTimestamp,
ToAddress: item.ToAddress,
Status: item.Status,
ErrorMessage: item.ErrorMessage,
}
}
return &ListNotificationResponse{
List: dtos,
Pagination: PaginationDTO{
Total: total,
Page: page,
PageSize: pageSize,
},
}
}

View File

@@ -0,0 +1,50 @@
package dto
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
)
// SendTestNotificationRequest 定义了发送测试通知请求的 JSON 结构
type SendTestNotificationRequest struct {
// Type 指定要测试的通知渠道
Type notify.NotifierType `json:"type" binding:"required"`
}
// ListNotificationRequest 定义了获取通知列表的请求参数
type ListNotificationRequest struct {
Page int `form:"page,default=1"`
PageSize int `form:"pageSize,default=10"`
UserID *uint `form:"user_id"`
NotifierType *notify.NotifierType `form:"notifier_type"`
Status *models.NotificationStatus `form:"status"`
Level *zapcore.Level `form:"level"`
StartTime *time.Time `form:"start_time"`
EndTime *time.Time `form:"end_time"`
OrderBy string `form:"order_by"`
}
// NotificationDTO 是用于API响应的通知结构
type NotificationDTO struct {
ID uint `json:"id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
NotifierType notify.NotifierType `json:"notifier_type"`
UserID uint `json:"user_id"`
Title string `json:"title"`
Message string `json:"message"`
Level zapcore.Level `json:"level"`
AlarmTimestamp time.Time `json:"alarm_timestamp"`
ToAddress string `json:"to_address"`
Status models.NotificationStatus `json:"status"`
ErrorMessage string `json:"error_message"`
}
// ListNotificationResponse 是获取通知列表的响应结构
type ListNotificationResponse struct {
List []NotificationDTO `json:"list"`
Pagination PaginationDTO `json:"pagination"`
}

View File

@@ -14,7 +14,7 @@ import (
// AuthMiddleware 创建一个Gin中间件用于JWT身份验证
// 它依赖于 TokenService 来解析和验证 token并使用 UserRepository 来获取完整的用户信息
func AuthMiddleware(tokenService token.TokenService, userRepo repository.UserRepository) gin.HandlerFunc {
func AuthMiddleware(tokenService token.Service, userRepo repository.UserRepository) gin.HandlerFunc {
return func(c *gin.Context) {
// 从 Authorization header 获取 token
authHeader := c.GetHeader("Authorization")

View File

@@ -24,6 +24,7 @@ type MonitorService interface {
ListPigSickLogs(opts repository.PigSickLogListOptions, page, pageSize int) ([]models.PigSickLog, int64, error)
ListPigPurchases(opts repository.PigPurchaseListOptions, page, pageSize int) ([]models.PigPurchase, int64, error)
ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error)
ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error)
}
// monitorService 是 MonitorService 接口的具体实现
@@ -40,6 +41,7 @@ type monitorService struct {
pigTransferLogRepo repository.PigTransferLogRepository
pigSickLogRepo repository.PigSickLogRepository
pigTradeRepo repository.PigTradeRepository
notificationRepo repository.NotificationRepository
}
// NewMonitorService 创建一个新的 MonitorService 实例
@@ -56,6 +58,7 @@ func NewMonitorService(
pigTransferLogRepo repository.PigTransferLogRepository,
pigSickLogRepo repository.PigSickLogRepository,
pigTradeRepo repository.PigTradeRepository,
notificationRepo repository.NotificationRepository,
) MonitorService {
return &monitorService{
sensorDataRepo: sensorDataRepo,
@@ -70,6 +73,7 @@ func NewMonitorService(
pigTransferLogRepo: pigTransferLogRepo,
pigSickLogRepo: pigSickLogRepo,
pigTradeRepo: pigTradeRepo,
notificationRepo: notificationRepo,
}
}
@@ -157,3 +161,8 @@ func (s *monitorService) ListPigPurchases(opts repository.PigPurchaseListOptions
func (s *monitorService) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) {
return s.pigTradeRepo.ListPigSales(opts, page, pageSize)
}
// ListNotifications 负责处理查询通知列表的业务逻辑
func (s *monitorService) ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) {
return s.notificationRepo.List(opts, page, pageSize)
}

View File

@@ -5,198 +5,70 @@ import (
"os"
"os/signal"
"syscall"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
)
// Application 是整个应用的核心,封装了所有组件和生命周期。
type Application struct {
Config *config.Config
Logger *logs.Logger
Storage database.Storage
Executor *task.Scheduler
API *api.API // 添加 API 对象
Config *config.Config
Logger *logs.Logger
API *api.API
// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
pendingCollectionRepo repository.PendingCollectionRepository
analysisPlanTaskManager *task.AnalysisPlanTaskManager
// Lora Mesh 监听器
loraMeshCommunicator transport.Listener
Infra *Infrastructure
Domain *DomainServices
App *AppServices
}
// NewApplication 创建并初始化一个新的 Application 实例。
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
func NewApplication(configPath string) (*Application, error) {
// 加载配置
// 1. 初始化基本组件: 配置和日志
cfg := config.NewConfig()
if err := cfg.Load(configPath); err != nil {
return nil, fmt.Errorf("无法加载配置: %w", err)
}
// 初始化日志记录器
logger := logs.NewLogger(cfg.Log)
// 初始化数据库存储
storage, err := initStorage(cfg.Database, logger)
// 2. 初始化所有分层服务
infra, err := initInfrastructure(cfg, logger)
if err != nil {
return nil, err // 错误已在 initStorage 中被包装
return nil, fmt.Errorf("初始化基础设施失败: %w", err)
}
domain := initDomainServices(cfg, infra, logger)
appServices := initAppServices(infra, domain, logger)
// 初始化 Token 服务
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
// --- 仓库对象初始化 ---
userRepo := repository.NewGormUserRepository(storage.GetDB())
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB())
deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB())
planRepo := repository.NewGormPlanRepository(storage.GetDB())
pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB())
userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB())
pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB())
pigBatchLogRepo := repository.NewGormPigBatchLogRepository(storage.GetDB())
pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB())
pigPenRepo := repository.NewGormPigPenRepository(storage.GetDB())
pigTransferLogRepo := repository.NewGormPigTransferLogRepository(storage.GetDB())
pigTradeRepo := repository.NewGormPigTradeRepository(storage.GetDB())
pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB())
medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB())
rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB())
// 初始化事务管理器
unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger)
// 初始化猪群管理领域
pigPenTransferManager := pig.NewPigPenTransferManager(pigPenRepo, pigTransferLogRepo, pigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(pigTradeRepo)
pigSickManager := pig.NewSickPigManager(pigSickPigLogRepo, medicationLogRepo)
pigBatchDomain := pig.NewPigBatchService(pigBatchRepo, pigBatchLogRepo, unitOfWork,
pigPenTransferManager, pigTradeManager, pigSickManager)
// --- 业务逻辑处理器初始化 ---
pigFarmService := service.NewPigFarmService(pigFarmRepo, pigPenRepo, pigBatchRepo, pigBatchDomain, unitOfWork, logger)
pigBatchService := service.NewPigBatchService(pigBatchDomain, logger)
monitorService := service.NewMonitorService(
sensorDataRepo,
deviceCommandLogRepo,
executionLogRepo,
pendingCollectionRepo,
userActionLogRepo,
rawMaterialRepo,
medicationLogRepo,
pigBatchRepo,
pigBatchLogRepo,
pigTransferLogRepo,
pigSickPigLogRepo,
pigTradeRepo,
)
// 初始化审计服务
auditService := audit.NewService(userActionLogRepo, logger)
// --- 初始化 LoRa 相关组件 ---
var listenHandler webhook.ListenHandler
var comm transport.Communicator
var loraListener transport.Listener
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger)
loraListener = lora.NewPlaceholderTransport(logger)
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo)
loraListener = tp
comm = tp
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
}
// 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
// 初始化通用设备服务
generalDeviceService := device.NewGeneralDeviceService(
deviceRepo,
deviceCommandLogRepo,
pendingCollectionRepo,
logger,
comm,
)
// 初始化任务执行器
executor := task.NewScheduler(
pendingTaskRepo,
executionLogRepo,
deviceRepo,
sensorDataRepo,
planRepo,
analysisPlanTaskManager,
logger,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
// 初始化 API 服务器
// 3. 初始化 API 入口点
apiServer := api.NewAPI(
cfg.Server,
logger,
userRepo,
deviceRepo,
areaControllerRepo,
deviceTemplateRepo,
planRepo,
pigFarmService,
pigBatchService,
monitorService,
userActionLogRepo,
tokenService,
auditService,
generalDeviceService,
listenHandler,
analysisPlanTaskManager,
infra.Repos.UserRepo,
infra.Repos.DeviceRepo,
infra.Repos.AreaControllerRepo,
infra.Repos.DeviceTemplateRepo,
infra.Repos.PlanRepo,
appServices.PigFarmService,
appServices.PigBatchService,
appServices.MonitorService,
infra.TokenService,
appServices.AuditService,
infra.NotifyService,
domain.GeneralDeviceService,
infra.Lora.ListenHandler,
domain.AnalysisPlanTaskManager,
)
// 组装 Application 对象
// 4. 组装 Application 对象
app := &Application{
Config: cfg,
Logger: logger,
Storage: storage,
Executor: executor,
API: apiServer,
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
pendingCollectionRepo: pendingCollectionRepo,
analysisPlanTaskManager: analysisPlanTaskManager,
loraMeshCommunicator: loraListener,
Config: cfg,
Logger: logger,
API: apiServer,
Infra: infra,
Domain: domain,
App: appServices,
}
return app, nil
@@ -206,35 +78,24 @@ func NewApplication(configPath string) (*Application, error) {
func (app *Application) Start() error {
app.Logger.Info("应用启动中...")
// -- 启动 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Listen(); err != nil {
// 1. 启动底层监听器
if err := app.Infra.Lora.LoraListener.Listen(); err != nil {
return fmt.Errorf("启动 LoRa Mesh 监听器失败: %w", err)
}
// --- 清理待采集任务 ---
if err := app.initializePendingCollections(); err != nil {
// 这是一个非致命错误,记录它,但应用应继续启动
app.Logger.Error(err)
// 2. 初始化应用状态 (清理、刷新任务等)
if err := app.initializeState(); err != nil {
return fmt.Errorf("初始化应用状态失败: %w", err)
}
// --- 初始化待执行任务列表 ---
if err := app.initializePendingTasks(
app.planRepo, // 传入 planRepo
app.pendingTaskRepo, // 传入 pendingTaskRepo
app.executionLogRepo, // 传入 executionLogRepo
app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
app.Logger, // 传入 logger
); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
// 3. 启动后台工作协程
app.Domain.Scheduler.Start()
app.Domain.TimedCollector.Start()
// 启动任务执行
app.Executor.Start()
// 启动 API 服务器
// 4. 启动 API 服务
app.API.Start()
// 等待关闭信号
// 5. 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
@@ -251,15 +112,18 @@ func (app *Application) Stop() error {
app.API.Stop()
// 关闭任务执行器
app.Executor.Stop()
app.Domain.Scheduler.Stop()
// 关闭定时采集器
app.Domain.TimedCollector.Stop()
// 断开数据库连接
if err := app.Storage.Disconnect(); err != nil {
if err := app.Infra.Storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err)
}
// 关闭 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Stop(); err != nil {
if err := app.Infra.Lora.LoraListener.Stop(); err != nil {
app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err)
}
@@ -270,6 +134,22 @@ func (app *Application) Stop() error {
return nil
}
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error {
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
@@ -277,7 +157,7 @@ func (app *Application) initializePendingCollections() error {
app.Logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut()
count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
@@ -290,13 +170,13 @@ func (app *Application) initializePendingCollections() error {
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
analysisPlanTaskManager *task.AnalysisPlanTaskManager,
logger *logs.Logger,
) error {
func (app *Application) initializePendingTasks() error {
logger := app.Logger
planRepo := app.Infra.Repos.PlanRepo
pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
executionLogRepo := app.Infra.Repos.ExecutionLogRepo
analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
@@ -383,21 +263,3 @@ func (app *Application) initializePendingTasks(
logger.Info("待执行任务列表初始化完成。")
return nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
logger.Info("数据库初始化完成。")
return storage, nil
}

View File

@@ -0,0 +1,362 @@
package core
import (
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
"gorm.io/gorm"
)
// Infrastructure 聚合了所有基础设施层的组件。
type Infrastructure struct {
Storage database.Storage
Repos *Repositories
Lora *LoraComponents
NotifyService domain_notify.Service
TokenService token.Service
}
// initInfrastructure 初始化所有基础设施层组件。
func initInfrastructure(cfg *config.Config, logger *logs.Logger) (*Infrastructure, error) {
storage, err := initStorage(cfg.Database, logger)
if err != nil {
return nil, err
}
repos := initRepositories(storage.GetDB(), logger)
lora, err := initLora(cfg, logger, repos)
if err != nil {
return nil, err
}
notifyService, err := initNotifyService(cfg.Notify, logger, repos.UserRepo, repos.NotificationRepo)
if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
}
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
return &Infrastructure{
Storage: storage,
Repos: repos,
Lora: lora,
NotifyService: notifyService,
TokenService: tokenService,
}, nil
}
// Repositories 聚合了所有的仓库实例。
type Repositories struct {
UserRepo repository.UserRepository
DeviceRepo repository.DeviceRepository
AreaControllerRepo repository.AreaControllerRepository
DeviceTemplateRepo repository.DeviceTemplateRepository
PlanRepo repository.PlanRepository
PendingTaskRepo repository.PendingTaskRepository
ExecutionLogRepo repository.ExecutionLogRepository
SensorDataRepo repository.SensorDataRepository
DeviceCommandLogRepo repository.DeviceCommandLogRepository
PendingCollectionRepo repository.PendingCollectionRepository
UserActionLogRepo repository.UserActionLogRepository
PigBatchRepo repository.PigBatchRepository
PigBatchLogRepo repository.PigBatchLogRepository
PigFarmRepo repository.PigFarmRepository
PigPenRepo repository.PigPenRepository
PigTransferLogRepo repository.PigTransferLogRepository
PigTradeRepo repository.PigTradeRepository
PigSickPigLogRepo repository.PigSickLogRepository
MedicationLogRepo repository.MedicationLogRepository
RawMaterialRepo repository.RawMaterialRepository
NotificationRepo repository.NotificationRepository
UnitOfWork repository.UnitOfWork
}
// initRepositories 初始化所有的仓库。
func initRepositories(db *gorm.DB, logger *logs.Logger) *Repositories {
return &Repositories{
UserRepo: repository.NewGormUserRepository(db),
DeviceRepo: repository.NewGormDeviceRepository(db),
AreaControllerRepo: repository.NewGormAreaControllerRepository(db),
DeviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db),
PlanRepo: repository.NewGormPlanRepository(db),
PendingTaskRepo: repository.NewGormPendingTaskRepository(db),
ExecutionLogRepo: repository.NewGormExecutionLogRepository(db),
SensorDataRepo: repository.NewGormSensorDataRepository(db),
DeviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db),
PendingCollectionRepo: repository.NewGormPendingCollectionRepository(db),
UserActionLogRepo: repository.NewGormUserActionLogRepository(db),
PigBatchRepo: repository.NewGormPigBatchRepository(db),
PigBatchLogRepo: repository.NewGormPigBatchLogRepository(db),
PigFarmRepo: repository.NewGormPigFarmRepository(db),
PigPenRepo: repository.NewGormPigPenRepository(db),
PigTransferLogRepo: repository.NewGormPigTransferLogRepository(db),
PigTradeRepo: repository.NewGormPigTradeRepository(db),
PigSickPigLogRepo: repository.NewGormPigSickLogRepository(db),
MedicationLogRepo: repository.NewGormMedicationLogRepository(db),
RawMaterialRepo: repository.NewGormRawMaterialRepository(db),
NotificationRepo: repository.NewGormNotificationRepository(db),
UnitOfWork: repository.NewGormUnitOfWork(db, logger),
}
}
// DomainServices 聚合了所有的领域服务实例。
type DomainServices struct {
PigPenTransferManager pig.PigPenTransferManager
PigTradeManager pig.PigTradeManager
PigSickManager pig.SickPigManager
PigBatchDomain pig.PigBatchService
TimedCollector collection.Collector
GeneralDeviceService device.Service
AnalysisPlanTaskManager *task.AnalysisPlanTaskManager
Scheduler *task.Scheduler
}
// initDomainServices 初始化所有的领域服务。
func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.Logger) *DomainServices {
// 猪群管理相关
pigPenTransferManager := pig.NewPigPenTransferManager(infra.Repos.PigPenRepo, infra.Repos.PigTransferLogRepo, infra.Repos.PigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(infra.Repos.PigTradeRepo)
pigSickManager := pig.NewSickPigManager(infra.Repos.PigSickPigLogRepo, infra.Repos.MedicationLogRepo)
pigBatchDomain := pig.NewPigBatchService(infra.Repos.PigBatchRepo, infra.Repos.PigBatchLogRepo, infra.Repos.UnitOfWork,
pigPenTransferManager, pigTradeManager, pigSickManager)
// 通用设备服务
generalDeviceService := device.NewGeneralDeviceService(
infra.Repos.DeviceRepo,
infra.Repos.DeviceCommandLogRepo,
infra.Repos.PendingCollectionRepo,
logger,
infra.Lora.Comm,
)
// 计划任务管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
// 任务执行器
scheduler := task.NewScheduler(
infra.Repos.PendingTaskRepo,
infra.Repos.ExecutionLogRepo,
infra.Repos.DeviceRepo,
infra.Repos.SensorDataRepo,
infra.Repos.PlanRepo,
analysisPlanTaskManager,
logger,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
// 定时采集器
timedCollector := collection.NewTimedCollector(
infra.Repos.DeviceRepo,
generalDeviceService,
logger,
time.Duration(cfg.Collection.Interval)*time.Second,
)
return &DomainServices{
PigPenTransferManager: pigPenTransferManager,
PigTradeManager: pigTradeManager,
PigSickManager: pigSickManager,
PigBatchDomain: pigBatchDomain,
GeneralDeviceService: generalDeviceService,
AnalysisPlanTaskManager: analysisPlanTaskManager,
Scheduler: scheduler,
TimedCollector: timedCollector,
}
}
// AppServices 聚合了所有的应用服务实例。
type AppServices struct {
PigFarmService service.PigFarmService
PigBatchService service.PigBatchService
MonitorService service.MonitorService
AuditService audit.Service
}
// initAppServices 初始化所有的应用服务。
func initAppServices(infra *Infrastructure, domainServices *DomainServices, logger *logs.Logger) *AppServices {
pigFarmService := service.NewPigFarmService(infra.Repos.PigFarmRepo, infra.Repos.PigPenRepo, infra.Repos.PigBatchRepo, domainServices.PigBatchDomain, infra.Repos.UnitOfWork, logger)
pigBatchService := service.NewPigBatchService(domainServices.PigBatchDomain, logger)
monitorService := service.NewMonitorService(
infra.Repos.SensorDataRepo,
infra.Repos.DeviceCommandLogRepo,
infra.Repos.ExecutionLogRepo,
infra.Repos.PendingCollectionRepo,
infra.Repos.UserActionLogRepo,
infra.Repos.RawMaterialRepo,
infra.Repos.MedicationLogRepo,
infra.Repos.PigBatchRepo,
infra.Repos.PigBatchLogRepo,
infra.Repos.PigTransferLogRepo,
infra.Repos.PigSickPigLogRepo,
infra.Repos.PigTradeRepo,
infra.Repos.NotificationRepo,
)
auditService := audit.NewService(infra.Repos.UserActionLogRepo, logger)
return &AppServices{
PigFarmService: pigFarmService,
PigBatchService: pigBatchService,
MonitorService: monitorService,
AuditService: auditService,
}
}
// LoraComponents 聚合了所有 LoRa 相关组件。
type LoraComponents struct {
ListenHandler webhook.ListenHandler
Comm transport.Communicator
LoraListener transport.Listener
}
// initLora 根据配置初始化 LoRa 相关组件。
func initLora(
cfg *config.Config,
logger *logs.Logger,
repos *Repositories,
) (*LoraComponents, error) {
var listenHandler webhook.ListenHandler
var comm transport.Communicator
var loraListener transport.Listener
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, repos.SensorDataRepo, repos.DeviceRepo, repos.AreaControllerRepo, repos.DeviceCommandLogRepo, repos.PendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger)
loraListener = lora.NewPlaceholderTransport(logger)
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, repos.AreaControllerRepo, repos.PendingCollectionRepo, repos.DeviceRepo, repos.SensorDataRepo)
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
loraListener = tp
comm = tp
}
return &LoraComponents{
ListenHandler: listenHandler,
Comm: comm,
LoraListener: loraListener,
}, nil
}
// initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService(
cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log)
availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
cfg.SMTP.Password,
cfg.SMTP.Sender,
)
availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用")
}
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
)
availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用")
}
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用")
}
// 3. 动态确定首选通知器
var primaryNotifier notify.Notifier
primaryNotifierType := notify.NotifierType(cfg.Primary)
// 检查用户指定的主渠道是否已启用
for _, n := range availableNotifiers {
if n.Type() == primaryNotifierType {
primaryNotifier = n
break
}
}
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
}
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService(
log,
userRepo,
availableNotifiers,
primaryNotifier.Type(),
cfg.FailureThreshold,
notificationRepo,
)
if err != nil {
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
}
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
logger.Info("数据库初始化完成。")
return storage, nil
}

View File

@@ -0,0 +1,6 @@
package collection
type Collector interface {
Start()
Stop()
}

View File

@@ -0,0 +1,89 @@
package collection
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令
type TimedCollector struct {
deviceRepo repository.DeviceRepository
deviceService device.Service
logger *logs.Logger
interval time.Duration
ticker *time.Ticker
done chan bool
}
// NewTimedCollector 创建一个定时采集器实例
func NewTimedCollector(
deviceRepo repository.DeviceRepository,
deviceService device.Service,
logger *logs.Logger,
interval time.Duration,
) Collector {
return &TimedCollector{
deviceRepo: deviceRepo,
deviceService: deviceService,
logger: logger,
interval: interval,
done: make(chan bool),
}
}
// Start 开始定时采集
func (c *TimedCollector) Start() {
c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval)
c.ticker = time.NewTicker(c.interval)
go func() {
for {
select {
case <-c.done:
return
case <-c.ticker.C:
c.collect()
}
}
}()
}
// Stop 停止定时采集
func (c *TimedCollector) Stop() {
c.logger.Info("定时采集器停止")
c.ticker.Stop()
c.done <- true
}
// collect 是核心的采集逻辑
func (c *TimedCollector) collect() {
c.logger.Info("开始新一轮的设备数据采集")
sensors, err := c.deviceRepo.ListAllSensors()
if err != nil {
c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err)
return
}
if len(sensors) == 0 {
c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集")
return
}
sensorsByController := make(map[uint][]*models.Device)
for _, sensor := range sensors {
sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
}
for controllerID, controllerSensors := range sensorsByController {
c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors))
if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil {
c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err)
}
}
c.logger.Info("本轮设备数据采集完成")
}

View File

@@ -0,0 +1,292 @@
package notify
import (
"fmt"
"strings"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"go.uber.org/zap"
)
// Service 定义了通知领域的核心业务逻辑接口
type Service interface {
// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error
// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
BroadcastAlarm(content notify.AlarmContent) error
// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。
SendTestMessage(userID uint, notifierType notify.NotifierType) error
}
// failoverService 是 Service 接口的实现,提供了故障转移功能
type failoverService struct {
log *logs.Logger
userRepo repository.UserRepository
notifiers map[notify.NotifierType]notify.Notifier
primaryNotifier notify.Notifier
failureThreshold int
failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int)
notificationRepo repository.NotificationRepository
}
// NewFailoverService 创建一个新的故障转移通知服务
func NewFailoverService(
log *logs.Logger,
userRepo repository.UserRepository,
notifiers []notify.Notifier,
primaryNotifierType notify.NotifierType,
failureThreshold int,
notificationRepo repository.NotificationRepository,
) (Service, error) {
notifierMap := make(map[notify.NotifierType]notify.Notifier)
for _, n := range notifiers {
notifierMap[n.Type()] = n
}
primaryNotifier, ok := notifierMap[primaryNotifierType]
if !ok {
return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType)
}
return &failoverService{
log: log,
userRepo: userRepo,
notifiers: notifierMap,
primaryNotifier: primaryNotifier,
failureThreshold: failureThreshold,
failureCounters: &sync.Map{},
notificationRepo: notificationRepo,
}, nil
}
// SendBatchAlarm 实现了向多个用户并发发送告警的功能
func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error {
var wg sync.WaitGroup
var mu sync.Mutex
var allErrors []string
s.log.Infow("开始批量发送告警...", "userCount", len(userIDs))
for _, userID := range userIDs {
wg.Add(1)
go func(id uint) {
defer wg.Done()
if err := s.sendAlarmToUser(id, content); err != nil {
mu.Lock()
allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err))
mu.Unlock()
}
}(userID)
}
wg.Wait()
if len(allErrors) > 0 {
finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n"))
s.log.Error(finalError.Error())
return finalError
}
s.log.Info("批量发送告警成功完成,所有用户均已通知。")
return nil
}
// BroadcastAlarm 实现了向所有用户发送告警的功能
func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error {
users, err := s.userRepo.FindAll()
if err != nil {
s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err)
return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err)
}
var userIDs []uint
for _, user := range users {
userIDs = append(userIDs, user.ID)
}
s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs))
// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理
return s.SendBatchAlarm(userIDs, content)
}
// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑
func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error {
user, err := s.userRepo.FindByID(userID)
if err != nil {
s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err)
return fmt.Errorf("查找用户失败: %w", err)
}
counter, _ := s.failureCounters.LoadOrStore(userID, 0)
failureCount := counter.(int)
if failureCount < s.failureThreshold {
primaryType := s.primaryNotifier.Type()
addr := getAddressForNotifier(primaryType, user.Contact)
if addr == "" {
// 记录跳过通知
s.recordNotificationAttempt(userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType))
return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)
}
err = s.primaryNotifier.Send(content, addr)
if err == nil {
// 记录成功通知
s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusSuccess, nil)
if failureCount > 0 {
s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType)
s.failureCounters.Store(userID, 0)
}
return nil
}
// 记录失败通知
s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusFailed, err)
newFailureCount := failureCount + 1
s.failureCounters.Store(userID, newFailureCount)
s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount)
failureCount = newFailureCount
}
if failureCount >= s.failureThreshold {
s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold)
var lastErr error
for _, notifier := range s.notifiers {
addr := getAddressForNotifier(notifier.Type(), user.Contact)
if addr == "" {
// 记录跳过通知
s.recordNotificationAttempt(userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type()))
continue
}
if err := notifier.Send(content, addr); err == nil {
// 记录成功通知
s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil)
s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type())
s.failureCounters.Store(userID, 0)
return nil
}
// 记录失败通知
s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err)
lastErr = err
s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err)
}
return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr)
}
return nil
}
// SendTestMessage 实现了手动发送测试消息的功能
func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error {
user, err := s.userRepo.FindByID(userID)
if err != nil {
s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err)
return fmt.Errorf("查找用户失败: %w", err)
}
notifier, ok := s.notifiers[notifierType]
if !ok {
s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType)
return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType)
}
addr := getAddressForNotifier(notifierType, user.Contact)
if addr == "" {
s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType)
// 记录跳过通知
s.recordNotificationAttempt(userID, notifierType, notify.AlarmContent{
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: zap.InfoLevel,
Timestamp: time.Now(),
}, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType))
return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)
}
testContent := notify.AlarmContent{
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: zap.InfoLevel,
Timestamp: time.Now(),
}
s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr)
err = notifier.Send(testContent, addr)
if err != nil {
s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err)
// 记录失败通知
s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusFailed, err)
return err
}
s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType)
// 记录成功通知
s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil)
return nil
}
// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址
func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string {
switch notifierType {
case notify.NotifierTypeSMTP:
return contact.Email
case notify.NotifierTypeWeChat:
return contact.WeChat
case notify.NotifierTypeLark:
return contact.Feishu
case notify.NotifierTypeLog:
return "log" // LogNotifier不需要具体的地址但为了函数签名一致性返回一个无意义的非空字符串以绕过配置存在检查
default:
return ""
}
}
// recordNotificationAttempt 记录一次通知发送尝试的结果
// userID: 接收通知的用户ID
// notifierType: 使用的通知器类型
// content: 通知内容
// toAddress: 实际发送到的地址
// status: 发送尝试的状态 (成功、失败、跳过)
// err: 如果发送失败,记录的错误信息
func (s *failoverService) recordNotificationAttempt(
userID uint,
notifierType notify.NotifierType,
content notify.AlarmContent,
toAddress string,
status models.NotificationStatus,
err error,
) {
errorMessage := ""
if err != nil {
errorMessage = err.Error()
}
notification := &models.Notification{
NotifierType: notifierType,
UserID: userID,
Title: content.Title,
Message: content.Message,
Level: models.LogLevel(content.Level),
AlarmTimestamp: content.Timestamp,
ToAddress: toAddress,
Status: status,
ErrorMessage: errorMessage,
}
if saveErr := s.notificationRepo.Create(notification); saveErr != nil {
s.log.Errorw("无法保存通知发送记录到数据库",
"userID", userID,
"notifierType", notifierType,
"status", status,
"originalError", errorMessage,
"saveError", saveErr,
)
}
}

View File

@@ -13,19 +13,19 @@ type Claims struct {
jwt.RegisteredClaims
}
// TokenService 定义了 token 操作的接口
type TokenService interface {
// Service 定义了 token 操作的接口
type Service interface {
GenerateToken(userID uint) (string, error)
ParseToken(tokenString string) (*Claims, error)
}
// tokenService 是 TokenService 接口的实现
// tokenService 是 Service 接口的实现
type tokenService struct {
secret []byte
}
// NewTokenService 创建并返回一个新的 TokenService 实例
func NewTokenService(secret []byte) TokenService {
// NewTokenService 创建并返回一个新的 Service 实例
func NewTokenService(secret []byte) Service {
return &tokenService{secret: secret}
}

View File

@@ -41,6 +41,12 @@ type Config struct {
// LoraMesh LoraMesh配置
LoraMesh LoraMeshConfig `yaml:"lora_mesh"`
// Notify 通知服务配置
Notify NotifyConfig `yaml:"notify"`
// Collection 定时采集配置
Collection CollectionConfig `yaml:"collection"`
}
// AppConfig 代表应用基础配置
@@ -158,6 +164,45 @@ type LoraMeshConfig struct {
ReassemblyTimeout int `yaml:"reassembly_timeout"`
}
// NotifyConfig 包含了所有与通知服务相关的配置
type NotifyConfig struct {
Primary string `yaml:"primary"` // 首选通知渠道 (e.g., "邮件", "企业微信", "飞书", "日志")
FailureThreshold int `yaml:"failureThreshold"` // 连续失败多少次后触发广播模式
SMTP SMTPConfig `yaml:"smtp"`
WeChat WeChatConfig `yaml:"wechat"`
Lark LarkConfig `yaml:"lark"`
}
// SMTPConfig SMTP邮件配置
type SMTPConfig struct {
Enabled bool `yaml:"enabled"`
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Sender string `yaml:"sender"`
}
// WeChatConfig 企业微信应用配置
type WeChatConfig struct {
Enabled bool `yaml:"enabled"`
CorpID string `yaml:"corpID"`
AgentID string `yaml:"agentID"`
Secret string `yaml:"secret"`
}
// LarkConfig 飞书应用配置
type LarkConfig struct {
Enabled bool `yaml:"enabled"`
AppID string `yaml:"appID"`
AppSecret string `yaml:"appSecret"`
}
// CollectionConfig 代表定时采集配置
type CollectionConfig struct {
Interval int `yaml:"interval"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
// 默认值可以在这里设置,但我们优先使用配置文件中的值

View File

@@ -171,6 +171,7 @@ func (ps *PostgresStorage) creatingHyperTable() error {
{models.PigSickLog{}, "happened_at"},
{models.PigPurchase{}, "purchase_date"},
{models.PigSale{}, "sale_date"},
{models.Notification{}, "alarm_timestamp"},
}
for _, table := range tablesToConvert {
@@ -211,6 +212,7 @@ func (ps *PostgresStorage) applyCompressionPolicies() error {
{models.PigSickLog{}, "pig_batch_id"},
{models.PigPurchase{}, "pig_batch_id"},
{models.PigSale{}, "pig_batch_id"},
{models.Notification{}, "user_id"},
}
for _, policy := range policies {

View File

@@ -59,6 +59,9 @@ func GetAllModels() []interface{} {
// Medication Models
&Medication{},
&MedicationLog{},
// Notification Models
&Notification{},
}
}

View File

@@ -0,0 +1,77 @@
package models
import (
"database/sql/driver"
"errors"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
"gorm.io/gorm"
)
// NotificationStatus 定义了通知发送尝试的状态枚举。
type NotificationStatus string
const (
NotificationStatusSuccess NotificationStatus = "发送成功" // 通知已成功发送
NotificationStatusFailed NotificationStatus = "发送失败" // 通知发送失败
NotificationStatusSkipped NotificationStatus = "已跳过" // 通知因某些原因被跳过(例如:用户未配置联系方式)
)
// LogLevel is a custom type for zapcore.Level to handle database scanning and valuing.
type LogLevel zapcore.Level
// Scan implements the sql.Scanner interface.
func (l *LogLevel) Scan(value interface{}) error {
var s string
switch v := value.(type) {
case []byte:
s = string(v)
case string:
s = v
default:
return errors.New("LogLevel的类型无效")
}
var zl zapcore.Level
if err := zl.UnmarshalText([]byte(s)); err != nil {
return err
}
*l = LogLevel(zl)
return nil
}
// Value implements the driver.Valuer interface.
func (l LogLevel) Value() (driver.Value, error) {
return (zapcore.Level)(l).String(), nil
}
// Notification 表示已发送或尝试发送的通知记录。
type Notification struct {
gorm.Model
// NotifierType 通知器类型 (例如:"邮件", "企业微信", "飞书", "日志")
NotifierType notify.NotifierType `gorm:"type:varchar(20);not null;index" json:"notifier_type"`
// UserID 接收通知的用户ID用于追溯通知记录到特定用户
UserID uint `gorm:"index" json:"user_id"` // 增加 UserID 字段,并添加索引
// Title 通知标题
Title string `gorm:"type:varchar(255);not null" json:"title"`
// Message 通知内容
Message string `gorm:"type:text;not null" json:"message"`
// Level 通知级别 (例如INFO, WARN, ERROR)
Level LogLevel `gorm:"type:varchar(10);not null" json:"level"`
// AlarmTimestamp 通知内容生成时的时间戳,与 ID 构成复合主键
AlarmTimestamp time.Time `gorm:"primaryKey;not null" json:"alarm_timestamp"`
// ToAddress 接收地址 (例如:邮箱地址, 企业微信ID, 日志标识符)
ToAddress string `gorm:"type:varchar(255);not null" json:"to_address"`
// Status 通知发送尝试的状态 (例如:"待发送", "发送成功", "发送失败", "已跳过")
Status NotificationStatus `gorm:"type:varchar(20);not null;default:'待发送'" json:"status"`
// ErrorMessage 如果通知发送失败,此字段存储错误信息
ErrorMessage string `gorm:"type:text" json:"error_message"`
}
// TableName 指定 Notification 模型的表名。
func (Notification) TableName() string {
return "notifications"
}

View File

@@ -0,0 +1,193 @@
package notify
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
const (
// 飞书获取 tenant_access_token 的 API 地址
larkGetTokenURL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
// 飞书发送消息的 API 地址
larkSendMessageURL = "https://open.feishu.cn/open-apis/im/v1/messages"
)
// larkNotifier 实现了 Notifier 接口,用于通过飞书自建应用发送私聊消息。
type larkNotifier struct {
appID string // 应用 ID
appSecret string // 应用密钥
// 用于线程安全地管理 tenant_access_token
mu sync.Mutex
accessToken string
tokenExpiresAt time.Time
}
// NewLarkNotifier 创建一个新的 larkNotifier 实例。
// 调用者需要注入飞书应用的 AppID 和 AppSecret。
func NewLarkNotifier(appID, appSecret string) Notifier {
return &larkNotifier{
appID: appID,
appSecret: appSecret,
}
}
// Send 向指定用户发送一条飞书消息卡片。
// toAddr 参数是接收者的邮箱地址。
func (l *larkNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 获取有效的 tenant_access_token
token, err := l.getAccessToken()
if err != nil {
return err
}
// 2. 构建消息卡片 JSON
// 飞书消息卡片结构复杂,这里构建一个简单的 Markdown 文本卡片
cardContent := map[string]interface{}{
"config": map[string]bool{
"wide_screen_mode": true,
},
"elements": []map[string]interface{}{
{
"tag": "div",
"text": map[string]string{
"tag": "lark_md",
"content": fmt.Sprintf("## %s\n**级别**: %s\n**时间**: %s\n\n%s",
content.Title,
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
),
},
},
},
}
cardJSON, err := json.Marshal(cardContent)
if err != nil {
return fmt.Errorf("序列化飞书卡片内容失败: %w", err)
}
// 3. 构建请求的 JSON Body
payload := larkMessagePayload{
ReceiveID: toAddr,
ReceiveIDType: "email", // 指定接收者类型为邮箱
MsgType: "interactive", // 消息卡片类型
Content: string(cardJSON),
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("序列化飞书消息失败: %w", err)
}
// 4. 发送 HTTP POST 请求
url := fmt.Sprintf("%s?receive_id_type=email", larkSendMessageURL) // 在 URL 中指定 receive_id_type
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
return fmt.Errorf("创建飞书请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token) // 携带 access_token
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送飞书通知失败: %w", err)
}
defer resp.Body.Close()
// 5. 检查响应
var response larkResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("解析飞书响应失败: %w", err)
}
if response.Code != 0 {
return fmt.Errorf("飞书API返回错误: code=%d, msg=%s", response.Code, response.Msg)
}
return nil
}
// getAccessToken 获取并缓存 tenant_access_token处理了线程安全和自动刷新。
func (l *larkNotifier) getAccessToken() (string, error) {
l.mu.Lock()
defer l.mu.Unlock()
// 如果 token 存在且有效期还有5分钟以上则直接返回缓存的 token
if l.accessToken != "" && time.Now().Before(l.tokenExpiresAt.Add(-5*time.Minute)) {
return l.accessToken, nil
}
// 否则,重新获取 token
payload := map[string]string{
"app_id": l.appID,
"app_secret": l.appSecret,
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("序列化获取 token 请求体失败: %w", err)
}
req, err := http.NewRequest("POST", larkGetTokenURL, bytes.NewBuffer(jsonBytes))
if err != nil {
return "", fmt.Errorf("创建获取 token 请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("获取 tenant_access_token 请求失败: %w", err)
}
defer resp.Body.Close()
var tokenResp larkTokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("解析 tenant_access_token 响应失败: %w", err)
}
if tokenResp.Code != 0 {
return "", fmt.Errorf("获取 tenant_access_token API 返回错误: code=%d, msg=%s", tokenResp.Code, tokenResp.Msg)
}
// 缓存新的 token 和过期时间
l.accessToken = tokenResp.TenantAccessToken
l.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.Expire) * time.Second)
return l.accessToken, nil
}
// Type 返回通知器的类型
func (l *larkNotifier) Type() NotifierType {
return NotifierTypeLark
}
// --- API 数据结构 ---
// larkTokenResponse 是获取 tenant_access_token API 的响应结构体
type larkTokenResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
TenantAccessToken string `json:"tenant_access_token"`
Expire int `json:"expire"` // 有效期,单位秒
}
// larkMessagePayload 是发送消息 API 的请求体结构
type larkMessagePayload struct {
ReceiveID string `json:"receive_id"`
ReceiveIDType string `json:"receive_id_type"`
MsgType string `json:"msg_type"`
Content string `json:"content"` // 对于 interactive 消息,这里是卡片的 JSON 字符串
}
// larkResponse 是飞书 API 的通用响应结构体
type larkResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
}

View File

@@ -0,0 +1,37 @@
package notify
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
)
// logNotifier 实现了 Notifier 接口,用于将告警信息记录到日志中。
type logNotifier struct {
logger *logs.Logger
}
// NewLogNotifier 创建一个新的 logNotifier 实例。
// 它接收一个日志记录器,用于实际的日志输出。
func NewLogNotifier(logger *logs.Logger) Notifier {
return &logNotifier{
logger: logger,
}
}
// Send 将告警内容以结构化的方式记录到日志中。
// toAddr 参数在这里表示告警的预期接收者地址,也会被记录。
func (l *logNotifier) Send(content AlarmContent, toAddr string) error {
l.logger.Infow("告警已记录到日志",
"notifierType", NotifierTypeLog,
"title", content.Title,
"message", content.Message,
"level", content.Level.String(),
"timestamp", content.Timestamp.Format(DefaultTimeFormat),
"toAddr", toAddr,
)
return nil // 记录日志操作本身不应失败
}
// Type 返回通知器的类型。
func (l *logNotifier) Type() NotifierType {
return NotifierTypeLog
}

View File

@@ -0,0 +1,44 @@
package notify
import (
"time"
"go.uber.org/zap/zapcore"
)
// DefaultTimeFormat 定义了所有通知中统一使用的时间格式。
const DefaultTimeFormat = "2006-01-02 15:04:05"
// NotifierType 定义了通知器的类型。
type NotifierType string
const (
// NotifierTypeSMTP 表示 SMTP 邮件通知器。
NotifierTypeSMTP NotifierType = "邮件"
// NotifierTypeWeChat 表示企业微信通知器。
NotifierTypeWeChat NotifierType = "企业微信"
// NotifierTypeLark 表示飞书通知器。
NotifierTypeLark NotifierType = "飞书"
// NotifierTypeLog 表示日志通知器,作为最终的告警记录渠道。
NotifierTypeLog NotifierType = "日志"
)
// AlarmContent 定义了通知的内容
type AlarmContent struct {
// 通知标题
Title string
// 通知信息
Message string
// 通知级别
Level zapcore.Level
// 通知时间
Timestamp time.Time
}
// Notifier 定义了通知发送器的接口
type Notifier interface {
// Send 发送通知
Send(content AlarmContent, toAddr string) error
// Type 返回通知器的类型
Type() NotifierType
}

View File

@@ -0,0 +1,73 @@
package notify
import (
"fmt"
"net/smtp"
"strings"
)
// smtpNotifier 实现了 Notifier 接口,用于通过 SMTP 发送邮件通知。
type smtpNotifier struct {
host string // SMTP 服务器地址
port int // SMTP 服务器端口
username string // 发件人邮箱地址
password string // 发件人邮箱授权码
sender string // 发件人名称或地址,显示在邮件的 \"From\" 字段
}
// NewSMTPNotifier 创建一个新的 smtpNotifier 实例。
// 调用者需要注入 SMTP 相关的配置。
func NewSMTPNotifier(host string, port int, username, password, sender string) Notifier {
return &smtpNotifier{
host: host,
port: port,
username: username,
password: password,
sender: sender,
}
}
// Send 使用 net/smtp 包发送一封邮件。
func (s *smtpNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 设置认证信息
auth := smtp.PlainAuth("", s.username, s.password, s.host)
// 2. 构建邮件内容
// 邮件头
subject := content.Title
contentType := "Content-Type: text/plain; charset=UTF-8"
fromHeader := fmt.Sprintf("From: %s", s.sender)
toHeader := fmt.Sprintf("To: %s", toAddr)
subjectHeader := fmt.Sprintf("Subject: %s", subject)
// 邮件正文
body := fmt.Sprintf("级别: %s\n时间: %s\n\n%s",
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
)
// 拼接完整的邮件报文
msg := strings.Join([]string{
fromHeader,
toHeader,
subjectHeader,
contentType,
"", // 邮件头和正文之间的空行
body,
}, "\r\n")
// 3. 发送邮件
addr := fmt.Sprintf("%s:%d", s.host, s.port)
err := smtp.SendMail(addr, auth, s.username, []string{toAddr}, []byte(msg))
if err != nil {
return fmt.Errorf("发送邮件失败: %w", err)
}
return nil
}
// Type 返回通知器的类型
func (s *smtpNotifier) Type() NotifierType {
return NotifierTypeSMTP
}

View File

@@ -0,0 +1,169 @@
package notify
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
)
const (
// 获取 access_token 的 API 地址
getTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
// 发送应用消息的 API 地址
sendMessageURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
)
// wechatNotifier 实现了 Notifier 接口,用于通过企业微信自建应用发送私聊消息。
type wechatNotifier struct {
corpID string // 企业ID (CorpID)
agentID string // 应用ID (AgentID)
secret string // 应用密钥 (Secret)
// 用于线程安全地管理 access_token
mu sync.Mutex
accessToken string
tokenExpiresAt time.Time
}
// NewWechatNotifier 创建一个新的 wechatNotifier 实例。
// 调用者需要注入企业微信应用的 CorpID, AgentID 和 Secret。
func NewWechatNotifier(corpID, agentID, secret string) Notifier {
return &wechatNotifier{
corpID: corpID,
agentID: agentID,
secret: secret,
}
}
// Send 向指定用户发送一条 markdown 格式的私聊消息。
// toAddr 参数是接收者的 UserID 列表,用逗号或竖线分隔。
func (w *wechatNotifier) Send(content AlarmContent, toAddr string) error {
// 1. 获取有效的 access_token
token, err := w.getAccessToken()
if err != nil {
return err
}
// 2. 构建 markdown 内容
markdownContent := fmt.Sprintf("## %s\n> 级别: <font color=\"warning\">%s</font>\n> 时间: %s\n\n%s",
content.Title,
content.Level.String(),
content.Timestamp.Format(DefaultTimeFormat),
content.Message,
)
// 3. 构建请求的 JSON Body
// 将逗号分隔的 toAddr 转换为竖线分隔,以符合 API 要求
userList := strings.ReplaceAll(toAddr, ",", "|")
payload := wechatMessagePayload{
ToUser: userList,
MsgType: "markdown",
AgentID: w.agentID,
Markdown: struct {
Content string `json:"content"`
}{
Content: markdownContent,
},
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("序列化企业微信消息失败: %w", err)
}
// 4. 发送 HTTP POST 请求
url := fmt.Sprintf("%s?access_token=%s", sendMessageURL, token)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
return fmt.Errorf("创建企业微信请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送企业微信通知失败: %w", err)
}
defer resp.Body.Close()
// 5. 检查响应
var response wechatResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("解析企业微信响应失败: %w", err)
}
if response.ErrCode != 0 {
return fmt.Errorf("企业微信API返回错误: code=%d, msg=%s", response.ErrCode, response.ErrMsg)
}
return nil
}
// getAccessToken 获取并缓存 access_token处理了线程安全和自动刷新。
func (w *wechatNotifier) getAccessToken() (string, error) {
w.mu.Lock()
defer w.mu.Unlock()
// 如果 token 存在且有效期还有5分钟以上则直接返回缓存的 token
if w.accessToken != "" && time.Now().Before(w.tokenExpiresAt.Add(-5*time.Minute)) {
return w.accessToken, nil
}
// 否则,重新获取 token
url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", getTokenURL, w.corpID, w.secret)
resp, err := http.Get(url)
if err != nil {
return "", fmt.Errorf("获取 access_token 请求失败: %w", err)
}
defer resp.Body.Close()
var tokenResp tokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("解析 access_token 响应失败: %w", err)
}
if tokenResp.ErrCode != 0 {
return "", fmt.Errorf("获取 access_token API 返回错误: code=%d, msg=%s", tokenResp.ErrCode, tokenResp.ErrMsg)
}
// 缓存新的 token 和过期时间
w.accessToken = tokenResp.AccessToken
w.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second)
return w.accessToken, nil
}
// Type 返回通知器的类型
func (w *wechatNotifier) Type() NotifierType {
return NotifierTypeWeChat
}
// --- API 数据结构 ---
// tokenResponse 是获取 access_token API 的响应结构体
type tokenResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
// wechatMessagePayload 是发送应用消息 API 的请求体结构
type wechatMessagePayload struct {
ToUser string `json:"touser"`
MsgType string `json:"msgtype"`
AgentID string `json:"agentid"`
Markdown struct {
Content string `json:"content"`
} `json:"markdown"`
}
// wechatResponse 是企业微信 API 的通用响应结构体
type wechatResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
}

View File

@@ -23,6 +23,9 @@ type DeviceRepository interface {
// ListAll 获取所有设备的列表
ListAll() ([]*models.Device, error)
// ListAllSensors 获取所有传感器类型的设备列表
ListAllSensors() ([]*models.Device, error)
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备。
ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error)
@@ -84,6 +87,19 @@ func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) {
return devices, nil
}
// ListAllSensors 检索归类为传感器的所有设备
func (r *gormDeviceRepository) ListAllSensors() ([]*models.Device, error) {
var sensors []*models.Device
err := r.db.Preload("AreaController").Preload("DeviceTemplate").
Joins("JOIN device_templates ON device_templates.id = devices.device_template_id").
Where("device_templates.category = ?", models.CategorySensor).
Find(&sensors).Error
if err != nil {
return nil, fmt.Errorf("查询所有传感器失败: %w", err)
}
return sensors, nil
}
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备
func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) {
var devices []*models.Device

View File

@@ -0,0 +1,111 @@
package repository
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"go.uber.org/zap/zapcore"
"gorm.io/gorm"
)
// NotificationListOptions 定义了查询通知列表时的可选参数
type NotificationListOptions struct {
UserID *uint // 按用户ID过滤
NotifierType *notify.NotifierType // 按通知器类型过滤
Status *models.NotificationStatus // 按通知状态过滤 (例如:"success", "failed")
Level *zapcore.Level // 按通知等级过滤 (例如:"info", "warning", "error")
StartTime *time.Time // 通知内容生成时间范围 - 开始时间 (对应 AlarmTimestamp)
EndTime *time.Time // 通知内容生成时间范围 - 结束时间 (对应 AlarmTimestamp)
OrderBy string // 排序字段,例如 "alarm_timestamp DESC"
}
// NotificationRepository 定义了与通知记录相关的数据库操作接口。
type NotificationRepository interface {
// Create 将一条新的通知记录插入数据库。
Create(notification *models.Notification) error
// CreateInTx 在给定的事务中插入一条新的通知记录。
CreateInTx(tx *gorm.DB, notification *models.Notification) error
// BatchCreate 批量插入多条通知记录。
BatchCreate(notifications []*models.Notification) error
// List 支持分页和过滤的通知列表查询。
// 返回通知列表、总记录数和错误。
List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error)
}
// gormNotificationRepository 是 NotificationRepository 的 GORM 实现。
type gormNotificationRepository struct {
db *gorm.DB
}
// NewGormNotificationRepository 创建一个新的 NotificationRepository GORM 实现实例。
func NewGormNotificationRepository(db *gorm.DB) NotificationRepository {
return &gormNotificationRepository{db: db}
}
// Create 将一条新的通知记录插入数据库。
func (r *gormNotificationRepository) Create(notification *models.Notification) error {
return r.db.Create(notification).Error
}
// CreateInTx 在给定的事务中插入一条新的通知记录。
func (r *gormNotificationRepository) CreateInTx(tx *gorm.DB, notification *models.Notification) error {
return tx.Create(notification).Error
}
// BatchCreate 批量插入多条通知记录。
func (r *gormNotificationRepository) BatchCreate(notifications []*models.Notification) error {
// GORM 的 Create 方法在传入切片时会自动进行批量插入
return r.db.Create(&notifications).Error
}
// List 实现了分页和过滤查询通知记录的功能
func (r *gormNotificationRepository) List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) {
// --- 校验分页参数 ---
if page <= 0 || pageSize <= 0 {
return nil, 0, ErrInvalidPagination // 复用已定义的错误
}
var results []models.Notification
var total int64
query := r.db.Model(&models.Notification{})
// --- 应用过滤条件 ---
if opts.UserID != nil {
query = query.Where("user_id = ?", *opts.UserID)
}
if opts.NotifierType != nil {
query = query.Where("notifier_type = ?", *opts.NotifierType)
}
if opts.Status != nil {
query = query.Where("status = ?", *opts.Status)
}
if opts.Level != nil {
query = query.Where("level = ?", opts.Level.String())
}
if opts.StartTime != nil {
query = query.Where("alarm_timestamp >= ?", *opts.StartTime)
}
if opts.EndTime != nil {
query = query.Where("alarm_timestamp <= ?", *opts.EndTime)
}
// --- 计算总数 ---
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
// --- 应用排序条件 ---
orderBy := "alarm_timestamp DESC" // 默认按时间倒序
if opts.OrderBy != "" {
orderBy = opts.OrderBy
}
query = query.Order(orderBy)
// --- 分页 ---
offset := (page - 1) * pageSize
err := query.Limit(pageSize).Offset(offset).Find(&results).Error
return results, total, err
}

View File

@@ -0,0 +1,6 @@
package repository
import "errors"
// ErrInvalidPagination 表示分页参数无效
var ErrInvalidPagination = errors.New("无效的分页参数page和pageSize必须为大于0")

View File

@@ -1,16 +1,12 @@
package repository
import (
"errors"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// ErrInvalidPagination 表示分页参数无效
var ErrInvalidPagination = errors.New("无效的分页参数page和pageSize必须为大于0")
// SensorDataListOptions 定义了查询传感器数据列表时的可选参数
type SensorDataListOptions struct {
DeviceID *uint

View File

@@ -13,6 +13,7 @@ type UserRepository interface {
FindByUsername(username string) (*models.User, error)
FindByID(id uint) (*models.User, error)
FindUserForLogin(identifier string) (*models.User, error)
FindAll() ([]*models.User, error)
}
// gormUserRepository 是 UserRepository 的 GORM 实现
@@ -64,3 +65,12 @@ func (r *gormUserRepository) FindByID(id uint) (*models.User, error) {
}
return &user, nil
}
// FindAll 返回数据库中的所有用户
func (r *gormUserRepository) FindAll() ([]*models.User, error) {
var users []*models.User
if err := r.db.Where("1 = 1").Find(&users).Error; err != nil {
return nil, err
}
return users, nil
}