1. 增加任务调度器配置文件
2. 创建/更新计划会自动处理触发器
This commit is contained in:
@@ -48,3 +48,8 @@ chirp_stack:
|
||||
api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址
|
||||
api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN>
|
||||
api_timeout: 10 # ChirpStack API请求超时时间(秒)
|
||||
|
||||
# 任务调度器配置
|
||||
task:
|
||||
interval: 3
|
||||
num_workers: 5
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/device"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/plan"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/user"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||
@@ -43,11 +44,19 @@ type API struct {
|
||||
deviceController *device.Controller // 设备控制器实例
|
||||
planController *plan.Controller // 计划控制器实例
|
||||
listenHandler transport.ListenHandler // 设备上行事件监听器
|
||||
analysisTaskManager *task.AnalysisPlanTaskManager // 计划触发器管理器实例
|
||||
}
|
||||
|
||||
// NewAPI 创建并返回一个新的 API 实例
|
||||
// 负责初始化 Gin 引擎、设置全局中间件,并注入所有必要的依赖。
|
||||
func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, tokenService token.TokenService, listenHandler transport.ListenHandler) *API {
|
||||
func NewAPI(cfg config.ServerConfig,
|
||||
logger *logs.Logger,
|
||||
userRepo repository.UserRepository,
|
||||
deviceRepository repository.DeviceRepository,
|
||||
planRepository repository.PlanRepository,
|
||||
tokenService token.TokenService,
|
||||
listenHandler transport.ListenHandler,
|
||||
analysisTaskManager *task.AnalysisPlanTaskManager) *API {
|
||||
// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
|
||||
// 从配置中获取 Gin 模式
|
||||
gin.SetMode(cfg.Mode)
|
||||
@@ -73,7 +82,7 @@ func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.Us
|
||||
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
|
||||
deviceController: device.NewController(deviceRepository, logger),
|
||||
// 在 NewAPI 中初始化计划控制器,并将其作为 API 结构体的成员
|
||||
planController: plan.NewController(logger, planRepository),
|
||||
planController: plan.NewController(logger, planRepository, analysisTaskManager),
|
||||
}
|
||||
|
||||
api.setupRoutes() // 设置所有路由
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller"
|
||||
task "git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
@@ -74,7 +75,7 @@ type TaskRequest struct {
|
||||
|
||||
// TaskResponse 定义任务响应结构体
|
||||
type TaskResponse struct {
|
||||
ID uint `json:"id" example:"1"`
|
||||
ID int `json:"id" example:"1"`
|
||||
PlanID uint `json:"plan_id" example:"1"`
|
||||
Name string `json:"name" example:"打开风扇"`
|
||||
Description string `json:"description" example:"打开1号风扇"`
|
||||
@@ -89,13 +90,15 @@ type TaskResponse struct {
|
||||
type Controller struct {
|
||||
logger *logs.Logger
|
||||
planRepo repository.PlanRepository
|
||||
analysisPlanTaskManager *task.AnalysisPlanTaskManager
|
||||
}
|
||||
|
||||
// NewController 创建一个新的 Controller 实例
|
||||
func NewController(logger *logs.Logger, planRepo repository.PlanRepository) *Controller {
|
||||
func NewController(logger *logs.Logger, planRepo repository.PlanRepository, analysisPlanTaskManager *task.AnalysisPlanTaskManager) *Controller {
|
||||
return &Controller{
|
||||
logger: logger,
|
||||
planRepo: planRepo,
|
||||
analysisPlanTaskManager: analysisPlanTaskManager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,6 +134,12 @@ func (c *Controller) CreatePlan(ctx *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 创建成功后,调用 manager 创建或更新触发器
|
||||
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToCreate.ID); err != nil {
|
||||
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
|
||||
c.logger.Errorf("为新创建的计划 %d 创建触发器失败: %v", planToCreate.ID, err)
|
||||
}
|
||||
|
||||
// 使用已有的转换函数将创建后的模型转换为响应对象
|
||||
resp := PlanToResponse(planToCreate)
|
||||
|
||||
@@ -261,6 +270,12 @@ func (c *Controller) UpdatePlan(ctx *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 更新成功后,调用 manager 创建或更新触发器
|
||||
if err := c.analysisPlanTaskManager.CreateOrUpdateTrigger(ctx, planToUpdate.ID); err != nil {
|
||||
// 这是一个非阻塞性错误,我们只记录日志
|
||||
c.logger.Errorf("为更新后的计划 %d 更新触发器失败: %v", planToUpdate.ID, err)
|
||||
}
|
||||
|
||||
// 6. 获取更新后的完整计划用于响应
|
||||
updatedPlan, err := c.planRepo.GetPlanByID(uint(id))
|
||||
if err != nil {
|
||||
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||
@@ -14,7 +16,6 @@ import (
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/task"
|
||||
)
|
||||
|
||||
// Application 是整个应用的核心,封装了所有组件和生命周期。
|
||||
@@ -22,7 +23,7 @@ type Application struct {
|
||||
Config *config.Config
|
||||
Logger *logs.Logger
|
||||
Storage database.Storage
|
||||
Executor *task.Executor
|
||||
Executor *task.Scheduler
|
||||
API *api.API // 添加 API 对象
|
||||
}
|
||||
|
||||
@@ -44,9 +45,6 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
return nil, err // 错误已在 initStorage 中被包装
|
||||
}
|
||||
|
||||
// 初始化任务执行器
|
||||
executor := task.NewExecutor(cfg.Heartbeat.Concurrency, logger)
|
||||
|
||||
// 初始化 Token 服务
|
||||
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
|
||||
|
||||
@@ -59,11 +57,23 @@ func NewApplication(configPath string) (*Application, error) {
|
||||
// 初始化计划仓库
|
||||
planRepo := repository.NewGormPlanRepository(storage.GetDB())
|
||||
|
||||
// 初始化待执行任务仓库
|
||||
pendingTaskRepo := repository.NewGormPendingTaskRepository(storage.GetDB())
|
||||
|
||||
// 初始化执行日志仓库
|
||||
executionLogRepo := repository.NewGormExecutionLogRepository(storage.GetDB())
|
||||
|
||||
// 初始化设备上行监听器
|
||||
listenHandler := transport.NewChirpStackListener(logger)
|
||||
|
||||
// 初始化计划触发器管理器
|
||||
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
||||
|
||||
// 初始化任务执行器
|
||||
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, task.TaskFactory, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
|
||||
|
||||
// 初始化 API 服务器
|
||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler)
|
||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler, analysisPlanTaskManager)
|
||||
|
||||
// 组装 Application 对象
|
||||
app := &Application{
|
||||
|
||||
@@ -29,6 +29,9 @@ type Config struct {
|
||||
|
||||
// Heartbeat 心跳配置
|
||||
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
|
||||
|
||||
// TaskConfig 任务调度配置
|
||||
Task TaskConfig `yaml:"task"`
|
||||
}
|
||||
|
||||
// AppConfig 代表应用基础配置
|
||||
@@ -106,6 +109,12 @@ type HeartbeatConfig struct {
|
||||
Concurrency int `yaml:"concurrency"`
|
||||
}
|
||||
|
||||
// TaskConfig 代表任务调度配置
|
||||
type TaskConfig struct {
|
||||
Interval int `yaml:"interval"`
|
||||
NumWorkers int `yaml:"num_workers"`
|
||||
}
|
||||
|
||||
// NewConfig 创建并返回一个新的配置实例
|
||||
func NewConfig() *Config {
|
||||
// 默认值可以在这里设置,但我们优先使用配置文件中的值
|
||||
|
||||
@@ -15,32 +15,32 @@ type ExecutionLogRepository interface {
|
||||
FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error)
|
||||
}
|
||||
|
||||
// executionLogRepository 是使用 GORM 的具体实现。
|
||||
type executionLogRepository struct {
|
||||
// gormExecutionLogRepository 是使用 GORM 的具体实现。
|
||||
type gormExecutionLogRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewExecutionLogRepository 创建一个新的执行日志仓库。
|
||||
// NewGormExecutionLogRepository 创建一个新的执行日志仓库。
|
||||
// 它接收一个 GORM DB 实例作为依赖。
|
||||
func NewExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
|
||||
return &executionLogRepository{db: db}
|
||||
func NewGormExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
|
||||
return &gormExecutionLogRepository{db: db}
|
||||
}
|
||||
|
||||
// CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。
|
||||
func (r *executionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error {
|
||||
func (r *gormExecutionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error {
|
||||
return r.db.Create(log).Error
|
||||
}
|
||||
|
||||
// UpdatePlanExecutionLog 使用 Updates 方法更新一个计划执行日志。
|
||||
// GORM 的 Updates 传入 struct 时,只会更新非零值字段。
|
||||
// 在这里,我们期望传入的对象一定包含一个有效的 ID。
|
||||
func (r *executionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error {
|
||||
func (r *gormExecutionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error {
|
||||
return r.db.Updates(log).Error
|
||||
}
|
||||
|
||||
// CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。
|
||||
// 这是“预写日志”步骤的关键。
|
||||
func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error {
|
||||
func (r *gormExecutionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error {
|
||||
// GORM 的 Create 传入一个切片指针会执行批量插入。
|
||||
return r.db.Create(&logs).Error
|
||||
}
|
||||
@@ -48,13 +48,13 @@ func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.T
|
||||
// UpdateTaskExecutionLog 使用 Updates 方法更新一个任务执行日志。
|
||||
// GORM 的 Updates 传入 struct 时,只会更新非零值字段。
|
||||
// 这种方式代码更直观,上层服务可以直接修改模型对象后进行保存。
|
||||
func (r *executionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error {
|
||||
func (r *gormExecutionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error {
|
||||
return r.db.Updates(log).Error
|
||||
}
|
||||
|
||||
// FindTaskExecutionLogByID 根据 ID 查找单个任务执行日志。
|
||||
// 它会预加载关联的 Task 信息。
|
||||
func (r *executionLogRepository) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) {
|
||||
func (r *gormExecutionLogRepository) FindTaskExecutionLogByID(id uint) (*models.TaskExecutionLog, error) {
|
||||
var log models.TaskExecutionLog
|
||||
// 使用 Preload("Task") 来确保关联的任务信息被一并加载
|
||||
err := r.db.Preload("Task").First(&log, id).Error
|
||||
|
||||
@@ -18,23 +18,23 @@ type PendingTaskRepository interface {
|
||||
RequeueTask(originalPendingTask *models.PendingTask) error
|
||||
}
|
||||
|
||||
// pendingTaskRepository 是使用 GORM 的具体实现。
|
||||
type pendingTaskRepository struct {
|
||||
// gormPendingTaskRepository 是使用 GORM 的具体实现。
|
||||
type gormPendingTaskRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewPendingTaskRepository 创建一个新的待执行任务队列仓库。
|
||||
func NewPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
|
||||
return &pendingTaskRepository{db: db}
|
||||
// NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。
|
||||
func NewGormPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
|
||||
return &gormPendingTaskRepository{db: db}
|
||||
}
|
||||
|
||||
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
|
||||
func (r *pendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
|
||||
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
|
||||
return r.db.Create(&tasks).Error
|
||||
}
|
||||
|
||||
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
|
||||
func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
|
||||
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*models.TaskExecutionLog, *models.PendingTask, error) {
|
||||
var log models.TaskExecutionLog
|
||||
var pendingTask models.PendingTask
|
||||
|
||||
@@ -79,7 +79,7 @@ func (r *pendingTaskRepository) ClaimNextAvailableTask(excludePlanIDs []uint) (*
|
||||
|
||||
// RequeueTask 安全地将一个任务重新放回队列。
|
||||
// 它通过将原始 PendingTask 的 ID 重置为 0,并重新创建它来实现。
|
||||
func (r *pendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error {
|
||||
func (r *gormPendingTaskRepository) RequeueTask(originalPendingTask *models.PendingTask) error {
|
||||
return r.db.Transaction(func(tx *gorm.DB) error {
|
||||
// 1. 将日志状态恢复为 waiting
|
||||
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {
|
||||
|
||||
Reference in New Issue
Block a user