定义ReleaseFeedWeightTask并注入依赖

This commit is contained in:
2025-09-25 09:44:32 +08:00
parent e1a1b29a0f
commit 0d6d1db290
9 changed files with 95 additions and 49 deletions

View File

@@ -48,6 +48,7 @@ heartbeat:
chirp_stack: chirp_stack:
api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址 api_host: "http://192.168.5.16:8090" # ChirpStack API服务器地址
api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN> api_token: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjaGlycHN0YWNrIiwiaXNzIjoiY2hpcnBzdGFjayIsInN1YiI6IjU2ZWRhNWQ3LTM4NzgtNDAwMC05MWMzLWYwZDk3M2YwODhjNiIsInR5cCI6ImtleSJ9.NxBxTrhPAnezKMqAYZR_Uq2mGQjJRlmVzg1ZDFCyaHQ" # ChirpStack API密钥, 请求头中需要设置 Grpc-Metadata-Authorization: Bearer <YOUR_API_TOKEN>
fport: 1
api_timeout: 10 # ChirpStack API请求超时时间(秒) api_timeout: 10 # ChirpStack API请求超时时间(秒)
# 任务调度器配置 # 任务调度器配置

View File

@@ -20,22 +20,15 @@ type DelayTask struct {
logger *logs.Logger logger *logs.Logger
} }
func NewDelayTask() Task { func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task {
return &DelayTask{} return &DelayTask{
executionTask: executionTask,
logger: logger,
}
} }
// Execute 执行延迟任务,等待指定的时间 // Execute 执行延迟任务,等待指定的时间
func (d *DelayTask) Execute() error { func (d *DelayTask) Execute() error {
d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration)
time.Sleep(d.duration)
d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID)
return nil
}
func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskExecutionLog) error {
d.logger = logger
d.executionTask = executionTask
if d.executionTask.Task.Parameters == nil { if d.executionTask.Task.Parameters == nil {
d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID) d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID)
return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID)
@@ -54,6 +47,10 @@ func (d *DelayTask) ParseParams(logger *logs.Logger, executionTask *models.TaskE
} }
d.duration = time.Duration(duration) * time.Second d.duration = time.Duration(duration) * time.Second
d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration)
time.Sleep(d.duration)
d.logger.Infof("任务 %v: 延迟结束。", d.executionTask.TaskID)
return nil return nil
} }

View File

@@ -0,0 +1,36 @@
package task
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
)
// ReleaseFeedWeightTask 是一个控制下料口释放指定重量的任务
type ReleaseFeedWeightTask struct {
deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository
comm transport.Communicator
logger *logs.Logger
}
func (r *ReleaseFeedWeightTask) Execute() error {
//TODO implement me
panic("implement me")
}
func (r *ReleaseFeedWeightTask) OnFailure(executeErr error) {
//TODO implement me
panic("implement me")
}
// NewReleaseFeedWeightTask 创建一个新的 ReleaseFeedWeightTask 实例
func NewReleaseFeedWeightTask(deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, comm transport.Communicator) Task {
return &ReleaseFeedWeightTask{
deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo,
comm: comm,
}
}

View File

