修改infra除repository包

This commit is contained in:
2025-11-05 22:22:46 +08:00
parent 07d8c719ac
commit 97aea66f7c
13 changed files with 293 additions and 242 deletions

View File

@@ -4,12 +4,13 @@
package database
import (
"context"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
@@ -17,35 +18,36 @@ import (
// PostgresStorage 代表基于PostgreSQL的存储实现
// 使用GORM作为ORM库
type PostgresStorage struct {
ctx context.Context
db *gorm.DB
isTimescaleDB bool
connectionString string
maxOpenConns int
maxIdleConns int
connMaxLifetime int
logger *logs.Logger // 依赖注入的 logger
}
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
// 它接收一个 logger 实例,而不是自己创建
func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage {
func NewPostgresStorage(ctx context.Context, connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int) *PostgresStorage {
return &PostgresStorage{
ctx: ctx,
connectionString: connectionString,
isTimescaleDB: isTimescaleDB,
maxOpenConns: maxOpenConns,
maxIdleConns: maxIdleConns,
connMaxLifetime: connMaxLifetime,
logger: logger, // 注入 logger
}
}
// Connect 建立与PostgreSQL数据库的连接
// 使用GORM建立数据库连接并使用自定义的 logger 接管 GORM 日志
func (ps *PostgresStorage) Connect() error {
ps.logger.Info("正在连接PostgreSQL数据库")
func (ps *PostgresStorage) Connect(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Connect")
logger.Info("正在连接PostgreSQL数据库")
// 创建 GORM 的 logger 适配器
gormLogger := logs.NewGormLogger(ps.logger)
gormLogger := logs.NewGormLogger(logger)
var err error
// 在 gorm.Open 时传入我们自定义的 logger
@@ -53,19 +55,19 @@ func (ps *PostgresStorage) Connect() error {
Logger: gormLogger,
})
if err != nil {
ps.logger.Errorw("数据库连接失败", "error", err)
logger.Errorw("数据库连接失败", "error", err)
return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装
}
// 测试连接
sqlDB, err := ps.db.DB()
sqlDB, err := ps.db.WithContext(storageCtx).DB()
if err != nil {
ps.logger.Errorw("获取数据库实例失败", "error", err)
logger.Errorw("获取数据库实例失败", "error", err)
return fmt.Errorf("获取数据库实例失败: %w", err)
}
if err = sqlDB.Ping(); err != nil {
ps.logger.Errorw("数据库连接测试失败", "error", err)
logger.Errorw("数据库连接测试失败", "error", err)
return fmt.Errorf("数据库连接测试失败: %w", err)
}
@@ -77,59 +79,62 @@ func (ps *PostgresStorage) Connect() error {
// gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对
ps.db.DisableForeignKeyConstraintWhenMigrating = true
ps.logger.Info("PostgreSQL数据库连接成功")
logger.Info("PostgreSQL数据库连接成功")
return nil
}
// Disconnect 断开与PostgreSQL数据库的连接
// 安全地关闭所有数据库连接
func (ps *PostgresStorage) Disconnect() error {
func (ps *PostgresStorage) Disconnect(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Disconnect")
if ps.db != nil {
ps.logger.Info("正在断开PostgreSQL数据库连接")
logger.Info("正在断开PostgreSQL数据库连接")
sqlDB, err := ps.db.DB()
sqlDB, err := ps.db.WithContext(storageCtx).DB()
if err != nil {
ps.logger.Errorw("获取数据库实例失败", "error", err)
logger.Errorw("获取数据库实例失败", "error", err)
return fmt.Errorf("获取数据库实例失败: %w", err)
}
if err := sqlDB.Close(); err != nil {
ps.logger.Errorw("关闭数据库连接失败", "error", err)
logger.Errorw("关闭数据库连接失败", "error", err)
return fmt.Errorf("关闭数据库连接失败: %w", err)
}
ps.logger.Info("PostgreSQL数据库连接已断开")
logger.Info("PostgreSQL数据库连接已断开")
}
return nil
}
// GetDB 获取GORM数据库实例
// 用于执行具体的数据库操作
func (ps *PostgresStorage) GetDB() *gorm.DB {
return ps.db
func (ps *PostgresStorage) GetDB(ctx context.Context) *gorm.DB {
storageCtx := logs.AddFuncName(ctx, ps.ctx, "GetDB")
return ps.db.WithContext(storageCtx)
}
// Migrate 执行数据库迁移
func (ps *PostgresStorage) Migrate(models ...interface{}) error {
func (ps *PostgresStorage) Migrate(ctx context.Context, models ...interface{}) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Migrate")
if len(models) == 0 {
ps.logger.Info("没有需要迁移的数据库模型,跳过迁移步骤")
logger.Info("没有需要迁移的数据库模型,跳过迁移步骤")
return nil
}
ps.logger.Info("正在自动迁移数据库表结构")
if err := ps.db.AutoMigrate(models...); err != nil {
ps.logger.Errorw("数据库表结构迁移失败", "error", err)
logger.Info("正在自动迁移数据库表结构")
if err := ps.db.WithContext(storageCtx).AutoMigrate(models...); err != nil {
logger.Errorw("数据库表结构迁移失败", "error", err)
return fmt.Errorf("数据库表结构迁移失败: %w", err)
}
ps.logger.Info("数据库表结构迁移完成")
logger.Info("数据库表结构迁移完成")
// -- 处理gorm做不到的初始化逻辑 --
if err := ps.creatingIndex(); err != nil {
if err := ps.creatingIndex(storageCtx); err != nil {
return err
}
// 如果是 TimescaleDB, 则将部分表转换为 hypertable
if ps.isTimescaleDB {
ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置")
if err := ps.setupTimescaleDB(); err != nil {
logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置")
if err := ps.setupTimescaleDB(storageCtx); err != nil {
return err
}
}
@@ -137,18 +142,20 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error {
}
// setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置
func (ps *PostgresStorage) setupTimescaleDB() error {
if err := ps.creatingHyperTable(); err != nil {
func (ps *PostgresStorage) setupTimescaleDB(ctx context.Context) error {
storageCtx := logs.AddFuncName(ctx, ps.ctx, "setupTimescaleDB")
if err := ps.creatingHyperTable(storageCtx); err != nil {
return err
}
if err := ps.applyCompressionPolicies(); err != nil {
if err := ps.applyCompressionPolicies(storageCtx); err != nil {
return err
}
return nil
}
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
func (ps *PostgresStorage) creatingHyperTable() error {
func (ps *PostgresStorage) creatingHyperTable(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingHyperTable")
// 定义一个辅助结构体来管理超表转换
tablesToConvert := []struct {
model interface{ TableName() string }
@@ -177,20 +184,21 @@ func (ps *PostgresStorage) creatingHyperTable() error {
for _, table := range tablesToConvert {
tableName := table.model.TableName()
chunkInterval := "1 days" // 统一设置为1天
ps.logger.Debugw("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval)
logger.Debugw("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval)
sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval)
if err := ps.db.Exec(sql).Error; err != nil {
ps.logger.Errorw("转换为超表失败", "table", tableName, "error", err)
if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil {
logger.Errorw("转换为超表失败", "table", tableName, "error", err)
return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err)
}
ps.logger.Debugw("成功将表转换为超表 (或已转换)", "table", tableName)
logger.Debugw("成功将表转换为超表 (或已转换)", "table", tableName)
}
return nil
}
// applyCompressionPolicies 为超表配置自动压缩策略
func (ps *PostgresStorage) applyCompressionPolicies() error {
func (ps *PostgresStorage) applyCompressionPolicies(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "applyCompressionPolicies")
policies := []struct {
model interface{ TableName() string }
segmentColumn string
@@ -220,50 +228,51 @@ func (ps *PostgresStorage) applyCompressionPolicies() error {
compressAfter := "3 days" // 统一设置为2天后即进入第3天开始压缩
// 1. 开启表的压缩设置,并指定分段列
ps.logger.Debugw("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn)
logger.Debugw("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn)
// 使用 + 而非Sprintf以规避goland静态检查报错
alterSQL := "ALTER TABLE" + " " + tableName + " SET (timescaledb.compress, timescaledb.compress_segmentby = '" + policy.segmentColumn + "');"
if err := ps.db.Exec(alterSQL).Error; err != nil {
if err := ps.db.WithContext(storageCtx).Exec(alterSQL).Error; err != nil {
// 忽略错误,因为这个设置可能是不可变的,重复执行会报错
ps.logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err)
logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err)
}
ps.logger.Debugw("成功为表启用压缩设置 (或已启用)", "table", tableName)
logger.Debugw("成功为表启用压缩设置 (或已启用)", "table", tableName)
// 2. 添加压缩策略
ps.logger.Debugw("为表添加压缩策略", "table", tableName, "compress_after", compressAfter)
logger.Debugw("为表添加压缩策略", "table", tableName, "compress_after", compressAfter)
policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter)
if err := ps.db.Exec(policySQL).Error; err != nil {
ps.logger.Errorw("添加压缩策略失败", "table", tableName, "error", err)
if err := ps.db.WithContext(storageCtx).Exec(policySQL).Error; err != nil {
logger.Errorw("添加压缩策略失败", "table", tableName, "error", err)
return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err)
}
ps.logger.Debugw("成功为表添加压缩策略 (或已存在)", "table", tableName)
logger.Debugw("成功为表添加压缩策略 (或已存在)", "table", tableName)
}
return nil
}
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
func (ps *PostgresStorage) creatingIndex() error {
func (ps *PostgresStorage) creatingIndex(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingIndex")
// 使用 IF NOT EXISTS 保证幂等性
// 如果索引已存在,此命令不会报错
// 为 sensor_data 表的 data 字段创建 GIN 索引
ps.logger.Debug("正在为 sensor_data 表的 data 字段创建 GIN 索引")
logger.Debug("正在为 sensor_data 表的 data 字段创建 GIN 索引")
ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil {
ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
if err := ps.db.WithContext(storageCtx).Exec(ginSensorDataIndexSQL).Error; err != nil {
logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Debug("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)")
logger.Debug("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)")
// 为 tasks.parameters 创建 GIN 索引
ps.logger.Debug("正在为 tasks 表的 parameters 字段创建 GIN 索引")
logger.Debug("正在为 tasks 表的 parameters 字段创建 GIN 索引")
taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);"
if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil {
ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err)
if err := ps.db.WithContext(storageCtx).Exec(taskGinIndexSQL).Error; err != nil {
logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err)
return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err)
}
ps.logger.Debug("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
logger.Debug("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
return nil
}

View File

@@ -4,10 +4,12 @@
package database
import (
"context"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"gorm.io/gorm"
)
@@ -15,22 +17,22 @@ import (
// 所有存储实现都需要实现此接口定义的方法
type Storage interface {
// Connect 建立与存储后端的连接
Connect() error
Connect(ctx context.Context) error
// Disconnect 断开与存储后端的连接
Disconnect() error
Disconnect(ctx context.Context) error
// GetDB 获取数据库实例
GetDB() *gorm.DB
GetDB(ctx context.Context) *gorm.DB
// Migrate 执行数据库迁移
// 参数为需要迁移的 GORM 模型
Migrate(models ...interface{}) error
Migrate(ctx context.Context, models ...interface{}) error
}
// NewStorage 创建并返回一个存储实例
// 根据配置返回相应的存储实现
func NewStorage(cfg config.DatabaseConfig, logger *logs.Logger) Storage {
func NewStorage(ctx context.Context, cfg config.DatabaseConfig) Storage {
// 构建数据库连接字符串
connectionString := fmt.Sprintf(
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
@@ -45,11 +47,11 @@ func NewStorage(cfg config.DatabaseConfig, logger *logs.Logger) Storage {
// 当前默认返回PostgreSQL存储实现并将 logger 注入
// 当前默认返回PostgreSQL存储实现并将 logger 注入
return NewPostgresStorage(
logs.AddCompName(context.Background(), "PostgresStorage"),
connectionString,
cfg.IsTimescaleDB, // <--- 添加 IsTimescaleDB
cfg.IsTimescaleDB,
cfg.MaxOpenConns,
cfg.MaxIdleConns,
cfg.ConnMaxLifetime,
logger,
)
}