149 lines
5.2 KiB
Go
149 lines
5.2 KiB
Go
// Package database 提供基于PostgreSQL的数据存储功能
|
||
// 使用GORM作为ORM库来操作数据库
|
||
// 实现与PostgreSQL数据库的连接和基本操作
|
||
package database
|
||
|
||
import (
|
||
"fmt"
|
||
"time"
|
||
|
||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||
"gorm.io/driver/postgres"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
// PostgresStorage 代表基于PostgreSQL的存储实现
|
||
// 使用GORM作为ORM库
|
||
type PostgresStorage struct {
|
||
db *gorm.DB
|
||
isTimescaleDB bool
|
||
connectionString string
|
||
maxOpenConns int
|
||
maxIdleConns int
|
||
connMaxLifetime int
|
||
logger *logs.Logger // 依赖注入的 logger
|
||
}
|
||
|
||
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
|
||
// 它接收一个 logger 实例,而不是自己创建
|
||
func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage {
|
||
return &PostgresStorage{
|
||
connectionString: connectionString,
|
||
isTimescaleDB: isTimescaleDB,
|
||
maxOpenConns: maxOpenConns,
|
||
maxIdleConns: maxIdleConns,
|
||
connMaxLifetime: connMaxLifetime,
|
||
logger: logger, // 注入 logger
|
||
}
|
||
}
|
||
|
||
// Connect 建立与PostgreSQL数据库的连接
|
||
// 使用GORM建立数据库连接,并使用自定义的 logger 接管 GORM 日志
|
||
func (ps *PostgresStorage) Connect() error {
|
||
ps.logger.Info("正在连接PostgreSQL数据库")
|
||
|
||
// 创建 GORM 的 logger 适配器
|
||
gormLogger := logs.NewGormLogger(ps.logger)
|
||
|
||
var err error
|
||
// 在 gorm.Open 时传入我们自定义的 logger
|
||
ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{
|
||
Logger: gormLogger,
|
||
})
|
||
if err != nil {
|
||
ps.logger.Errorw("数据库连接失败", "error", err)
|
||
return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装
|
||
}
|
||
|
||
// 测试连接
|
||
sqlDB, err := ps.db.DB()
|
||
if err != nil {
|
||
ps.logger.Errorw("获取数据库实例失败", "error", err)
|
||
return fmt.Errorf("获取数据库实例失败: %w", err)
|
||
}
|
||
|
||
if err = sqlDB.Ping(); err != nil {
|
||
ps.logger.Errorw("数据库连接测试失败", "error", err)
|
||
return fmt.Errorf("数据库连接测试失败: %w", err)
|
||
}
|
||
|
||
// 设置连接池参数
|
||
sqlDB.SetMaxOpenConns(ps.maxOpenConns)
|
||
sqlDB.SetMaxIdleConns(ps.maxIdleConns)
|
||
sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
|
||
|
||
// gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对
|
||
ps.db.DisableForeignKeyConstraintWhenMigrating = true
|
||
|
||
ps.logger.Info("PostgreSQL数据库连接成功")
|
||
return nil
|
||
}
|
||
|
||
// Disconnect 断开与PostgreSQL数据库的连接
|
||
// 安全地关闭所有数据库连接
|
||
func (ps *PostgresStorage) Disconnect() error {
|
||
if ps.db != nil {
|
||
ps.logger.Info("正在断开PostgreSQL数据库连接")
|
||
|
||
sqlDB, err := ps.db.DB()
|
||
if err != nil {
|
||
ps.logger.Errorw("获取数据库实例失败", "error", err)
|
||
return fmt.Errorf("获取数据库实例失败: %w", err)
|
||
}
|
||
|
||
if err := sqlDB.Close(); err != nil {
|
||
ps.logger.Errorw("关闭数据库连接失败", "error", err)
|
||
return fmt.Errorf("关闭数据库连接失败: %w", err)
|
||
}
|
||
ps.logger.Info("PostgreSQL数据库连接已断开")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// GetDB 获取GORM数据库实例
|
||
// 用于执行具体的数据库操作
|
||
func (ps *PostgresStorage) GetDB() *gorm.DB {
|
||
return ps.db
|
||
}
|
||
|
||
// Migrate 执行数据库迁移
|
||
func (ps *PostgresStorage) Migrate(models ...interface{}) error {
|
||
if len(models) == 0 {
|
||
ps.logger.Info("没有需要迁移的数据库模型,跳过迁移步骤")
|
||
return nil
|
||
}
|
||
ps.logger.Info("正在自动迁移数据库表结构")
|
||
if err := ps.db.AutoMigrate(models...); err != nil {
|
||
ps.logger.Errorw("数据库表结构迁移失败", "error", err)
|
||
return fmt.Errorf("数据库表结构迁移失败: %w", err)
|
||
}
|
||
ps.logger.Info("数据库表结构迁移完成")
|
||
|
||
// -- 处理gorm做不到的初始化逻辑 --
|
||
// 创建GIN索引(用于优化JSONB查询)
|
||
ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引")
|
||
// 使用 IF NOT EXISTS 保证幂等性
|
||
// 如果索引已存在,此命令不会报错
|
||
ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
|
||
if err := ps.db.Exec(ginIndexSQL).Error; err != nil {
|
||
ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
|
||
return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
|
||
}
|
||
ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)")
|
||
|
||
// 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable
|
||
if ps.isTimescaleDB {
|
||
ps.logger.Info("检测到 TimescaleDB, 准备转换 sensor_data 为超表")
|
||
// 使用 if_not_exists => TRUE 保证幂等性
|
||
// 如果 sensor_data 已经是超表,此命令不会报错
|
||
// 'time' 是 SensorData 模型中定义的时间列
|
||
sql := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);"
|
||
if err := ps.db.Exec(sql).Error; err != nil {
|
||
ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err)
|
||
return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err)
|
||
}
|
||
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
|
||
}
|
||
return nil
|
||
}
|