From 6a8e8f1f7dd8fd13d7b48ada9f784005134bc6f9 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 26 Oct 2025 15:10:38 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E9=87=87=E9=9B=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.example.yml | 38 ++++++-- config.yml | 6 +- internal/core/application.go | 27 ++++-- internal/domain/collection/collector.go | 6 ++ internal/domain/collection/timed_collector.go | 89 +++++++++++++++++++ internal/infra/config/config.go | 8 ++ .../infra/repository/device_repository.go | 16 ++++ 7 files changed, 176 insertions(+), 14 deletions(-) create mode 100644 internal/domain/collection/collector.go create mode 100644 internal/domain/collection/timed_collector.go diff --git a/config.example.yml b/config.example.yml index 455fd53..ae486b5 100644 --- a/config.example.yml +++ b/config.example.yml @@ -48,8 +48,10 @@ chirp_stack: api_host: "http://localhost:8080" # ChirpStack API 主机地址 api_token: "your_chirpstack_api_token" # ChirpStack API Token fport: 10 # ChirpStack FPort - api_timeout: 5 # API 请求超时时间 (秒) - collection_request_timeout: 10 # 采集请求超时时间 (秒) + api_timeout: 10 # ChirpStack API请求超时时间(秒) + # 等待设备上行响应的超时时间(秒)。 + # 对于LoRaWAN这种延迟较高的网络,建议设置为5分钟 (300秒) 或更长。 + collection_request_timeout: 300 # 任务调度配置 task: @@ -62,12 +64,28 @@ lora: # Lora Mesh 配置 lora_mesh: - uart_port: "/dev/ttyUSB0" # UART 串口路径 - baud_rate: 115200 # 波特率 - timeout: 5 # 超时时间 (秒) - lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command - max_chunk_size: 200 # 最大数据块大小 - reassembly_timeout: 10 # 重组超时时间 (秒) + # 主节点串口 + uart_port: "COM7" + # LoRa模块的通信波特率 + baud_rate: 9600 + # 等待LoRa模块AT指令响应的超时时间(ms) + timeout: 50 + # LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包) + # e.g. + # EC: 接收端只会接收到消息, 不会接收到请求头 + # e.g. 发送: EC 05 02 01 48 65 6c 6c 6f + # (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体)) + # 接收: 48 65 6c 6c 6f ("Hello") + # ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。 + # e.g. 发送: ED 05 12 34 01 00 01 02 03 + # (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块)) + # 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾) + lora_mesh_mode: "ED" + # 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238 + max_chunk_size: 238 + #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 + # 还没收到完整的包,则认为接收失败。 + reassembly_timeout: 30 # 通知服务配置 notify: @@ -91,3 +109,7 @@ notify: enabled: false # 是否启用飞书通知 appID: "cli_xxxxxxxxxx" # 应用 ID appSecret: "your_lark_app_secret" # 应用密钥 + +# 定时采集配置 +collection: + interval: 300 # 采集间隔 (秒) diff --git a/config.yml b/config.yml index ea5b45c..ace399d 100644 --- a/config.yml +++ b/config.yml @@ -86,4 +86,8 @@ lora_mesh: max_chunk_size: 238 #分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间 # 还没收到完整的包,则认为接收失败。 - reassembly_timeout: 30 \ No newline at end of file + reassembly_timeout: 30 + +# 定时采集配置 +collection: + interval: 300 # 采集间隔 (秒) \ No newline at end of file diff --git a/internal/core/application.go b/internal/core/application.go index 700b01a..454beee 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -11,6 +11,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/service" "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" @@ -28,11 +29,12 @@ import ( // Application 是整个应用的核心,封装了所有组件和生命周期。 type Application struct { - Config *config.Config - Logger *logs.Logger - Storage database.Storage - Executor *task.Scheduler - API *api.API // 添加 API 对象 + Config *config.Config + Logger *logs.Logger + Storage database.Storage + Executor *task.Scheduler + API *api.API + Collector collection.Collector // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 planRepo repository.PlanRepository @@ -177,6 +179,14 @@ func NewApplication(configPath string) (*Application, error) { cfg.Task.NumWorkers, ) + // --- 初始化定时采集器 --- + timedCollector := collection.NewTimedCollector( + deviceRepo, + generalDeviceService, + logger, + time.Duration(cfg.Collection.Interval)*time.Second, + ) + // 初始化 API 服务器 apiServer := api.NewAPI( cfg.Server, @@ -204,6 +214,7 @@ func NewApplication(configPath string) (*Application, error) { Storage: storage, Executor: executor, API: apiServer, + Collector: timedCollector, planRepo: planRepo, pendingTaskRepo: pendingTaskRepo, executionLogRepo: executionLogRepo, @@ -327,6 +338,9 @@ func (app *Application) Start() error { // 启动任务执行器 app.Executor.Start() + // 启动定时采集器 + app.Collector.Start() + // 启动 API 服务器 app.API.Start() @@ -349,6 +363,9 @@ func (app *Application) Stop() error { // 关闭任务执行器 app.Executor.Stop() + // 关闭定时采集器 + app.Collector.Stop() + // 断开数据库连接 if err := app.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) diff --git a/internal/domain/collection/collector.go b/internal/domain/collection/collector.go new file mode 100644 index 0000000..ae28dd7 --- /dev/null +++ b/internal/domain/collection/collector.go @@ -0,0 +1,6 @@ +package collection + +type Collector interface { + Start() + Stop() +} diff --git a/internal/domain/collection/timed_collector.go b/internal/domain/collection/timed_collector.go new file mode 100644 index 0000000..f6855ad --- /dev/null +++ b/internal/domain/collection/timed_collector.go @@ -0,0 +1,89 @@ +package collection + +import ( + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令 +type TimedCollector struct { + deviceRepo repository.DeviceRepository + deviceService device.Service + logger *logs.Logger + interval time.Duration + ticker *time.Ticker + done chan bool +} + +// NewTimedCollector 创建一个定时采集器实例 +func NewTimedCollector( + deviceRepo repository.DeviceRepository, + deviceService device.Service, + logger *logs.Logger, + interval time.Duration, +) Collector { + return &TimedCollector{ + deviceRepo: deviceRepo, + deviceService: deviceService, + logger: logger, + interval: interval, + done: make(chan bool), + } +} + +// Start 开始定时采集 +func (c *TimedCollector) Start() { + c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval) + c.ticker = time.NewTicker(c.interval) + go func() { + for { + select { + case <-c.done: + return + case <-c.ticker.C: + c.collect() + } + } + }() +} + +// Stop 停止定时采集 +func (c *TimedCollector) Stop() { + c.logger.Info("定时采集器停止") + c.ticker.Stop() + c.done <- true +} + +// collect 是核心的采集逻辑 +func (c *TimedCollector) collect() { + c.logger.Info("开始新一轮的设备数据采集") + + sensors, err := c.deviceRepo.ListAllSensors() + if err != nil { + c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err) + return + } + + if len(sensors) == 0 { + c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集") + return + } + + sensorsByController := make(map[uint][]*models.Device) + for _, sensor := range sensors { + sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor) + } + + for controllerID, controllerSensors := range sensorsByController { + c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors)) + if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil { + c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err) + } + } + + c.logger.Info("本轮设备数据采集完成") +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 1a1cfb4..b22f97b 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -44,6 +44,9 @@ type Config struct { // Notify 通知服务配置 Notify NotifyConfig `yaml:"notify"` + + // Collection 定时采集配置 + Collection CollectionConfig `yaml:"collection"` } // AppConfig 代表应用基础配置 @@ -195,6 +198,11 @@ type LarkConfig struct { AppSecret string `yaml:"appSecret"` } +// CollectionConfig 代表定时采集配置 +type CollectionConfig struct { + Interval int `yaml:"interval"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 diff --git a/internal/infra/repository/device_repository.go b/internal/infra/repository/device_repository.go index c22a584..96104cb 100644 --- a/internal/infra/repository/device_repository.go +++ b/internal/infra/repository/device_repository.go @@ -23,6 +23,9 @@ type DeviceRepository interface { // ListAll 获取所有设备的列表 ListAll() ([]*models.Device, error) + // ListAllSensors 获取所有传感器类型的设备列表 + ListAllSensors() ([]*models.Device, error) + // ListByAreaControllerID 根据区域主控 ID 列出所有子设备。 ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) @@ -84,6 +87,19 @@ func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) { return devices, nil } +// ListAllSensors 检索归类为传感器的所有设备 +func (r *gormDeviceRepository) ListAllSensors() ([]*models.Device, error) { + var sensors []*models.Device + err := r.db.Preload("AreaController").Preload("DeviceTemplate"). + Joins("JOIN device_templates ON device_templates.id = devices.device_template_id"). + Where("device_templates.category = ?", models.CategorySensor). + Find(&sensors).Error + if err != nil { + return nil, fmt.Errorf("查询所有传感器失败: %w", err) + } + return sensors, nil +} + // ListByAreaControllerID 根据区域主控 ID 列出所有子设备 func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) { var devices []*models.Device From 40eb57ee47eb9ed1f8bba5e8ef8fc0fa0960685c Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 26 Oct 2025 15:48:38 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E9=87=8D=E6=9E=84core=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/app/api/api.go | 4 +- .../app/controller/user/user_controller.go | 4 +- internal/app/middleware/auth.go | 2 +- internal/core/application.go | 395 ++++-------------- internal/core/initializers.go | 362 ++++++++++++++++ internal/domain/token/token_service.go | 10 +- 6 files changed, 444 insertions(+), 333 deletions(-) create mode 100644 internal/core/initializers.go diff --git a/internal/app/api/api.go b/internal/app/api/api.go index 835bf82..5200b7e 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -43,7 +43,7 @@ type API struct { engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 logger *logs.Logger // 日志记录器,用于输出日志信息 userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 - tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析 + tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析 auditService audit.Service // 审计服务,用于记录用户操作 httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig @@ -69,7 +69,7 @@ func NewAPI(cfg config.ServerConfig, pigFarmService service.PigFarmService, pigBatchService service.PigBatchService, monitorService service.MonitorService, - tokenService token.TokenService, + tokenService token.Service, auditService audit.Service, notifyService domain_notify.Service, deviceService domain_device.Service, diff --git a/internal/app/controller/user/user_controller.go b/internal/app/controller/user/user_controller.go index 5fca24a..6ddd6d9 100644 --- a/internal/app/controller/user/user_controller.go +++ b/internal/app/controller/user/user_controller.go @@ -20,7 +20,7 @@ import ( type Controller struct { userRepo repository.UserRepository monitorService service.MonitorService - tokenService token.TokenService + tokenService token.Service notifyService domain_notify.Service logger *logs.Logger } @@ -30,7 +30,7 @@ func NewController( userRepo repository.UserRepository, monitorService service.MonitorService, logger *logs.Logger, - tokenService token.TokenService, + tokenService token.Service, notifyService domain_notify.Service, ) *Controller { return &Controller{ diff --git a/internal/app/middleware/auth.go b/internal/app/middleware/auth.go index e370954..d0fbdc1 100644 --- a/internal/app/middleware/auth.go +++ b/internal/app/middleware/auth.go @@ -14,7 +14,7 @@ import ( // AuthMiddleware 创建一个Gin中间件,用于JWT身份验证 // 它依赖于 TokenService 来解析和验证 token,并使用 UserRepository 来获取完整的用户信息 -func AuthMiddleware(tokenService token.TokenService, userRepo repository.UserRepository) gin.HandlerFunc { +func AuthMiddleware(tokenService token.Service, userRepo repository.UserRepository) gin.HandlerFunc { return func(c *gin.Context) { // 从 Authorization header 获取 token authHeader := c.GetHeader("Authorization") diff --git a/internal/core/application.go b/internal/core/application.go index 454beee..fcc0904 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -5,346 +5,97 @@ import ( "os" "os/signal" "syscall" - "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" - "git.huangwc.com/pig/pig-farm-controller/internal/app/service" - "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" - domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 type Application struct { - Config *config.Config - Logger *logs.Logger - Storage database.Storage - Executor *task.Scheduler - API *api.API - Collector collection.Collector + Config *config.Config + Logger *logs.Logger + API *api.API - // 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问 - planRepo repository.PlanRepository - pendingTaskRepo repository.PendingTaskRepository - executionLogRepo repository.ExecutionLogRepository - pendingCollectionRepo repository.PendingCollectionRepository - analysisPlanTaskManager *task.AnalysisPlanTaskManager - - // Lora Mesh 监听器 - loraMeshCommunicator transport.Listener - - // 通知服务 - NotifyService domain_notify.Service + Infra *Infrastructure + Domain *DomainServices + App *AppServices } // NewApplication 创建并初始化一个新的 Application 实例。 // 这是应用的“组合根”,所有依赖都在这里被创建和注入。 func NewApplication(configPath string) (*Application, error) { - // 加载配置 + // 1. 初始化基本组件: 配置和日志 cfg := config.NewConfig() if err := cfg.Load(configPath); err != nil { return nil, fmt.Errorf("无法加载配置: %w", err) } - - // 初始化日志记录器 logger := logs.NewLogger(cfg.Log) - // 初始化数据库存储 - storage, err := initStorage(cfg.Database, logger) + // 2. 初始化所有分层服务 + infra, err := initInfrastructure(cfg, logger) if err != nil { - return nil, err // 错误已在 initStorage 中被包装 + return nil, fmt.Errorf("初始化基础设施失败: %w", err) } + domain := initDomainServices(cfg, infra, logger) + appServices := initAppServices(infra, domain, logger) - // 初始化 Token 服务 - tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) - - // --- 仓库对象初始化 --- - userRepo := repository.NewGormUserRepository(storage.GetDB()) - deviceRepo := repository.NewGormDeviceRepository(storage.GetDB()) - areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB()) - deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB()) - planRepo := repository.NewGormPlanRepository(storage.GetDB()) - pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB()) - executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) - sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB()) - deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB()) - pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB()) - userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB()) - pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB()) - pigBatchLogRepo := repository.NewGormPigBatchLogRepository(storage.GetDB()) - pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB()) - pigPenRepo := repository.NewGormPigPenRepository(storage.GetDB()) - pigTransferLogRepo := repository.NewGormPigTransferLogRepository(storage.GetDB()) - pigTradeRepo := repository.NewGormPigTradeRepository(storage.GetDB()) - pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB()) - medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB()) - rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB()) - notificationRepo := repository.NewGormNotificationRepository(storage.GetDB()) - - // 初始化事务管理器 - unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger) - - // 初始化猪群管理领域 - pigPenTransferManager := pig.NewPigPenTransferManager(pigPenRepo, pigTransferLogRepo, pigBatchRepo) - pigTradeManager := pig.NewPigTradeManager(pigTradeRepo) - pigSickManager := pig.NewSickPigManager(pigSickPigLogRepo, medicationLogRepo) - pigBatchDomain := pig.NewPigBatchService(pigBatchRepo, pigBatchLogRepo, unitOfWork, - pigPenTransferManager, pigTradeManager, pigSickManager) - - // --- 业务逻辑处理器初始化 --- - pigFarmService := service.NewPigFarmService(pigFarmRepo, pigPenRepo, pigBatchRepo, pigBatchDomain, unitOfWork, logger) - pigBatchService := service.NewPigBatchService(pigBatchDomain, logger) - monitorService := service.NewMonitorService( - sensorDataRepo, - deviceCommandLogRepo, - executionLogRepo, - pendingCollectionRepo, - userActionLogRepo, - rawMaterialRepo, - medicationLogRepo, - pigBatchRepo, - pigBatchLogRepo, - pigTransferLogRepo, - pigSickPigLogRepo, - pigTradeRepo, - notificationRepo, - ) - - // 初始化审计服务 - auditService := audit.NewService(userActionLogRepo, logger) - - // 初始化通知服务 - notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo) - if err != nil { - return nil, fmt.Errorf("初始化通知服务失败: %w", err) - } - - // --- 初始化 LoRa 相关组件 --- - var listenHandler webhook.ListenHandler - var comm transport.Communicator - var loraListener transport.Listener - - if cfg.Lora.Mode == config.LoraMode_LoRaWAN { - logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。") - listenHandler = webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo) - comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger) - loraListener = lora.NewPlaceholderTransport(logger) - } else { - logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。") - listenHandler = webhook.NewPlaceholderListener(logger) - tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo) - loraListener = tp - comm = tp - if err != nil { - return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err) - } - } - - // 初始化计划触发器管理器 - analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) - - // 初始化通用设备服务 - generalDeviceService := device.NewGeneralDeviceService( - deviceRepo, - deviceCommandLogRepo, - pendingCollectionRepo, - logger, - comm, - ) - - // 初始化任务执行器 - executor := task.NewScheduler( - pendingTaskRepo, - executionLogRepo, - deviceRepo, - sensorDataRepo, - planRepo, - analysisPlanTaskManager, - logger, - generalDeviceService, - time.Duration(cfg.Task.Interval)*time.Second, - cfg.Task.NumWorkers, - ) - - // --- 初始化定时采集器 --- - timedCollector := collection.NewTimedCollector( - deviceRepo, - generalDeviceService, - logger, - time.Duration(cfg.Collection.Interval)*time.Second, - ) - - // 初始化 API 服务器 + // 3. 初始化 API 入口点 apiServer := api.NewAPI( cfg.Server, logger, - userRepo, - deviceRepo, - areaControllerRepo, - deviceTemplateRepo, - planRepo, - pigFarmService, - pigBatchService, - monitorService, - tokenService, - auditService, - notifyService, - generalDeviceService, - listenHandler, - analysisPlanTaskManager, + infra.Repos.UserRepo, + infra.Repos.DeviceRepo, + infra.Repos.AreaControllerRepo, + infra.Repos.DeviceTemplateRepo, + infra.Repos.PlanRepo, + appServices.PigFarmService, + appServices.PigBatchService, + appServices.MonitorService, + infra.TokenService, + appServices.AuditService, + infra.NotifyService, + domain.GeneralDeviceService, + infra.Lora.ListenHandler, + domain.AnalysisPlanTaskManager, ) - // 组装 Application 对象 + // 4. 组装 Application 对象 app := &Application{ - Config: cfg, - Logger: logger, - Storage: storage, - Executor: executor, - API: apiServer, - Collector: timedCollector, - planRepo: planRepo, - pendingTaskRepo: pendingTaskRepo, - executionLogRepo: executionLogRepo, - pendingCollectionRepo: pendingCollectionRepo, - analysisPlanTaskManager: analysisPlanTaskManager, - loraMeshCommunicator: loraListener, - NotifyService: notifyService, + Config: cfg, + Logger: logger, + API: apiServer, + Infra: infra, + Domain: domain, + App: appServices, } return app, nil } -// initNotifyService 根据配置初始化并返回一个通知领域服务。 -// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 -func initNotifyService( - cfg config.NotifyConfig, - log *logs.Logger, - userRepo repository.UserRepository, - notificationRepo repository.NotificationRepository, -) (domain_notify.Service, error) { - var availableNotifiers []notify.Notifier - - // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 - logNotifier := notify.NewLogNotifier(log) - availableNotifiers = append(availableNotifiers, logNotifier) - log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") - - // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 - if cfg.SMTP.Enabled { - smtpNotifier := notify.NewSMTPNotifier( - cfg.SMTP.Host, - cfg.SMTP.Port, - cfg.SMTP.Username, - cfg.SMTP.Password, - cfg.SMTP.Sender, - ) - availableNotifiers = append(availableNotifiers, smtpNotifier) - log.Info("SMTP通知器已启用") - } - - if cfg.WeChat.Enabled { - wechatNotifier := notify.NewWechatNotifier( - cfg.WeChat.CorpID, - cfg.WeChat.AgentID, - cfg.WeChat.Secret, - ) - availableNotifiers = append(availableNotifiers, wechatNotifier) - log.Info("企业微信通知器已启用") - } - - if cfg.Lark.Enabled { - larkNotifier := notify.NewLarkNotifier( - cfg.Lark.AppID, - cfg.Lark.AppSecret, - ) - availableNotifiers = append(availableNotifiers, larkNotifier) - log.Info("飞书通知器已启用") - } - - // 3. 动态确定首选通知器 - var primaryNotifier notify.Notifier - primaryNotifierType := notify.NotifierType(cfg.Primary) - - // 检查用户指定的主渠道是否已启用 - for _, n := range availableNotifiers { - if n.Type() == primaryNotifierType { - primaryNotifier = n - break - } - } - - // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用) - if primaryNotifier == nil { - primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 - log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) - } - - // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 - notifyService, err := domain_notify.NewFailoverService( - log, - userRepo, - availableNotifiers, - primaryNotifier.Type(), - cfg.FailureThreshold, - notificationRepo, - ) - if err != nil { - return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) - } - - log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) - return notifyService, nil -} - // Start 启动应用的所有组件并阻塞,直到接收到关闭信号。 func (app *Application) Start() error { app.Logger.Info("应用启动中...") - // -- 启动 LoRa Mesh 监听器 - if err := app.loraMeshCommunicator.Listen(); err != nil { + // 1. 启动底层监听器 + if err := app.Infra.Lora.LoraListener.Listen(); err != nil { return fmt.Errorf("启动 LoRa Mesh 监听器失败: %w", err) } - // --- 清理待采集任务 --- - if err := app.initializePendingCollections(); err != nil { - // 这是一个非致命错误,记录它,但应用应继续启动 - app.Logger.Error(err) + // 2. 初始化应用状态 (清理、刷新任务等) + if err := app.initializeState(); err != nil { + return fmt.Errorf("初始化应用状态失败: %w", err) } - // --- 初始化待执行任务列表 --- - if err := app.initializePendingTasks( - app.planRepo, // 传入 planRepo - app.pendingTaskRepo, // 传入 pendingTaskRepo - app.executionLogRepo, // 传入 executionLogRepo - app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager - app.Logger, // 传入 logger - ); err != nil { - return fmt.Errorf("初始化待执行任务列表失败: %w", err) - } + // 3. 启动后台工作协程 + app.Domain.Scheduler.Start() + app.Domain.TimedCollector.Start() - // 启动任务执行器 - app.Executor.Start() - - // 启动定时采集器 - app.Collector.Start() - - // 启动 API 服务器 + // 4. 启动 API 服务器 app.API.Start() - // 等待关闭信号 + // 5. 等待关闭信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit @@ -361,18 +112,18 @@ func (app *Application) Stop() error { app.API.Stop() // 关闭任务执行器 - app.Executor.Stop() + app.Domain.Scheduler.Stop() // 关闭定时采集器 - app.Collector.Stop() + app.Domain.TimedCollector.Stop() // 断开数据库连接 - if err := app.Storage.Disconnect(); err != nil { + if err := app.Infra.Storage.Disconnect(); err != nil { app.Logger.Errorw("数据库连接断开失败", "error", err) } // 关闭 LoRa Mesh 监听器 - if err := app.loraMeshCommunicator.Stop(); err != nil { + if err := app.Infra.Lora.LoraListener.Stop(); err != nil { app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err) } @@ -383,6 +134,22 @@ func (app *Application) Stop() error { return nil } +// initializeState 在应用启动时准备其初始数据状态。 +// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 +func (app *Application) initializeState() error { + // 清理待采集任务 (非致命错误) + if err := app.initializePendingCollections(); err != nil { + app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) + } + + // 初始化待执行任务列表 (致命错误) + if err := app.initializePendingTasks(); err != nil { + return fmt.Errorf("初始化待执行任务列表失败: %w", err) + } + + return nil +} + // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。 @@ -390,7 +157,7 @@ func (app *Application) initializePendingCollections() error { app.Logger.Info("开始清理所有未完成的采集请求...") // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 - count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut() + count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() if err != nil { return fmt.Errorf("清理未完成的采集请求失败: %v", err) } else if count > 0 { @@ -403,13 +170,13 @@ func (app *Application) initializePendingCollections() error { } // initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 -func (app *Application) initializePendingTasks( - planRepo repository.PlanRepository, - pendingTaskRepo repository.PendingTaskRepository, - executionLogRepo repository.ExecutionLogRepository, - analysisPlanTaskManager *task.AnalysisPlanTaskManager, - logger *logs.Logger, -) error { +func (app *Application) initializePendingTasks() error { + logger := app.Logger + planRepo := app.Infra.Repos.PlanRepo + pendingTaskRepo := app.Infra.Repos.PendingTaskRepo + executionLogRepo := app.Infra.Repos.ExecutionLogRepo + analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager + logger.Info("开始初始化待执行任务列表...") // 阶段一:修正因崩溃导致状态不一致的固定次数计划 @@ -496,21 +263,3 @@ func (app *Application) initializePendingTasks( logger.Info("待执行任务列表初始化完成。") return nil } - -// initStorage 封装了数据库的初始化、连接和迁移逻辑。 -func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { - // 创建存储实例 - storage := database.NewStorage(cfg, logger) - if err := storage.Connect(); err != nil { - // 错误已在 Connect 内部被记录,这里只需包装并返回 - return nil, fmt.Errorf("数据库连接失败: %w", err) - } - - // 执行数据库迁移 - if err := storage.Migrate(models.GetAllModels()...); err != nil { - return nil, fmt.Errorf("数据库迁移失败: %w", err) - } - - logger.Info("数据库初始化完成。") - return storage, nil -} diff --git a/internal/core/initializers.go b/internal/core/initializers.go new file mode 100644 index 0000000..b9819b3 --- /dev/null +++ b/internal/core/initializers.go @@ -0,0 +1,362 @@ +package core + +import ( + "fmt" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/app/service" + "git.huangwc.com/pig/pig-farm-controller/internal/app/webhook" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/audit" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/collection" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/pig" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/token" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/database" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/notify" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora" + "gorm.io/gorm" +) + +// Infrastructure 聚合了所有基础设施层的组件。 +type Infrastructure struct { + Storage database.Storage + Repos *Repositories + Lora *LoraComponents + NotifyService domain_notify.Service + TokenService token.Service +} + +// initInfrastructure 初始化所有基础设施层组件。 +func initInfrastructure(cfg *config.Config, logger *logs.Logger) (*Infrastructure, error) { + storage, err := initStorage(cfg.Database, logger) + if err != nil { + return nil, err + } + + repos := initRepositories(storage.GetDB(), logger) + + lora, err := initLora(cfg, logger, repos) + if err != nil { + return nil, err + } + + notifyService, err := initNotifyService(cfg.Notify, logger, repos.UserRepo, repos.NotificationRepo) + if err != nil { + return nil, fmt.Errorf("初始化通知服务失败: %w", err) + } + + tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) + + return &Infrastructure{ + Storage: storage, + Repos: repos, + Lora: lora, + NotifyService: notifyService, + TokenService: tokenService, + }, nil +} + +// Repositories 聚合了所有的仓库实例。 +type Repositories struct { + UserRepo repository.UserRepository + DeviceRepo repository.DeviceRepository + AreaControllerRepo repository.AreaControllerRepository + DeviceTemplateRepo repository.DeviceTemplateRepository + PlanRepo repository.PlanRepository + PendingTaskRepo repository.PendingTaskRepository + ExecutionLogRepo repository.ExecutionLogRepository + SensorDataRepo repository.SensorDataRepository + DeviceCommandLogRepo repository.DeviceCommandLogRepository + PendingCollectionRepo repository.PendingCollectionRepository + UserActionLogRepo repository.UserActionLogRepository + PigBatchRepo repository.PigBatchRepository + PigBatchLogRepo repository.PigBatchLogRepository + PigFarmRepo repository.PigFarmRepository + PigPenRepo repository.PigPenRepository + PigTransferLogRepo repository.PigTransferLogRepository + PigTradeRepo repository.PigTradeRepository + PigSickPigLogRepo repository.PigSickLogRepository + MedicationLogRepo repository.MedicationLogRepository + RawMaterialRepo repository.RawMaterialRepository + NotificationRepo repository.NotificationRepository + UnitOfWork repository.UnitOfWork +} + +// initRepositories 初始化所有的仓库。 +func initRepositories(db *gorm.DB, logger *logs.Logger) *Repositories { + return &Repositories{ + UserRepo: repository.NewGormUserRepository(db), + DeviceRepo: repository.NewGormDeviceRepository(db), + AreaControllerRepo: repository.NewGormAreaControllerRepository(db), + DeviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db), + PlanRepo: repository.NewGormPlanRepository(db), + PendingTaskRepo: repository.NewGormPendingTaskRepository(db), + ExecutionLogRepo: repository.NewGormExecutionLogRepository(db), + SensorDataRepo: repository.NewGormSensorDataRepository(db), + DeviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db), + PendingCollectionRepo: repository.NewGormPendingCollectionRepository(db), + UserActionLogRepo: repository.NewGormUserActionLogRepository(db), + PigBatchRepo: repository.NewGormPigBatchRepository(db), + PigBatchLogRepo: repository.NewGormPigBatchLogRepository(db), + PigFarmRepo: repository.NewGormPigFarmRepository(db), + PigPenRepo: repository.NewGormPigPenRepository(db), + PigTransferLogRepo: repository.NewGormPigTransferLogRepository(db), + PigTradeRepo: repository.NewGormPigTradeRepository(db), + PigSickPigLogRepo: repository.NewGormPigSickLogRepository(db), + MedicationLogRepo: repository.NewGormMedicationLogRepository(db), + RawMaterialRepo: repository.NewGormRawMaterialRepository(db), + NotificationRepo: repository.NewGormNotificationRepository(db), + UnitOfWork: repository.NewGormUnitOfWork(db, logger), + } +} + +// DomainServices 聚合了所有的领域服务实例。 +type DomainServices struct { + PigPenTransferManager pig.PigPenTransferManager + PigTradeManager pig.PigTradeManager + PigSickManager pig.SickPigManager + PigBatchDomain pig.PigBatchService + TimedCollector collection.Collector + GeneralDeviceService device.Service + AnalysisPlanTaskManager *task.AnalysisPlanTaskManager + Scheduler *task.Scheduler +} + +// initDomainServices 初始化所有的领域服务。 +func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.Logger) *DomainServices { + // 猪群管理相关 + pigPenTransferManager := pig.NewPigPenTransferManager(infra.Repos.PigPenRepo, infra.Repos.PigTransferLogRepo, infra.Repos.PigBatchRepo) + pigTradeManager := pig.NewPigTradeManager(infra.Repos.PigTradeRepo) + pigSickManager := pig.NewSickPigManager(infra.Repos.PigSickPigLogRepo, infra.Repos.MedicationLogRepo) + pigBatchDomain := pig.NewPigBatchService(infra.Repos.PigBatchRepo, infra.Repos.PigBatchLogRepo, infra.Repos.UnitOfWork, + pigPenTransferManager, pigTradeManager, pigSickManager) + + // 通用设备服务 + generalDeviceService := device.NewGeneralDeviceService( + infra.Repos.DeviceRepo, + infra.Repos.DeviceCommandLogRepo, + infra.Repos.PendingCollectionRepo, + logger, + infra.Lora.Comm, + ) + + // 计划任务管理器 + analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger) + + // 任务执行器 + scheduler := task.NewScheduler( + infra.Repos.PendingTaskRepo, + infra.Repos.ExecutionLogRepo, + infra.Repos.DeviceRepo, + infra.Repos.SensorDataRepo, + infra.Repos.PlanRepo, + analysisPlanTaskManager, + logger, + generalDeviceService, + time.Duration(cfg.Task.Interval)*time.Second, + cfg.Task.NumWorkers, + ) + + // 定时采集器 + timedCollector := collection.NewTimedCollector( + infra.Repos.DeviceRepo, + generalDeviceService, + logger, + time.Duration(cfg.Collection.Interval)*time.Second, + ) + + return &DomainServices{ + PigPenTransferManager: pigPenTransferManager, + PigTradeManager: pigTradeManager, + PigSickManager: pigSickManager, + PigBatchDomain: pigBatchDomain, + GeneralDeviceService: generalDeviceService, + AnalysisPlanTaskManager: analysisPlanTaskManager, + Scheduler: scheduler, + TimedCollector: timedCollector, + } +} + +// AppServices 聚合了所有的应用服务实例。 +type AppServices struct { + PigFarmService service.PigFarmService + PigBatchService service.PigBatchService + MonitorService service.MonitorService + AuditService audit.Service +} + +// initAppServices 初始化所有的应用服务。 +func initAppServices(infra *Infrastructure, domainServices *DomainServices, logger *logs.Logger) *AppServices { + pigFarmService := service.NewPigFarmService(infra.Repos.PigFarmRepo, infra.Repos.PigPenRepo, infra.Repos.PigBatchRepo, domainServices.PigBatchDomain, infra.Repos.UnitOfWork, logger) + pigBatchService := service.NewPigBatchService(domainServices.PigBatchDomain, logger) + monitorService := service.NewMonitorService( + infra.Repos.SensorDataRepo, + infra.Repos.DeviceCommandLogRepo, + infra.Repos.ExecutionLogRepo, + infra.Repos.PendingCollectionRepo, + infra.Repos.UserActionLogRepo, + infra.Repos.RawMaterialRepo, + infra.Repos.MedicationLogRepo, + infra.Repos.PigBatchRepo, + infra.Repos.PigBatchLogRepo, + infra.Repos.PigTransferLogRepo, + infra.Repos.PigSickPigLogRepo, + infra.Repos.PigTradeRepo, + infra.Repos.NotificationRepo, + ) + auditService := audit.NewService(infra.Repos.UserActionLogRepo, logger) + + return &AppServices{ + PigFarmService: pigFarmService, + PigBatchService: pigBatchService, + MonitorService: monitorService, + AuditService: auditService, + } +} + +// LoraComponents 聚合了所有 LoRa 相关组件。 +type LoraComponents struct { + ListenHandler webhook.ListenHandler + Comm transport.Communicator + LoraListener transport.Listener +} + +// initLora 根据配置初始化 LoRa 相关组件。 +func initLora( + cfg *config.Config, + logger *logs.Logger, + repos *Repositories, +) (*LoraComponents, error) { + var listenHandler webhook.ListenHandler + var comm transport.Communicator + var loraListener transport.Listener + + if cfg.Lora.Mode == config.LoraMode_LoRaWAN { + logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。") + listenHandler = webhook.NewChirpStackListener(logger, repos.SensorDataRepo, repos.DeviceRepo, repos.AreaControllerRepo, repos.DeviceCommandLogRepo, repos.PendingCollectionRepo) + comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger) + loraListener = lora.NewPlaceholderTransport(logger) + } else { + logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。") + listenHandler = webhook.NewPlaceholderListener(logger) + tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, repos.AreaControllerRepo, repos.PendingCollectionRepo, repos.DeviceRepo, repos.SensorDataRepo) + if err != nil { + return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err) + } + loraListener = tp + comm = tp + } + + return &LoraComponents{ + ListenHandler: listenHandler, + Comm: comm, + LoraListener: loraListener, + }, nil +} + +// initNotifyService 根据配置初始化并返回一个通知领域服务。 +// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。 +func initNotifyService( + cfg config.NotifyConfig, + log *logs.Logger, + userRepo repository.UserRepository, + notificationRepo repository.NotificationRepository, +) (domain_notify.Service, error) { + var availableNotifiers []notify.Notifier + + // 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道 + logNotifier := notify.NewLogNotifier(log) + availableNotifiers = append(availableNotifiers, logNotifier) + log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)") + + // 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例 + if cfg.SMTP.Enabled { + smtpNotifier := notify.NewSMTPNotifier( + cfg.SMTP.Host, + cfg.SMTP.Port, + cfg.SMTP.Username, + cfg.SMTP.Password, + cfg.SMTP.Sender, + ) + availableNotifiers = append(availableNotifiers, smtpNotifier) + log.Info("SMTP通知器已启用") + } + + if cfg.WeChat.Enabled { + wechatNotifier := notify.NewWechatNotifier( + cfg.WeChat.CorpID, + cfg.WeChat.AgentID, + cfg.WeChat.Secret, + ) + availableNotifiers = append(availableNotifiers, wechatNotifier) + log.Info("企业微信通知器已启用") + } + + if cfg.Lark.Enabled { + larkNotifier := notify.NewLarkNotifier( + cfg.Lark.AppID, + cfg.Lark.AppSecret, + ) + availableNotifiers = append(availableNotifiers, larkNotifier) + log.Info("飞书通知器已启用") + } + + // 3. 动态确定首选通知器 + var primaryNotifier notify.Notifier + primaryNotifierType := notify.NotifierType(cfg.Primary) + + // 检查用户指定的主渠道是否已启用 + for _, n := range availableNotifiers { + if n.Type() == primaryNotifierType { + primaryNotifier = n + break + } + } + + // 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier,如果其他都未启用) + if primaryNotifier == nil { + primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的 + log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type()) + } + + // 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务 + notifyService, err := domain_notify.NewFailoverService( + log, + userRepo, + availableNotifiers, + primaryNotifier.Type(), + cfg.FailureThreshold, + notificationRepo, + ) + if err != nil { + return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err) + } + + log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold) + return notifyService, nil +} + +// initStorage 封装了数据库的初始化、连接和迁移逻辑。 +func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) { + // 创建存储实例 + storage := database.NewStorage(cfg, logger) + if err := storage.Connect(); err != nil { + // 错误已在 Connect 内部被记录,这里只需包装并返回 + return nil, fmt.Errorf("数据库连接失败: %w", err) + } + + // 执行数据库迁移 + if err := storage.Migrate(models.GetAllModels()...); err != nil { + return nil, fmt.Errorf("数据库迁移失败: %w", err) + } + + logger.Info("数据库初始化完成。") + return storage, nil +} diff --git a/internal/domain/token/token_service.go b/internal/domain/token/token_service.go index 361e305..782f041 100644 --- a/internal/domain/token/token_service.go +++ b/internal/domain/token/token_service.go @@ -13,19 +13,19 @@ type Claims struct { jwt.RegisteredClaims } -// TokenService 定义了 token 操作的接口 -type TokenService interface { +// Service 定义了 token 操作的接口 +type Service interface { GenerateToken(userID uint) (string, error) ParseToken(tokenString string) (*Claims, error) } -// tokenService 是 TokenService 接口的实现 +// tokenService 是 Service 接口的实现 type tokenService struct { secret []byte } -// NewTokenService 创建并返回一个新的 TokenService 实例 -func NewTokenService(secret []byte) TokenService { +// NewTokenService 创建并返回一个新的 Service 实例 +func NewTokenService(secret []byte) Service { return &tokenService{secret: secret} }