修改infra.repository包

This commit is contained in:
2025-11-05 23:00:07 +08:00
parent 97aea66f7c
commit 10b123ab93
25 changed files with 877 additions and 608 deletions

View File

@@ -40,7 +40,7 @@ func initInfrastructure(ctx context.Context, cfg *config.Config) (*Infrastructur
return nil, err
}
repos := initRepositories(ctx, storage.GetDB())
repos := initRepositories(ctx, storage.GetDB(ctx))
lora, err := initLora(ctx, cfg, repos)
if err != nil {
@@ -279,12 +279,12 @@ func initLora(
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
listenHandler = webhook.NewChirpStackListener(logs.AddCompName(baseCtx, "ChirpStackListener"), repos.sensorDataRepo, repos.deviceRepo, repos.areaControllerRepo, repos.deviceCommandLogRepo, repos.pendingCollectionRepo)
comm = lora.NewChirpStackTransport(cfg.ChirpStack, logs.AddCompName(baseCtx, "ChirpStackTransport"))
comm = lora.NewChirpStackTransport(logs.AddCompName(baseCtx, "ChirpStackTransport"), cfg.ChirpStack)
loraListener = lora.NewPlaceholderTransport(logs.AddCompName(baseCtx, "PlaceholderTransport"))
} else {
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
listenHandler = webhook.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
tp, err := lora.NewLoRaMeshUartPassthroughTransport(cfg.LoraMesh, logs.AddCompName(baseCtx, "LoRaMeshTransport"), repos.areaControllerRepo, repos.pendingCollectionRepo, repos.deviceRepo, repos.sensorDataRepo)
tp, err := lora.NewLoRaMeshUartPassthroughTransport(logs.AddCompName(baseCtx, "LoRaMeshTransport"), cfg.LoraMesh, repos.areaControllerRepo, repos.pendingCollectionRepo, repos.deviceRepo, repos.sensorDataRepo)
if err != nil {
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
}
@@ -319,6 +319,7 @@ func initNotifyService(
// 2. 根据配置,按需创建并收集所有启用的其他 Notifier 实例
if cfg.SMTP.Enabled {
smtpNotifier := notify.NewSMTPNotifier(
logs.AddCompName(baseCtx, "SMTPNotifier"),
cfg.SMTP.Host,
cfg.SMTP.Port,
cfg.SMTP.Username,
@@ -331,6 +332,7 @@ func initNotifyService(
if cfg.WeChat.Enabled {
wechatNotifier := notify.NewWechatNotifier(
logs.AddCompName(baseCtx, "WechatNotifier"),
cfg.WeChat.CorpID,
cfg.WeChat.AgentID,
cfg.WeChat.Secret,
@@ -341,6 +343,7 @@ func initNotifyService(
if cfg.Lark.Enabled {
larkNotifier := notify.NewLarkNotifier(
logs.AddCompName(baseCtx, "LarkNotifier"),
cfg.Lark.AppID,
cfg.Lark.AppSecret,
)
@@ -387,7 +390,7 @@ func initNotifyService(
func initStorage(ctx context.Context, cfg config.DatabaseConfig) (database.Storage, error) {
// 创建存储实例
storage := database.NewStorage(logs.AddCompName(context.Background(), "Storage"), cfg)
if err := storage.Connect(); err != nil {
if err := storage.Connect(ctx); err != nil {
// 错误已在 Connect 内部被记录,这里只需包装并返回
return nil, fmt.Errorf("数据库连接失败: %w", err)
}

View File

@@ -17,19 +17,19 @@ const (
// initializeState 在应用启动时准备其初始数据状态。
// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。
func (app *Application) initializeState(ctx context.Context) error {
newCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeState")
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeState")
// 初始化预定义系统计划 (致命错误)
if err := app.initializeSystemPlans(ctx); err != nil {
return fmt.Errorf("初始化预定义系统计划失败: %w", err)
}
// 清理待采集任务 (非致命错误)
if err := app.initializePendingCollections(newCtx); err != nil {
if err := app.initializePendingCollections(appCtx); err != nil {
logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 初始化待执行任务列表 (致命错误)
if err := app.initializePendingTasks(newCtx); err != nil {
if err := app.initializePendingTasks(appCtx); err != nil {
return fmt.Errorf("初始化待执行任务列表失败: %w", err)
}
@@ -38,14 +38,14 @@ func (app *Application) initializeState(ctx context.Context) error {
// initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。
func (app *Application) initializeSystemPlans(ctx context.Context) error {
logger := logs.TraceLogger(ctx, app.Ctx, "InitializeSystemPlans")
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeSystemPlans")
logger.Info("开始检查并更新预定义的系统计划...")
// 动态构建预定义计划列表
predefinedSystemPlans := app.getPredefinedSystemPlans()
// 1. 获取所有已存在的系统计划
existingPlans, _, err := app.Infra.repos.planRepo.ListPlans(repository.ListPlansOptions{
existingPlans, _, err := app.Infra.repos.planRepo.ListPlans(appCtx, repository.ListPlansOptions{
PlanType: repository.PlanTypeFilterSystem,
}, 1, 99999) // 使用一个较大的 pageSize 来获取所有系统计划
if err != nil {
@@ -70,7 +70,7 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error {
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
if err := app.Infra.repos.planRepo.UpdatePlan(predefinedPlan); err != nil {
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
@@ -78,7 +78,7 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error {
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(predefinedPlan); err != nil {
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
@@ -124,11 +124,11 @@ func (app *Application) getPredefinedSystemPlans() []models.Plan {
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
func (app *Application) initializePendingCollections(ctx context.Context) error {
logger := logs.TraceLogger(ctx, app.Ctx, "InitializePendingCollections")
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializePendingCollections")
logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.Infra.repos.pendingCollectionRepo.MarkAllPendingAsTimedOut()
count, err := app.Infra.repos.pendingCollectionRepo.MarkAllPendingAsTimedOut(appCtx)
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
@@ -142,7 +142,7 @@ func (app *Application) initializePendingCollections(ctx context.Context) error
// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。
func (app *Application) initializePendingTasks(ctx context.Context) error {
logger := logs.TraceLogger(ctx, app.Ctx, "InitializePendingTasks")
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializePendingTasks")
planRepo := app.Infra.repos.planRepo
pendingTaskRepo := app.Infra.repos.pendingTaskRepo
executionLogRepo := app.Infra.repos.executionLogRepo
@@ -152,7 +152,7 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
plansToCorrect, err := planRepo.FindPlansWithPendingTasks()
plansToCorrect, err := planRepo.FindPlansWithPendingTasks(appCtx)
if err != nil {
return fmt.Errorf("查找需要修正的计划失败: %w", err)
}
@@ -172,7 +172,7 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
}
// 保存更新后的计划
if err := planRepo.UpdatePlan(plan); err != nil {
if err := planRepo.UpdatePlan(appCtx, plan); err != nil {
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
@@ -184,7 +184,7 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs()
incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs(appCtx)
if err != nil {
return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
}
@@ -198,7 +198,7 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
// 3. 对于每个受影响的 PlanID重置其 execute_count 并将其状态设置为 Failed, 系统计划不受此影响
for planID := range affectedPlanIDs {
// 首先,获取计划的详细信息以判断其类型
plan, err := planRepo.GetBasicPlanByID(planID)
plan, err := planRepo.GetBasicPlanByID(appCtx, planID)
if err != nil {
logger.Errorf("在尝试修正计划状态时,获取计划 #%d 的基本信息失败: %v", planID, err)
continue // 获取失败,跳过此计划
@@ -213,7 +213,7 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
// 对于非系统计划,执行原有的失败标记逻辑
logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil {
if err := planRepo.UpdatePlanStateAfterExecution(appCtx, planID, 0, models.PlanStatusFailed); err != nil {
logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
@@ -221,26 +221,26 @@ func (app *Application) initializePendingTasks(ctx context.Context) error {
logger.Info("阶段二:计划主表状态修正完成。")
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil {
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(appCtx); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil {
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(appCtx); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表
if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil {
if err := pendingTaskRepo.ClearAllPendingTasks(appCtx); err != nil {
return fmt.Errorf("清空待执行任务列表失败: %w", err)
}
logger.Info("阶段二:待执行任务和相关日志清理完成。")
// 阶段三:初始刷新
logger.Info("阶段三:开始刷新待执行列表...")
if err := planService.RefreshPlanTriggers(); err != nil {
if err := planService.RefreshPlanTriggers(appCtx); err != nil {
return fmt.Errorf("刷新待执行任务列表失败: %w", err)
}
logger.Info("阶段三:待执行任务列表初始化完成。")