diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..455fd53 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,93 @@ +# 应用基础配置 +app: + name: "PigFarmController" # 应用名称 + version: "1.0.0" # 应用版本 + jwt_secret: "your_jwt_secret_key_here" # JWT 签名密钥,请务必修改为强密码 + +# 服务器配置 +server: + port: 8080 # 服务器监听端口 + mode: "debug" # 运行模式: debug, release, test + +# 日志配置 +log: + level: "info" # 日志级别: debug, info, warn, error, dpanic, panic, fatal + format: "console" # 日志输出格式: console, json + enable_file: true # 是否同时输出到文件 + file_path: "app_logs/pig_farm_controller.log" # 日志文件路径 + max_size: 100 # 单个日志文件最大大小 (MB) + max_backups: 7 # 最多保留的旧日志文件数量 + max_age: 7 # 最多保留的旧日志文件天数 + compress: true # 是否压缩旧日志文件 + +# 数据库配置 +database: + host: "localhost" # 数据库主机地址 + port: 5432 # 数据库端口 + username: "postgres" # 数据库用户名 + password: "your_db_password" # 数据库密码 + dbname: "pig_farm_controller_db" # 数据库名称 + sslmode: "disable" # SSL模式: disable, require, verify-ca, verify-full + is_timescaledb: false # 是否为 TimescaleDB + max_open_conns: 100 # 最大开放连接数 + max_idle_conns: 10 # 最大空闲连接数 + conn_max_lifetime: 300 # 连接最大生命周期 (秒) + +# WebSocket配置 +websocket: + timeout: 60 # WebSocket请求超时时间 (秒) + heartbeat_interval: 30 # 心跳检测间隔 (秒) + +# 心跳配置 +heartbeat: + interval: 10 # 心跳间隔 (秒) + concurrency: 5 # 请求并发数 + +# ChirpStack API 配置 +chirp_stack: + api_host: "http://localhost:8080" # ChirpStack API 主机地址 + api_token: "your_chirpstack_api_token" # ChirpStack API Token + fport: 10 # ChirpStack FPort + api_timeout: 5 # API 请求超时时间 (秒) + collection_request_timeout: 10 # 采集请求超时时间 (秒) + +# 任务调度配置 +task: + interval: 5 # 任务调度间隔 (秒) + num_workers: 5 # 任务执行器并发工作数量 + +# Lora 配置 +lora: + mode: "lora_mesh" # Lora 运行模式: lora_wan, lora_mesh + +# Lora Mesh 配置 +lora_mesh: + uart_port: "/dev/ttyUSB0" # UART 串口路径 + baud_rate: 115200 # 波特率 + timeout: 5 # 超时时间 (秒) + lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command + max_chunk_size: 200 # 最大数据块大小 + reassembly_timeout: 10 # 重组超时时间 (秒) + +# 通知服务配置 +notify: + primary: "日志" # 首选通知渠道: "邮件", "企业微信", "飞书", "日志" (如果其他渠道未启用,"日志" 会自动成为首选) + failureThreshold: 2 # 连续失败多少次后触发广播模式 + smtp: + enabled: false # 是否启用 SMTP 邮件通知 + host: "smtp.example.com" # SMTP 服务器地址 + port: 587 # SMTP 服务器端口 + username: "your_email@example.com" # 发件人邮箱地址 + password: "your_email_password" # 发件人邮箱授权码或密码 + sender: "PigFarm Alarm " # 发件人名称和地址 + + wechat: + enabled: false # 是否启用企业微信通知 + corpID: "wwxxxxxxxxxxxx" # 企业ID (CorpID) + agentID: "1000001" # 应用ID (AgentID) + secret: "your_wechat_app_secret" # 应用密钥 (Secret) + + lark: + enabled: false # 是否启用飞书通知 + appID: "cli_xxxxxxxxxx" # 应用 ID + appSecret: "your_lark_app_secret" # 应用密钥 diff --git a/docs/docs.go b/docs/docs.go index 460adbc..e31d51e 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -975,6 +975,149 @@ const docTemplate = `{ } } }, + "/api/v1/monitor/notifications": { + "get": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "根据提供的过滤条件,分页获取通知列表", + "produces": [ + "application/json" + ], + "tags": [ + "数据监控" + ], + "summary": "批量查询通知", + "parameters": [ + { + "type": "string", + "name": "end_time", + "in": "query" + }, + { + "enum": [ + 7, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + -1, + 5, + 6 + ], + "type": "integer", + "format": "int32", + "x-enum-varnames": [ + "_numLevels", + "DebugLevel", + "InfoLevel", + "WarnLevel", + "ErrorLevel", + "DPanicLevel", + "PanicLevel", + "FatalLevel", + "_minLevel", + "_maxLevel", + "InvalidLevel" + ], + "name": "level", + "in": "query" + }, + { + "enum": [ + "邮件", + "企业微信", + "飞书", + "日志" + ], + "type": "string", + "x-enum-varnames": [ + "NotifierTypeSMTP", + "NotifierTypeWeChat", + "NotifierTypeLark", + "NotifierTypeLog" + ], + "name": "notifier_type", + "in": "query" + }, + { + "type": "string", + "name": "order_by", + "in": "query" + }, + { + "type": "integer", + "name": "page", + "in": "query" + }, + { + "type": "integer", + "name": "pageSize", + "in": "query" + }, + { + "type": "string", + "name": "start_time", + "in": "query" + }, + { + "enum": [ + "发送成功", + "发送失败", + "已跳过" + ], + "type": "string", + "x-enum-comments": { + "NotificationStatusFailed": "通知发送失败", + "NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)", + "NotificationStatusSuccess": "通知已成功发送" + }, + "x-enum-descriptions": [ + "通知已成功发送", + "通知发送失败", + "通知因某些原因被跳过(例如:用户未配置联系方式)" + ], + "x-enum-varnames": [ + "NotificationStatusSuccess", + "NotificationStatusFailed", + "NotificationStatusSkipped" + ], + "name": "status", + "in": "query" + }, + { + "type": "integer", + "name": "user_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/controller.Response" + }, + { + "type": "object", + "properties": { + "data": { + "$ref": "#/definitions/dto.ListNotificationResponse" + } + } + } + ] + } + } + } + } + }, "/api/v1/monitor/pending-collections": { "get": { "security": [ @@ -3923,6 +4066,64 @@ const docTemplate = `{ } } } + }, + "/api/v1/users/{id}/notifications/test": { + "post": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "用户管理" + ], + "summary": "发送测试通知", + "parameters": [ + { + "type": "integer", + "description": "用户ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "请求体", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/dto.SendTestNotificationRequest" + } + } + ], + "responses": { + "200": { + "description": "成功响应", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/controller.Response" + }, + { + "type": "object", + "properties": { + "data": { + "type": "string" + } + } + } + ] + } + } + } + } } }, "definitions": { @@ -4431,6 +4632,20 @@ const docTemplate = `{ } } }, + "dto.ListNotificationResponse": { + "type": "object", + "properties": { + "list": { + "type": "array", + "items": { + "$ref": "#/definitions/dto.NotificationDTO" + } + }, + "pagination": { + "$ref": "#/definitions/dto.PaginationDTO" + } + } + }, "dto.ListPendingCollectionResponse": { "type": "object", "properties": { @@ -4739,6 +4954,47 @@ const docTemplate = `{ } } }, + "dto.NotificationDTO": { + "type": "object", + "properties": { + "alarm_timestamp": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "error_message": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "level": { + "$ref": "#/definitions/zapcore.Level" + }, + "message": { + "type": "string" + }, + "notifier_type": { + "$ref": "#/definitions/notify.NotifierType" + }, + "status": { + "$ref": "#/definitions/models.NotificationStatus" + }, + "title": { + "type": "string" + }, + "to_address": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "user_id": { + "type": "integer" + } + } + }, "dto.PaginationDTO": { "type": "object", "properties": { @@ -5591,6 +5847,22 @@ const docTemplate = `{ } } }, + "dto.SendTestNotificationRequest": { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "description": "Type 指定要测试的通知渠道", + "allOf": [ + { + "$ref": "#/definitions/notify.NotifierType" + } + ] + } + } + }, "dto.SensorDataDTO": { "type": "object", "properties": { @@ -6201,6 +6473,29 @@ const docTemplate = `{ "ReasonTypeHealthCare" ] }, + "models.NotificationStatus": { + "type": "string", + "enum": [ + "发送成功", + "发送失败", + "已跳过" + ], + "x-enum-comments": { + "NotificationStatusFailed": "通知发送失败", + "NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)", + "NotificationStatusSuccess": "通知已成功发送" + }, + "x-enum-descriptions": [ + "通知已成功发送", + "通知发送失败", + "通知因某些原因被跳过(例如:用户未配置联系方式)" + ], + "x-enum-varnames": [ + "NotificationStatusSuccess", + "NotificationStatusFailed", + "NotificationStatusSkipped" + ] + }, "models.PenStatus": { "type": "string", "enum": [ @@ -6530,6 +6825,51 @@ const docTemplate = `{ "$ref": "#/definitions/models.SensorType" } } + }, + "notify.NotifierType": { + "type": "string", + "enum": [ + "邮件", + "企业微信", + "飞书", + "日志" + ], + "x-enum-varnames": [ + "NotifierTypeSMTP", + "NotifierTypeWeChat", + "NotifierTypeLark", + "NotifierTypeLog" + ] + }, + "zapcore.Level": { + "type": "integer", + "format": "int32", + "enum": [ + 7, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + -1, + 5, + 6 + ], + "x-enum-varnames": [ + "_numLevels", + "DebugLevel", + "InfoLevel", + "WarnLevel", + "ErrorLevel", + "DPanicLevel", + "PanicLevel", + "FatalLevel", + "_minLevel", + "_maxLevel", + "InvalidLevel" + ] } }, "securityDefinitions": { diff --git a/docs/swagger.json b/docs/swagger.json index 067a487..ec1dcb9 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -967,6 +967,149 @@ } } }, + "/api/v1/monitor/notifications": { + "get": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "根据提供的过滤条件,分页获取通知列表", + "produces": [ + "application/json" + ], + "tags": [ + "数据监控" + ], + "summary": "批量查询通知", + "parameters": [ + { + "type": "string", + "name": "end_time", + "in": "query" + }, + { + "enum": [ + 7, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + -1, + 5, + 6 + ], + "type": "integer", + "format": "int32", + "x-enum-varnames": [ + "_numLevels", + "DebugLevel", + "InfoLevel", + "WarnLevel", + "ErrorLevel", + "DPanicLevel", + "PanicLevel", + "FatalLevel", + "_minLevel", + "_maxLevel", + "InvalidLevel" + ], + "name": "level", + "in": "query" + }, + { + "enum": [ + "邮件", + "企业微信", + "飞书", + "日志" + ], + "type": "string", + "x-enum-varnames": [ + "NotifierTypeSMTP", + "NotifierTypeWeChat", + "NotifierTypeLark", + "NotifierTypeLog" + ], + "name": "notifier_type", + "in": "query" + }, + { + "type": "string", + "name": "order_by", + "in": "query" + }, + { + "type": "integer", + "name": "page", + "in": "query" + }, + { + "type": "integer", + "name": "pageSize", + "in": "query" + }, + { + "type": "string", + "name": "start_time", + "in": "query" + }, + { + "enum": [ + "发送成功", + "发送失败", + "已跳过" + ], + "type": "string", + "x-enum-comments": { + "NotificationStatusFailed": "通知发送失败", + "NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)", + "NotificationStatusSuccess": "通知已成功发送" + }, + "x-enum-descriptions": [ + "通知已成功发送", + "通知发送失败", + "通知因某些原因被跳过(例如:用户未配置联系方式)" + ], + "x-enum-varnames": [ + "NotificationStatusSuccess", + "NotificationStatusFailed", + "NotificationStatusSkipped" + ], + "name": "status", + "in": "query" + }, + { + "type": "integer", + "name": "user_id", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/controller.Response" + }, + { + "type": "object", + "properties": { + "data": { + "$ref": "#/definitions/dto.ListNotificationResponse" + } + } + } + ] + } + } + } + } + }, "/api/v1/monitor/pending-collections": { "get": { "security": [ @@ -3915,6 +4058,64 @@ } } } + }, + "/api/v1/users/{id}/notifications/test": { + "post": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "用户管理" + ], + "summary": "发送测试通知", + "parameters": [ + { + "type": "integer", + "description": "用户ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "请求体", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/dto.SendTestNotificationRequest" + } + } + ], + "responses": { + "200": { + "description": "成功响应", + "schema": { + "allOf": [ + { + "$ref": "#/definitions/controller.Response" + }, + { + "type": "object", + "properties": { + "data": { + "type": "string" + } + } + } + ] + } + } + } + } } }, "definitions": { @@ -4423,6 +4624,20 @@ } } }, + "dto.ListNotificationResponse": { + "type": "object", + "properties": { + "list": { + "type": "array", + "items": { + "$ref": "#/definitions/dto.NotificationDTO" + } + }, + "pagination": { + "$ref": "#/definitions/dto.PaginationDTO" + } + } + }, "dto.ListPendingCollectionResponse": { "type": "object", "properties": { @@ -4731,6 +4946,47 @@ } } }, + "dto.NotificationDTO": { + "type": "object", + "properties": { + "alarm_timestamp": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "error_message": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "level": { + "$ref": "#/definitions/zapcore.Level" + }, + "message": { + "type": "string" + }, + "notifier_type": { + "$ref": "#/definitions/notify.NotifierType" + }, + "status": { + "$ref": "#/definitions/models.NotificationStatus" + }, + "title": { + "type": "string" + }, + "to_address": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "user_id": { + "type": "integer" + } + } + }, "dto.PaginationDTO": { "type": "object", "properties": { @@ -5583,6 +5839,22 @@ } } }, + "dto.SendTestNotificationRequest": { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "description": "Type 指定要测试的通知渠道", + "allOf": [ + { + "$ref": "#/definitions/notify.NotifierType" + } + ] + } + } + }, "dto.SensorDataDTO": { "type": "object", "properties": { @@ -6193,6 +6465,29 @@ "ReasonTypeHealthCare" ] }, + "models.NotificationStatus": { + "type": "string", + "enum": [ + "发送成功", + "发送失败", + "已跳过" + ], + "x-enum-comments": { + "NotificationStatusFailed": "通知发送失败", + "NotificationStatusSkipped": "通知因某些原因被跳过(例如:用户未配置联系方式)", + "NotificationStatusSuccess": "通知已成功发送" + }, + "x-enum-descriptions": [ + "通知已成功发送", + "通知发送失败", + "通知因某些原因被跳过(例如:用户未配置联系方式)" + ], + "x-enum-varnames": [ + "NotificationStatusSuccess", + "NotificationStatusFailed", + "NotificationStatusSkipped" + ] + }, "models.PenStatus": { "type": "string", "enum": [ @@ -6522,6 +6817,51 @@ "$ref": "#/definitions/models.SensorType" } } + }, + "notify.NotifierType": { + "type": "string", + "enum": [ + "邮件", + "企业微信", + "飞书", + "日志" + ], + "x-enum-varnames": [ + "NotifierTypeSMTP", + "NotifierTypeWeChat", + "NotifierTypeLark", + "NotifierTypeLog" + ] + }, + "zapcore.Level": { + "type": "integer", + "format": "int32", + "enum": [ + 7, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + -1, + 5, + 6 + ], + "x-enum-varnames": [ + "_numLevels", + "DebugLevel", + "InfoLevel", + "WarnLevel", + "ErrorLevel", + "DPanicLevel", + "PanicLevel", + "FatalLevel", + "_minLevel", + "_maxLevel", + "InvalidLevel" + ] } }, "securityDefinitions": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 0953c91..93c6e86 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -349,6 +349,15 @@ definitions: pagination: $ref: '#/definitions/dto.PaginationDTO' type: object + dto.ListNotificationResponse: + properties: + list: + items: + $ref: '#/definitions/dto.NotificationDTO' + type: array + pagination: + $ref: '#/definitions/dto.PaginationDTO' + type: object dto.ListPendingCollectionResponse: properties: list: @@ -552,6 +561,33 @@ definitions: - quantity - toPenID type: object + dto.NotificationDTO: + properties: + alarm_timestamp: + type: string + created_at: + type: string + error_message: + type: string + id: + type: integer + level: + $ref: '#/definitions/zapcore.Level' + message: + type: string + notifier_type: + $ref: '#/definitions/notify.NotifierType' + status: + $ref: '#/definitions/models.NotificationStatus' + title: + type: string + to_address: + type: string + updated_at: + type: string + user_id: + type: integer + type: object dto.PaginationDTO: properties: page: @@ -1125,6 +1161,15 @@ definitions: - traderName - unitPrice type: object + dto.SendTestNotificationRequest: + properties: + type: + allOf: + - $ref: '#/definitions/notify.NotifierType' + description: Type 指定要测试的通知渠道 + required: + - type + type: object dto.SensorDataDTO: properties: data: @@ -1548,6 +1593,24 @@ definitions: - ReasonTypePreventive - ReasonTypeTreatment - ReasonTypeHealthCare + models.NotificationStatus: + enum: + - 发送成功 + - 发送失败 + - 已跳过 + type: string + x-enum-comments: + NotificationStatusFailed: 通知发送失败 + NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式) + NotificationStatusSuccess: 通知已成功发送 + x-enum-descriptions: + - 通知已成功发送 + - 通知发送失败 + - 通知因某些原因被跳过(例如:用户未配置联系方式) + x-enum-varnames: + - NotificationStatusSuccess + - NotificationStatusFailed + - NotificationStatusSkipped models.PenStatus: enum: - 空闲 @@ -1816,6 +1879,45 @@ definitions: type: $ref: '#/definitions/models.SensorType' type: object + notify.NotifierType: + enum: + - 邮件 + - 企业微信 + - 飞书 + - 日志 + type: string + x-enum-varnames: + - NotifierTypeSMTP + - NotifierTypeWeChat + - NotifierTypeLark + - NotifierTypeLog + zapcore.Level: + enum: + - 7 + - -1 + - 0 + - 1 + - 2 + - 3 + - 4 + - 5 + - -1 + - 5 + - 6 + format: int32 + type: integer + x-enum-varnames: + - _numLevels + - DebugLevel + - InfoLevel + - WarnLevel + - ErrorLevel + - DPanicLevel + - PanicLevel + - FatalLevel + - _minLevel + - _maxLevel + - InvalidLevel info: contact: email: divano@example.com @@ -2379,6 +2481,105 @@ paths: summary: 获取用药记录列表 tags: - 数据监控 + /api/v1/monitor/notifications: + get: + description: 根据提供的过滤条件,分页获取通知列表 + parameters: + - in: query + name: end_time + type: string + - enum: + - 7 + - -1 + - 0 + - 1 + - 2 + - 3 + - 4 + - 5 + - -1 + - 5 + - 6 + format: int32 + in: query + name: level + type: integer + x-enum-varnames: + - _numLevels + - DebugLevel + - InfoLevel + - WarnLevel + - ErrorLevel + - DPanicLevel + - PanicLevel + - FatalLevel + - _minLevel + - _maxLevel + - InvalidLevel + - enum: + - 邮件 + - 企业微信 + - 飞书 + - 日志 + in: query + name: notifier_type + type: string + x-enum-varnames: + - NotifierTypeSMTP + - NotifierTypeWeChat + - NotifierTypeLark + - NotifierTypeLog + - in: query + name: order_by + type: string + - in: query + name: page + type: integer + - in: query + name: pageSize + type: integer + - in: query + name: start_time + type: string + - enum: + - 发送成功 + - 发送失败 + - 已跳过 + in: query + name: status + type: string + x-enum-comments: + NotificationStatusFailed: 通知发送失败 + NotificationStatusSkipped: 通知因某些原因被跳过(例如:用户未配置联系方式) + NotificationStatusSuccess: 通知已成功发送 + x-enum-descriptions: + - 通知已成功发送 + - 通知发送失败 + - 通知因某些原因被跳过(例如:用户未配置联系方式) + x-enum-varnames: + - NotificationStatusSuccess + - NotificationStatusFailed + - NotificationStatusSkipped + - in: query + name: user_id + type: integer + produces: + - application/json + responses: + "200": + description: OK + schema: + allOf: + - $ref: '#/definitions/controller.Response' + - properties: + data: + $ref: '#/definitions/dto.ListNotificationResponse' + type: object + security: + - BearerAuth: [] + summary: 批量查询通知 + tags: + - 数据监控 /api/v1/monitor/pending-collections: get: description: 根据提供的过滤条件,分页获取待采集请求 @@ -4086,6 +4287,40 @@ paths: summary: 获取指定用户的操作历史 tags: - 用户管理 + /api/v1/users/{id}/notifications/test: + post: + consumes: + - application/json + description: 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。 + parameters: + - description: 用户ID + in: path + name: id + required: true + type: integer + - description: 请求体 + in: body + name: body + required: true + schema: + $ref: '#/definitions/dto.SendTestNotificationRequest' + produces: + - application/json + responses: + "200": + description: 成功响应 + schema: + allOf: + - $ref: '#/definitions/controller.Response' + - properties: + data: + type: string + type: object + security: + - BearerAuth: [] + summary: 发送测试通知 + tags: + - 用户管理 /api/v1/users/login: post: consumes: diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 6037bbb..835bf82 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -28,6 +28,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" domain_device "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -68,9 +69,9 @@ func NewAPI(cfg config.ServerConfig, pigFarmService service.PigFarmService, pigBatchService service.PigBatchService, monitorService service.MonitorService, - userActionLogRepository repository.UserActionLogRepository, tokenService token.TokenService, auditService audit.Service, + notifyService domain_notify.Service, deviceService domain_device.Service, listenHandler webhook.ListenHandler, analysisTaskManager *task.AnalysisPlanTaskManager) *API { @@ -96,7 +97,7 @@ func NewAPI(cfg config.ServerConfig, config: cfg, listenHandler: listenHandler, // 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员 - userController: user.NewController(userRepo, monitorService, logger, tokenService), + userController: user.NewController(userRepo, monitorService, logger, tokenService, notifyService), // 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员 deviceController: device.NewController(deviceRepository, areaControllerRepository, deviceTemplateRepository, deviceService, logger), // 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员 diff --git a/internal/app/api/router.go b/internal/app/api/router.go index e5ac36e..56e595c 100644 --- a/internal/app/api/router.go +++ b/internal/app/api/router.go @@ -57,6 +57,7 @@ func (a *API) setupRoutes() { userGroup := authGroup.Group("/users") { userGroup.GET("/:id/history", a.userController.ListUserHistory) // 获取用户操作历史 + userGroup.POST("/:id/notifications/test", a.userController.SendTestNotification) } a.logger.Info("用户相关接口注册成功 (需要认证和审计)") @@ -175,6 +176,7 @@ func (a *API) setupRoutes() { monitorGroup.GET("/pig-sick-logs", a.monitorController.ListPigSickLogs) monitorGroup.GET("/pig-purchases", a.monitorController.ListPigPurchases) monitorGroup.GET("/pig-sales", a.monitorController.ListPigSales) + monitorGroup.GET("/notifications", a.monitorController.ListNotifications) } a.logger.Info("数据监控相关接口注册成功 (需要认证和审计)") } diff --git a/internal/app/controller/monitor/monitor_controller.go b/internal/app/controller/monitor/monitor_controller.go index 9b409fe..8c336d5 100644 --- a/internal/app/controller/monitor/monitor_controller.go +++ b/internal/app/controller/monitor/monitor_controller.go @@ -839,3 +839,50 @@ func (c *Controller) ListPigSales(ctx *gin.Context) { c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取猪只售卖记录成功", resp, actionType, "获取猪只售卖记录成功", req) } + +// ListNotifications godoc +// @Summary 批量查询通知 +// @Description 根据提供的过滤条件,分页获取通知列表 +// @Tags 数据监控 +// @Security BearerAuth +// @Produce json +// @Param query query dto.ListNotificationRequest true "查询参数" +// @Success 200 {object} controller.Response{data=dto.ListNotificationResponse} +// @Router /api/v1/monitor/notifications [get] +func (c *Controller) ListNotifications(ctx *gin.Context) { + const actionType = "批量查询通知" + + var req dto.ListNotificationRequest + if err := ctx.ShouldBindQuery(&req); err != nil { + c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的查询参数: "+err.Error(), actionType, "参数绑定失败", req) + return + } + + opts := repository.NotificationListOptions{ + UserID: req.UserID, + NotifierType: req.NotifierType, + Level: req.Level, + StartTime: req.StartTime, + EndTime: req.EndTime, + OrderBy: req.OrderBy, + Status: req.Status, + } + + data, total, err := c.monitorService.ListNotifications(opts, req.Page, req.PageSize) + if err != nil { + if errors.Is(err, repository.ErrInvalidPagination) { + c.logger.Warnf("%s: 无效的分页参数: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的分页参数: "+err.Error(), actionType, "无效分页参数", req) + return + } + + c.logger.Errorf("%s: 服务层查询失败: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "批量查询通知失败: "+err.Error(), actionType, "服务层查询失败", req) + return + } + + resp := dto.NewListNotificationResponse(data, total, req.Page, req.PageSize) + c.logger.Infof("%s: 成功, 获取到 %d 条记录, 总计 %d 条", actionType, len(data), total) + controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "批量查询通知成功", resp, actionType, "批量查询通知成功", req) +} diff --git a/internal/app/controller/user/user_controller.go b/internal/app/controller/user/user_controller.go index 5ec7d55..5fca24a 100644 --- a/internal/app/controller/user/user_controller.go +++ b/internal/app/controller/user/user_controller.go @@ -7,6 +7,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/controller" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" "git.huangwc.com/pig/pig-farm-controller/internal/app/service" + domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" @@ -19,16 +20,24 @@ import ( type Controller struct { userRepo repository.UserRepository monitorService service.MonitorService - tokenService token.TokenService // 注入 token 服务 + tokenService token.TokenService + notifyService domain_notify.Service logger *logs.Logger } // NewController 创建用户控制器实例 -func NewController(userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, tokenService token.TokenService) *Controller { +func NewController( + userRepo repository.UserRepository, + monitorService service.MonitorService, + logger *logs.Logger, + tokenService token.TokenService, + notifyService domain_notify.Service, +) *Controller { return &Controller{ userRepo: userRepo, monitorService: monitorService, tokenService: tokenService, + notifyService: notifyService, logger: logger, } } @@ -192,3 +201,46 @@ func (c *Controller) ListUserHistory(ctx *gin.Context) { c.logger.Infof("%s: 成功获取用户 %d 的操作历史, 数量: %d", actionType, userID, len(data)) controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "获取用户操作历史成功", resp, actionType, "获取用户操作历史成功", opts) } + +// SendTestNotification godoc +// @Summary 发送测试通知 +// @Description 为指定用户发送一条特定渠道的测试消息,以验证其配置是否正确。 +// @Tags 用户管理 +// @Security BearerAuth +// @Accept json +// @Produce json +// @Param id path int true "用户ID" +// @Param body body dto.SendTestNotificationRequest true "请求体" +// @Success 200 {object} controller.Response{data=string} "成功响应" +// @Router /api/v1/users/{id}/notifications/test [post] +func (c *Controller) SendTestNotification(ctx *gin.Context) { + const actionType = "发送测试通知" + + // 1. 从 URL 中获取用户 ID + userID, err := strconv.ParseUint(ctx.Param("id"), 10, 32) + if err != nil { + c.logger.Errorf("%s: 无效的用户ID格式: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "无效的用户ID格式", actionType, "无效的用户ID格式", ctx.Param("id")) + return + } + + // 2. 从请求体 (JSON Body) 中获取要测试的通知类型 + var req dto.SendTestNotificationRequest + if err := ctx.ShouldBindJSON(&req); err != nil { + c.logger.Errorf("%s: 参数绑定失败: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeBadRequest, "请求体格式错误或缺少 'type' 字段: "+err.Error(), actionType, "请求体绑定失败", req) + return + } + + // 3. 调用领域服务 + err = c.notifyService.SendTestMessage(uint(userID), req.Type) + if err != nil { + c.logger.Errorf("%s: 服务层调用失败: %v", actionType, err) + controller.SendErrorWithAudit(ctx, controller.CodeInternalError, "发送测试消息失败: "+err.Error(), actionType, "服务层调用失败", gin.H{"userID": userID, "type": req.Type}) + return + } + + // 4. 返回成功响应 + c.logger.Infof("%s: 成功为用户 %d 发送类型为 %s 的测试消息", actionType, userID, req.Type) + controller.SendSuccessWithAudit(ctx, controller.CodeSuccess, "测试消息已发送,请检查您的接收端。", nil, actionType, "测试消息发送成功", gin.H{"userID": userID, "type": req.Type}) +} diff --git a/internal/app/dto/notification_converter.go b/internal/app/dto/notification_converter.go new file mode 100644 index 0000000..5df25a8 --- /dev/null +++ b/internal/app/dto/notification_converter.go @@ -0,0 +1,36 @@ +package dto + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "go.uber.org/zap/zapcore" +) + +// NewListNotificationResponse 从模型数据创建通知列表响应 DTO +func NewListNotificationResponse(data []models.Notification, total int64, page, pageSize int) *ListNotificationResponse { + dtos := make([]NotificationDTO, len(data)) + for i, item := range data { + dtos[i] = NotificationDTO{ + ID: item.ID, + CreatedAt: item.CreatedAt, + UpdatedAt: item.UpdatedAt, + NotifierType: item.NotifierType, + UserID: item.UserID, + Title: item.Title, + Message: item.Message, + Level: zapcore.Level(item.Level), + AlarmTimestamp: item.AlarmTimestamp, + ToAddress: item.ToAddress, + Status: item.Status, + ErrorMessage: item.ErrorMessage, + } + } + + return &ListNotificationResponse{ + List: dtos, + Pagination: PaginationDTO{ + Total: total, + Page: page, + PageSize: pageSize, + }, + } +} diff --git a/internal/app/dto/notification_dto.go b/internal/app/dto/notification_dto.go new file mode 100644 index 0000000..c7b9856 --- /dev/null +++ b/internal/app/dto/notification_dto.go @@ -0,0 +1,50 @@ +package dto + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "go.uber.org/zap/zapcore" +) + +// SendTestNotificationRequest 定义了发送测试通知请求的 JSON 结构 +type SendTestNotificationRequest struct { + // Type 指定要测试的通知渠道 + Type notify.NotifierType `json:"type" binding:"required"` +} + +// ListNotificationRequest 定义了获取通知列表的请求参数 +type ListNotificationRequest struct { + Page int `form:"page,default=1"` + PageSize int `form:"pageSize,default=10"` + UserID *uint `form:"user_id"` + NotifierType *notify.NotifierType `form:"notifier_type"` + Status *models.NotificationStatus `form:"status"` + Level *zapcore.Level `form:"level"` + StartTime *time.Time `form:"start_time"` + EndTime *time.Time `form:"end_time"` + OrderBy string `form:"order_by"` +} + +// NotificationDTO 是用于API响应的通知结构 +type NotificationDTO struct { + ID uint `json:"id"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + NotifierType notify.NotifierType `json:"notifier_type"` + UserID uint `json:"user_id"` + Title string `json:"title"` + Message string `json:"message"` + Level zapcore.Level `json:"level"` + AlarmTimestamp time.Time `json:"alarm_timestamp"` + ToAddress string `json:"to_address"` + Status models.NotificationStatus `json:"status"` + ErrorMessage string `json:"error_message"` +} + +// ListNotificationResponse 是获取通知列表的响应结构 +type ListNotificationResponse struct { + List []NotificationDTO `json:"list"` + Pagination PaginationDTO `json:"pagination"` +} diff --git a/internal/app/service/monitor_service.go b/internal/app/service/monitor_service.go index eed0f4d..aa5666b 100644 --- a/internal/app/service/monitor_service.go +++ b/internal/app/service/monitor_service.go @@ -24,6 +24,7 @@ type MonitorService interface { ListPigSickLogs(opts repository.PigSickLogListOptions, page, pageSize int) ([]models.PigSickLog, int64, error) ListPigPurchases(opts repository.PigPurchaseListOptions, page, pageSize int) ([]models.PigPurchase, int64, error) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) + ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) } // monitorService 是 MonitorService 接口的具体实现 @@ -40,6 +41,7 @@ type monitorService struct { pigTransferLogRepo repository.PigTransferLogRepository pigSickLogRepo repository.PigSickLogRepository pigTradeRepo repository.PigTradeRepository + notificationRepo repository.NotificationRepository } // NewMonitorService 创建一个新的 MonitorService 实例 @@ -56,6 +58,7 @@ func NewMonitorService( pigTransferLogRepo repository.PigTransferLogRepository, pigSickLogRepo repository.PigSickLogRepository, pigTradeRepo repository.PigTradeRepository, + notificationRepo repository.NotificationRepository, ) MonitorService { return &monitorService{ sensorDataRepo: sensorDataRepo, @@ -70,6 +73,7 @@ func NewMonitorService( pigTransferLogRepo: pigTransferLogRepo, pigSickLogRepo: pigSickLogRepo, pigTradeRepo: pigTradeRepo, + notificationRepo: notificationRepo, } } @@ -157,3 +161,8 @@ func (s *monitorService) ListPigPurchases(opts repository.PigPurchaseListOptions func (s *monitorService) ListPigSales(opts repository.PigSaleListOptions, page, pageSize int) ([]models.PigSale, int64, error) { return s.pigTradeRepo.ListPigSales(opts, page, pageSize) } + +// ListNotifications 负责处理查询通知列表的业务逻辑 +func (s *monitorService) ListNotifications(opts repository.NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) { + return s.notificationRepo.List(opts, page, pageSize) +} diff --git a/internal/core/application.go b/internal/core/application.go index 573cad6..700b01a 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -12,6 +12,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" @@ -19,6 +20,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" @@ -41,6 +43,9 @@ type Application struct { // Lora Mesh 监听器 loraMeshCommunicator transport.Listener + + // 通知服务 + NotifyService domain_notify.Service } // NewApplication 创建并初始化一个新的 Application 实例。 @@ -85,6 +90,7 @@ func NewApplication(configPath string) (*Application, error) { pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB()) medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB()) rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB()) + notificationRepo := repository.NewGormNotificationRepository(storage.GetDB()) // 初始化事务管理器 unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger) @@ -112,11 +118,18 @@ func NewApplication(configPath string) (*Application, error) { pigTransferLogRepo, pigSickPigLogRepo, pigTradeRepo, + notificationRepo, ) // 初始化审计服务 auditService := audit.NewService(userActionLogRepo, logger) + // 初始化通知服务 + notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo) + if err != nil { + return nil, fmt.Errorf("初始化通知服务失败: %w", err) + } + // --- 初始化 LoRa 相关组件 --- var listenHandler webhook.ListenHandler var comm transport.Communicator @@ -176,9 +189,9 @@ func NewApplication(configPath string) (*Application, error) { pigFarmService, pigBatchService, monitorService, - userActionLogRepo, tokenService, auditService, + notifyService, generalDeviceService, listenHandler, analysisPlanTaskManager, @@ -197,11 +210,94 @@ func NewApplication(configPath string) (*Application, error) { pendingCollectionRepo: pendingCollectionRepo, analysisPlanTaskManager: analysisPlanTaskManager, loraMeshCommunicator: loraListener, + NotifyService: notifyService, } return app, nil } +// initNotifyService 根据配置初始化并返回一个通知领域服务。 +// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 +func initNotifyService( + cfg config.NotifyConfig, + log *logs.Logger, + userRepo repository.UserRepository, + notificationRepo repository.NotificationRepository, +) (domain_notify.Service, error) { + var availableNotifiers []notify.Notifier + + // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 + logNotifier := notify.NewLogNotifier(log) + availableNotifiers = append(availableNotifiers, logNotifier) + log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") + + // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 + if cfg.SMTP.Enabled { + smtpNotifier := notify.NewSMTPNotifier( + cfg.SMTP.Host, + cfg.SMTP.Port, + cfg.SMTP.Username, + cfg.SMTP.Password, + cfg.SMTP.Sender, + ) + availableNotifiers = append(availableNotifiers, smtpNotifier) + log.Info("SMTP通知器已启用") + } + + if cfg.WeChat.Enabled { + wechatNotifier := notify.NewWechatNotifier( + cfg.WeChat.CorpID, + cfg.WeChat.AgentID, + cfg.WeChat.Secret, + ) + availableNotifiers = append(availableNotifiers, wechatNotifier) + log.Info("企业微信通知器已启用") + } + + if cfg.Lark.Enabled { + larkNotifier := notify.NewLarkNotifier( + cfg.Lark.AppID, + cfg.Lark.AppSecret, + ) + availableNotifiers = append(availableNotifiers, larkNotifier) + log.Info("飞书通知器已启用") + } + + // 3. 动态确定首选通知器 + var primaryNotifier notify.Notifier + primaryNotifierType := notify.NotifierType(cfg.Primary) + + // 检查用户指定的主渠道是否已启用 + for _, n := range availableNotifiers { + if n.Type() == primaryNotifierType { + primaryNotifier = n + break + } + } + + // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用) + if primaryNotifier == nil { + primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 + log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) + } + + // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 + notifyService, err := domain_notify.NewFailoverService( + log, + userRepo, + availableNotifiers, + primaryNotifier.Type(), + cfg.FailureThreshold, + notificationRepo, + ) + if err != nil { + return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) + } + + log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) + return notifyService, nil +} + // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 func (app *Application) Start() error { app.Logger.Info("应用启动中...") diff --git a/internal/domain/notify/notify.go b/internal/domain/notify/notify.go new file mode 100644 index 0000000..2cfde52 --- /dev/null +++ b/internal/domain/notify/notify.go @@ -0,0 +1,292 @@ +package notify + +import ( + "fmt" + "strings" + "sync" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "go.uber.org/zap" +) + +// Service 定义了通知领域的核心业务逻辑接口 +type Service interface { + // SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 + SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error + + // BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。 + BroadcastAlarm(content notify.AlarmContent) error + + // SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。 + SendTestMessage(userID uint, notifierType notify.NotifierType) error +} + +// failoverService 是 Service 接口的实现,提供了故障转移功能 +type failoverService struct { + log *logs.Logger + userRepo repository.UserRepository + notifiers map[notify.NotifierType]notify.Notifier + primaryNotifier notify.Notifier + failureThreshold int + failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int) + notificationRepo repository.NotificationRepository +} + +// NewFailoverService 创建一个新的故障转移通知服务 +func NewFailoverService( + log *logs.Logger, + userRepo repository.UserRepository, + notifiers []notify.Notifier, + primaryNotifierType notify.NotifierType, + failureThreshold int, + notificationRepo repository.NotificationRepository, +) (Service, error) { + notifierMap := make(map[notify.NotifierType]notify.Notifier) + for _, n := range notifiers { + notifierMap[n.Type()] = n + } + + primaryNotifier, ok := notifierMap[primaryNotifierType] + if !ok { + return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType) + } + + return &failoverService{ + log: log, + userRepo: userRepo, + notifiers: notifierMap, + primaryNotifier: primaryNotifier, + failureThreshold: failureThreshold, + failureCounters: &sync.Map{}, + notificationRepo: notificationRepo, + }, nil +} + +// SendBatchAlarm 实现了向多个用户并发发送告警的功能 +func (s *failoverService) SendBatchAlarm(userIDs []uint, content notify.AlarmContent) error { + var wg sync.WaitGroup + var mu sync.Mutex + var allErrors []string + + s.log.Infow("开始批量发送告警...", "userCount", len(userIDs)) + + for _, userID := range userIDs { + wg.Add(1) + go func(id uint) { + defer wg.Done() + if err := s.sendAlarmToUser(id, content); err != nil { + mu.Lock() + allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err)) + mu.Unlock() + } + }(userID) + } + + wg.Wait() + + if len(allErrors) > 0 { + finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n")) + s.log.Error(finalError.Error()) + return finalError + } + + s.log.Info("批量发送告警成功完成,所有用户均已通知。") + return nil +} + +// BroadcastAlarm 实现了向所有用户发送告警的功能 +func (s *failoverService) BroadcastAlarm(content notify.AlarmContent) error { + users, err := s.userRepo.FindAll() + if err != nil { + s.log.Errorw("广播告警失败:查找所有用户时出错", "error", err) + return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err) + } + + var userIDs []uint + for _, user := range users { + userIDs = append(userIDs, user.ID) + } + + s.log.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs)) + // 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理 + return s.SendBatchAlarm(userIDs, content) +} + +// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑 +func (s *failoverService) sendAlarmToUser(userID uint, content notify.AlarmContent) error { + user, err := s.userRepo.FindByID(userID) + if err != nil { + s.log.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err) + return fmt.Errorf("查找用户失败: %w", err) + } + + counter, _ := s.failureCounters.LoadOrStore(userID, 0) + failureCount := counter.(int) + + if failureCount < s.failureThreshold { + primaryType := s.primaryNotifier.Type() + addr := getAddressForNotifier(primaryType, user.Contact) + if addr == "" { + // 记录跳过通知 + s.recordNotificationAttempt(userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)) + return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType) + } + + err = s.primaryNotifier.Send(content, addr) + if err == nil { + // 记录成功通知 + s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusSuccess, nil) + if failureCount > 0 { + s.log.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType) + s.failureCounters.Store(userID, 0) + } + return nil + } + + // 记录失败通知 + s.recordNotificationAttempt(userID, primaryType, content, addr, models.NotificationStatusFailed, err) + newFailureCount := failureCount + 1 + s.failureCounters.Store(userID, newFailureCount) + s.log.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount) + failureCount = newFailureCount + } + + if failureCount >= s.failureThreshold { + s.log.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold) + var lastErr error + for _, notifier := range s.notifiers { + addr := getAddressForNotifier(notifier.Type(), user.Contact) + if addr == "" { + // 记录跳过通知 + s.recordNotificationAttempt(userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type())) + continue + } + if err := notifier.Send(content, addr); err == nil { + // 记录成功通知 + s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil) + s.log.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type()) + s.failureCounters.Store(userID, 0) + return nil + } + // 记录失败通知 + s.recordNotificationAttempt(userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err) + lastErr = err + s.log.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err) + } + return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr) + } + + return nil +} + +// SendTestMessage 实现了手动发送测试消息的功能 +func (s *failoverService) SendTestMessage(userID uint, notifierType notify.NotifierType) error { + user, err := s.userRepo.FindByID(userID) + if err != nil { + s.log.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err) + return fmt.Errorf("查找用户失败: %w", err) + } + + notifier, ok := s.notifiers[notifierType] + if !ok { + s.log.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType) + return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType) + } + + addr := getAddressForNotifier(notifierType, user.Contact) + if addr == "" { + s.log.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType) + // 记录跳过通知 + s.recordNotificationAttempt(userID, notifierType, notify.AlarmContent{ + Title: "通知服务测试", + Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType), + Level: zap.InfoLevel, + Timestamp: time.Now(), + }, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)) + return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType) + } + + testContent := notify.AlarmContent{ + Title: "通知服务测试", + Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息,说明您的配置正确。", notifierType), + Level: zap.InfoLevel, + Timestamp: time.Now(), + } + + s.log.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr) + err = notifier.Send(testContent, addr) + if err != nil { + s.log.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err) + // 记录失败通知 + s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusFailed, err) + return err + } + + s.log.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType) + // 记录成功通知 + s.recordNotificationAttempt(userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil) + return nil +} + +// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址 +func getAddressForNotifier(notifierType notify.NotifierType, contact models.ContactInfo) string { + switch notifierType { + case notify.NotifierTypeSMTP: + return contact.Email + case notify.NotifierTypeWeChat: + return contact.WeChat + case notify.NotifierTypeLark: + return contact.Feishu + case notify.NotifierTypeLog: + return "log" // LogNotifier不需要具体的地址,但为了函数签名一致性,返回一个无意义的非空字符串以绕过配置存在检查 + default: + return "" + } +} + +// recordNotificationAttempt 记录一次通知发送尝试的结果 +// userID: 接收通知的用户ID +// notifierType: 使用的通知器类型 +// content: 通知内容 +// toAddress: 实际发送到的地址 +// status: 发送尝试的状态 (成功、失败、跳过) +// err: 如果发送失败,记录的错误信息 +func (s *failoverService) recordNotificationAttempt( + userID uint, + notifierType notify.NotifierType, + content notify.AlarmContent, + toAddress string, + status models.NotificationStatus, + err error, +) { + errorMessage := "" + if err != nil { + errorMessage = err.Error() + } + + notification := &models.Notification{ + NotifierType: notifierType, + UserID: userID, + Title: content.Title, + Message: content.Message, + Level: models.LogLevel(content.Level), + AlarmTimestamp: content.Timestamp, + ToAddress: toAddress, + Status: status, + ErrorMessage: errorMessage, + } + + if saveErr := s.notificationRepo.Create(notification); saveErr != nil { + s.log.Errorw("无法保存通知发送记录到数据库", + "userID", userID, + "notifierType", notifierType, + "status", status, + "originalError", errorMessage, + "saveError", saveErr, + ) + } +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 0fd56ff..1a1cfb4 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -41,6 +41,9 @@ type Config struct { // LoraMesh LoraMesh配置 LoraMesh LoraMeshConfig `yaml:"lora_mesh"` + + // Notify 通知服务配置 + Notify NotifyConfig `yaml:"notify"` } // AppConfig 代表应用基础配置 @@ -158,6 +161,40 @@ type LoraMeshConfig struct { ReassemblyTimeout int `yaml:"reassembly_timeout"` } +// NotifyConfig 包含了所有与通知服务相关的配置 +type NotifyConfig struct { + Primary string `yaml:"primary"` // 首选通知渠道 (e.g., "邮件", "企业微信", "飞书", "日志") + FailureThreshold int `yaml:"failureThreshold"` // 连续失败多少次后触发广播模式 + SMTP SMTPConfig `yaml:"smtp"` + WeChat WeChatConfig `yaml:"wechat"` + Lark LarkConfig `yaml:"lark"` +} + +// SMTPConfig SMTP邮件配置 +type SMTPConfig struct { + Enabled bool `yaml:"enabled"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Sender string `yaml:"sender"` +} + +// WeChatConfig 企业微信应用配置 +type WeChatConfig struct { + Enabled bool `yaml:"enabled"` + CorpID string `yaml:"corpID"` + AgentID string `yaml:"agentID"` + Secret string `yaml:"secret"` +} + +// LarkConfig 飞书应用配置 +type LarkConfig struct { + Enabled bool `yaml:"enabled"` + AppID string `yaml:"appID"` + AppSecret string `yaml:"appSecret"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index 4069a5d..fb58a30 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -171,6 +171,7 @@ func (ps *PostgresStorage) creatingHyperTable() error { {models.PigSickLog{}, "happened_at"}, {models.PigPurchase{}, "purchase_date"}, {models.PigSale{}, "sale_date"}, + {models.Notification{}, "alarm_timestamp"}, } for _, table := range tablesToConvert { @@ -211,6 +212,7 @@ func (ps *PostgresStorage) applyCompressionPolicies() error { {models.PigSickLog{}, "pig_batch_id"}, {models.PigPurchase{}, "pig_batch_id"}, {models.PigSale{}, "pig_batch_id"}, + {models.Notification{}, "user_id"}, } for _, policy := range policies { diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 7344a8a..43b0edb 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -59,6 +59,9 @@ func GetAllModels() []interface{} { // Medication Models &Medication{}, &MedicationLog{}, + + // Notification Models + &Notification{}, } } diff --git a/internal/infra/models/notify.go b/internal/infra/models/notify.go new file mode 100644 index 0000000..291ee9b --- /dev/null +++ b/internal/infra/models/notify.go @@ -0,0 +1,77 @@ +package models + +import ( + "database/sql/driver" + "errors" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "go.uber.org/zap/zapcore" + "gorm.io/gorm" +) + +// NotificationStatus 定义了通知发送尝试的状态枚举。 +type NotificationStatus string + +const ( + NotificationStatusSuccess NotificationStatus = "发送成功" // 通知已成功发送 + NotificationStatusFailed NotificationStatus = "发送失败" // 通知发送失败 + NotificationStatusSkipped NotificationStatus = "已跳过" // 通知因某些原因被跳过(例如:用户未配置联系方式) +) + +// LogLevel is a custom type for zapcore.Level to handle database scanning and valuing. +type LogLevel zapcore.Level + +// Scan implements the sql.Scanner interface. +func (l *LogLevel) Scan(value interface{}) error { + var s string + switch v := value.(type) { + case []byte: + s = string(v) + case string: + s = v + default: + return errors.New("LogLevel的类型无效") + } + + var zl zapcore.Level + if err := zl.UnmarshalText([]byte(s)); err != nil { + return err + } + *l = LogLevel(zl) + return nil +} + +// Value implements the driver.Valuer interface. +func (l LogLevel) Value() (driver.Value, error) { + return (zapcore.Level)(l).String(), nil +} + +// Notification 表示已发送或尝试发送的通知记录。 +type Notification struct { + gorm.Model + + // NotifierType 通知器类型 (例如:"邮件", "企业微信", "飞书", "日志") + NotifierType notify.NotifierType `gorm:"type:varchar(20);not null;index" json:"notifier_type"` + // UserID 接收通知的用户ID,用于追溯通知记录到特定用户 + UserID uint `gorm:"index" json:"user_id"` // 增加 UserID 字段,并添加索引 + // Title 通知标题 + Title string `gorm:"type:varchar(255);not null" json:"title"` + // Message 通知内容 + Message string `gorm:"type:text;not null" json:"message"` + // Level 通知级别 (例如:INFO, WARN, ERROR) + Level LogLevel `gorm:"type:varchar(10);not null" json:"level"` + // AlarmTimestamp 通知内容生成时的时间戳,与 ID 构成复合主键 + AlarmTimestamp time.Time `gorm:"primaryKey;not null" json:"alarm_timestamp"` + // ToAddress 接收地址 (例如:邮箱地址, 企业微信ID, 日志标识符) + ToAddress string `gorm:"type:varchar(255);not null" json:"to_address"` + // Status 通知发送尝试的状态 (例如:"待发送", "发送成功", "发送失败", "已跳过") + Status NotificationStatus `gorm:"type:varchar(20);not null;default:'待发送'" json:"status"` + // ErrorMessage 如果通知发送失败,此字段存储错误信息 + ErrorMessage string `gorm:"type:text" json:"error_message"` +} + +// TableName 指定 Notification 模型的表名。 +func (Notification) TableName() string { + return "notifications" +} diff --git a/internal/infra/notify/lark.go b/internal/infra/notify/lark.go new file mode 100644 index 0000000..586aa91 --- /dev/null +++ b/internal/infra/notify/lark.go @@ -0,0 +1,193 @@ +package notify + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +const ( + // 飞书获取 tenant_access_token 的 API 地址 + larkGetTokenURL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" + // 飞书发送消息的 API 地址 + larkSendMessageURL = "https://open.feishu.cn/open-apis/im/v1/messages" +) + +// larkNotifier 实现了 Notifier 接口,用于通过飞书自建应用发送私聊消息。 +type larkNotifier struct { + appID string // 应用 ID + appSecret string // 应用密钥 + + // 用于线程安全地管理 tenant_access_token + mu sync.Mutex + accessToken string + tokenExpiresAt time.Time +} + +// NewLarkNotifier 创建一个新的 larkNotifier 实例。 +// 调用者需要注入飞书应用的 AppID 和 AppSecret。 +func NewLarkNotifier(appID, appSecret string) Notifier { + return &larkNotifier{ + appID: appID, + appSecret: appSecret, + } +} + +// Send 向指定用户发送一条飞书消息卡片。 +// toAddr 参数是接收者的邮箱地址。 +func (l *larkNotifier) Send(content AlarmContent, toAddr string) error { + // 1. 获取有效的 tenant_access_token + token, err := l.getAccessToken() + if err != nil { + return err + } + + // 2. 构建消息卡片 JSON + // 飞书消息卡片结构复杂,这里构建一个简单的 Markdown 文本卡片 + cardContent := map[string]interface{}{ + "config": map[string]bool{ + "wide_screen_mode": true, + }, + "elements": []map[string]interface{}{ + { + "tag": "div", + "text": map[string]string{ + "tag": "lark_md", + "content": fmt.Sprintf("## %s\n**级别**: %s\n**时间**: %s\n\n%s", + content.Title, + content.Level.String(), + content.Timestamp.Format(DefaultTimeFormat), + content.Message, + ), + }, + }, + }, + } + + cardJSON, err := json.Marshal(cardContent) + if err != nil { + return fmt.Errorf("序列化飞书卡片内容失败: %w", err) + } + + // 3. 构建请求的 JSON Body + payload := larkMessagePayload{ + ReceiveID: toAddr, + ReceiveIDType: "email", // 指定接收者类型为邮箱 + MsgType: "interactive", // 消息卡片类型 + Content: string(cardJSON), + } + + jsonBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("序列化飞书消息失败: %w", err) + } + + // 4. 发送 HTTP POST 请求 + url := fmt.Sprintf("%s?receive_id_type=email", larkSendMessageURL) // 在 URL 中指定 receive_id_type + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) + if err != nil { + return fmt.Errorf("创建飞书请求失败: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) // 携带 access_token + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("发送飞书通知失败: %w", err) + } + defer resp.Body.Close() + + // 5. 检查响应 + var response larkResponse + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return fmt.Errorf("解析飞书响应失败: %w", err) + } + + if response.Code != 0 { + return fmt.Errorf("飞书API返回错误: code=%d, msg=%s", response.Code, response.Msg) + } + + return nil +} + +// getAccessToken 获取并缓存 tenant_access_token,处理了线程安全和自动刷新。 +func (l *larkNotifier) getAccessToken() (string, error) { + l.mu.Lock() + defer l.mu.Unlock() + + // 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token + if l.accessToken != "" && time.Now().Before(l.tokenExpiresAt.Add(-5*time.Minute)) { + return l.accessToken, nil + } + + // 否则,重新获取 token + payload := map[string]string{ + "app_id": l.appID, + "app_secret": l.appSecret, + } + jsonBytes, err := json.Marshal(payload) + if err != nil { + return "", fmt.Errorf("序列化获取 token 请求体失败: %w", err) + } + + req, err := http.NewRequest("POST", larkGetTokenURL, bytes.NewBuffer(jsonBytes)) + if err != nil { + return "", fmt.Errorf("创建获取 token 请求失败: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("获取 tenant_access_token 请求失败: %w", err) + } + defer resp.Body.Close() + + var tokenResp larkTokenResponse + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return "", fmt.Errorf("解析 tenant_access_token 响应失败: %w", err) + } + + if tokenResp.Code != 0 { + return "", fmt.Errorf("获取 tenant_access_token API 返回错误: code=%d, msg=%s", tokenResp.Code, tokenResp.Msg) + } + + // 缓存新的 token 和过期时间 + l.accessToken = tokenResp.TenantAccessToken + l.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.Expire) * time.Second) + + return l.accessToken, nil +} + +// Type 返回通知器的类型 +func (l *larkNotifier) Type() NotifierType { + return NotifierTypeLark +} + +// --- API 数据结构 --- + +// larkTokenResponse 是获取 tenant_access_token API 的响应结构体 +type larkTokenResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` + TenantAccessToken string `json:"tenant_access_token"` + Expire int `json:"expire"` // 有效期,单位秒 +} + +// larkMessagePayload 是发送消息 API 的请求体结构 +type larkMessagePayload struct { + ReceiveID string `json:"receive_id"` + ReceiveIDType string `json:"receive_id_type"` + MsgType string `json:"msg_type"` + Content string `json:"content"` // 对于 interactive 消息,这里是卡片的 JSON 字符串 +} + +// larkResponse 是飞书 API 的通用响应结构体 +type larkResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` +} diff --git a/internal/infra/notify/log_notifier.go b/internal/infra/notify/log_notifier.go new file mode 100644 index 0000000..887e524 --- /dev/null +++ b/internal/infra/notify/log_notifier.go @@ -0,0 +1,37 @@ +package notify + +import ( + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" +) + +// logNotifier 实现了 Notifier 接口,用于将告警信息记录到日志中。 +type logNotifier struct { + logger *logs.Logger +} + +// NewLogNotifier 创建一个新的 logNotifier 实例。 +// 它接收一个日志记录器,用于实际的日志输出。 +func NewLogNotifier(logger *logs.Logger) Notifier { + return &logNotifier{ + logger: logger, + } +} + +// Send 将告警内容以结构化的方式记录到日志中。 +// toAddr 参数在这里表示告警的预期接收者地址,也会被记录。 +func (l *logNotifier) Send(content AlarmContent, toAddr string) error { + l.logger.Infow("告警已记录到日志", + "notifierType", NotifierTypeLog, + "title", content.Title, + "message", content.Message, + "level", content.Level.String(), + "timestamp", content.Timestamp.Format(DefaultTimeFormat), + "toAddr", toAddr, + ) + return nil // 记录日志操作本身不应失败 +} + +// Type 返回通知器的类型。 +func (l *logNotifier) Type() NotifierType { + return NotifierTypeLog +} diff --git a/internal/infra/notify/notify.go b/internal/infra/notify/notify.go new file mode 100644 index 0000000..36b4a48 --- /dev/null +++ b/internal/infra/notify/notify.go @@ -0,0 +1,44 @@ +package notify + +import ( + "time" + + "go.uber.org/zap/zapcore" +) + +// DefaultTimeFormat 定义了所有通知中统一使用的时间格式。 +const DefaultTimeFormat = "2006-01-02 15:04:05" + +// NotifierType 定义了通知器的类型。 +type NotifierType string + +const ( + // NotifierTypeSMTP 表示 SMTP 邮件通知器。 + NotifierTypeSMTP NotifierType = "邮件" + // NotifierTypeWeChat 表示企业微信通知器。 + NotifierTypeWeChat NotifierType = "企业微信" + // NotifierTypeLark 表示飞书通知器。 + NotifierTypeLark NotifierType = "飞书" + // NotifierTypeLog 表示日志通知器,作为最终的告警记录渠道。 + NotifierTypeLog NotifierType = "日志" +) + +// AlarmContent 定义了通知的内容 +type AlarmContent struct { + // 通知标题 + Title string + // 通知信息 + Message string + // 通知级别 + Level zapcore.Level + // 通知时间 + Timestamp time.Time +} + +// Notifier 定义了通知发送器的接口 +type Notifier interface { + // Send 发送通知 + Send(content AlarmContent, toAddr string) error + // Type 返回通知器的类型 + Type() NotifierType +} diff --git a/internal/infra/notify/smtp.go b/internal/infra/notify/smtp.go new file mode 100644 index 0000000..f889556 --- /dev/null +++ b/internal/infra/notify/smtp.go @@ -0,0 +1,73 @@ +package notify + +import ( + "fmt" + "net/smtp" + "strings" +) + +// smtpNotifier 实现了 Notifier 接口,用于通过 SMTP 发送邮件通知。 +type smtpNotifier struct { + host string // SMTP 服务器地址 + port int // SMTP 服务器端口 + username string // 发件人邮箱地址 + password string // 发件人邮箱授权码 + sender string // 发件人名称或地址,显示在邮件的 \"From\" 字段 +} + +// NewSMTPNotifier 创建一个新的 smtpNotifier 实例。 +// 调用者需要注入 SMTP 相关的配置。 +func NewSMTPNotifier(host string, port int, username, password, sender string) Notifier { + return &smtpNotifier{ + host: host, + port: port, + username: username, + password: password, + sender: sender, + } +} + +// Send 使用 net/smtp 包发送一封邮件。 +func (s *smtpNotifier) Send(content AlarmContent, toAddr string) error { + // 1. 设置认证信息 + auth := smtp.PlainAuth("", s.username, s.password, s.host) + + // 2. 构建邮件内容 + // 邮件头 + subject := content.Title + contentType := "Content-Type: text/plain; charset=UTF-8" + fromHeader := fmt.Sprintf("From: %s", s.sender) + toHeader := fmt.Sprintf("To: %s", toAddr) + subjectHeader := fmt.Sprintf("Subject: %s", subject) + + // 邮件正文 + body := fmt.Sprintf("级别: %s\n时间: %s\n\n%s", + content.Level.String(), + content.Timestamp.Format(DefaultTimeFormat), + content.Message, + ) + + // 拼接完整的邮件报文 + msg := strings.Join([]string{ + fromHeader, + toHeader, + subjectHeader, + contentType, + "", // 邮件头和正文之间的空行 + body, + }, "\r\n") + + // 3. 发送邮件 + addr := fmt.Sprintf("%s:%d", s.host, s.port) + err := smtp.SendMail(addr, auth, s.username, []string{toAddr}, []byte(msg)) + if err != nil { + return fmt.Errorf("发送邮件失败: %w", err) + } + + return nil +} + +// Type 返回通知器的类型 +func (s *smtpNotifier) Type() NotifierType { + return NotifierTypeSMTP +} diff --git a/internal/infra/notify/wechat.go b/internal/infra/notify/wechat.go new file mode 100644 index 0000000..9e7e79e --- /dev/null +++ b/internal/infra/notify/wechat.go @@ -0,0 +1,169 @@ +package notify + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +const ( + // 获取 access_token 的 API 地址 + getTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" + // 发送应用消息的 API 地址 + sendMessageURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send" +) + +// wechatNotifier 实现了 Notifier 接口,用于通过企业微信自建应用发送私聊消息。 +type wechatNotifier struct { + corpID string // 企业ID (CorpID) + agentID string // 应用ID (AgentID) + secret string // 应用密钥 (Secret) + + // 用于线程安全地管理 access_token + mu sync.Mutex + accessToken string + tokenExpiresAt time.Time +} + +// NewWechatNotifier 创建一个新的 wechatNotifier 实例。 +// 调用者需要注入企业微信应用的 CorpID, AgentID 和 Secret。 +func NewWechatNotifier(corpID, agentID, secret string) Notifier { + return &wechatNotifier{ + corpID: corpID, + agentID: agentID, + secret: secret, + } +} + +// Send 向指定用户发送一条 markdown 格式的私聊消息。 +// toAddr 参数是接收者的 UserID 列表,用逗号或竖线分隔。 +func (w *wechatNotifier) Send(content AlarmContent, toAddr string) error { + // 1. 获取有效的 access_token + token, err := w.getAccessToken() + if err != nil { + return err + } + + // 2. 构建 markdown 内容 + markdownContent := fmt.Sprintf("## %s\n> 级别: %s\n> 时间: %s\n\n%s", + content.Title, + content.Level.String(), + content.Timestamp.Format(DefaultTimeFormat), + content.Message, + ) + + // 3. 构建请求的 JSON Body + // 将逗号分隔的 toAddr 转换为竖线分隔,以符合 API 要求 + userList := strings.ReplaceAll(toAddr, ",", "|") + payload := wechatMessagePayload{ + ToUser: userList, + MsgType: "markdown", + AgentID: w.agentID, + Markdown: struct { + Content string `json:"content"` + }{ + Content: markdownContent, + }, + } + + jsonBytes, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("序列化企业微信消息失败: %w", err) + } + + // 4. 发送 HTTP POST 请求 + url := fmt.Sprintf("%s?access_token=%s", sendMessageURL, token) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) + if err != nil { + return fmt.Errorf("创建企业微信请求失败: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("发送企业微信通知失败: %w", err) + } + defer resp.Body.Close() + + // 5. 检查响应 + var response wechatResponse + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return fmt.Errorf("解析企业微信响应失败: %w", err) + } + + if response.ErrCode != 0 { + return fmt.Errorf("企业微信API返回错误: code=%d, msg=%s", response.ErrCode, response.ErrMsg) + } + + return nil +} + +// getAccessToken 获取并缓存 access_token,处理了线程安全和自动刷新。 +func (w *wechatNotifier) getAccessToken() (string, error) { + w.mu.Lock() + defer w.mu.Unlock() + + // 如果 token 存在且有效期还有5分钟以上,则直接返回缓存的 token + if w.accessToken != "" && time.Now().Before(w.tokenExpiresAt.Add(-5*time.Minute)) { + return w.accessToken, nil + } + + // 否则,重新获取 token + url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", getTokenURL, w.corpID, w.secret) + resp, err := http.Get(url) + if err != nil { + return "", fmt.Errorf("获取 access_token 请求失败: %w", err) + } + defer resp.Body.Close() + + var tokenResp tokenResponse + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return "", fmt.Errorf("解析 access_token 响应失败: %w", err) + } + + if tokenResp.ErrCode != 0 { + return "", fmt.Errorf("获取 access_token API 返回错误: code=%d, msg=%s", tokenResp.ErrCode, tokenResp.ErrMsg) + } + + // 缓存新的 token 和过期时间 + w.accessToken = tokenResp.AccessToken + w.tokenExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second) + + return w.accessToken, nil +} + +// Type 返回通知器的类型 +func (w *wechatNotifier) Type() NotifierType { + return NotifierTypeWeChat +} + +// --- API 数据结构 --- + +// tokenResponse 是获取 access_token API 的响应结构体 +type tokenResponse struct { + ErrCode int `json:"errcode"` + ErrMsg string `json:"errmsg"` + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` +} + +// wechatMessagePayload 是发送应用消息 API 的请求体结构 +type wechatMessagePayload struct { + ToUser string `json:"touser"` + MsgType string `json:"msgtype"` + AgentID string `json:"agentid"` + Markdown struct { + Content string `json:"content"` + } `json:"markdown"` +} + +// wechatResponse 是企业微信 API 的通用响应结构体 +type wechatResponse struct { + ErrCode int `json:"errcode"` + ErrMsg string `json:"errmsg"` +} diff --git a/internal/infra/repository/notification_repository.go b/internal/infra/repository/notification_repository.go new file mode 100644 index 0000000..5843502 --- /dev/null +++ b/internal/infra/repository/notification_repository.go @@ -0,0 +1,111 @@ +package repository + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "go.uber.org/zap/zapcore" + "gorm.io/gorm" +) + +// NotificationListOptions 定义了查询通知列表时的可选参数 +type NotificationListOptions struct { + UserID *uint // 按用户ID过滤 + NotifierType *notify.NotifierType // 按通知器类型过滤 + Status *models.NotificationStatus // 按通知状态过滤 (例如:"success", "failed") + Level *zapcore.Level // 按通知等级过滤 (例如:"info", "warning", "error") + StartTime *time.Time // 通知内容生成时间范围 - 开始时间 (对应 AlarmTimestamp) + EndTime *time.Time // 通知内容生成时间范围 - 结束时间 (对应 AlarmTimestamp) + OrderBy string // 排序字段,例如 "alarm_timestamp DESC" +} + +// NotificationRepository 定义了与通知记录相关的数据库操作接口。 +type NotificationRepository interface { + // Create 将一条新的通知记录插入数据库。 + Create(notification *models.Notification) error + // CreateInTx 在给定的事务中插入一条新的通知记录。 + CreateInTx(tx *gorm.DB, notification *models.Notification) error + // BatchCreate 批量插入多条通知记录。 + BatchCreate(notifications []*models.Notification) error + // List 支持分页和过滤的通知列表查询。 + // 返回通知列表、总记录数和错误。 + List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) +} + +// gormNotificationRepository 是 NotificationRepository 的 GORM 实现。 +type gormNotificationRepository struct { + db *gorm.DB +} + +// NewGormNotificationRepository 创建一个新的 NotificationRepository GORM 实现实例。 +func NewGormNotificationRepository(db *gorm.DB) NotificationRepository { + return &gormNotificationRepository{db: db} +} + +// Create 将一条新的通知记录插入数据库。 +func (r *gormNotificationRepository) Create(notification *models.Notification) error { + return r.db.Create(notification).Error +} + +// CreateInTx 在给定的事务中插入一条新的通知记录。 +func (r *gormNotificationRepository) CreateInTx(tx *gorm.DB, notification *models.Notification) error { + return tx.Create(notification).Error +} + +// BatchCreate 批量插入多条通知记录。 +func (r *gormNotificationRepository) BatchCreate(notifications []*models.Notification) error { + // GORM 的 Create 方法在传入切片时会自动进行批量插入 + return r.db.Create(¬ifications).Error +} + +// List 实现了分页和过滤查询通知记录的功能 +func (r *gormNotificationRepository) List(opts NotificationListOptions, page, pageSize int) ([]models.Notification, int64, error) { + // --- 校验分页参数 --- + if page <= 0 || pageSize <= 0 { + return nil, 0, ErrInvalidPagination // 复用已定义的错误 + } + + var results []models.Notification + var total int64 + + query := r.db.Model(&models.Notification{}) + + // --- 应用过滤条件 --- + if opts.UserID != nil { + query = query.Where("user_id = ?", *opts.UserID) + } + if opts.NotifierType != nil { + query = query.Where("notifier_type = ?", *opts.NotifierType) + } + if opts.Status != nil { + query = query.Where("status = ?", *opts.Status) + } + if opts.Level != nil { + query = query.Where("level = ?", opts.Level.String()) + } + if opts.StartTime != nil { + query = query.Where("alarm_timestamp >= ?", *opts.StartTime) + } + if opts.EndTime != nil { + query = query.Where("alarm_timestamp <= ?", *opts.EndTime) + } + + // --- 计算总数 --- + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + + // --- 应用排序条件 --- + orderBy := "alarm_timestamp DESC" // 默认按时间倒序 + if opts.OrderBy != "" { + orderBy = opts.OrderBy + } + query = query.Order(orderBy) + + // --- 分页 --- + offset := (page - 1) * pageSize + err := query.Limit(pageSize).Offset(offset).Find(&results).Error + + return results, total, err +} diff --git a/internal/infra/repository/repository.go b/internal/infra/repository/repository.go new file mode 100644 index 0000000..9b7ebd4 --- /dev/null +++ b/internal/infra/repository/repository.go @@ -0,0 +1,6 @@ +package repository + +import "errors" + +// ErrInvalidPagination 表示分页参数无效 +var ErrInvalidPagination = errors.New("无效的分页参数:page和pageSize必须为大于0") diff --git a/internal/infra/repository/sensor_data_repository.go b/internal/infra/repository/sensor_data_repository.go index cc66db9..ee01117 100644 --- a/internal/infra/repository/sensor_data_repository.go +++ b/internal/infra/repository/sensor_data_repository.go @@ -1,16 +1,12 @@ package repository import ( - "errors" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" ) -// ErrInvalidPagination 表示分页参数无效 -var ErrInvalidPagination = errors.New("无效的分页参数:page和pageSize必须为大于0") - // SensorDataListOptions 定义了查询传感器数据列表时的可选参数 type SensorDataListOptions struct { DeviceID *uint diff --git a/internal/infra/repository/user_repository.go b/internal/infra/repository/user_repository.go index 8a290cf..71c2503 100644 --- a/internal/infra/repository/user_repository.go +++ b/internal/infra/repository/user_repository.go @@ -13,6 +13,7 @@ type UserRepository interface { FindByUsername(username string) (*models.User, error) FindByID(id uint) (*models.User, error) FindUserForLogin(identifier string) (*models.User, error) + FindAll() ([]*models.User, error) } // gormUserRepository 是 UserRepository 的 GORM 实现 @@ -64,3 +65,12 @@ func (r *gormUserRepository) FindByID(id uint) (*models.User, error) { } return &user, nil } + +// FindAll 返回数据库中的所有用户 +func (r *gormUserRepository) FindAll() ([]*models.User, error) { + var users []*models.User + if err := r.db.Where("1 = 1").Find(&users).Error; err != nil { + return nil, err + } + return users, nil +}