定义执行历史和执行队列model, 以及基础的增删改查功能

This commit is contained in:
2025-09-16 23:11:07 +08:00
parent 8980c29b07
commit 3271f820d4
7 changed files with 264 additions and 6 deletions

View File

@@ -1,8 +1,12 @@
// TODO 列表 // TODO 列表
可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能 // TODO 可以实现的问题
1. 可以用TimescaleDB代替PGSQL, 优化传感器数据存储性能
2. 系统启动时应该检查一遍执行历史库, 将所有显示为执行中的任务都修正为执行失败并报错
目前设备都只对应一个地址, 但实际上如电磁两位五通阀等设备是需要用两个开关控制的 // TODO 暂时实现不了的问题
Task调度器目前只能一个任务一个任务执行, 但实际上有些任务需要并发执行, 如开启下料口时需要不断从料筒称重传感器读取数据 1. 目前设备都只对应一个地址, 但实际上如电磁两位五通阀等设备是需要用两个开关控制的
ListenHandler 的实现遇到问题只能panic, 没有处理错误 2. Task调度器目前只能一个任务一个任务执行, 但实际上有些任务需要并发执行, 如开启下料口时需要不断从料筒称重传感器读取数据
暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功 3. ListenHandler 的实现遇到问题只能panic, 没有处理错误
4. 暂时不考虑和区域主控间的同步消息, 假设所有消息都是异步的, 这可能导致无法知道指令是否执行成功
5. 如果系统停机时间很长, 待执行任务表中的任务过期了怎么办, 目前没有任务过期机制

View File

@@ -0,0 +1,9 @@
package task
// PlanAnalysisTask 用于在任务执行队列中触发一个plan的执行
// 该任务会解析plan生成扁平化的待执行任务表, 并将任务列表插入任务执行队列
// 该任务会预写入plan所有待执行任务的执行日志
// 每个plan执行完毕时 或 创建plan时 都应该重新创建一个 PlanAnalysisTask 以便触发下次plan执行
// 更新plan后应当更新对应 PlanAnalysisTask
type PlanAnalysisTask struct {
}

View File

@@ -0,0 +1,75 @@
package models
import (
"time"
"gorm.io/gorm"
)
// 定义系统任务的特殊ID
const (
SystemTaskIDResolvePlan int = -1 // 代表“解析计划”的系统任务
)
type ExecutionStatus string
const (
ExecutionStatusStarted ExecutionStatus = "started" // 开始执行
ExecutionStatusCompleted ExecutionStatus = "completed" // 执行完成
ExecutionStatusFailed ExecutionStatus = "failed" // 执行失败
ExecutionStatusCancelled ExecutionStatus = "cancelled" // 执行取消
ExecutionStatusWaiting ExecutionStatus = "waiting" // 等待执行 (用于预写日志)
)
// PlanExecutionLog 记录整个计划的一次执行历史
type PlanExecutionLog struct {
gorm.Model
PlanID uint `gorm:"index"`
Status ExecutionStatus
StartedAt time.Time
EndedAt time.Time
Error string
}
// TableName 自定义 GORM 使用的数据库表名
func (PlanExecutionLog) TableName() string {
return "plan_execution_logs"
}
// TaskExecutionLog 记录单个任务的一次执行历史
type TaskExecutionLog struct {
gorm.Model
PlanExecutionLogID uint `gorm:"index"` // 关联到某次计划执行
// TaskID 使用 int 类型以容纳特殊的负数ID代表系统任务
TaskID int `gorm:"index"`
// 关键改动:移除了 OnDelete 约束。
Task Task `gorm:"foreignKey:TaskID;constraint:OnUpdate:CASCADE;"`
Status ExecutionStatus
Output string // 任务执行的输出或错误信息
StartedAt time.Time
EndedAt time.Time
}
// TableName 自定义 GORM 使用的数据库表名
func (TaskExecutionLog) TableName() string {
return "task_execution_logs"
}
// AfterFind 是 GORM 的一个钩子,在查询数据后自动执行
// 我们用它来优雅地处理系统任务的“虚拟”Task定义
func (log *TaskExecutionLog) AfterFind(tx *gorm.DB) (err error) {
// 检查是否是我们的“解析计划”系统任务
if log.TaskID == SystemTaskIDResolvePlan {
// 如果是,手动创建一个写死的 Task 定义并绑定上去
// 这使得上层服务在处理日志时无需关心TaskID是否为负数
log.Task = Task{
// 注意:这里不能设置 ID否则 GORM 可能会混淆
Name: "系统:解析并启动计划",
Description: "这是一个由系统自动触发的内部任务,用于准备计划的执行。",
}
}
return
}

View File