@@ -9,6 +9,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -80,8 +81,11 @@ type Scheduler struct {
workers int workers int
pendingTaskRepo repository.PendingTaskRepository pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository executionLogRepo repository.ExecutionLogRepository
deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository
planRepo repository.PlanRepository planRepo repository.PlanRepository
analysisPlanTaskManager *AnalysisPlanTaskManager // <--- 注入共享的 Manager comm transport.Communicator
analysisPlanTaskManager *AnalysisPlanTaskManager
progressTracker *ProgressTracker progressTracker *ProgressTracker
pool *ants.Pool // 使用 ants 协程池来管理并发 pool *ants.Pool // 使用 ants 协程池来管理并发
@@ -93,16 +97,22 @@ type Scheduler struct {
func NewScheduler( func NewScheduler(
pendingTaskRepo repository.PendingTaskRepository, pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository, executionLogRepo repository.ExecutionLogRepository,
deviceRepo repository.DeviceRepository,
sensorDataRepo repository.SensorDataRepository,
planRepo repository.PlanRepository, planRepo repository.PlanRepository,
analysisPlanTaskManager *AnalysisPlanTaskManager, // <--- 注入 Manager comm transport.Communicator,
analysisPlanTaskManager *AnalysisPlanTaskManager,
logger *logs.Logger, logger *logs.Logger,
interval time.Duration, interval time.Duration,
numWorkers int) *Scheduler { numWorkers int) *Scheduler {
return &Scheduler{ return &Scheduler{
pendingTaskRepo: pendingTaskRepo, pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo, executionLogRepo: executionLogRepo,
deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo,
planRepo: planRepo, planRepo: planRepo,
analysisPlanTaskManager: analysisPlanTaskManager, // <--- 注入 Manager comm: comm,
analysisPlanTaskManager: analysisPlanTaskManager,
logger: logger, logger: logger,
pollingInterval: interval, pollingInterval: interval,
workers: numWorkers, workers: numWorkers,
@@ -261,11 +271,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
} else { } else {
// 执行普通任务 // 执行普通任务
task := s.taskFactory(claimedLog.Task.Type) task := s.taskFactory(claimedLog)
if err := task.ParseParams(s.logger, claimedLog); err != nil {
s.logger.Errorf("[严重] 任务参数解析失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
return err
}
if err := task.Execute(); err != nil { if err := task.Execute(); err != nil {
s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err) s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
@@ -278,10 +284,13 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
} }
// taskFactory 会根据任务类型初始化对应任务 // taskFactory 会根据任务类型初始化对应任务
func (s *Scheduler) taskFactory(taskType models.TaskType) Task { func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
switch taskType { switch claimedLog.Task.Type {
case models.TaskTypeWaiting: case models.TaskTypeWaiting:
return NewDelayTask() return NewDelayTask(s.logger, claimedLog)
case models.TaskTypeReleaseFeedWeight:
return NewReleaseFeedWeightTask(s.deviceRepo, s.sensorDataRepo, s.comm)
default: default:
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
panic("不支持的任务类型") panic("不支持的任务类型")

View File

@@ -1,7 +1,6 @@
package task package task
import ( import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
) )
@@ -13,9 +12,6 @@ type Task interface {
// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
Execute() error Execute() error
// ParseParams 解析及校验参数
ParseParams(logger *logs.Logger, claimedLog *models.TaskExecutionLog) error
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
// log: 任务执行的上下文。 // log: 任务执行的上下文。
// executeErr: 从 Execute 方法返回的原始错误。 // executeErr: 从 Execute 方法返回的原始错误。

View File

@@ -16,6 +16,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
) )
// Application 是整个应用的核心,封装了所有组件和生命周期。 // Application 是整个应用的核心,封装了所有组件和生命周期。
@@ -81,8 +82,11 @@ func NewApplication(configPath string) (*Application, error) {
// 初始化计划触发器管理器 // 初始化计划触发器管理器
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger) analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
// 初始化设备通信器
comm := lora.NewChirpStackTransport(cfg.ChirpStack, logger, deviceCommandLogRepo, deviceRepo)
// 初始化任务执行器 // 初始化任务执行器
executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, planRepo, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers) executor := task.NewScheduler(pendingTaskRepo, executionLogRepo, deviceRepo, sensorDataRepo, planRepo, comm, analysisPlanTaskManager, logger, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers)
// 初始化 API 服务器 // 初始化 API 服务器
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager) apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, sensorDataRepo, executionLogRepo, tokenService, listenHandler, analysisPlanTaskManager)

View File

@@ -30,6 +30,9 @@ type Config struct {
// Heartbeat 心跳配置 // Heartbeat 心跳配置
Heartbeat HeartbeatConfig `yaml:"heartbeat"` Heartbeat HeartbeatConfig `yaml:"heartbeat"`
// ChirpStack ChirpStack API 配置
ChirpStack ChirpStackConfig `yaml:"chirp_stack"`
// TaskConfig 任务调度配置 // TaskConfig 任务调度配置
Task TaskConfig `yaml:"task"` Task TaskConfig `yaml:"task"`
} }
@@ -112,6 +115,14 @@ type HeartbeatConfig struct {
Concurrency int `yaml:"concurrency"` Concurrency int `yaml:"concurrency"`
} }
// ChirpStackConfig 代表 ChirpStack API 配置
type ChirpStackConfig struct {
APIHost string `yaml:"api_host"`
APIToken string `yaml:"api_token"`
FPort int `yaml:"fport"`
APITimeout int `yaml:"api_timeout"`
}
// TaskConfig 代表任务调度配置 // TaskConfig 代表任务调度配置
type TaskConfig struct { type TaskConfig struct {
Interval int `yaml:"interval"` Interval int `yaml:"interval"`
@@ -139,3 +150,8 @@ func (c *Config) Load(path string) error {
return nil return nil
} }
// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分
func (c ChirpStackConfig) GenerateAPIKey() string {
return "Bearer " + c.APIToken
}

View File

@@ -31,6 +31,7 @@ type TaskType string
const ( const (
TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务 TaskPlanAnalysis TaskType = "plan_analysis" // 解析Plan的Task列表并添加到待执行队列的特殊任务
TaskTypeWaiting TaskType = "waiting" // 等待任务 TaskTypeWaiting TaskType = "waiting" // 等待任务
TaskTypeReleaseFeedWeight TaskType = "release_feed_weight" // 下料口释放指定重量任务
) )
const ( const (

View File

@@ -3,6 +3,7 @@ package lora
import ( import (
"time" "time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
@@ -14,26 +15,11 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client"
) )
// ChirpStackConfig 保存连接到 ChirpStack API 所需的配置。
type ChirpStackConfig struct {
// ServerAddress 是 ChirpStack API 服务器的地址,例如 "localhost:8080"。
ServerAddress string
// APIKey 是用于认证的 API 密钥。
APIKey string
// LoRaWAN 端口, 需要和设备一致
Fport int64
}
// GenerateAPIKey 用于补齐API Key作为请求头时缺失的部分
func (c ChirpStackConfig) GenerateAPIKey() string {
return "Bearer " + c.APIKey
}
// ChirpStackTransport 是一个客户端,用于封装与 ChirpStack REST API 的交互。 // ChirpStackTransport 是一个客户端,用于封装与 ChirpStack REST API 的交互。
type ChirpStackTransport struct { type ChirpStackTransport struct {
client *client.ChirpStackRESTAPI client *client.ChirpStackRESTAPI
authInfo runtime.ClientAuthInfoWriter authInfo runtime.ClientAuthInfoWriter
config ChirpStackConfig config config.ChirpStackConfig
deviceCommandLogRepo repository.DeviceCommandLogRepository deviceCommandLogRepo repository.DeviceCommandLogRepository
deviceRepo repository.DeviceRepository deviceRepo repository.DeviceRepository
@@ -43,14 +29,14 @@ type ChirpStackTransport struct {
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。 // NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
func NewChirpStackTransport( func NewChirpStackTransport(
config ChirpStackConfig, config config.ChirpStackConfig,
logger *logs.Logger, logger *logs.Logger,
deviceCommandLogRepo repository.DeviceCommandLogRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository,
deviceRepo repository.DeviceRepository, deviceRepo repository.DeviceRepository,
) *ChirpStackTransport { ) *ChirpStackTransport {
// 使用配置中的服务器地址创建一个 HTTP transport。 // 使用配置中的服务器地址创建一个 HTTP transport。
// 它会使用生成的客户端中定义的默认 base path 和 schemes。 // 它会使用生成的客户端中定义的默认 base path 和 schemes。
transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes) transport := httptransport.New(config.APIHost, client.DefaultBasePath, client.DefaultSchemes)
// 使用 transport 和默认的字符串格式化器,创建一个 API 主客户端。 // 使用 transport 和默认的字符串格式化器,创建一个 API 主客户端。
apiClient := client.New(transport, strfmt.Default) apiClient := client.New(transport, strfmt.Default)
@@ -77,7 +63,7 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error {
QueueItem: &device_service.DeviceServiceEnqueueParamsBodyQueueItem{ QueueItem: &device_service.DeviceServiceEnqueueParamsBodyQueueItem{
Confirmed: true, Confirmed: true,
Data: payload, Data: payload,
FPort: c.config.Fport, FPort: int64(c.config.FPort),
}, },
} }