// Package database 提供基于PostgreSQL的数据存储功能 // 使用GORM作为ORM库来操作数据库 // 实现与PostgreSQL数据库的连接和基本操作 package database import ( "context" "fmt" "strings" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/driver/postgres" "gorm.io/gorm" ) // PostgresStorage 代表基于PostgreSQL的存储实现 // 使用GORM作为ORM库 type PostgresStorage struct { ctx context.Context db *gorm.DB isTimescaleDB bool connectionString string maxOpenConns int maxIdleConns int connMaxLifetime int } // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例 // 它接收一个 logger 实例,而不是自己创建 func NewPostgresStorage(ctx context.Context, connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int) *PostgresStorage { return &PostgresStorage{ ctx: ctx, connectionString: connectionString, isTimescaleDB: isTimescaleDB, maxOpenConns: maxOpenConns, maxIdleConns: maxIdleConns, connMaxLifetime: connMaxLifetime, } } // Connect 建立与PostgreSQL数据库的连接 // 使用GORM建立数据库连接,并使用自定义的 logger 接管 GORM 日志 func (ps *PostgresStorage) Connect(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "Connect") logger.Info("正在连接PostgreSQL数据库") // 创建 GORM 的 logger 适配器 gormLogger := logs.NewGormLogger(logs.GetLogger(logs.AddCompName(context.Background(), "GORM"))) var err error // 在 gorm.Open 时传入我们自定义的 logger ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{ Logger: gormLogger, }) if err != nil { logger.Errorw("数据库连接失败", "error", err) return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装 } // 测试连接 sqlDB, err := ps.db.WithContext(storageCtx).DB() if err != nil { logger.Errorw("获取数据库实例失败", "error", err) return fmt.Errorf("获取数据库实例失败: %w", err) } if err = sqlDB.Ping(); err != nil { logger.Errorw("数据库连接测试失败", "error", err) return fmt.Errorf("数据库连接测试失败: %w", err) } // 设置连接池参数 sqlDB.SetMaxOpenConns(ps.maxOpenConns) sqlDB.SetMaxIdleConns(ps.maxIdleConns) sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second) // gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对 ps.db.DisableForeignKeyConstraintWhenMigrating = true logger.Info("PostgreSQL数据库连接成功") return nil } // Disconnect 断开与PostgreSQL数据库的连接 // 安全地关闭所有数据库连接 func (ps *PostgresStorage) Disconnect(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "Disconnect") if ps.db != nil { logger.Info("正在断开PostgreSQL数据库连接") sqlDB, err := ps.db.WithContext(storageCtx).DB() if err != nil { logger.Errorw("获取数据库实例失败", "error", err) return fmt.Errorf("获取数据库实例失败: %w", err) } if err := sqlDB.Close(); err != nil { logger.Errorw("关闭数据库连接失败", "error", err) return fmt.Errorf("关闭数据库连接失败: %w", err) } logger.Info("PostgreSQL数据库连接已断开") } return nil } // GetDB 获取GORM数据库实例 // 用于执行具体的数据库操作 func (ps *PostgresStorage) GetDB(ctx context.Context) *gorm.DB { storageCtx := logs.AddFuncName(ctx, ps.ctx, "GetDB") return ps.db.WithContext(storageCtx) } // Migrate 执行数据库迁移 func (ps *PostgresStorage) Migrate(ctx context.Context, models ...interface{}) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "Migrate") if len(models) == 0 { logger.Info("没有需要迁移的数据库模型,跳过迁移步骤") return nil } logger.Info("正在自动迁移数据库表结构") if err := ps.db.WithContext(storageCtx).AutoMigrate(models...); err != nil { logger.Errorw("数据库表结构迁移失败", "error", err) return fmt.Errorf("数据库表结构迁移失败: %w", err) } logger.Info("数据库表结构迁移完成") // -- 处理gorm做不到的初始化逻辑 -- if err := ps.creatingIndex(storageCtx); err != nil { return err } // 如果是 TimescaleDB, 则将部分表转换为 hypertable if ps.isTimescaleDB { logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置") if err := ps.setupTimescaleDB(storageCtx); err != nil { return err } } return nil } // setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置 func (ps *PostgresStorage) setupTimescaleDB(ctx context.Context) error { storageCtx := logs.AddFuncName(ctx, ps.ctx, "setupTimescaleDB") if err := ps.creatingHyperTable(storageCtx); err != nil { return err } if err := ps.applyCompressionPolicies(storageCtx); err != nil { return err } return nil } // creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表 func (ps *PostgresStorage) creatingHyperTable(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingHyperTable") // 定义一个辅助结构体来管理超表转换 tablesToConvert := []struct { model interface{ TableName() string } timeColumn string }{ {models.SensorData{}, "time"}, {models.DeviceCommandLog{}, "sent_at"}, {models.PlanExecutionLog{}, "created_at"}, {models.TaskExecutionLog{}, "created_at"}, {models.PendingCollection{}, "created_at"}, {models.UserActionLog{}, "time"}, {models.MedicationLog{}, "happened_at"}, {models.PigBatchLog{}, "happened_at"}, {models.WeighingBatch{}, "weighing_time"}, {models.WeighingRecord{}, "weighing_time"}, {models.PigTransferLog{}, "transfer_time"}, {models.PigSickLog{}, "happened_at"}, {models.PigPurchase{}, "purchase_date"}, {models.PigSale{}, "sale_date"}, {models.Notification{}, "alarm_timestamp"}, {models.HistoricalAlarm{}, "trigger_time"}, {models.RawMaterialStockLog{}, "happened_at"}, } for _, table := range tablesToConvert { tableName := table.model.TableName() chunkInterval := "1 days" // 统一设置为1天 logger.Debugw("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval) sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval) if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil { logger.Errorw("转换为超表失败", "table", tableName, "error", err) return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err) } logger.Debugw("成功将表转换为超表 (或已转换)", "table", tableName) } return nil } // applyCompressionPolicies 为超表配置自动压缩策略 func (ps *PostgresStorage) applyCompressionPolicies(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "applyCompressionPolicies") policies := []struct { model interface{ TableName() string } segmentColumn string }{ {models.SensorData{}, "device_id"}, {models.DeviceCommandLog{}, "device_id"}, {models.PlanExecutionLog{}, "plan_id"}, {models.TaskExecutionLog{}, "task_id"}, {models.PendingCollection{}, "device_id"}, {models.UserActionLog{}, "user_id"}, {models.MedicationLog{}, "pig_batch_id"}, {models.PigBatchLog{}, "pig_batch_id"}, {models.WeighingBatch{}, "pig_batch_id"}, {models.WeighingRecord{}, "weighing_batch_id"}, {models.PigTransferLog{}, "pig_batch_id"}, {models.PigSickLog{}, "pig_batch_id"}, {models.PigPurchase{}, "pig_batch_id"}, {models.PigSale{}, "pig_batch_id"}, {models.Notification{}, "user_id"}, {models.HistoricalAlarm{}, "source_id"}, {models.RawMaterialStockLog{}, "raw_material_id"}, } for _, policy := range policies { tableName := policy.model.TableName() compressAfter := "3 days" // 统一设置为2天后(即进入第3天)开始压缩 // 1. 开启表的压缩设置,并指定分段列 logger.Debugw("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn) // 使用 + 而非Sprintf以规避goland静态检查报错 alterSQL := "ALTER TABLE" + " " + tableName + " SET (timescaledb.compress, timescaledb.compress_segmentby = '" + policy.segmentColumn + "');" if err := ps.db.WithContext(storageCtx).Exec(alterSQL).Error; err != nil { // 忽略错误,因为这个设置可能是不可变的,重复执行会报错 logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err) } logger.Debugw("成功为表启用压缩设置 (或已启用)", "table", tableName) // 2. 添加压缩策略 logger.Debugw("为表添加压缩策略", "table", tableName, "compress_after", compressAfter) policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter) if err := ps.db.WithContext(storageCtx).Exec(policySQL).Error; err != nil { logger.Errorw("添加压缩策略失败", "table", tableName, "error", err) return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err) } logger.Debugw("成功为表添加压缩策略 (或已存在)", "table", tableName) } return nil } // creatingIndex 用于创建gorm无法处理的索引, 如gin索引 func (ps *PostgresStorage) creatingIndex(ctx context.Context) error { storageCtx := logs.AddFuncName(ctx, ps.ctx, "creatingIndex") // 使用 IF NOT EXISTS 保证幂等性 // 如果索引已存在,此命令不会报错 if err := ps.creatingUniqueIndex(storageCtx); err != nil { return err } if err := ps.createGinIndexes(storageCtx); err != nil { return err } return nil } // uniqueIndexDefinition 结构体定义了唯一索引的详细信息 type uniqueIndexDefinition struct { tableName string // 索引所属的表名 columns []string // 构成唯一索引的列名 indexName string // 唯一索引的名称 whereClause string // 可选的 WHERE 子句,用于创建部分索引 description string // 索引的描述,用于日志记录 } // ginIndexDefinition 结构体定义了 GIN 索引的详细信息 type ginIndexDefinition struct { tableName string // 索引所属的表名 columnName string // 需要创建 GIN 索引的列名 indexName string // GIN 索引的名称 description string // 索引的描述,用于日志记录 } func (ps *PostgresStorage) creatingUniqueIndex(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingUniqueIndex") // 定义所有需要创建的唯一索引 uniqueIndexesToCreate := []uniqueIndexDefinition{ { tableName: models.RawMaterialNutrient{}.TableName(), columns: []string{"raw_material_id", "nutrient_id"}, indexName: "idx_raw_material_nutrients_unique_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "确保同一原料中的每种营养成分不重复", }, { tableName: models.PigBreed{}.TableName(), columns: []string{"name"}, indexName: "idx_pig_breeds_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_breeds 表的部分唯一索引 (name 唯一)", }, { tableName: models.PigAgeStage{}.TableName(), columns: []string{"name"}, indexName: "idx_pig_age_stages_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_age_stages 表的部分唯一索引 (name 唯一)", }, { tableName: models.PigType{}.TableName(), columns: []string{"breed_id", "age_stage_id"}, indexName: "idx_pig_types_unique_breed_age_stage_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_types 表的部分唯一索引 (breed_id, age_stage_id 组合唯一)", }, { tableName: models.PigNutrientRequirement{}.TableName(), columns: []string{"pig_type_id", "nutrient_id"}, indexName: "idx_pig_nutrient_requirements_unique_type_nutrient_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_nutrient_requirements 表的部分唯一索引 (pig_type_id, nutrient_id 组合唯一)", }, { tableName: models.User{}.TableName(), columns: []string{"username"}, indexName: "idx_users_unique_username_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "users 表的部分唯一索引 (username 唯一)", }, { tableName: models.AreaController{}.TableName(), columns: []string{"name"}, indexName: "idx_area_controllers_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "area_controllers 表的部分唯一索引 (Name 唯一)", }, { tableName: models.AreaController{}.TableName(), columns: []string{"network_id"}, indexName: "idx_area_controllers_unique_network_id_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "area_controllers 表的部分唯一索引 (NetworkID 唯一)", }, { tableName: models.DeviceTemplate{}.TableName(), columns: []string{"name"}, indexName: "idx_device_templates_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "device_templates 表的部分唯一索引 (name 唯一)", }, { tableName: models.PigBatch{}.TableName(), columns: []string{"batch_number"}, indexName: "idx_pig_batches_unique_batch_number_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_batches 表的部分唯一索引 (batch_number 唯一)", }, { tableName: models.PigHouse{}.TableName(), columns: []string{"name"}, indexName: "idx_pig_houses_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "pig_houses 表的部分唯一索引 (name 唯一)", }, { tableName: models.RawMaterial{}.TableName(), columns: []string{"name"}, indexName: "idx_raw_materials_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "raw_materials 表的部分唯一索引 (name 唯一)", }, { tableName: models.Nutrient{}.TableName(), columns: []string{"name"}, indexName: "idx_nutrients_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "nutrients 表的部分唯一索引 (name 唯一)", }, { tableName: models.Recipe{}.TableName(), columns: []string{"name"}, indexName: "idx_recipes_unique_name_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "recipes 表的部分唯一索引 (name 唯一)", }, { tableName: models.RecipeIngredient{}.TableName(), columns: []string{"recipe_id", "raw_material_id"}, indexName: "idx_recipe_ingredients_unique_recipe_raw_material_when_not_deleted", whereClause: "WHERE deleted_at IS NULL", description: "recipe_ingredients 表的部分唯一索引 (recipe_id, raw_material_id 组合唯一)", }, } for _, indexDef := range uniqueIndexesToCreate { logger.Debugw("正在为表创建部分唯一索引", "表名", indexDef.tableName, "索引名", indexDef.indexName, "描述", indexDef.description) // 拼接列名字符串 columnsStr := strings.Join(indexDef.columns, ", ") // 构建 SQL 语句 sql := fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s) %s;", indexDef.indexName, indexDef.tableName, columnsStr, indexDef.whereClause) if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil { logger.Errorw("创建部分唯一索引失败", "表名", indexDef.tableName, "索引名", indexDef.indexName, "错误", err) return fmt.Errorf("为 %s 表创建部分唯一索引 %s 失败: %w", indexDef.tableName, indexDef.indexName, err) } logger.Debugw("成功为表创建部分唯一索引 (或已存在)", "表名", indexDef.tableName, "索引名", indexDef.indexName) } return nil } func (ps *PostgresStorage) createGinIndexes(ctx context.Context) error { storageCtx, logger := logs.Trace(ctx, ps.ctx, "createGinIndexes") // 定义所有需要创建的 GIN 索引 ginIndexesToCreate := []ginIndexDefinition{ { tableName: "sensor_data", columnName: "data", indexName: "idx_sensor_data_data_gin", description: "为 sensor_data 表的 data 字段创建 GIN 索引", }, { tableName: "tasks", columnName: "parameters", indexName: "idx_tasks_parameters_gin", description: "为 tasks 表的 parameters 字段创建 GIN 索引", }, } for _, indexDef := range ginIndexesToCreate { logger.Debugw("正在创建 GIN 索引", "表名", indexDef.tableName, "列名", indexDef.columnName, "描述", indexDef.description) // 构建 SQL 语句 sql := fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s USING GIN (%s);", indexDef.indexName, indexDef.tableName, indexDef.columnName) if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil { logger.Errorw("创建 GIN 索引失败", "表名", indexDef.tableName, "索引名", indexDef.indexName, "错误", err) return fmt.Errorf("为 %s 表的 %s 字段创建 GIN 索引 %s 失败: %w", indexDef.tableName, indexDef.columnName, indexDef.indexName, err) } logger.Debugw("成功创建 GIN 索引 (或已存在)", "表名", indexDef.tableName, "索引名", indexDef.indexName) } return nil }