issue_42 #46
@@ -11,6 +11,7 @@ import (
 | 
				
			|||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
	domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
						domain_notify "git.huangwc.com/pig/pig-farm-controller/internal/domain/notify"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/pig"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/token"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
 | 
				
			||||||
@@ -125,8 +126,9 @@ type DomainServices struct {
 | 
				
			|||||||
	PigBatchDomain          pig.PigBatchService
 | 
						PigBatchDomain          pig.PigBatchService
 | 
				
			||||||
	TimedCollector          collection.Collector
 | 
						TimedCollector          collection.Collector
 | 
				
			||||||
	GeneralDeviceService    device.Service
 | 
						GeneralDeviceService    device.Service
 | 
				
			||||||
	AnalysisPlanTaskManager *task.AnalysisPlanTaskManager
 | 
						taskFactory             scheduler.TaskFactory
 | 
				
			||||||
	Scheduler               *task.Scheduler
 | 
						AnalysisPlanTaskManager *scheduler.AnalysisPlanTaskManager
 | 
				
			||||||
 | 
						Scheduler               *scheduler.Scheduler
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// initDomainServices 初始化所有的领域服务。
 | 
					// initDomainServices 初始化所有的领域服务。
 | 
				
			||||||
@@ -148,16 +150,20 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
 | 
				
			|||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 计划任务管理器
 | 
						// 计划任务管理器
 | 
				
			||||||
	analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
 | 
						analysisPlanTaskManager := scheduler.NewAnalysisPlanTaskManager(infra.Repos.PlanRepo, infra.Repos.PendingTaskRepo, infra.Repos.ExecutionLogRepo, logger)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 任务工厂
 | 
				
			||||||
 | 
						taskFactory := task.NewTaskFactory(logger, infra.Repos.SensorDataRepo, infra.Repos.DeviceRepo, generalDeviceService)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 任务执行器
 | 
						// 任务执行器
 | 
				
			||||||
	scheduler := task.NewScheduler(
 | 
						planScheduler := scheduler.NewScheduler(
 | 
				
			||||||
		infra.Repos.PendingTaskRepo,
 | 
							infra.Repos.PendingTaskRepo,
 | 
				
			||||||
		infra.Repos.ExecutionLogRepo,
 | 
							infra.Repos.ExecutionLogRepo,
 | 
				
			||||||
		infra.Repos.DeviceRepo,
 | 
							infra.Repos.DeviceRepo,
 | 
				
			||||||
		infra.Repos.SensorDataRepo,
 | 
							infra.Repos.SensorDataRepo,
 | 
				
			||||||
		infra.Repos.PlanRepo,
 | 
							infra.Repos.PlanRepo,
 | 
				
			||||||
		analysisPlanTaskManager,
 | 
							analysisPlanTaskManager,
 | 
				
			||||||
 | 
							taskFactory,
 | 
				
			||||||
		logger,
 | 
							logger,
 | 
				
			||||||
		generalDeviceService,
 | 
							generalDeviceService,
 | 
				
			||||||
		time.Duration(cfg.Task.Interval)*time.Second,
 | 
							time.Duration(cfg.Task.Interval)*time.Second,
 | 
				
			||||||
@@ -179,7 +185,8 @@ func initDomainServices(cfg *config.Config, infra *Infrastructure, logger *logs.
 | 
				
			|||||||
		PigBatchDomain:          pigBatchDomain,
 | 
							PigBatchDomain:          pigBatchDomain,
 | 
				
			||||||
		GeneralDeviceService:    generalDeviceService,
 | 
							GeneralDeviceService:    generalDeviceService,
 | 
				
			||||||
		AnalysisPlanTaskManager: analysisPlanTaskManager,
 | 
							AnalysisPlanTaskManager: analysisPlanTaskManager,
 | 
				
			||||||
		Scheduler:               scheduler,
 | 
							taskFactory:             taskFactory,
 | 
				
			||||||
 | 
							Scheduler:               planScheduler,
 | 
				
			||||||
		TimedCollector:          timedCollector,
 | 
							TimedCollector:          timedCollector,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,4 +1,4 @@
 | 
				
			|||||||
package task
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
@@ -1,4 +1,4 @@
 | 
				
			|||||||
package task
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
@@ -83,6 +83,7 @@ type Scheduler struct {
 | 
				
			|||||||
	deviceRepo              repository.DeviceRepository
 | 
						deviceRepo              repository.DeviceRepository
 | 
				
			||||||
	sensorDataRepo          repository.SensorDataRepository
 | 
						sensorDataRepo          repository.SensorDataRepository
 | 
				
			||||||
	planRepo                repository.PlanRepository
 | 
						planRepo                repository.PlanRepository
 | 
				
			||||||
 | 
						taskFactory             TaskFactory
 | 
				
			||||||
	analysisPlanTaskManager *AnalysisPlanTaskManager
 | 
						analysisPlanTaskManager *AnalysisPlanTaskManager
 | 
				
			||||||
	progressTracker         *ProgressTracker
 | 
						progressTracker         *ProgressTracker
 | 
				
			||||||
	deviceService           device.Service
 | 
						deviceService           device.Service
 | 
				
			||||||
@@ -100,6 +101,7 @@ func NewScheduler(
 | 
				
			|||||||
	sensorDataRepo repository.SensorDataRepository,
 | 
						sensorDataRepo repository.SensorDataRepository,
 | 
				
			||||||
	planRepo repository.PlanRepository,
 | 
						planRepo repository.PlanRepository,
 | 
				
			||||||
	analysisPlanTaskManager *AnalysisPlanTaskManager,
 | 
						analysisPlanTaskManager *AnalysisPlanTaskManager,
 | 
				
			||||||
 | 
						taskFactory TaskFactory,
 | 
				
			||||||
	logger *logs.Logger,
 | 
						logger *logs.Logger,
 | 
				
			||||||
	deviceService device.Service,
 | 
						deviceService device.Service,
 | 
				
			||||||
	interval time.Duration,
 | 
						interval time.Duration,
 | 
				
			||||||
@@ -112,6 +114,7 @@ func NewScheduler(
 | 
				
			|||||||
		sensorDataRepo:          sensorDataRepo,
 | 
							sensorDataRepo:          sensorDataRepo,
 | 
				
			||||||
		planRepo:                planRepo,
 | 
							planRepo:                planRepo,
 | 
				
			||||||
		analysisPlanTaskManager: analysisPlanTaskManager,
 | 
							analysisPlanTaskManager: analysisPlanTaskManager,
 | 
				
			||||||
 | 
							taskFactory:             taskFactory,
 | 
				
			||||||
		logger:                  logger,
 | 
							logger:                  logger,
 | 
				
			||||||
		deviceService:           deviceService,
 | 
							deviceService:           deviceService,
 | 
				
			||||||
		pollingInterval:         interval,
 | 
							pollingInterval:         interval,
 | 
				
			||||||
@@ -271,7 +274,7 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// 执行普通任务
 | 
							// 执行普通任务
 | 
				
			||||||
		task := s.taskFactory(claimedLog)
 | 
							task := s.taskFactory.Production(claimedLog)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := task.Execute(); err != nil {
 | 
							if err := task.Execute(); err != nil {
 | 
				
			||||||
			s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
 | 
								s.logger.Errorf("[严重] 任务执行失败, 日志ID: %d, 错误: %v", claimedLog.ID, err)
 | 
				
			||||||
@@ -283,20 +286,6 @@ func (s *Scheduler) runTask(claimedLog *models.TaskExecutionLog) error {
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// taskFactory 会根据任务类型初始化对应任务
 | 
					 | 
				
			||||||
func (s *Scheduler) taskFactory(claimedLog *models.TaskExecutionLog) Task {
 | 
					 | 
				
			||||||
	switch claimedLog.Task.Type {
 | 
					 | 
				
			||||||
	case models.TaskTypeWaiting:
 | 
					 | 
				
			||||||
		return NewDelayTask(s.logger, claimedLog)
 | 
					 | 
				
			||||||
	case models.TaskTypeReleaseFeedWeight:
 | 
					 | 
				
			||||||
		return NewReleaseFeedWeightTask(claimedLog, s.sensorDataRepo, s.deviceRepo, s.deviceService, s.logger)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	default:
 | 
					 | 
				
			||||||
		// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
 | 
					 | 
				
			||||||
		panic("不支持的任务类型")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
 | 
					// analysisPlan 解析Plan并将解析出的Task列表插入待执行队列中
 | 
				
			||||||
func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
 | 
					func (s *Scheduler) analysisPlan(claimedLog *models.TaskExecutionLog) error {
 | 
				
			||||||
	// 创建Plan执行记录
 | 
						// 创建Plan执行记录
 | 
				
			||||||
							
								
								
									
										23
									
								
								internal/domain/scheduler/task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								internal/domain/scheduler/task.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,23 @@
 | 
				
			|||||||
 | 
					package scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Task 定义了所有可被调度器执行的任务必须实现的接口。
 | 
				
			||||||
 | 
					type Task interface {
 | 
				
			||||||
 | 
						// Execute 是任务的核心执行逻辑。
 | 
				
			||||||
 | 
						// ctx: 用于控制任务的超时或取消。
 | 
				
			||||||
 | 
						// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
 | 
				
			||||||
 | 
						// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
 | 
				
			||||||
 | 
						Execute() error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
 | 
				
			||||||
 | 
						// log: 任务执行的上下文。
 | 
				
			||||||
 | 
						// executeErr: 从 Execute 方法返回的原始错误。
 | 
				
			||||||
 | 
						OnFailure(executeErr error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TaskFactory 是一个工厂接口,用于根据任务执行日志创建任务实例。
 | 
				
			||||||
 | 
					type TaskFactory interface {
 | 
				
			||||||
 | 
						// Production 根据指定的任务执行日志创建一个任务实例。
 | 
				
			||||||
 | 
						Production(claimedLog *models.TaskExecutionLog) Task
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -4,6 +4,7 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -19,7 +20,7 @@ type DelayTask struct {
 | 
				
			|||||||
	logger        *logs.Logger
 | 
						logger        *logs.Logger
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) Task {
 | 
					func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) scheduler.Task {
 | 
				
			||||||
	return &DelayTask{
 | 
						return &DelayTask{
 | 
				
			||||||
		executionTask: executionTask,
 | 
							executionTask: executionTask,
 | 
				
			||||||
		logger:        logger,
 | 
							logger:        logger,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,6 +6,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
@@ -40,12 +41,12 @@ func NewReleaseFeedWeightTask(
 | 
				
			|||||||
	deviceRepo repository.DeviceRepository,
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
	deviceService device.Service,
 | 
						deviceService device.Service,
 | 
				
			||||||
	logger *logs.Logger,
 | 
						logger *logs.Logger,
 | 
				
			||||||
) Task {
 | 
					) scheduler.Task {
 | 
				
			||||||
	return &ReleaseFeedWeightTask{
 | 
						return &ReleaseFeedWeightTask{
 | 
				
			||||||
		claimedLog:     claimedLog,
 | 
							claimedLog:     claimedLog,
 | 
				
			||||||
		deviceRepo:     deviceRepo,
 | 
							deviceRepo:     deviceRepo,
 | 
				
			||||||
		sensorDataRepo: sensorDataRepo,
 | 
							sensorDataRepo: sensorDataRepo,
 | 
				
			||||||
		feedPort:       deviceService, // 直接注入
 | 
							feedPort:       deviceService,
 | 
				
			||||||
		logger:         logger,
 | 
							logger:         logger,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,30 +1,43 @@
 | 
				
			|||||||
package task
 | 
					package task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/domain/scheduler"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | 
				
			||||||
	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Task 定义了所有可被调度器执行的任务必须实现的接口。
 | 
					type taskFactory struct {
 | 
				
			||||||
type Task interface {
 | 
						logger         *logs.Logger
 | 
				
			||||||
	// Execute 是任务的核心执行逻辑。
 | 
						sensorDataRepo repository.SensorDataRepository
 | 
				
			||||||
	// ctx: 用于控制任务的超时或取消。
 | 
						deviceRepo     repository.DeviceRepository
 | 
				
			||||||
	// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
 | 
						deviceService  device.Service
 | 
				
			||||||
	// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
 | 
					 | 
				
			||||||
	Execute() error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
 | 
					 | 
				
			||||||
	// log: 任务执行的上下文。
 | 
					 | 
				
			||||||
	// executeErr: 从 Execute 方法返回的原始错误。
 | 
					 | 
				
			||||||
	OnFailure(executeErr error)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TaskFactory 是一个任务组装工厂, 可以根据Task类型获取到对应的初始化函数
 | 
					func NewTaskFactory(
 | 
				
			||||||
var TaskFactory = func(tt models.TaskType) Task {
 | 
						logger *logs.Logger,
 | 
				
			||||||
	switch tt {
 | 
						sensorDataRepo repository.SensorDataRepository,
 | 
				
			||||||
 | 
						deviceRepo repository.DeviceRepository,
 | 
				
			||||||
 | 
						deviceService device.Service,
 | 
				
			||||||
 | 
					) scheduler.TaskFactory {
 | 
				
			||||||
 | 
						return &taskFactory{
 | 
				
			||||||
 | 
							logger:         logger,
 | 
				
			||||||
 | 
							sensorDataRepo: sensorDataRepo,
 | 
				
			||||||
 | 
							deviceRepo:     deviceRepo,
 | 
				
			||||||
 | 
							deviceService:  deviceService,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *taskFactory) Production(claimedLog *models.TaskExecutionLog) scheduler.Task {
 | 
				
			||||||
 | 
						switch claimedLog.Task.Type {
 | 
				
			||||||
	case models.TaskTypeWaiting:
 | 
						case models.TaskTypeWaiting:
 | 
				
			||||||
		return &DelayTask{}
 | 
							return NewDelayTask(t.logger, claimedLog)
 | 
				
			||||||
 | 
						case models.TaskTypeReleaseFeedWeight:
 | 
				
			||||||
 | 
							return NewReleaseFeedWeightTask(claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService, t.logger)
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		// 出现位置任务类型说明业务逻辑出现重大问题, 一个异常任务被创建了出来
 | 
							// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
 | 
				
			||||||
		panic("发现未知任务类型")
 | 
							t.logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type)
 | 
				
			||||||
 | 
							panic("不支持的任务类型") // 显式panic防编译器报错
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user