Merge pull request 'issue_42' (#44) from issue_42 into main

Reviewed-on: #44
This commit is contained in:
2025-10-26 15:57:04 +08:00
12 changed files with 600 additions and 327 deletions

View File

@@ -48,8 +48,10 @@ chirp_stack:
api_host: "http://localhost:8080" # ChirpStack API 主机地址
api_token: "your_chirpstack_api_token" # ChirpStack API Token
fport: 10 # ChirpStack FPort
api_timeout: 5 # API 请求超时时间 (秒)
collection_request_timeout: 10 # 采集请求超时时间 (秒)
api_timeout: 10 # ChirpStack API请求超时时间(秒)
# 等待设备上行响应的超时时间(秒)
# 对于LoRaWAN这种延迟较高的网络建议设置为5分钟 (300秒) 或更长。
collection_request_timeout: 300
# 任务调度配置
task:
@@ -62,12 +64,28 @@ lora:
# Lora Mesh 配置
lora_mesh:
uart_port: "/dev/ttyUSB0" # UART 串口路径
baud_rate: 115200 # 波特率
timeout: 5 # 超时时间 (秒)
lora_mesh_mode: "transparent" # Lora Mesh 模式: transparent, command
max_chunk_size: 200 # 最大数据块大小
reassembly_timeout: 10 # 重组超时时间 (秒)
# 主节点串口
uart_port: "COM7"
# LoRa模块的通信波特率
baud_rate: 9600
# 等待LoRa模块AT指令响应的超时时间(ms)
timeout: 50
# LoRa Mesh 模块发送模式(EC: 透传; ED: 完整数据包)
# e.g.
# EC: 接收端只会接收到消息, 不会接收到请求头
# e.g. 发送: EC 05 02 01 48 65 6c 6c 6f
# (EC + 05(消息长度) + 0201(地址) + "Hello"(消息本体))
# 接收: 48 65 6c 6c 6f ("Hello")
# ED: 接收端会接收完整数据包,包含自定义协议头和地址信息。
# e.g. 发送: ED 05 12 34 01 00 01 02 03
# (ED(帧头) + 05(Length, 即 1(总包数)+1(当前包序号)+3(数据块)) + 12 34(目标地址) + 01(总包数) + 00(当前包序号) + 01 02 03(数据块))
# 接收: ED 05 12 34 01 00 01 02 03 56 78(56 78 是发送方地址,会自动拼接到消息末尾)
lora_mesh_mode: "ED"
# 单包最大用户数据数据长度, 模块限制240, 去掉两位自定义包头, 还剩238
max_chunk_size: 238
#分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间
# 还没收到完整的包,则认为接收失败。
reassembly_timeout: 30
# 通知服务配置
notify:
@@ -91,3 +109,7 @@ notify:
enabled: false # 是否启用飞书通知
appID: "cli_xxxxxxxxxx" # 应用 ID
appSecret: "your_lark_app_secret" # 应用密钥
# 定时采集配置
collection:
interval: 300 # 采集间隔 (秒)

View File

@@ -86,4 +86,8 @@ lora_mesh:
max_chunk_size: 238
#分片重组超时时间(秒)。如果在一个分片到达后,超过这个时间
# 还没收到完整的包,则认为接收失败。
reassembly_timeout: 30
reassembly_timeout: 30
# 定时采集配置
collection:
interval: 300 # 采集间隔 (秒)

View File

@@ -43,7 +43,7 @@ type API struct {
engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求
logger *logs.Logger // 日志记录器,用于输出日志信息
userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作
tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析
tokenService token.Service // Token 服务接口,用于 JWT token 的生成和解析
auditService audit.Service // 审计服务,用于记录用户操作
httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务
config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig
@@ -69,7 +69,7 @@ func NewAPI(cfg config.ServerConfig,
pigFarmService service.PigFarmService,
pigBatchService service.PigBatchService,
monitorService service.MonitorService,
tokenService token.TokenService,
tokenService token.Service,
auditService audit.Service,
notifyService domain_notify.Service,
deviceService domain_device.Service,

View File

@@ -20,7 +20,7 @@ import (
type Controller struct {
userRepo repository.UserRepository
monitorService service.MonitorService
tokenService token.TokenService
tokenService token.Service
notifyService domain_notify.Service
logger *logs.Logger
}
@@ -30,7 +30,7 @@ func NewController(
userRepo repository.UserRepository,
monitorService service.MonitorService,
logger *logs.Logger,
tokenService token.TokenService,
tokenService token.Service,
notifyService domain_notify.Service,
) *Controller {
return &Controller{

View File

@@ -14,7 +14,7 @@ import (
// AuthMiddleware 创建一个Gin中间件用于JWT身份验证
// 它依赖于 TokenService 来解析和验证 token并使用 UserRepository 来获取完整的用户信息
func AuthMiddleware(tokenService token.TokenService, userRepo repository.UserRepository) gin.HandlerFunc {
func AuthMiddleware(tokenService token.Service, userRepo repository.UserRepository) gin.HandlerFunc {
return func(c *gin.Context) {
// 从 Authorization header 获取 token
authHeader := c.GetHeader("Authorization")

View File

@@ -5,332 +5,97 @@ import (
"os"
"os/signal"
"syscall"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
)
// Application 是整个应用的核心,封装了所有组件和生命周期。
type Application struct {
Config *config.Config
Logger *logs.Logger
Storage database.Storage
Executor *task.Scheduler
API *api.API // 添加 API 对象
Config *config.Config
Logger *logs.Logger
API *api.API
// 新增的仓库和管理器字段,以便在 initializePendingTasks 中访问
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
pendingCollectionRepo repository.PendingCollectionRepository
analysisPlanTaskManager *task.AnalysisPlanTaskManager
// Lora Mesh 监听器
loraMeshCommunicator transport.Listener
// 通知服务
NotifyService domain_notify.Service
Infra *Infrastructure
Domain *DomainServices
App *AppServices
}
// NewApplication 创建并初始化一个新的 Application 实例。
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
func NewApplication(configPath string) (*Application, error) {
// 加载配置
// 1. 初始化基本组件: 配置和日志
cfg := config.NewConfig()
if err := cfg.Load(configPath); err != nil {
return nil, fmt.Errorf("无法加载配置: %w", err)
}
// 初始化日志记录器
logger := logs.NewLogger(cfg.Log)
// 初始化数据库存储
storage, err := initStorage(cfg.Database, logger)
// 2. 初始化所有分层服务
infra, err := initInfrastructure(cfg, logger)
if err != nil {
return nil, err // 错误已在 initStorage 中被包装
return nil, fmt.Errorf("初始化基础设施失败: %w", err)
}
domain := initDomainServices(cfg, infra, logger)
appServices := initAppServices(infra, domain, logger)
// 初始化 Token 服务
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
// --- 仓库对象初始化 ---
userRepo := repository.NewGormUserRepository(storage.GetDB())
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
areaControllerRepo := repository.NewGormAreaControllerRepository(storage.GetDB())
deviceTemplateRepo := repository.NewGormDeviceTemplateRepository(storage.GetDB())
planRepo := repository.NewGormPlanRepository(storage.GetDB())
pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
pendingCollectionRepo := repository.NewGormPendingCollectionRepository(storage.GetDB())
userActionLogRepo := repository.NewGormUserActionLogRepository(storage.GetDB())
pigBatchRepo := repository.NewGormPigBatchRepository(storage.GetDB())
pigBatchLogRepo := repository.NewGormPigBatchLogRepository(storage.GetDB())
pigFarmRepo := repository.NewGormPigFarmRepository(storage.GetDB())
pigPenRepo := repository.NewGormPigPenRepository(storage.GetDB())
pigTransferLogRepo := repository.NewGormPigTransferLogRepository(storage.GetDB())
pigTradeRepo := repository.NewGormPigTradeRepository(storage.GetDB())
pigSickPigLogRepo := repository.NewGormPigSickLogRepository(storage.GetDB())
medicationLogRepo := repository.NewGormMedicationLogRepository(storage.GetDB())
rawMaterialRepo := repository.NewGormRawMaterialRepository(storage.GetDB())
notificationRepo := repository.NewGormNotificationRepository(storage.GetDB())
// 初始化事务管理器
unitOfWork := repository.NewGormUnitOfWork(storage.GetDB(), logger)
// 初始化猪群管理领域
pigPenTransferManager := pig.NewPigPenTransferManager(pigPenRepo, pigTransferLogRepo, pigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(pigTradeRepo)
pigSickManager := pig.NewSickPigManager(pigSickPigLogRepo, medicationLogRepo)
pigBatchDomain := pig.NewPigBatchService(pigBatchRepo, pigBatchLogRepo, unitOfWork,
pigPenTransferManager, pigTradeManager, pigSickManager)
// --- 业务逻辑处理器初始化 ---
pigFarmService := service.NewPigFarmService(pigFarmRepo, pigPenRepo, pigBatchRepo, pigBatchDomain, unitOfWork, logger)
pigBatchService := service.NewPigBatchService(pigBatchDomain, logger)
monitorService := service.NewMonitorService(
sensorDataRepo,
deviceCommandLogRepo,
executionLogRepo,
pendingCollectionRepo,
userActionLogRepo,
rawMaterialRepo,
medicationLogRepo,
pigBatchRepo,
pigBatchLogRepo,
pigTransferLogRepo,
pigSickPigLogRepo,
pigTradeRepo,
notificationRepo,
)
// 初始化审计服务
auditService := audit.NewService(userActionLogRepo, logger)
// 初始化通知服务
notifyService, err := initNotifyService(cfg.Notify, logger, userRepo, notificationRepo)
if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
}
// --- 初始化 LoRa 相关组件 ---
var listenHandler webhook.ListenHandler
var comm transport.Communicator
var loraListener transport.Listener
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, areaControllerRepo, deviceCommandLogRepo, pendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger)
loraListener = lora.NewPlaceholderTransport(logger)
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, areaControllerRepo, pendingCollectionRepo, deviceRepo, sensorDataRepo)
loraListener = tp
comm = tp
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
}
// 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
// 初始化通用设备服务
generalDeviceService := device.NewGeneralDeviceService(
deviceRepo,
deviceCommandLogRepo,
pendingCollectionRepo,
logger,
comm,
)
// 初始化任务执行器
executor := task.NewScheduler(
pendingTaskRepo,
executionLogRepo,
deviceRepo,
sensorDataRepo,
planRepo,
analysisPlanTaskManager,
logger,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
// 初始化 API 服务器
// 3. 初始化 API 入口点
apiServer := api.NewAPI(
cfg.Server,
logger,
userRepo,
deviceRepo,
areaControllerRepo,
deviceTemplateRepo,
planRepo,
pigFarmService,
pigBatchService,
monitorService,
tokenService,
auditService,
notifyService,
generalDeviceService,
listenHandler,
analysisPlanTaskManager,
infra.Repos.UserRepo,
infra.Repos.DeviceRepo,
infra.Repos.AreaControllerRepo,
infra.Repos.DeviceTemplateRepo,
infra.Repos.PlanRepo,
appServices.PigFarmService,
appServices.PigBatchService,
appServices.MonitorService,
infra.TokenService,
appServices.AuditService,
infra.NotifyService,
domain.GeneralDeviceService,
infra.Lora.ListenHandler,
domain.AnalysisPlanTaskManager,
)
// 组装 Application 对象
// 4. 组装 Application 对象
app := &Application{
Config: cfg,
Logger: logger,
Storage: storage,
Executor: executor,
API: apiServer,
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
pendingCollectionRepo: pendingCollectionRepo,
analysisPlanTaskManager: analysisPlanTaskManager,
loraMeshCommunicator: loraListener,
NotifyService: notifyService,
Config: cfg,
Logger: logger,
API: apiServer,
Infra: infra,
Domain: domain,
App: appServices,
}
return app, nil
}
// initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService(
cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log)
availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
cfg.SMTP.Password,
cfg.SMTP.Sender,
)
availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用")
}
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
)
availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用")
}
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用")
}
// 3. 动态确定首选通知器
var primaryNotifier notify.Notifier
primaryNotifierType := notify.NotifierType(cfg.Primary)
// 检查用户指定的主渠道是否已启用
for _, n := range availableNotifiers {
if n.Type() == primaryNotifierType {
primaryNotifier = n
break
}
}
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
}
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService(
log,
userRepo,
availableNotifiers,
primaryNotifier.Type(),
cfg.FailureThreshold,
notificationRepo,
)
if err != nil {
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
}
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil
}
// Start 启动应用的所有组件并阻塞,直到接收到关闭信号。
func (app *Application) Start() error {
app.Logger.Info("应用启动中...")
// -- 启动 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Listen(); err != nil {
// 1. 启动底层监听器
if err := app.Infra.Lora.LoraListener.Listen(); err != nil {
return fmt.Errorf("启动 LoRa Mesh 监听器失败: %w", err)
}
// --- 清理待采集任务 ---
if err := app.initializePendingCollections(); err != nil {
// 这是一个非致命错误,记录它,但应用应继续启动
app.Logger.Error(err)
// 2. 初始化应用状态 (清理、刷新任务等)
if err := app.initializeState(); err != nil {
return fmt.Errorf("初始化应用状态失败: %w", err)
}
// --- 初始化待执行任务列表 ---
if err := app.initializePendingTasks(
app.planRepo, // 传入 planRepo
app.pendingTaskRepo, // 传入 pendingTaskRepo
app.executionLogRepo, // 传入 executionLogRepo
app.analysisPlanTaskManager, // 传入 analysisPlanTaskManager
app.Logger, // 传入 logger
); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
// 3. 启动后台工作协程
app.Domain.Scheduler.Start()
app.Domain.TimedCollector.Start()
// 启动任务执行
app.Executor.Start()
// 启动 API 服务器
// 4. 启动 API 服务
app.API.Start()
// 等待关闭信号
// 5. 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
@@ -347,15 +112,18 @@ func (app *Application) Stop() error {
app.API.Stop()
// 关闭任务执行器
app.Executor.Stop()
app.Domain.Scheduler.Stop()
// 关闭定时采集器
app.Domain.TimedCollector.Stop()
// 断开数据库连接
if err := app.Storage.Disconnect(); err != nil {
if err := app.Infra.Storage.Disconnect(); err != nil {
app.Logger.Errorw("数据库连接断开失败", "error", err)
}
// 关闭 LoRa Mesh 监听器
if err := app.loraMeshCommunicator.Stop(); err != nil {
if err := app.Infra.Lora.LoraListener.Stop(); err != nil {
app.Logger.Errorw("LoRa Mesh 监听器关闭失败", "error", err)
}
@@ -366,6 +134,22 @@ func (app *Application) Stop() error {
return nil
}
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState() error {
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(); err != nil {
app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
@@ -373,7 +157,7 @@ func (app *Application) initializePendingCollections() error {
app.Logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.pendingCollectionRepo.MarkAllPendingAsTimedOut()
count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut()
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
@@ -386,13 +170,13 @@ func (app *Application) initializePendingCollections() error {
}
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks(
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
analysisPlanTaskManager *task.AnalysisPlanTaskManager,
logger *logs.Logger,
) error {
func (app *Application) initializePendingTasks() error {
logger := app.Logger
planRepo := app.Infra.Repos.PlanRepo
pendingTaskRepo := app.Infra.Repos.PendingTaskRepo
executionLogRepo := app.Infra.Repos.ExecutionLogRepo
analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager
logger.Info("开始初始化待执行任务列表...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
@@ -479,21 +263,3 @@ func (app *Application) initializePendingTasks(
logger.Info("待执行任务列表初始化完成。")
return nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
logger.Info("数据库初始化完成。")
return storage, nil
}

View File

@@ -0,0 +1,362 @@
package core
import (
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
"gorm.io/gorm"
)
// Infrastructure 聚合了所有基础设施层的组件。
type Infrastructure struct {
Storage database.Storage
Repos *Repositories
Lora *LoraComponents
NotifyService domain_notify.Service
TokenService token.Service
}
// initInfrastructure 初始化所有基础设施层组件。
func initInfrastructure(cfg *config.Config, logger *logs.Logger) (*Infrastructure, error) {
storage, err := initStorage(cfg.Database, logger)
if err != nil {
return nil, err
}
repos := initRepositories(storage.GetDB(), logger)
lora, err := initLora(cfg, logger, repos)
if err != nil {
return nil, err
}
notifyService, err := initNotifyService(cfg.Notify, logger, repos.UserRepo, repos.NotificationRepo)
if err != nil {
return nil, fmt.Errorf("初始化通知服务失败: %w", err)
}
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
return &Infrastructure{
Storage: storage,
Repos: repos,
Lora: lora,
NotifyService: notifyService,
TokenService: tokenService,
}, nil
}
// Repositories 聚合了所有的仓库实例。
type Repositories struct {
UserRepo repository.UserRepository
DeviceRepo repository.DeviceRepository
AreaControllerRepo repository.AreaControllerRepository
DeviceTemplateRepo repository.DeviceTemplateRepository
PlanRepo repository.PlanRepository
PendingTaskRepo repository.PendingTaskRepository
ExecutionLogRepo repository.ExecutionLogRepository
SensorDataRepo repository.SensorDataRepository
DeviceCommandLogRepo repository.DeviceCommandLogRepository
PendingCollectionRepo repository.PendingCollectionRepository
UserActionLogRepo repository.UserActionLogRepository
PigBatchRepo repository.PigBatchRepository
PigBatchLogRepo repository.PigBatchLogRepository
PigFarmRepo repository.PigFarmRepository
PigPenRepo repository.PigPenRepository
PigTransferLogRepo repository.PigTransferLogRepository
PigTradeRepo repository.PigTradeRepository
PigSickPigLogRepo repository.PigSickLogRepository
MedicationLogRepo repository.MedicationLogRepository
RawMaterialRepo repository.RawMaterialRepository
NotificationRepo repository.NotificationRepository
UnitOfWork repository.UnitOfWork
}
// initRepositories 初始化所有的仓库。
func initRepositories(db *gorm.DB, logger *logs.Logger) *Repositories {
return &Repositories{
UserRepo: repository.NewGormUserRepository(db),
DeviceRepo: repository.NewGormDeviceRepository(db),
AreaControllerRepo: repository.NewGormAreaControllerRepository(db),
DeviceTemplateRepo: repository.NewGormDeviceTemplateRepository(db),
PlanRepo: repository.NewGormPlanRepository(db),
PendingTaskRepo: repository.NewGormPendingTaskRepository(db),
ExecutionLogRepo: repository.NewGormExecutionLogRepository(db),
SensorDataRepo: repository.NewGormSensorDataRepository(db),
DeviceCommandLogRepo: repository.NewGormDeviceCommandLogRepository(db),
PendingCollectionRepo: repository.NewGormPendingCollectionRepository(db),
UserActionLogRepo: repository.NewGormUserActionLogRepository(db),
PigBatchRepo: repository.NewGormPigBatchRepository(db),
PigBatchLogRepo: repository.NewGormPigBatchLogRepository(db),
PigFarmRepo: repository.NewGormPigFarmRepository(db),
PigPenRepo: repository.NewGormPigPenRepository(db),
PigTransferLogRepo: repository.NewGormPigTransferLogRepository(db),
PigTradeRepo: repository.NewGormPigTradeRepository(db),
PigSickPigLogRepo: repository.NewGormPigSickLogRepository(db),
MedicationLogRepo: repository.NewGormMedicationLogRepository(db),
RawMaterialRepo: repository.NewGormRawMaterialRepository(db),
NotificationRepo: repository.NewGormNotificationRepository(db),
UnitOfWork: repository.NewGormUnitOfWork(db, logger),
}
}
// DomainServices 聚合了所有的领域服务实例。
type DomainServices struct {
PigPenTransferManager pig.PigPenTransferManager
PigTradeManager pig.PigTradeManager
PigSickManager pig.SickPigManager
PigBatchDomain pig.PigBatchService
TimedCollector collection.Collector
GeneralDeviceService device.Service
AnalysisPlanTaskManager *task.AnalysisPlanTaskManager
Scheduler *task.Scheduler
}
// initDomainServices 初始化所有的领域服务。
func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.Logger) *DomainServices {
// 猪群管理相关
pigPenTransferManager := pig.NewPigPenTransferManager(infra.Repos.PigPenRepo, infra.Repos.PigTransferLogRepo, infra.Repos.PigBatchRepo)
pigTradeManager := pig.NewPigTradeManager(infra.Repos.PigTradeRepo)
pigSickManager := pig.NewSickPigManager(infra.Repos.PigSickPigLogRepo, infra.Repos.MedicationLogRepo)
pigBatchDomain := pig.NewPigBatchService(infra.Repos.PigBatchRepo, infra.Repos.PigBatchLogRepo, infra.Repos.UnitOfWork,
pigPenTransferManager, pigTradeManager, pigSickManager)
// 通用设备服务
generalDeviceService := device.NewGeneralDeviceService(
infra.Repos.DeviceRepo,
infra.Repos.DeviceCommandLogRepo,
infra.Repos.PendingCollectionRepo,
logger,
infra.Lora.Comm,
)
// 计划任务管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
// 任务执行器
scheduler := task.NewScheduler(
infra.Repos.PendingTaskRepo,
infra.Repos.ExecutionLogRepo,
infra.Repos.DeviceRepo,
infra.Repos.SensorDataRepo,
infra.Repos.PlanRepo,
analysisPlanTaskManager,
logger,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
// 定时采集器
timedCollector := collection.NewTimedCollector(
infra.Repos.DeviceRepo,
generalDeviceService,
logger,
time.Duration(cfg.Collection.Interval)*time.Second,
)
return &DomainServices{
PigPenTransferManager: pigPenTransferManager,
PigTradeManager: pigTradeManager,
PigSickManager: pigSickManager,
PigBatchDomain: pigBatchDomain,
GeneralDeviceService: generalDeviceService,
AnalysisPlanTaskManager: analysisPlanTaskManager,
Scheduler: scheduler,
TimedCollector: timedCollector,
}
}
// AppServices 聚合了所有的应用服务实例。
type AppServices struct {
PigFarmService service.PigFarmService
PigBatchService service.PigBatchService
MonitorService service.MonitorService
AuditService audit.Service
}
// initAppServices 初始化所有的应用服务。
func initAppServices(infra *Infrastructure, domainServices *DomainServices, logger *logs.Logger) *AppServices {
pigFarmService := service.NewPigFarmService(infra.Repos.PigFarmRepo, infra.Repos.PigPenRepo, infra.Repos.PigBatchRepo, domainServices.PigBatchDomain, infra.Repos.UnitOfWork, logger)
pigBatchService := service.NewPigBatchService(domainServices.PigBatchDomain, logger)
monitorService := service.NewMonitorService(
infra.Repos.SensorDataRepo,
infra.Repos.DeviceCommandLogRepo,
infra.Repos.ExecutionLogRepo,
infra.Repos.PendingCollectionRepo,
infra.Repos.UserActionLogRepo,
infra.Repos.RawMaterialRepo,
infra.Repos.MedicationLogRepo,
infra.Repos.PigBatchRepo,
infra.Repos.PigBatchLogRepo,
infra.Repos.PigTransferLogRepo,
infra.Repos.PigSickPigLogRepo,
infra.Repos.PigTradeRepo,
infra.Repos.NotificationRepo,
)
auditService := audit.NewService(infra.Repos.UserActionLogRepo, logger)
return &AppServices{
PigFarmService: pigFarmService,
PigBatchService: pigBatchService,
MonitorService: monitorService,
AuditService: auditService,
}
}
// LoraComponents 聚合了所有 LoRa 相关组件。
type LoraComponents struct {
ListenHandler webhook.ListenHandler
Comm transport.Communicator
LoraListener transport.Listener
}
// initLora 根据配置初始化 LoRa 相关组件。
func initLora(
cfg *config.Config,
logger *logs.Logger,
repos *Repositories,
) (*LoraComponents, error) {
var listenHandler webhook.ListenHandler
var comm transport.Communicator
var loraListener transport.Listener
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logger, repos.SensorDataRepo, repos.DeviceRepo, repos.AreaControllerRepo, repos.DeviceCommandLogRepo, repos.PendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logger)
loraListener = lora.NewPlaceholderTransport(logger)
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logger)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logger, repos.AreaControllerRepo, repos.PendingCollectionRepo, repos.DeviceRepo, repos.SensorDataRepo)
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
loraListener = tp
comm = tp
}
return &LoraComponents{
ListenHandler: listenHandler,
Comm: comm,
LoraListener: loraListener,
}, nil
}
// initNotifyService 根据配置初始化并返回一个通知领域服务。
// 它确保至少有一个 LogNotifier 总是可用,并根据配置启用其他通知器。
func initNotifyService(
cfg config.NotifyConfig,
log *logs.Logger,
userRepo repository.UserRepository,
notificationRepo repository.NotificationRepository,
) (domain_notify.Service, error) {
var availableNotifiers []notify.Notifier
// 1. 总是创建 LogNotifier 作为所有告警的最终记录渠道
logNotifier := notify.NewLogNotifier(log)
availableNotifiers = append(availableNotifiers, logNotifier)
log.Info("Log通知器已启用 (作为所有告警的最终记录渠道)")
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
cfg.SMTP.Password,
cfg.SMTP.Sender,
)
availableNotifiers = append(availableNotifiers, smtpNotifier)
log.Info("SMTP通知器已启用")
}
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
)
availableNotifiers = append(availableNotifiers, wechatNotifier)
log.Info("企业微信通知器已启用")
}
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
availableNotifiers = append(availableNotifiers, larkNotifier)
log.Info("飞书通知器已启用")
}
// 3. 动态确定首选通知器
var primaryNotifier notify.Notifier
primaryNotifierType := notify.NotifierType(cfg.Primary)
// 检查用户指定的主渠道是否已启用
for _, n := range availableNotifiers {
if n.Type() == primaryNotifierType {
primaryNotifier = n
break
}
}
// 如果用户指定的主渠道未启用或未指定,则自动选择第一个可用的 (这将是 LogNotifier如果其他都未启用)
if primaryNotifier == nil {
primaryNotifier = availableNotifiers[0] // 确保总能找到一个,因为 LogNotifier 总是存在的
log.Warnf("配置的首选渠道 '%s' 未启用或未指定,已自动降级使用 '%s' 作为首选渠道。", cfg.Primary, primaryNotifier.Type())
}
// 4. 使用创建的 Notifier 列表和 notificationRepo 来组装领域服务
notifyService, err := domain_notify.NewFailoverService(
log,
userRepo,
availableNotifiers,
primaryNotifier.Type(),
cfg.FailureThreshold,
notificationRepo,
)
if err != nil {
return nil, fmt.Errorf("创建故障转移通知服务失败: %w", err)
}
log.Infof("通知服务初始化成功,首选渠道: %s, 故障阈值: %d", primaryNotifier.Type(), cfg.FailureThreshold)
return notifyService, nil
}
// initStorage 封装了数据库的初始化、连接和迁移逻辑。
func initStorage(cfg config.DatabaseConfig, logger *logs.Logger) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(cfg, logger)
if err := storage.Connect(); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}
// 执行数据库迁移
if err := storage.Migrate(models.GetAllModels()...); err != nil {
return nil, fmt.Errorf("数据库迁移失败: %w", err)
}
logger.Info("数据库初始化完成。")
return storage, nil
}

