删掉原来的定时采集线程
This commit is contained in:
@@ -89,7 +89,6 @@ func (app *Application) Start() error {
|
|||||||
|
|
||||||
// 3. 启动后台工作协程
|
// 3. 启动后台工作协程
|
||||||
app.Domain.Scheduler.Start()
|
app.Domain.Scheduler.Start()
|
||||||
app.Domain.TimedCollector.Start()
|
|
||||||
|
|
||||||
// 4. 启动 API 服务器
|
// 4. 启动 API 服务器
|
||||||
app.API.Start()
|
app.API.Start()
|
||||||
@@ -113,9 +112,6 @@ func (app *Application) Stop() error {
|
|||||||
// 关闭任务执行器
|
// 关闭任务执行器
|
||||||
app.Domain.Scheduler.Stop()
|
app.Domain.Scheduler.Stop()
|
||||||
|
|
||||||
// 关闭定时采集器
|
|
||||||
app.Domain.TimedCollector.Stop()
|
|
||||||
|
|
||||||
// 断开数据库连接
|
// 断开数据库连接
|
||||||
if err := app.Infra.Storage.Disconnect(); err != nil {
|
if err := app.Infra.Storage.Disconnect(); err != nil {
|
||||||
app.Logger.Errorw("数据库连接断开失败", "error", err)
|
app.Logger.Errorw("数据库连接断开失败", "error", err)
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/webhook"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/audit"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/collection"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
|
||||||
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
|
domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
|
||||||
@@ -124,7 +123,6 @@ type DomainServices struct {
|
|||||||
PigTradeManager pig.PigTradeManager
|
PigTradeManager pig.PigTradeManager
|
||||||
PigSickManager pig.SickPigManager
|
PigSickManager pig.SickPigManager
|
||||||
PigBatchDomain pig.PigBatchService
|
PigBatchDomain pig.PigBatchService
|
||||||
TimedCollector collection.Collector
|
|
||||||
GeneralDeviceService device.Service
|
GeneralDeviceService device.Service
|
||||||
taskFactory scheduler.TaskFactory
|
taskFactory scheduler.TaskFactory
|
||||||
AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
|
AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
|
||||||
@@ -170,14 +168,6 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
|
|||||||
cfg.Task.NumWorkers,
|
cfg.Task.NumWorkers,
|
||||||
)
|
)
|
||||||
|
|
||||||
// 定时采集器
|
|
||||||
timedCollector := collection.NewTimedCollector(
|
|
||||||
infra.Repos.DeviceRepo,
|
|
||||||
generalDeviceService,
|
|
||||||
logger,
|
|
||||||
time.Duration(cfg.Collection.Interval)*time.Second,
|
|
||||||
)
|
|
||||||
|
|
||||||
return &DomainServices{
|
return &DomainServices{
|
||||||
PigPenTransferManager: pigPenTransferManager,
|
PigPenTransferManager: pigPenTransferManager,
|
||||||
PigTradeManager: pigTradeManager,
|
PigTradeManager: pigTradeManager,
|
||||||
@@ -187,7 +177,6 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
|
|||||||
AnalysisPlanTaskManager: analysisPlanTaskManager,
|
AnalysisPlanTaskManager: analysisPlanTaskManager,
|
||||||
taskFactory: taskFactory,
|
taskFactory: taskFactory,
|
||||||
Scheduler: planScheduler,
|
Scheduler: planScheduler,
|
||||||
TimedCollector: timedCollector,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +0,0 @@
|
|||||||
package collection
|
|
||||||
|
|
||||||
type Collector interface {
|
|
||||||
Start()
|
|
||||||
Stop()
|
|
||||||
}
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
package collection
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TimedCollector 实现了 Collector 接口,用于定时从数据库获取设备信息并下发采集指令
|
|
||||||
type TimedCollector struct {
|
|
||||||
deviceRepo repository.DeviceRepository
|
|
||||||
deviceService device.Service
|
|
||||||
logger *logs.Logger
|
|
||||||
interval time.Duration
|
|
||||||
ticker *time.Ticker
|
|
||||||
done chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTimedCollector 创建一个定时采集器实例
|
|
||||||
func NewTimedCollector(
|
|
||||||
deviceRepo repository.DeviceRepository,
|
|
||||||
deviceService device.Service,
|
|
||||||
logger *logs.Logger,
|
|
||||||
interval time.Duration,
|
|
||||||
) Collector {
|
|
||||||
return &TimedCollector{
|
|
||||||
deviceRepo: deviceRepo,
|
|
||||||
deviceService: deviceService,
|
|
||||||
logger: logger,
|
|
||||||
interval: interval,
|
|
||||||
done: make(chan bool),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start 开始定时采集
|
|
||||||
func (c *TimedCollector) Start() {
|
|
||||||
c.logger.Infof("定时采集器启动,采集间隔: %s", c.interval)
|
|
||||||
c.ticker = time.NewTicker(c.interval)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.done:
|
|
||||||
return
|
|
||||||
case <-c.ticker.C:
|
|
||||||
c.collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop 停止定时采集
|
|
||||||
func (c *TimedCollector) Stop() {
|
|
||||||
c.logger.Info("定时采集器停止")
|
|
||||||
c.ticker.Stop()
|
|
||||||
c.done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
// collect 是核心的采集逻辑
|
|
||||||
func (c *TimedCollector) collect() {
|
|
||||||
c.logger.Info("开始新一轮的设备数据采集")
|
|
||||||
|
|
||||||
sensors, err := c.deviceRepo.ListAllSensors()
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Errorf("采集周期: 从数据库获取所有传感器失败: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(sensors) == 0 {
|
|
||||||
c.logger.Info("采集周期: 未发现任何传感器设备,跳过本次采集")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sensorsByController := make(map[uint][]*models.Device)
|
|
||||||
for _, sensor := range sensors {
|
|
||||||
sensorsByController[sensor.AreaControllerID] = append(sensorsByController[sensor.AreaControllerID], sensor)
|
|
||||||
}
|
|
||||||
|
|
||||||
for controllerID, controllerSensors := range sensorsByController {
|
|
||||||
c.logger.Infof("采集周期: 准备为区域主控 %d 下的 %d 个传感器下发采集指令", controllerID, len(controllerSensors))
|
|
||||||
if err := c.deviceService.Collect(controllerID, controllerSensors); err != nil {
|
|
||||||
c.logger.Errorf("采集周期: 为区域主控 %d 下发采集指令失败: %v", controllerID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("本轮设备数据采集完成")
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user