@@ -3,6 +3,7 @@ package models
import ( import (
"fmt" "fmt"
"sort" "sort"
"time"
"gorm.io/datatypes" "gorm.io/datatypes"
"gorm.io/gorm" "gorm.io/gorm"
@@ -133,7 +134,11 @@ func (SubPlan) TableName() string {
// Task 代表计划中的一个任务,具有执行顺序 // Task 代表计划中的一个任务,具有执行顺序
type Task struct { type Task struct {
gorm.Model // 手动定义字段以将 ID 类型设置为 int以匹配 TaskExecutionLog 中的 TaskID
ID int `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt gorm.DeletedAt `gorm:"index"` // 保持软删除功能
PlanID uint `gorm:"not null;index" json:"plan_id"` // 此任务所属计划的ID PlanID uint `gorm:"not null;index" json:"plan_id"` // 此任务所属计划的ID
Name string `gorm:"not null" json:"name"` Name string `gorm:"not null" json:"name"`

View File

@@ -0,0 +1,32 @@
package models
import (
"time"
)
// PendingTask 是一个待执行任务队列, 里面会储存待执行的Task以及这个Task什么时候执行
// 它是一个纯粹的工作队列,任务被认领后即被删除。
type PendingTask struct {
// 手动填充必须字段以实现硬删除,不内嵌 gorm.Model
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
// 业务字段
// TaskID 使用 int 类型以容纳特殊的负数ID代表系统任务
TaskID int `gorm:"index"`
ExecuteAt time.Time `gorm:"index"` // 任务执行时间
TaskExecutionLogID uint `gorm:"unique;not null"` // 对应的执行历史记录ID
// 关联关系定义
// 通过 TaskExecutionLogID 关联到唯一的 TaskExecutionLog 记录
// ON DELETE CASCADE 确保如果日志被删除,这个待办任务也会被自动清理
TaskExecutionLog TaskExecutionLog `gorm:"foreignKey:TaskExecutionLogID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
}
// TableName 自定义 GORM 使用的数据库表名
func (PendingTask) TableName() string {
return "pending_tasks"
}

View File

@@ -0,0 +1,51 @@
package repository
import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
)
// ExecutionLogRepository 定义了与执行日志交互的接口。
// 这为服务层提供了一个清晰的契约,并允许在测试中轻松地进行模拟。
type ExecutionLogRepository interface {
CreatePlanExecutionLog(log *models.PlanExecutionLog) error
UpdatePlanExecutionLog(log *models.PlanExecutionLog) error
CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error
UpdateTaskExecutionLog(log *models.TaskExecutionLog) error
}
// executionLogRepository 是使用 GORM 的具体实现。
type executionLogRepository struct {
db *gorm.DB
}
// NewExecutionLogRepository 创建一个新的执行日志仓库。
// 它接收一个 GORM DB 实例作为依赖。
func NewExecutionLogRepository(db *gorm.DB) ExecutionLogRepository {
return &executionLogRepository{db: db}
}
// CreatePlanExecutionLog 为一次计划执行创建一条新的日志条目。
func (r *executionLogRepository) CreatePlanExecutionLog(log *models.PlanExecutionLog) error {
return r.db.Create(log).Error
}
// UpdatePlanExecutionLog 使用 Save 方法全量更新一个计划执行日志。
// GORM 的 Save 会自动根据主键是否存在来决定是执行 UPDATE 还是 INSERT。
// 在这里,我们期望传入的对象一定包含一个有效的 ID。
func (r *executionLogRepository) UpdatePlanExecutionLog(log *models.PlanExecutionLog) error {
return r.db.Save(log).Error
}
// CreateTaskExecutionLogsInBatch 在一次数据库调用中创建多个任务执行日志条目。
// 这是“预写日志”步骤的关键。
func (r *executionLogRepository) CreateTaskExecutionLogsInBatch(logs []*models.TaskExecutionLog) error {
// GORM 的 Create 传入一个切片指针会执行批量插入。
return r.db.Create(&logs).Error
}
// UpdateTaskExecutionLog 使用 Save 方法全量更新一个任务执行日志。
// 这种方式代码更直观,上层服务可以直接修改模型对象后进行保存。
func (r *executionLogRepository) UpdateTaskExecutionLog(log *models.TaskExecutionLog) error {
return r.db.Save(log).Error
}

View File

@@ -0,0 +1,82 @@
package repository
import (
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
type PendingTaskRepository interface {
CreatePendingTasksInBatch(tasks []*models.PendingTask) error
ClaimNextDueTask() (*models.TaskExecutionLog, error)
}
// pendingTaskRepository 是使用 GORM 的具体实现。
type pendingTaskRepository struct {
db *gorm.DB
}
// NewPendingTaskRepository 创建一个新的待执行任务队列仓库。
// 它接收一个 GORM DB 实例作为依赖。
func NewPendingTaskRepository(db *gorm.DB) PendingTaskRepository {
return &pendingTaskRepository{db: db}
}
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
func (r *pendingTaskRepository) CreatePendingTasksInBatch(tasks []*models.PendingTask) error {
return r.db.Create(&tasks).Error
}
// ClaimNextDueTask 以原子方式认领下一个到期任务。
// 它在一个事务中完成三件事:
// 1. 查找并锁定一个到期的待办任务。
// 2. 从待办队列中将其删除。
// 3. 将其在执行日志表中的状态更新为 'running'。
// 最后返回更新后的执行日志对象,作为执行的唯一凭证。
func (r *pendingTaskRepository) ClaimNextDueTask() (*models.TaskExecutionLog, error) {
var log models.TaskExecutionLog
err := r.db.Transaction(func(tx *gorm.DB) error {
var pendingTask models.PendingTask
// 1. 查找并用 FOR UPDATE 锁定一个到期的待办任务
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("execute_at <= ?", time.Now()).
Order("execute_at ASC").
First(&pendingTask).Error; err != nil {
return err // 没找到是正常情况,事务将回滚
}
// 2. 在同一个事务中,立刻硬删除这个待办任务
if err := tx.Unscoped().Delete(&pendingTask).Error; err != nil {
return err
}
// 3. 在同一个事务中,更新其在日志表中的孪生兄弟的状态
updates := map[string]interface{}{
"status": models.ExecutionStatusStarted,
"started_at": time.Now(),
}
if err := tx.Model(&models.TaskExecutionLog{}).Where("id = ?", pendingTask.TaskExecutionLogID).Updates(updates).Error; err != nil {
return err
}
// 4. 获取更新后的完整日志对象,以返回给调用者
if err := tx.First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {
return err
}
return nil
})
if err != nil {
// 如果错误是 `gorm.ErrRecordNotFound`,它仅表示当前没有到期的任务。
// 服务层应该优雅地处理这种情况,而不是将其视为需要立即处理的严重错误。
return nil, err
}
return &log, nil
}