diff --git a/config.yml b/config.yml index ebd7001..69f64d0 100644 --- a/config.yml +++ b/config.yml @@ -48,3 +48,8 @@ chirp_stack: api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址 api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer api_timeout: 10 # ChirpStack API请求超时时间(秒) + +# 任务调度器配置 +task: + interval: 3 + num_workers: 5 \ No newline at end of file diff --git a/internal/app/api/api.go b/internal/app/api/api.go index ce5bcc3..571a729 100644 --- a/internal/app/api/api.go +++ b/internal/app/api/api.go @@ -19,6 +19,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/plan" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller/user" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/token" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -33,21 +34,29 @@ import ( // API 结构体定义了 HTTP 服务器及其依赖 type API struct { - engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 - logger *logs.Logger // 日志记录器,用于输出日志信息 - userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 - tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析 - httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 - config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig - userController *user.Controller // 用户控制器实例 - deviceController *device.Controller // 设备控制器实例 - planController *plan.Controller // 计划控制器实例 - listenHandler transport.ListenHandler // 设备上行事件监听器 + engine *gin.Engine // Gin 引擎实例,用于处理 HTTP 请求 + logger *logs.Logger // 日志记录器,用于输出日志信息 + userRepo repository.UserRepository // 用户数据仓库接口,用于用户数据操作 + tokenService token.TokenService // Token 服务接口,用于 JWT token 的生成和解析 + httpServer *http.Server // 标准库的 HTTP 服务器实例,用于启动和停止服务 + config config.ServerConfig // API 服务器的配置,使用 infra/config 包中的 ServerConfig + userController *user.Controller // 用户控制器实例 + deviceController *device.Controller // 设备控制器实例 + planController *plan.Controller // 计划控制器实例 + listenHandler transport.ListenHandler // 设备上行事件监听器 + analysisTaskManager *task.AnalysisPlanTaskManager // 计划触发器管理器实例 } // NewAPI 创建并返回一个新的 API 实例 // 负责初始化 Gin 引擎、设置全局中间件,并注入所有必要的依赖。 -func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, tokenService token.TokenService, listenHandler transport.ListenHandler) *API { +func NewAPI(cfg config.ServerConfig, + logger *logs.Logger, + userRepo repository.UserRepository, + deviceRepository repository.DeviceRepository, + planRepository repository.PlanRepository, + tokenService token.TokenService, + listenHandler transport.ListenHandler, + analysisTaskManager *task.AnalysisPlanTaskManager) *API { // 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式) // 从配置中获取 Gin 模式 gin.SetMode(cfg.Mode) @@ -73,7 +82,7 @@ func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.Us // 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员 deviceController: device.NewController(deviceRepository, logger), // 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员 - planController: plan.NewController(logger, planRepository), + planController: plan.NewController(logger, planRepository, analysisTaskManager), } api.setupRoutes() // 设置所有路由 diff --git a/internal/app/controller/plan/plan_controller.go b/internal/app/controller/plan/plan_controller.go index cd6c0ab..d7c9576 100644 --- a/internal/app/controller/plan/plan_controller.go +++ b/internal/app/controller/plan/plan_controller.go @@ -5,6 +5,7 @@ import ( "strconv" "git.huangwc.com/pig/pig-farm-controller/internal/app/controller" + task "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -74,7 +75,7 @@ type TaskRequest struct { // TaskResponse 定义任务响应结构体 type TaskResponse struct { - ID uint `json:"id" example:"1"` + ID int `json:"id" example:"1"` PlanID uint `json:"plan_id" example:"1"` Name string `json:"name" example:"打开风扇"` Description string `json:"description" example:"打开1号风扇"` @@ -87,15 +88,17 @@ type TaskResponse struct { // Controller 定义了计划相关的控制器 type Controller struct { - logger *logs.Logger - planRepo repository.PlanRepository + logger *logs.Logger + planRepo repository.PlanRepository + analysisPlanTaskManager *task.AnalysisPlanTaskManager } // NewController 创建一个新的 Controller 实例 -func NewController(logger *logs.Logger, planRepo repository.PlanRepository) *Controller { +func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager) *Controller { return &Controller{ - logger: logger, - planRepo: planRepo, + logger: logger, + planRepo: planRepo, + analysisPlanTaskManager: analysisPlanTaskManager, } } @@ -131,6 +134,12 @@ func (c *Controller) CreatePlan(ctx *gin.Context) { return } + // 创建成功后,调用 manager 创建或更新触发器 + if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil { + // 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功 + c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err) + } + // 使用已有的转换函数将创建后的模型转换为响应对象 resp := PlanToResponse(planToCreate) @@ -261,6 +270,12 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) { return } + // 更新成功后,调用 manager 创建或更新触发器 + if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil { + // 这是一个非阻塞性错误,我们只记录日志 + c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err) + } + // 6. 获取更新后的完整计划用于响应 updatedPlan, err := c.planRepo.GetPlanByID(uint(id)) if err != nil { diff --git a/internal/core/application.go b/internal/core/application.go index 1c627bb..0c36568 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -5,8 +5,10 @@ import ( "os" "os/signal" "syscall" + "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/api" + "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/token" "git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" @@ -14,7 +16,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/task" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 @@ -22,7 +23,7 @@ type Application struct { Config *config.Config Logger *logs.Logger Storage database.Storage - Executor *task.Executor + Executor *task.Scheduler API *api.API // 添加 API 对象 } @@ -44,9 +45,6 @@ func NewApplication(configPath string) (*Application, error) { return nil, err // 错误已在 initStorage 中被包装 } - // 初始化任务执行器 - executor := task.NewExecutor(cfg.Heartbeat.Concurrency, logger) - // 初始化 Token 服务 tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret)) @@ -59,11 +57,23 @@ func NewApplication(configPath string) (*Application, error) { // 初始化计划仓库 planRepo := repository.NewGormPlanRepository(storage.GetDB()) + // 初始化待执行任务仓库 + pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB()) + + // 初始化执行日志仓库 + executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB()) + // 初始化设备上行监听器 listenHandler := transport.NewChirpStackListener(logger) + // 初始化计划触发器管理器 + analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) + + // 初始化任务执行器 + executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) + // 初始化 API 服务器 - apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler) + apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager) // 组装 Application 对象 app := &Application{ diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index 74679bf..273e37c 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -29,6 +29,9 @@ type Config struct { // Heartbeat 心跳配置 Heartbeat HeartbeatConfig `yaml:"heartbeat"` + + // TaskConfig 任务调度配置 + Task TaskConfig `yaml:"task"` } // AppConfig 代表应用基础配置 @@ -106,6 +109,12 @@ type HeartbeatConfig struct { Concurrency int `yaml:"concurrency"` } +// TaskConfig 代表任务调度配置 +type TaskConfig struct { + Interval int `yaml:"interval"` + NumWorkers int `yaml:"num_workers"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 diff --git a/internal/infra/repository/execution_log_repository.go b/internal/infra/repository/execution_log_repository.go index 195afd7..4503cb0 100644 --- a/internal/infra/repository/execution_log_repository.go +++ b/internal/infra/repository/execution_log_repository.go @@ -15,32 +15,32 @@ type ExecutionLogRepository interface { FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) } -// executionLogRepository 是使用 GORM 的具体实现。 -type executionLogRepository struct { +// gormExecutionLogRepository 是使用 GORM 的具体实现。 +type gormExecutionLogRepository struct { db *gorm.DB } -// NewExecutionLogRepository 创建一个新的执行日志仓库。 +// NewGormExecutionLogRepository 创建一个新的执行日志仓库。 // 它接收一个 GORM DB 实例作为依赖。 -func NewExecutionLogRepository(db *gorm.DB) ExecutionLogRepository { - return &executionLogRepository{db: db} +func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository { + return &gormExecutionLogRepository{db: db} } // CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。 -func (r *executionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error { +func (r *gormExecutionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error { return r.db.Create(log).Error } // UpdatePlanExecutionLog 使用 Updates 方法更新一个计划执行日志。 // GORM 的 Updates 传入 struct 时,只会更新非零值字段。 // 在这里,我们期望传入的对象一定包含一个有效的 ID。 -func (r *executionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error { +func (r *gormExecutionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error { return r.db.Updates(log).Error } // CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。 // 这是“预写日志”步骤的关键。 -func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error { +func (r *gormExecutionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error { // GORM 的 Create 传入一个切片指针会执行批量插入。 return r.db.Create(&logs).Error } @@ -48,13 +48,13 @@ func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.T // UpdateTaskExecutionLog 使用 Updates 方法更新一个任务执行日志。 // GORM 的 Updates 传入 struct 时,只会更新非零值字段。 // 这种方式代码更直观,上层服务可以直接修改模型对象后进行保存。 -func (r *executionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error { +func (r *gormExecutionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error { return r.db.Updates(log).Error } // FindTaskExecutionLogByID 根据 ID 查找单个任务执行日志。 // 它会预加载关联的 Task 信息。 -func (r *executionLogRepository) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) { +func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) { var log models.TaskExecutionLog // 使用 Preload("Task") 来确保关联的任务信息被一并加载 err := r.db.Preload("Task").First(&log, id).Error diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index dfee75b..689197b 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -18,23 +18,23 @@ type PendingTaskRepository interface { RequeueTask(originalPendingTask *models.PendingTask) error } -// pendingTaskRepository 是使用 GORM 的具体实现。 -type pendingTaskRepository struct { +// gormPendingTaskRepository 是使用 GORM 的具体实现。 +type gormPendingTaskRepository struct { db *gorm.DB } -// NewPendingTaskRepository 创建一个新的待执行任务队列仓库。 -func NewPendingTaskRepository(db *gorm.DB) PendingTaskRepository { - return &pendingTaskRepository{db: db} +// NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。 +func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository { + return &gormPendingTaskRepository{db: db} } // CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。 -func (r *pendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error { +func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error { return r.db.Create(&tasks).Error } // ClaimNextAvailableTask 以原子方式认领下一个可用的任务。 -func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { +func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) { var log models.TaskExecutionLog var pendingTask models.PendingTask @@ -79,7 +79,7 @@ func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (* // RequeueTask 安全地将一个任务重新放回队列。 // 它通过将原始 PendingTask 的 ID 重置为 0,并重新创建它来实现。 -func (r *pendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error { +func (r *gormPendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error { return r.db.Transaction(func(tx *gorm.DB) error { // 1. 将日志状态恢复为 waiting if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {