267 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			267 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package database 提供基于PostgreSQL的数据存储功能
 | ||
| // 使用GORM作为ORM库来操作数据库
 | ||
| // 实现与PostgreSQL数据库的连接和基本操作
 | ||
| package database
 | ||
| 
 | ||
| import (
 | ||
| 	"fmt"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
 | ||
| 
 | ||
| 	"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
 | ||
| 	"gorm.io/driver/postgres"
 | ||
| 	"gorm.io/gorm"
 | ||
| )
 | ||
| 
 | ||
| // PostgresStorage 代表基于PostgreSQL的存储实现
 | ||
| // 使用GORM作为ORM库
 | ||
| type PostgresStorage struct {
 | ||
| 	db               *gorm.DB
 | ||
| 	isTimescaleDB    bool
 | ||
| 	connectionString string
 | ||
| 	maxOpenConns     int
 | ||
| 	maxIdleConns     int
 | ||
| 	connMaxLifetime  int
 | ||
| 	logger           *logs.Logger // 依赖注入的 logger
 | ||
| }
 | ||
| 
 | ||
| // NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
 | ||
| // 它接收一个 logger 实例,而不是自己创建
 | ||
| func NewPostgresStorage(connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int, logger *logs.Logger) *PostgresStorage {
 | ||
| 	return &PostgresStorage{
 | ||
| 		connectionString: connectionString,
 | ||
| 		isTimescaleDB:    isTimescaleDB,
 | ||
| 		maxOpenConns:     maxOpenConns,
 | ||
| 		maxIdleConns:     maxIdleConns,
 | ||
| 		connMaxLifetime:  connMaxLifetime,
 | ||
| 		logger:           logger, // 注入 logger
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // Connect 建立与PostgreSQL数据库的连接
 | ||
| // 使用GORM建立数据库连接,并使用自定义的 logger 接管 GORM 日志
 | ||
| func (ps *PostgresStorage) Connect() error {
 | ||
| 	ps.logger.Info("正在连接PostgreSQL数据库")
 | ||
| 
 | ||
| 	// 创建 GORM 的 logger 适配器
 | ||
| 	gormLogger := logs.NewGormLogger(ps.logger)
 | ||
| 
 | ||
| 	var err error
 | ||
| 	// 在 gorm.Open 时传入我们自定义的 logger
 | ||
| 	ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{
 | ||
| 		Logger: gormLogger,
 | ||
| 	})
 | ||
| 	if err != nil {
 | ||
| 		ps.logger.Errorw("数据库连接失败", "error", err)
 | ||
| 		return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装
 | ||
| 	}
 | ||
| 
 | ||
| 	// 测试连接
 | ||
| 	sqlDB, err := ps.db.DB()
 | ||
| 	if err != nil {
 | ||
| 		ps.logger.Errorw("获取数据库实例失败", "error", err)
 | ||
| 		return fmt.Errorf("获取数据库实例失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	if err = sqlDB.Ping(); err != nil {
 | ||
| 		ps.logger.Errorw("数据库连接测试失败", "error", err)
 | ||
| 		return fmt.Errorf("数据库连接测试失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 设置连接池参数
 | ||
| 	sqlDB.SetMaxOpenConns(ps.maxOpenConns)
 | ||
| 	sqlDB.SetMaxIdleConns(ps.maxIdleConns)
 | ||
| 	sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
 | ||
| 
 | ||
| 	// gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对
 | ||
| 	ps.db.DisableForeignKeyConstraintWhenMigrating = true
 | ||
| 
 | ||
| 	ps.logger.Info("PostgreSQL数据库连接成功")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // Disconnect 断开与PostgreSQL数据库的连接
 | ||
| // 安全地关闭所有数据库连接
 | ||
| func (ps *PostgresStorage) Disconnect() error {
 | ||
| 	if ps.db != nil {
 | ||
| 		ps.logger.Info("正在断开PostgreSQL数据库连接")
 | ||
| 
 | ||
| 		sqlDB, err := ps.db.DB()
 | ||
| 		if err != nil {
 | ||
| 			ps.logger.Errorw("获取数据库实例失败", "error", err)
 | ||
| 			return fmt.Errorf("获取数据库实例失败: %w", err)
 | ||
| 		}
 | ||
| 
 | ||
| 		if err := sqlDB.Close(); err != nil {
 | ||
| 			ps.logger.Errorw("关闭数据库连接失败", "error", err)
 | ||
| 			return fmt.Errorf("关闭数据库连接失败: %w", err)
 | ||
| 		}
 | ||
| 		ps.logger.Info("PostgreSQL数据库连接已断开")
 | ||
| 	}
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // GetDB 获取GORM数据库实例
 | ||
| // 用于执行具体的数据库操作
 | ||
| func (ps *PostgresStorage) GetDB() *gorm.DB {
 | ||
| 	return ps.db
 | ||
| }
 | ||
| 
 | ||
| // Migrate 执行数据库迁移
 | ||
| func (ps *PostgresStorage) Migrate(models ...interface{}) error {
 | ||
| 	if len(models) == 0 {
 | ||
| 		ps.logger.Info("没有需要迁移的数据库模型,跳过迁移步骤")
 | ||
| 		return nil
 | ||
| 	}
 | ||
| 	ps.logger.Info("正在自动迁移数据库表结构")
 | ||
| 	if err := ps.db.AutoMigrate(models...); err != nil {
 | ||
| 		ps.logger.Errorw("数据库表结构迁移失败", "error", err)
 | ||
| 		return fmt.Errorf("数据库表结构迁移失败: %w", err)
 | ||
| 	}
 | ||
| 	ps.logger.Info("数据库表结构迁移完成")
 | ||
| 
 | ||
| 	// -- 处理gorm做不到的初始化逻辑 --
 | ||
| 	if err := ps.creatingIndex(); err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 
 | ||
| 	// 如果是 TimescaleDB, 则将部分表转换为 hypertable
 | ||
| 	if ps.isTimescaleDB {
 | ||
| 		ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置")
 | ||
| 		if err := ps.setupTimescaleDB(); err != nil {
 | ||
| 			return err
 | ||
| 		}
 | ||
| 	}
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置
 | ||
| func (ps *PostgresStorage) setupTimescaleDB() error {
 | ||
| 	if err := ps.creatingHyperTable(); err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	if err := ps.applyCompressionPolicies(); err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
 | ||
| func (ps *PostgresStorage) creatingHyperTable() error {
 | ||
| 	// 定义一个辅助结构体来管理超表转换
 | ||
| 	tablesToConvert := []struct {
 | ||
| 		model      interface{ TableName() string }
 | ||
| 		timeColumn string
 | ||
| 	}{
 | ||
| 		{models.SensorData{}, "time"},
 | ||
| 		{models.DeviceCommandLog{}, "sent_at"},
 | ||
| 		{models.PlanExecutionLog{}, "created_at"},
 | ||
| 		{models.TaskExecutionLog{}, "created_at"},
 | ||
| 		{models.PendingCollection{}, "created_at"},
 | ||
| 		{models.UserActionLog{}, "time"},
 | ||
| 		{models.RawMaterialPurchase{}, "purchase_date"},
 | ||
| 		{models.RawMaterialStockLog{}, "happened_at"},
 | ||
| 		{models.FeedUsageRecord{}, "recorded_at"},
 | ||
| 		{models.MedicationLog{}, "happened_at"},
 | ||
| 		{models.PigBatchLog{}, "happened_at"},
 | ||
| 		{models.WeighingBatch{}, "weighing_time"},
 | ||
| 		{models.WeighingRecord{}, "weighing_time"},
 | ||
| 		{models.PigTransferLog{}, "transfer_time"},
 | ||
| 		{models.PigSickLog{}, "happened_at"},
 | ||
| 		{models.PigPurchase{}, "purchase_date"},
 | ||
| 		{models.PigSale{}, "sale_date"},
 | ||
| 	}
 | ||
| 
 | ||
| 	for _, table := range tablesToConvert {
 | ||
| 		tableName := table.model.TableName()
 | ||
| 		chunkInterval := "1 days" // 统一设置为1天
 | ||
| 		ps.logger.Infow("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval)
 | ||
| 		sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval)
 | ||
| 		if err := ps.db.Exec(sql).Error; err != nil {
 | ||
| 			ps.logger.Errorw("转换为超表失败", "table", tableName, "error", err)
 | ||
| 			return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err)
 | ||
| 		}
 | ||
| 		ps.logger.Infow("成功将表转换为超表 (或已转换)", "table", tableName)
 | ||
| 	}
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // applyCompressionPolicies 为超表配置自动压缩策略
 | ||
| func (ps *PostgresStorage) applyCompressionPolicies() error {
 | ||
| 	policies := []struct {
 | ||
| 		model         interface{ TableName() string }
 | ||
| 		segmentColumn string
 | ||
| 	}{
 | ||
| 		{models.SensorData{}, "device_id"},
 | ||
| 		{models.DeviceCommandLog{}, "device_id"},
 | ||
| 		{models.PlanExecutionLog{}, "plan_id"},
 | ||
| 		{models.TaskExecutionLog{}, "task_id"},
 | ||
| 		{models.PendingCollection{}, "device_id"},
 | ||
| 		{models.UserActionLog{}, "user_id"},
 | ||
| 		{models.RawMaterialPurchase{}, "raw_material_id"},
 | ||
| 		{models.RawMaterialStockLog{}, "raw_material_id"},
 | ||
| 		{models.FeedUsageRecord{}, "pen_id"},
 | ||
| 		{models.MedicationLog{}, "pig_batch_id"},
 | ||
| 		{models.PigBatchLog{}, "pig_batch_id"},
 | ||
| 		{models.WeighingBatch{}, "pig_batch_id"},
 | ||
| 		{models.WeighingRecord{}, "weighing_batch_id"},
 | ||
| 		{models.PigTransferLog{}, "pig_batch_id"},
 | ||
| 		{models.PigSickLog{}, "pig_batch_id"},
 | ||
| 		{models.PigPurchase{}, "pig_batch_id"},
 | ||
| 		{models.PigSale{}, "pig_batch_id"},
 | ||
| 	}
 | ||
| 
 | ||
| 	for _, policy := range policies {
 | ||
| 		tableName := policy.model.TableName()
 | ||
| 		compressAfter := "3 days" // 统一设置为2天后(即进入第3天)开始压缩
 | ||
| 
 | ||
| 		// 1. 开启表的压缩设置,并指定分段列
 | ||
| 		ps.logger.Infow("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn)
 | ||
| 		// 使用 + 而非Sprintf以规避goland静态检查报错
 | ||
| 		alterSQL := "ALTER TABLE" + " " + tableName + " SET (timescaledb.compress, timescaledb.compress_segmentby = '" + policy.segmentColumn + "');"
 | ||
| 		if err := ps.db.Exec(alterSQL).Error; err != nil {
 | ||
| 			// 忽略错误,因为这个设置可能是不可变的,重复执行会报错
 | ||
| 			ps.logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err)
 | ||
| 		}
 | ||
| 
 | ||
| 		// 2. 添加压缩策略
 | ||
| 		ps.logger.Infow("为表添加压缩策略", "table", tableName, "compress_after", compressAfter)
 | ||
| 		policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter)
 | ||
| 		if err := ps.db.Exec(policySQL).Error; err != nil {
 | ||
| 			ps.logger.Errorw("添加压缩策略失败", "table", tableName, "error", err)
 | ||
| 			return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err)
 | ||
| 		}
 | ||
| 		ps.logger.Infow("成功为表添加压缩策略 (或已存在)", "table", tableName)
 | ||
| 	}
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // creatingIndex 用于创建gorm无法处理的索引, 如gin索引
 | ||
| func (ps *PostgresStorage) creatingIndex() error {
 | ||
| 	// 使用 IF NOT EXISTS 保证幂等性
 | ||
| 	// 如果索引已存在,此命令不会报错
 | ||
| 
 | ||
| 	// 为 sensor_data 表的 data 字段创建 GIN 索引
 | ||
| 	ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引")
 | ||
| 	ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
 | ||
| 	if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil {
 | ||
| 		ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
 | ||
| 		return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
 | ||
| 	}
 | ||
| 	ps.logger.Info("成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)")
 | ||
| 
 | ||
| 	// 为 tasks.parameters 创建 GIN 索引
 | ||
| 	ps.logger.Info("正在为 tasks 表的 parameters 字段创建 GIN 索引")
 | ||
| 	taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);"
 | ||
| 	if err := ps.db.Exec(taskGinIndexSQL).Error; err != nil {
 | ||
| 		ps.logger.Errorw("为 tasks 的 parameters 字段创建 GIN 索引失败", "error", err)
 | ||
| 		return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err)
 | ||
| 	}
 | ||
| 	ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 |