// Package database 提供基于PostgreSQL的数据存储功能 // 使用GORM作为ORM库来操作数据库 // 实现与PostgreSQL数据库的连接和基本操作 package database import ( "fmt" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "gorm.io/driver/postgres" "gorm.io/gorm" ) // PostgresStorage 代表基于PostgreSQL的存储实现 // 使用GORM作为ORM库 type PostgresStorage struct { db *gorm.DB isTimescaleDB bool connectionString string maxOpenConns int maxIdleConns int connMaxLifetime int logger *logs.Logger // 依赖注入的 logger } // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例 // 它接收一个 logger 实例,而不是自己创建 func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage { return &PostgresStorage{ connectionString: connectionString, isTimescaleDB: isTimescaleDB, maxOpenConns: maxOpenConns, maxIdleConns: maxIdleConns, connMaxLifetime: connMaxLifetime, logger: logger, // 注入 logger } } // Connect 建立与PostgreSQL数据库的连接 // 使用GORM建立数据库连接,并使用自定义的 logger 接管 GORM 日志 func (ps *PostgresStorage) Connect() error { ps.logger.Info("正在连接PostgreSQL数据库") // 创建 GORM 的 logger 适配器 gormLogger := logs.NewGormLogger(ps.logger) var err error // 在 gorm.Open 时传入我们自定义的 logger ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{ Logger: gormLogger, }) if err != nil { ps.logger.Errorw("数据库连接失败", "error", err) return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装 } // 测试连接 sqlDB, err := ps.db.DB() if err != nil { ps.logger.Errorw("获取数据库实例失败", "error", err) return fmt.Errorf("获取数据库实例失败: %w", err) } if err = sqlDB.Ping(); err != nil { ps.logger.Errorw("数据库连接测试失败", "error", err) return fmt.Errorf("数据库连接测试失败: %w", err) } // 设置连接池参数 sqlDB.SetMaxOpenConns(ps.maxOpenConns) sqlDB.SetMaxIdleConns(ps.maxIdleConns) sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second) // gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对 ps.db.DisableForeignKeyConstraintWhenMigrating = true ps.logger.Info("PostgreSQL数据库连接成功") return nil } // Disconnect 断开与PostgreSQL数据库的连接 // 安全地关闭所有数据库连接 func (ps *PostgresStorage) Disconnect() error { if ps.db != nil { ps.logger.Info("正在断开PostgreSQL数据库连接") sqlDB, err := ps.db.DB() if err != nil { ps.logger.Errorw("获取数据库实例失败", "error", err) return fmt.Errorf("获取数据库实例失败: %w", err) } if err := sqlDB.Close(); err != nil { ps.logger.Errorw("关闭数据库连接失败", "error", err) return fmt.Errorf("关闭数据库连接失败: %w", err) } ps.logger.Info("PostgreSQL数据库连接已断开") } return nil } // GetDB 获取GORM数据库实例 // 用于执行具体的数据库操作 func (ps *PostgresStorage) GetDB() *gorm.DB { return ps.db } // Migrate 执行数据库迁移 func (ps *PostgresStorage) Migrate(models ...interface{}) error { if len(models) == 0 { ps.logger.Info("没有需要迁移的数据库模型,跳过迁移步骤") return nil } ps.logger.Info("正在自动迁移数据库表结构") if err := ps.db.AutoMigrate(models...); err != nil { ps.logger.Errorw("数据库表结构迁移失败", "error", err) return fmt.Errorf("数据库表结构迁移失败: %w", err) } ps.logger.Info("数据库表结构迁移完成") // -- 处理gorm做不到的初始化逻辑 -- if err := ps.creatingIndex(); err != nil { return err } // 如果是 TimescaleDB, 则将部分表转换为 hypertable if ps.isTimescaleDB { ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置") if err := ps.setupTimescaleDB(); err != nil { return err } } return nil } // setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置 func (ps *PostgresStorage) setupTimescaleDB() error { if err := ps.creatingHyperTable(); err != nil { return err } if err := ps.applyCompressionPolicies(); err != nil { return err } return nil } // creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表 func (ps *PostgresStorage) creatingHyperTable() error { // 定义一个辅助结构体来管理超表转换 tablesToConvert := []struct { model interface{ TableName() string } timeColumn string }{ {models.SensorData{}, "time"}, {models.DeviceCommandLog{}, "sent_at"}, {models.PlanExecutionLog{}, "created_at"}, {models.TaskExecutionLog{}, "created_at"}, {models.PendingCollection{}, "created_at"}, {models.UserActionLog{}, "time"}, } for _, table := range tablesToConvert { tableName := table.model.TableName() chunkInterval := "1 day" // 统一设置为1天 ps.logger.Infow("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval) sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval) if err := ps.db.Exec(sql).Error; err != nil { ps.logger.Errorw("转换为超表失败", "table", tableName, "error", err) return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err) } ps.logger.Infow("成功将表转换为超表 (或已转换)", "table", tableName) } return nil } // applyCompressionPolicies 为超表配置自动压缩策略 func (ps *PostgresStorage) applyCompressionPolicies() error { policies := []struct { model interface{ TableName() string } segmentColumn string }{ {models.SensorData{}, "device_id"}, {models.DeviceCommandLog{}, "device_id"}, {models.PlanExecutionLog{}, "plan_id"}, {models.TaskExecutionLog{}, "task_id"}, {models.PendingCollection{}, "device_id"}, {models.UserActionLog{}, "user_id"}, } for _, policy := range policies { tableName := policy.model.TableName() compressAfter := "3 days" // 统一设置为2天后(即进入第3天)开始压缩 // 1. 开启表的压缩设置,并指定分段列 ps.logger.Infow("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn) alterSQL := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress, timescaledb.compress_segmentby = '%s');", tableName, policy.segmentColumn) if err := ps.db.Exec(alterSQL).Error; err != nil { // 忽略错误,因为这个设置可能是不可变的,重复执行会报错 ps.logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err) } // 2. 添加压缩策略 ps.logger.Infow("为表添加压缩策略", "table", tableName, "compress_after", compressAfter) policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter) if err := ps.db.Exec(policySQL).Error; err != nil { ps.logger.Errorw("添加压缩策略失败", "table", tableName, "error", err) return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err) } ps.logger.Infow("成功为表添加压缩策略 (或已存在)", "table", tableName) } return nil } // creatingIndex 用于创建gorm无法处理的索引, 如gin索引 func (ps *PostgresStorage) creatingIndex() error { // 使用 IF NOT EXISTS 保证幂等性 // 如果索引已存在,此命令不会报错 // 为 sensor_data 表的 data 字段创建 GIN 索引 ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引") ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);" if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil { ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err) return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err) } ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)") // 为 tasks.parameters 创建 GIN 索引 ps.logger.Info("正在为 tasks 表的 parameters 字段创建 GIN 索引") taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);" if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil { ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err) return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err) } ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)") // 为 devices 表的 properties 字段创建 GIN 索引 //ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引") //ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);" //if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil { // ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err) // return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err) //} //ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)") return nil }