重构core包

This commit is contained in:
2025-10-26 15:48:38 +08:00
parent 6a8e8f1f7d
commit 40eb57ee47
6 changed files with 444 additions and 333 deletions

View File

@@ -5,346 +5,97 @@ import (
"os"
"os/signal"
"syscall"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
)
// Application 是整个应用的核心,封装了所有组件和生命周期。
type Application struct {
Config *config.Config
Logger *logs.Logger
Storage database.Storage
Executor *task.Scheduler
API *api.API
Collector collection.Collector
Config *config.Config
Logger *logs.Logger
API *api.API
// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
pendingCollectionRepo repository.PendingCollectionRepository
analysisPlanTaskManager *task.AnalysisPlanTaskManager
// Lora Mesh 监听器
loraMeshCommunicator transport.Listener
// 通知服务
NotifyService domain_notify.Service
Infra *Infrastructure
Domain *DomainServices
App *AppServices
}
// NewApplication 创建并初始化一个新的 Application 实例。
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
func NewApplication(configPath string) (*Application, error) {
// 加载配置
// 1. 初始化基本组件: 配置和日志
cfg := config.NewConfig()
if err := cfg.Load(configPath); err != nil {
return nil, fmt.Errorf("无法加载配置: %w", err)
}
// 初始化日志记录器
logger := logs.NewLogger(cfg.Log)
// 初始化数据库存储
storage, err := initStorage(cfg.Database, logger)
// 2. 初始化所有分层服务
infra, err := initInfrastructure(cfg, logger)
if err != nil {
return nil, err // 错误已在 initStorage 中被包装
return nil, fmt.Errorf("初始化基础设施失败: %w", err)
}
domain := initDomainServices(cfg, infra, logger)
appServices := initAppServices(infra, domain, logger)
// 初始化 Token 服务
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
// --- 仓库对象初始化 ---
userRepo := repository.NewGormUserRepository(storage.GetDB())
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB())
deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB())
planRepo := repository.NewGormPlanRepository(storage.GetDB())
pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB())
userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB())
pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB())
pigBatchLogRepo := repository.NewGormPigBatchLogRepository(storage.GetDB())
pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB())
pigPenRepo := repository.NewGormPigPenRepository(storage.GetDB())
pigTransferLogRepo := repository.NewGormPigTransferLogRepository(storage.GetDB())
pigTradeRepo := repository.NewGormPigTradeRepository(storage.GetDB())
pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB())
medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB())
rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB())
notificationRepo := repository.NewGormNotificationRepository(storage.GetDB())
// 初始化事务管理器
unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger)
// 初始化猪群管理领域
pigPenTransferManager := pig.NewPigPenTransferManager(pigPenRepo, pigTransferLogRepo, pigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(pigTradeRepo)
pigSickManager := pig.NewSickPigManager(pigSickPigLogRepo, medicationLogRepo)
pigBatchDomain := pig.NewPigBatchService(pigBatchRepo, pigBatchLogRepo, unitOfWork,
pigPenTransferManager, pigTradeManager, pigSickManager)
// --- 业务逻辑处理器初始化 ---
pigFarmService := service.NewPigFarmService(pigFarmRepo, pigPenRepo, pigBatchRepo, pigBatchDomain, unitOfWork, logger)
pigBatchService := service.NewPigBatchService(pigBatchDomain, logger)
monitorService := service.NewMonitorService(
sensorDataRepo,
deviceCommandLogRepo,
executionLogRepo,
pendingCollectionRepo,
userActionLogRepo,
rawMaterialRepo,
medicationLogRepo,
pigBatchRepo,
pigBatchLogRepo,
pigTransferLogRepo,
pigSickPigLogRepo,
pigTradeRepo,
notificationRepo,
)
// 初始化审计服务
auditService := audit.NewService(userActionLogRepo, logger)
// 初始化通知服务
notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo)
if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
}
// --- 初始化 LoRa 相关组件 ---
var listenHandler webhook.ListenHandler
var comm transport.Communicator
var loraListener transport.Listener
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger)
loraListener = lora.NewPlaceholderTransport(logger)
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo)
loraListener = tp
comm = tp
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
}
// 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
// 初始化通用设备服务
generalDeviceService := device.NewGeneralDeviceService(
deviceRepo,
deviceCommandLogRepo,
pendingCollectionRepo,
logger,
comm,
)
// 初始化任务执行器
executor := task.NewScheduler(
pendingTaskRepo,
executionLogRepo,
deviceRepo,
sensorDataRepo,
planRepo,
analysisPlanTaskManager,
logger,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
// --- 初始化定时采集器 ---
timedCollector := collection.NewTimedCollector(
deviceRepo,
generalDeviceService,
logger,
time.Duration(cfg.Collection.Interval)*time.Second,
)
// 初始化 API 服务器
// 3. 初始化 API 入口点
apiServer := api.NewAPI(
cfg.Server,
logger,
userRepo,
deviceRepo,
areaControllerRepo,
deviceTemplateRepo,
planRepo,
pigFarmService,
pigBatchService,
monitorService,
tokenService,
auditService,
notifyService,
generalDeviceService,
listenHandler,
analysisPlanTaskManager,
infra.Repos.UserRepo,
infra.Repos.DeviceRepo,
infra.Repos.AreaControllerRepo,
infra.Repos.DeviceTemplateRepo,
infra.Repos.PlanRepo,
appServices.PigFarmService,
appServices.PigBatchService,
appServices.MonitorService,
infra.TokenService,
appServices.AuditService,
infra.NotifyService,
domain.GeneralDeviceService,
infra.Lora.ListenHandler,
domain.AnalysisPlanTaskManager,
)
// 组装 Application 对象
// 4. 组装 Application 对象
app := &Application{
Config: cfg,
Logger: logger,
Storage: storage,
Executor: executor,
API: apiServer,
Collector: timedCollector,
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
pendingCollectionRepo: pendingCollectionRepo,
analysisPlanTaskManager: analysisPlanTaskManager,
loraMeshCommunicator: loraListener,
NotifyService: notifyService,
Config: cfg,
Logger: logger,
API: apiServer,
Infra: infra,
Domain: domain,
App: appServices,
}
return app, nil
}
// initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService(
cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log)
availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
cfg.SMTP.Password,
cfg.SMTP.Sender,
)
availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用")
}
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
)
availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用")
}
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用")
}
// 3. 动态确定首选通知器
var primaryNotifier notify.Notifier
primaryNotifierType := notify.NotifierType(cfg.Primary)
// 检查用户指定的主渠道是否已启用
for _, n := range availableNotifiers {
if n.Type() == primaryNotifierType {
primaryNotifier = n
break
}
}
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
}
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService(
log,
userRepo,
availableNotifiers,
primaryNotifier.Type(),
cfg.FailureThreshold,
notificationRepo,
)
if err != nil {
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
}
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil
}
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
func (app *Application) Start() error {
app.Logger.Info("应用启动中...")
// -- 启动 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Listen(); err != nil {
// 1. 启动底层监听器
if err := app.Infra.Lora.LoraListener.Listen(); err != nil {
return fmt.Errorf("启动 LoRa Mesh 监听器失败: %w", err)
}
// --- 清理待采集任务 ---
if err := app.initializePendingCollections(); err != nil {
// 这是一个非致命错误,记录它,但应用应继续启动
app.Logger.Error(err)
// 2. 初始化应用状态 (清理、刷新任务等)
if err := app.initializeState(); err != nil {
return fmt.Errorf("初始化应用状态失败: %w", err)
}
// --- 初始化待执行任务列表 ---
if err := app.initializePendingTasks(
app.planRepo, // 传入 planRepo
app.pendingTaskRepo, // 传入 pendingTaskRepo
app.executionLogRepo, // 传入 executionLogRepo
app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
app.Logger, // 传入 logger
); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
// 3. 启动后台工作协程
app.Domain.Scheduler.Start()
app.Domain.TimedCollector.Start()
// 启动任务执行
app.Executor.Start()
// 启动定时采集器
app.Collector.Start()
// 启动 API 服务器
// 4. 启动 API 服务
app.API.Start()
// 等待关闭信号
// 5. 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
@@ -361,18 +112,18 @@ func (app *Application) Stop() error {
app.API.Stop()
// 关闭任务执行器
app.Executor.Stop()
app.Domain.Scheduler.Stop()
// 关闭定时采集器
app.Collector.Stop()
app.Domain.TimedCollector.Stop()
// 断开数据库连接
if err := app.Storage.Disconnect(); err != nil {
if err := app.Infra.Storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err)
}
// 关闭 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Stop(); err != nil {
if err := app.Infra.Lora.LoraListener.Stop(); err != nil {
app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err)
}
@@ -383,6 +134,22 @@ func (app *Application) Stop() error {
return nil
}
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error {
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
@@ -390,7 +157,7 @@ func (app *Application) initializePendingCollections() error {
app.Logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut()
count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
@@ -403,13 +170,13 @@ func (app *Application) initializePendingCollections() error {
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
analysisPlanTaskManager *task.AnalysisPlanTaskManager,
logger *logs.Logger,
) error {
func (app *Application) initializePendingTasks() error {
logger := app.Logger
planRepo := app.Infra.Repos.PlanRepo
pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
executionLogRepo := app.Infra.Repos.ExecutionLogRepo
analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
@@ -496,21 +263,3 @@ func (app *Application) initializePendingTasks(
logger.Info("待执行任务列表初始化完成。")
return nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
logger.Info("数据库初始化完成。")
return storage, nil
}