diff --git a/internal/app/service/task/delay_task.go b/internal/app/service/task/delay_task.go index a39779a..d6be350 100644 --- a/internal/app/service/task/delay_task.go +++ b/internal/app/service/task/delay_task.go @@ -1,48 +1,63 @@ package task import ( + "encoding/json" "fmt" "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" +) + +const ( + ParamsDelayDuration = "delay_duration" ) // DelayTask 是一个用于模拟延迟的 Task 实现 type DelayTask struct { - id string - duration time.Duration - priority int - done bool + executionTask *models.TaskExecutionLog + duration time.Duration + logger *logs.Logger } // NewDelayTask 创建一个新的 DelayTask 实例 -func NewDelayTask(id string, duration time.Duration, priority int) *DelayTask { +func NewDelayTask(logger *logs.Logger, executionTask *models.TaskExecutionLog) *DelayTask { return &DelayTask{ - id: id, - duration: duration, - priority: priority, - done: false, + executionTask: executionTask, + logger: logger, } } // Execute 执行延迟任务,等待指定的时间 func (d *DelayTask) Execute() error { - fmt.Printf("任务 %s (%s): 开始延迟 %s...\n", d.id, d.GetDescription(), d.duration) + d.logger.Infof("任务 %v: 开始延迟 %v...", d.executionTask.TaskID, d.duration) time.Sleep(d.duration) - fmt.Printf("任务 %s (%s): 延迟结束。\n", d.id, d.GetDescription()) - d.done = true + d.logger.Infof("任务 %v: 延迟结束。\n", d.executionTask.TaskID) return nil } -// GetID 获取任务ID -func (d *DelayTask) GetID() string { - return d.id +func (d *DelayTask) ParseParams() error { + if d.executionTask.Task.Parameters == nil { + d.logger.Errorf("任务 %v: 缺少参数", d.executionTask.TaskID) + return fmt.Errorf("任务 %v: 参数不全", d.executionTask.TaskID) + } + + var params map[string]interface{} + if err := json.Unmarshal(d.executionTask.Task.Parameters, ¶ms); err != nil { + d.logger.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) + return fmt.Errorf("任务 %v: 解析参数失败: %v", d.executionTask.TaskID, err) + } + + duration, ok := params[ParamsDelayDuration].(float64) + if !ok { + d.logger.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) + return fmt.Errorf("任务 %v: 参数 %v 不是数字或不存在", d.executionTask.TaskID, ParamsDelayDuration) + } + + d.duration = time.Duration(duration) + return nil } -// IsDone 检查任务是否已完成 -func (d *DelayTask) IsDone() bool { - return d.done -} - -// GetDescription 获取任务说明,根据任务ID和延迟时间生成 -func (d *DelayTask) GetDescription() string { - return fmt.Sprintf("延迟任务,ID: %s,延迟时间: %s", d.id, d.duration) +func (d *DelayTask) OnFailure(executeErr error) { + d.logger.Errorf("任务 %v: 执行失败: %v", d.executionTask.TaskID, executeErr) } diff --git a/internal/infra/models/models.go b/internal/infra/models/models.go index 37cc5c5..4d542da 100644 --- a/internal/infra/models/models.go +++ b/internal/infra/models/models.go @@ -9,5 +9,8 @@ func GetAllModels() []interface{} { &Plan{}, &SubPlan{}, &Task{}, + &PlanExecutionLog{}, + &TaskExecutionLog{}, + &PendingTask{}, } } diff --git a/internal/infra/task/task.go b/internal/infra/task/task.go index b028dfd..3455e20 100644 --- a/internal/infra/task/task.go +++ b/internal/infra/task/task.go @@ -1 +1,18 @@ package task + +// Task 定义了所有可被调度器执行的任务必须实现的接口。 +type Task interface { + // Execute 是任务的核心执行逻辑。 + // ctx: 用于控制任务的超时或取消。 + // log: 包含了当前任务执行的完整上下文信息,包括从数据库中加载的任务参数等。 + // 返回的 error 表示任务是否执行成功。调度器会根据返回的 error 是否为 nil 来决定任务状态。 + Execute() error + + // ParseParams 解析参数 + ParseParams() error + + // OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑。 + // log: 任务执行的上下文。 + // executeErr: 从 Execute 方法返回的原始错误。 + OnFailure(executeErr error) +}