Compare commits
	
		
			2 Commits
		
	
	
		
			cf00a74008
			...
			f9bc4d6326
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| f9bc4d6326 | |||
| 2aa2e9bbdd | 
							
								
								
									
										41
									
								
								internal/logs/logs.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								internal/logs/logs.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,41 @@
 | 
				
			|||||||
 | 
					// Package logs 提供统一的日志记录功能
 | 
				
			||||||
 | 
					// 支持不同级别的日志记录和格式化输出
 | 
				
			||||||
 | 
					package logs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Logger 代表日志记录器结构
 | 
				
			||||||
 | 
					type Logger struct {
 | 
				
			||||||
 | 
						// logger 内部日志记录器
 | 
				
			||||||
 | 
						logger *log.Logger
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewLogger 创建并返回一个新的日志记录器实例
 | 
				
			||||||
 | 
					func NewLogger() *Logger {
 | 
				
			||||||
 | 
						return &Logger{
 | 
				
			||||||
 | 
							logger: log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Info 记录信息级别日志
 | 
				
			||||||
 | 
					func (l *Logger) Info(message string) {
 | 
				
			||||||
 | 
						l.logger.Printf("[INFO] %s", message)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Error 记录错误级别日志
 | 
				
			||||||
 | 
					func (l *Logger) Error(message string) {
 | 
				
			||||||
 | 
						l.logger.Printf("[ERROR] %s", message)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Debug 记录调试级别日志
 | 
				
			||||||
 | 
					func (l *Logger) Debug(message string) {
 | 
				
			||||||
 | 
						l.logger.Printf("[DEBUG] %s", message)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Warn 记录警告级别日志
 | 
				
			||||||
 | 
					func (l *Logger) Warn(message string) {
 | 
				
			||||||
 | 
						l.logger.Printf("[WARN] %s", message)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -5,9 +5,9 @@ package db
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"log"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/logs"
 | 
				
			||||||
	"gorm.io/driver/postgres"
 | 
						"gorm.io/driver/postgres"
 | 
				
			||||||
	"gorm.io/gorm"
 | 
						"gorm.io/gorm"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -29,6 +29,9 @@ type PostgresStorage struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// connMaxLifetime 连接最大生命周期(秒)
 | 
						// connMaxLifetime 连接最大生命周期(秒)
 | 
				
			||||||
	connMaxLifetime int
 | 
						connMaxLifetime int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// logger 日志记录器
 | 
				
			||||||
 | 
						logger *logs.Logger
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
 | 
					// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
 | 
				
			||||||
@@ -39,25 +42,31 @@ func NewPostgresStorage(connectionString string, maxOpenConns, maxIdleConns, con
 | 
				
			|||||||
		maxOpenConns:     maxOpenConns,
 | 
							maxOpenConns:     maxOpenConns,
 | 
				
			||||||
		maxIdleConns:     maxIdleConns,
 | 
							maxIdleConns:     maxIdleConns,
 | 
				
			||||||
		connMaxLifetime:  connMaxLifetime,
 | 
							connMaxLifetime:  connMaxLifetime,
 | 
				
			||||||
 | 
							logger:           logs.NewLogger(),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Connect 建立与PostgreSQL数据库的连接
 | 
					// Connect 建立与PostgreSQL数据库的连接
 | 
				
			||||||
// 使用GORM建立数据库连接
 | 
					// 使用GORM建立数据库连接
 | 
				
			||||||
func (ps *PostgresStorage) Connect() error {
 | 
					func (ps *PostgresStorage) Connect() error {
 | 
				
			||||||
 | 
						ps.logger.Info("Connecting to PostgreSQL database")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{})
 | 
						ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 | 
							ps.logger.Error(fmt.Sprintf("Failed to connect to database: %v", err))
 | 
				
			||||||
		return fmt.Errorf("failed to connect to database: %v", err)
 | 
							return fmt.Errorf("failed to connect to database: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 测试连接
 | 
						// 测试连接
 | 
				
			||||||
	sqlDB, err := ps.db.DB()
 | 
						sqlDB, err := ps.db.DB()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 | 
							ps.logger.Error(fmt.Sprintf("Failed to get database instance: %v", err))
 | 
				
			||||||
		return fmt.Errorf("failed to get database instance: %v", err)
 | 
							return fmt.Errorf("failed to get database instance: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err = sqlDB.Ping(); err != nil {
 | 
						if err = sqlDB.Ping(); err != nil {
 | 
				
			||||||
 | 
							ps.logger.Error(fmt.Sprintf("Failed to ping database: %v", err))
 | 
				
			||||||
		return fmt.Errorf("failed to ping database: %v", err)
 | 
							return fmt.Errorf("failed to ping database: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -66,7 +75,7 @@ func (ps *PostgresStorage) Connect() error {
 | 
				
			|||||||
	sqlDB.SetMaxIdleConns(ps.maxIdleConns)
 | 
						sqlDB.SetMaxIdleConns(ps.maxIdleConns)
 | 
				
			||||||
	sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
 | 
						sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	log.Println("Successfully connected to PostgreSQL database")
 | 
						ps.logger.Info("Successfully connected to PostgreSQL database")
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -74,15 +83,19 @@ func (ps *PostgresStorage) Connect() error {
 | 
				
			|||||||
// 安全地关闭所有数据库连接
 | 
					// 安全地关闭所有数据库连接
 | 
				
			||||||
func (ps *PostgresStorage) Disconnect() error {
 | 
					func (ps *PostgresStorage) Disconnect() error {
 | 
				
			||||||
	if ps.db != nil {
 | 
						if ps.db != nil {
 | 
				
			||||||
 | 
							ps.logger.Info("Disconnecting from PostgreSQL database")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		sqlDB, err := ps.db.DB()
 | 
							sqlDB, err := ps.db.DB()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
 | 
								ps.logger.Error(fmt.Sprintf("Failed to get database instance: %v", err))
 | 
				
			||||||
			return fmt.Errorf("failed to get database instance: %v", err)
 | 
								return fmt.Errorf("failed to get database instance: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := sqlDB.Close(); err != nil {
 | 
							if err := sqlDB.Close(); err != nil {
 | 
				
			||||||
 | 
								ps.logger.Error(fmt.Sprintf("Failed to close database connection: %v", err))
 | 
				
			||||||
			return fmt.Errorf("failed to close database connection: %v", err)
 | 
								return fmt.Errorf("failed to close database connection: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		log.Println("Successfully disconnected from PostgreSQL database")
 | 
							ps.logger.Info("Successfully disconnected from PostgreSQL database")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										129
									
								
								internal/task/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								internal/task/README.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,129 @@
 | 
				
			|||||||
 | 
					# Task包使用说明
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 概述
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Task包提供了一个基本的任务队列和执行框架,用于管理、调度和执行各种控制任务。它支持并发执行、任务优先级和优雅的任务管理。
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 核心组件
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 1. Task接口
 | 
				
			||||||
 | 
					所有任务都需要实现Task接口,包含以下方法:
 | 
				
			||||||
 | 
					- `Execute() error` - 执行任务
 | 
				
			||||||
 | 
					- `GetID() string` - 获取任务ID
 | 
				
			||||||
 | 
					- `GetPriority() int` - 获取任务优先级
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 2. Executor(执行器)
 | 
				
			||||||
 | 
					负责管理任务队列和执行任务,支持并发执行。
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 3. TaskQueue(任务队列)
 | 
				
			||||||
 | 
					用于存储和管理待执行的任务。
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 使用方法
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 1. 实现任务
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					首先,需要实现Task接口来创建自定义任务:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					type MyTask struct {
 | 
				
			||||||
 | 
					    id       string
 | 
				
			||||||
 | 
					    priority int
 | 
				
			||||||
 | 
					    // 其他任务特定字段
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *MyTask) Execute() error {
 | 
				
			||||||
 | 
					    // 实现任务逻辑
 | 
				
			||||||
 | 
					    return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *MyTask) GetID() string {
 | 
				
			||||||
 | 
					    return t.id
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *MyTask) GetPriority() int {
 | 
				
			||||||
 | 
					    return t.priority
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 2. 创建和启动执行器
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					// 创建执行器,指定工作协程数量
 | 
				
			||||||
 | 
					executor := task.NewExecutor(5) // 5个工作协程
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 启动执行器
 | 
				
			||||||
 | 
					executor.Start()
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 3. 提交任务
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					// 创建任务实例
 | 
				
			||||||
 | 
					myTask := NewMyTask("task-1", 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 提交任务到执行器
 | 
				
			||||||
 | 
					executor.SubmitTask(myTask)
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					### 4. 停止执行器
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					// 停止执行器(会等待所有正在执行的任务完成)
 | 
				
			||||||
 | 
					executor.Stop()
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 处理定时循环任务
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					对于定时循环任务,建议采用以下方式:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					1. 使用`time.Ticker`定期创建任务
 | 
				
			||||||
 | 
					2. 将任务提交到执行器
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					func RunScheduledTasks(executor *task.Executor) {
 | 
				
			||||||
 | 
					    // 启动一个协程来定期提交定时任务
 | 
				
			||||||
 | 
					    go func() {
 | 
				
			||||||
 | 
					        ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次
 | 
				
			||||||
 | 
					        defer ticker.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for {
 | 
				
			||||||
 | 
					            select {
 | 
				
			||||||
 | 
					            case <-ticker.C:
 | 
				
			||||||
 | 
					                // 创建定时任务并提交
 | 
				
			||||||
 | 
					                scheduledTask := NewScheduledTask("scheduled-task", 1)
 | 
				
			||||||
 | 
					                executor.SubmitTask(scheduledTask)
 | 
				
			||||||
 | 
					            case <-executor.ctx.Done():
 | 
				
			||||||
 | 
					                return // 执行器已停止
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 处理互不相关的独立任务
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					对于互不相关的独立任务,可以直接创建并提交:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					```go
 | 
				
			||||||
 | 
					// 创建多个独立任务
 | 
				
			||||||
 | 
					task1 := NewIndependentTask("task-1", "data1", 1)
 | 
				
			||||||
 | 
					task2 := NewIndependentTask("task-2", "data2", 2)
 | 
				
			||||||
 | 
					task3 := NewIndependentTask("task-3", "data3", 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// 提交所有任务
 | 
				
			||||||
 | 
					executor.SubmitTask(task1)
 | 
				
			||||||
 | 
					executor.SubmitTask(task2)
 | 
				
			||||||
 | 
					executor.SubmitTask(task3)
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 最佳实践
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					1. **合理设置工作协程数量**:根据系统资源和任务特性设置适当的工作协程数量
 | 
				
			||||||
 | 
					2. **正确处理任务错误**:在任务的Execute方法中正确处理和返回错误
 | 
				
			||||||
 | 
					3. **合理设置任务优先级**:重要的任务可以设置更高的优先级
 | 
				
			||||||
 | 
					4. **优雅关闭**:使用Stop方法确保所有任务都能正确完成
 | 
				
			||||||
 | 
					5. **避免任务阻塞**:任务执行时间过长会阻塞工作协程
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## 示例
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					请参考 [example_task.go](./example_task.go) 和 [usage_example.go](./usage_example.go) 文件获取完整的使用示例。
 | 
				
			||||||
@@ -1,16 +0,0 @@
 | 
				
			|||||||
// Package task 提供任务执行器功能
 | 
					 | 
				
			||||||
// 负责执行各种控制任务,如环境调节、饲料投放等
 | 
					 | 
				
			||||||
// 管理任务队列和并发执行
 | 
					 | 
				
			||||||
package task
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Executor 代表任务执行器
 | 
					 | 
				
			||||||
// 负责调度和执行所有控制任务
 | 
					 | 
				
			||||||
type Executor struct {
 | 
					 | 
				
			||||||
	// TODO: 定义任务执行器结构
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewExecutor 创建并返回一个新的任务执行器实例
 | 
					 | 
				
			||||||
func NewExecutor() *Executor {
 | 
					 | 
				
			||||||
	// TODO: 实现任务执行器初始化
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
							
								
								
									
										222
									
								
								internal/task/task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										222
									
								
								internal/task/task.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,222 @@
 | 
				
			|||||||
 | 
					// Package task 提供任务队列和执行框架
 | 
				
			||||||
 | 
					// 负责管理任务队列、调度和执行各种控制任务
 | 
				
			||||||
 | 
					package task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"container/heap"
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.huangwc.com/pig/pig-farm-controller/internal/logs"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Task 代表一个任务接口
 | 
				
			||||||
 | 
					// 所有任务都需要实现此接口
 | 
				
			||||||
 | 
					type Task interface {
 | 
				
			||||||
 | 
						// Execute 执行任务
 | 
				
			||||||
 | 
						Execute() error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// GetID 获取任务ID
 | 
				
			||||||
 | 
						GetID() string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// GetPriority 获取任务优先级
 | 
				
			||||||
 | 
						GetPriority() int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// taskItem 任务队列中的元素
 | 
				
			||||||
 | 
					type taskItem struct {
 | 
				
			||||||
 | 
						task     Task
 | 
				
			||||||
 | 
						priority int
 | 
				
			||||||
 | 
						index    int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TaskQueue 代表任务队列
 | 
				
			||||||
 | 
					type TaskQueue struct {
 | 
				
			||||||
 | 
						// queue 任务队列(按优先级排序)
 | 
				
			||||||
 | 
						queue *priorityQueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// mutex 互斥锁
 | 
				
			||||||
 | 
						mutex sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// logger 日志记录器
 | 
				
			||||||
 | 
						logger *logs.Logger
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewTaskQueue 创建并返回一个新的任务队列实例
 | 
				
			||||||
 | 
					func NewTaskQueue() *TaskQueue {
 | 
				
			||||||
 | 
						pq := make(priorityQueue, 0)
 | 
				
			||||||
 | 
						heap.Init(&pq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &TaskQueue{
 | 
				
			||||||
 | 
							queue:  &pq,
 | 
				
			||||||
 | 
							logger: logs.NewLogger(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// AddTask 向队列中添加任务
 | 
				
			||||||
 | 
					func (tq *TaskQueue) AddTask(task Task) {
 | 
				
			||||||
 | 
						tq.mutex.Lock()
 | 
				
			||||||
 | 
						defer tq.mutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						item := &taskItem{
 | 
				
			||||||
 | 
							task:     task,
 | 
				
			||||||
 | 
							priority: task.GetPriority(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						heap.Push(tq.queue, item)
 | 
				
			||||||
 | 
						tq.logger.Info("Task added to queue: " + task.GetID())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetNextTask 获取下一个要执行的任务(优先级最高的任务)
 | 
				
			||||||
 | 
					func (tq *TaskQueue) GetNextTask() Task {
 | 
				
			||||||
 | 
						tq.mutex.Lock()
 | 
				
			||||||
 | 
						defer tq.mutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if tq.queue.Len() == 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 获取优先级最高的任务
 | 
				
			||||||
 | 
						item := heap.Pop(tq.queue).(*taskItem)
 | 
				
			||||||
 | 
						tq.logger.Info("Task retrieved from queue: " + item.task.GetID())
 | 
				
			||||||
 | 
						return item.task
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetTaskCount 获取队列中的任务数量
 | 
				
			||||||
 | 
					func (tq *TaskQueue) GetTaskCount() int {
 | 
				
			||||||
 | 
						tq.mutex.Lock()
 | 
				
			||||||
 | 
						defer tq.mutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return tq.queue.Len()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// priorityQueue 实现优先级队列
 | 
				
			||||||
 | 
					type priorityQueue []*taskItem
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pq priorityQueue) Len() int { return len(pq) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Less 优先级小的优先级更高
 | 
				
			||||||
 | 
					func (pq priorityQueue) Less(i, j int) bool {
 | 
				
			||||||
 | 
						return pq[i].priority < pq[j].priority
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pq priorityQueue) Swap(i, j int) {
 | 
				
			||||||
 | 
						pq[i], pq[j] = pq[j], pq[i]
 | 
				
			||||||
 | 
						pq[i].index = i
 | 
				
			||||||
 | 
						pq[j].index = j
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pq *priorityQueue) Push(x interface{}) {
 | 
				
			||||||
 | 
						n := len(*pq)
 | 
				
			||||||
 | 
						item := x.(*taskItem)
 | 
				
			||||||
 | 
						item.index = n
 | 
				
			||||||
 | 
						*pq = append(*pq, item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pq *priorityQueue) Pop() interface{} {
 | 
				
			||||||
 | 
						old := *pq
 | 
				
			||||||
 | 
						n := len(old)
 | 
				
			||||||
 | 
						item := old[n-1]
 | 
				
			||||||
 | 
						old[n-1] = nil  // 避免内存泄漏
 | 
				
			||||||
 | 
						item.index = -1 // 无效索引
 | 
				
			||||||
 | 
						*pq = old[0 : n-1]
 | 
				
			||||||
 | 
						return item
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Executor 代表任务执行器
 | 
				
			||||||
 | 
					type Executor struct {
 | 
				
			||||||
 | 
						// taskQueue 任务队列
 | 
				
			||||||
 | 
						taskQueue *TaskQueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// workers 工作协程数量
 | 
				
			||||||
 | 
						workers int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ctx 执行上下文
 | 
				
			||||||
 | 
						ctx context.Context
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// cancel 取消函数
 | 
				
			||||||
 | 
						cancel context.CancelFunc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// wg 等待组
 | 
				
			||||||
 | 
						wg sync.WaitGroup
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// logger 日志记录器
 | 
				
			||||||
 | 
						logger *logs.Logger
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewExecutor 创建并返回一个新的任务执行器实例
 | 
				
			||||||
 | 
					func NewExecutor(workers int) *Executor {
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &Executor{
 | 
				
			||||||
 | 
							taskQueue: NewTaskQueue(),
 | 
				
			||||||
 | 
							workers:   workers,
 | 
				
			||||||
 | 
							ctx:       ctx,
 | 
				
			||||||
 | 
							cancel:    cancel,
 | 
				
			||||||
 | 
							logger:    logs.NewLogger(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Start 启动任务执行器
 | 
				
			||||||
 | 
					func (e *Executor) Start() {
 | 
				
			||||||
 | 
						e.logger.Info(fmt.Sprintf("Starting task executor with %d workers", e.workers))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 启动工作协程
 | 
				
			||||||
 | 
						for i := 0; i < e.workers; i++ {
 | 
				
			||||||
 | 
							e.wg.Add(1)
 | 
				
			||||||
 | 
							go e.worker(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						e.logger.Info("Task executor started successfully")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop 停止任务执行器
 | 
				
			||||||
 | 
					func (e *Executor) Stop() {
 | 
				
			||||||
 | 
						e.logger.Info("Stopping task executor")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 取消上下文
 | 
				
			||||||
 | 
						e.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 等待所有工作协程结束
 | 
				
			||||||
 | 
						e.wg.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						e.logger.Info("Task executor stopped successfully")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SubmitTask 提交任务到执行器
 | 
				
			||||||
 | 
					func (e *Executor) SubmitTask(task Task) {
 | 
				
			||||||
 | 
						e.taskQueue.AddTask(task)
 | 
				
			||||||
 | 
						e.logger.Info("Task submitted: " + task.GetID())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// worker 工作协程
 | 
				
			||||||
 | 
					func (e *Executor) worker(id int) {
 | 
				
			||||||
 | 
						defer e.wg.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						e.logger.Info(fmt.Sprintf("Worker %d started", id))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-e.ctx.Done():
 | 
				
			||||||
 | 
								e.logger.Info(fmt.Sprintf("Worker %d stopped", id))
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								// 获取下一个任务
 | 
				
			||||||
 | 
								task := e.taskQueue.GetNextTask()
 | 
				
			||||||
 | 
								if task != nil {
 | 
				
			||||||
 | 
									e.logger.Info(fmt.Sprintf("Worker %d executing task: %s", id, task.GetID()))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// 执行任务
 | 
				
			||||||
 | 
									if err := task.Execute(); err != nil {
 | 
				
			||||||
 | 
										e.logger.Error("Task execution failed: " + task.GetID() + ", error: " + err.Error())
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										e.logger.Info("Task executed successfully: " + task.GetID())
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									// 没有任务时短暂休眠
 | 
				
			||||||
 | 
									time.Sleep(100 * time.Millisecond)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user