View File

@@ -0,0 +1,6 @@
package collection
type Collector interface {
Start()
Stop()
}

View File

@@ -0,0 +1,89 @@
package collection
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令
type TimedCollector struct {
deviceRepo repository.DeviceRepository
deviceService device.Service
logger *logs.Logger
interval time.Duration
ticker *time.Ticker
done chan bool
}
// NewTimedCollector 创建一个定时采集器实例
func NewTimedCollector(
deviceRepo repository.DeviceRepository,
deviceService device.Service,
logger *logs.Logger,
interval time.Duration,
) Collector {
return &TimedCollector{
deviceRepo: deviceRepo,
deviceService: deviceService,
logger: logger,
interval: interval,
done: make(chan bool),
}
}
// Start 开始定时采集
func (c *TimedCollector) Start() {
c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval)
c.ticker = time.NewTicker(c.interval)
go func() {
for {
select {
case <-c.done:
return
case <-c.ticker.C:
c.collect()
}
}
}()
}
// Stop 停止定时采集
func (c *TimedCollector) Stop() {
c.logger.Info("定时采集器停止")
c.ticker.Stop()
c.done <- true
}
// collect 是核心的采集逻辑
func (c *TimedCollector) collect() {
c.logger.Info("开始新一轮的设备数据采集")
sensors, err := c.deviceRepo.ListAllSensors()
if err != nil {
c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err)
return
}
if len(sensors) == 0 {
c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集")
return
}
sensorsByController := make(map[uint][]*models.Device)
for _, sensor := range sensors {
sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
}
for controllerID, controllerSensors := range sensorsByController {
c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors))
if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil {
c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err)
}
}
c.logger.Info("本轮设备数据采集完成")
}

