定义task接口和delay_task实现
This commit is contained in:
@@ -1,48 +1,63 @@
|
|||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ParamsDelayDuration = "delay_duration"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DelayTask 是一个用于模拟延迟的 Task 实现
|
// DelayTask 是一个用于模拟延迟的 Task 实现
|
||||||
type DelayTask struct {
|
type DelayTask struct {
|
||||||
id string
|
executionTask *models.TaskExecutionLog
|
||||||
duration time.Duration
|
duration time.Duration
|
||||||
priority int
|
logger *logs.Logger
|
||||||
done bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelayTask 创建一个新的 DelayTask 实例
|
// NewDelayTask 创建一个新的 DelayTask 实例
|
||||||
func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask {
|
func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) *DelayTask {
|
||||||
return &DelayTask{
|
return &DelayTask{
|
||||||
id: id,
|
executionTask: executionTask,
|
||||||
duration: duration,
|
logger: logger,
|
||||||
priority: priority,
|
|
||||||
done: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute 执行延迟任务,等待指定的时间
|
// Execute 执行延迟任务,等待指定的时间
|
||||||
func (d *DelayTask) Execute() error {
|
func (d *DelayTask) Execute() error {
|
||||||
fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration)
|
d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration)
|
||||||
time.Sleep(d.duration)
|
time.Sleep(d.duration)
|
||||||
fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription())
|
d.logger.Infof("任务 %v: 延迟结束。\n", d.executionTask.TaskID)
|
||||||
d.done = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetID 获取任务ID
|
func (d *DelayTask) ParseParams() error {
|
||||||
func (d *DelayTask) GetID() string {
|
if d.executionTask.Task.Parameters == nil {
|
||||||
return d.id
|
d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID)
|
||||||
|
return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsDone 检查任务是否已完成
|
var params map[string]interface{}
|
||||||
func (d *DelayTask) IsDone() bool {
|
if err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms); err != nil {
|
||||||
return d.done
|
d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
||||||
|
return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDescription 获取任务说明,根据任务ID和延迟时间生成
|
duration, ok := params[ParamsDelayDuration].(float64)
|
||||||
func (d *DelayTask) GetDescription() string {
|
if !ok {
|
||||||
return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration)
|
d.logger.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration)
|
||||||
|
return fmt.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.duration = time.Duration(duration)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DelayTask) OnFailure(executeErr error) {
|
||||||
|
d.logger.Errorf("任务 %v: 执行失败: %v", d.executionTask.TaskID, executeErr)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,5 +9,8 @@ func GetAllModels() []interface{} {
|
|||||||
&Plan{},
|
&Plan{},
|
||||||
&SubPlan{},
|
&SubPlan{},
|
||||||
&Task{},
|
&Task{},
|
||||||
|
&PlanExecutionLog{},
|
||||||
|
&TaskExecutionLog{},
|
||||||
|
&PendingTask{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1 +1,18 @@
|
|||||||
package task
|
package task
|
||||||
|
|
||||||
|
// Task 定义了所有可被调度器执行的任务必须实现的接口。
|
||||||
|
type Task interface {
|
||||||
|
// Execute 是任务的核心执行逻辑。
|
||||||
|
// ctx: 用于控制任务的超时或取消。
|
||||||
|
// log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。
|
||||||
|
// 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。
|
||||||
|
Execute() error
|
||||||
|
|
||||||
|
// ParseParams 解析参数
|
||||||
|
ParseParams() error
|
||||||
|
|
||||||
|
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。
|
||||||
|
// log: 任务执行的上下文。
|
||||||
|
// executeErr: 从 Execute 方法返回的原始错误。
|
||||||
|
OnFailure(executeErr error)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user