View File

@@ -13,19 +13,19 @@ type Claims struct {
jwt.RegisteredClaims
}
// TokenService 定义了 token 操作的接口
type TokenService interface {
// Service 定义了 token 操作的接口
type Service interface {
GenerateToken(userID uint) (string, error)
ParseToken(tokenString string) (*Claims, error)
}
// tokenService 是 TokenService 接口的实现
// tokenService 是 Service 接口的实现
type tokenService struct {
secret []byte
}
// NewTokenService 创建并返回一个新的 TokenService 实例
func NewTokenService(secret []byte) TokenService {
// NewTokenService 创建并返回一个新的 Service 实例
func NewTokenService(secret []byte) Service {
return &tokenService{secret: secret}
}

View File

@@ -44,6 +44,9 @@ type Config struct {
// Notify 通知服务配置
Notify NotifyConfig `yaml:"notify"`
// Collection 定时采集配置
Collection CollectionConfig `yaml:"collection"`
}
// AppConfig 代表应用基础配置
@@ -195,6 +198,11 @@ type LarkConfig struct {
AppSecret string `yaml:"appSecret"`
}
// CollectionConfig 代表定时采集配置
type CollectionConfig struct {
Interval int `yaml:"interval"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
// 默认值可以在这里设置,但我们优先使用配置文件中的值

View File

@@ -23,6 +23,9 @@ type DeviceRepository interface {
// ListAll 获取所有设备的列表
ListAll() ([]*models.Device, error)
// ListAllSensors 获取所有传感器类型的设备列表
ListAllSensors() ([]*models.Device, error)
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备。
ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error)
@@ -84,6 +87,19 @@ func (r *gormDeviceRepository) ListAll() ([]*models.Device, error) {
return devices, nil
}
// ListAllSensors 检索归类为传感器的所有设备
func (r *gormDeviceRepository) ListAllSensors() ([]*models.Device, error) {
var sensors []*models.Device
err := r.db.Preload("AreaController").Preload("DeviceTemplate").
Joins("JOIN device_templates ON device_templates.id = devices.device_template_id").
Where("device_templates.category = ?", models.CategorySensor).
Find(&sensors).Error
if err != nil {
return nil, fmt.Errorf("查询所有传感器失败: %w", err)
}
return sensors, nil
}
// ListByAreaControllerID 根据区域主控 ID 列出所有子设备
func (r *gormDeviceRepository) ListByAreaControllerID(areaControllerID uint) ([]*models.Device, error) {
var devices []*models.Device