From 18c4747de63ea67900e38e2407021cdcc4683eae Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sat, 27 Sep 2025 23:05:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=B6=85=E8=A1=A8,=20?= =?UTF-8?q?=E8=B6=85=E8=BF=87=E4=B8=A4=E5=A4=A9=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=85=A8=E9=83=A8=E5=8E=8B=E7=BC=A9,=20=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E9=87=8A=E6=94=BE=E7=B4=A2=E5=BC=95=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=86=85=E5=AD=98=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/infra/database/postgres.go | 92 +++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 19 deletions(-) diff --git a/internal/infra/database/postgres.go b/internal/infra/database/postgres.go index b2200c4..60d775f 100644 --- a/internal/infra/database/postgres.go +++ b/internal/infra/database/postgres.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "gorm.io/driver/postgres" "gorm.io/gorm" @@ -126,36 +128,88 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error { // 如果是 TimescaleDB, 则将部分表转换为 hypertable if ps.isTimescaleDB { - ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换") - if err := ps.creatingHyperTable(); err != nil { + ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置") + if err := ps.setupTimescaleDB(); err != nil { return err } } return nil } +// setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置 +func (ps *PostgresStorage) setupTimescaleDB() error { + if err := ps.creatingHyperTable(); err != nil { + return err + } + if err := ps.applyCompressionPolicies(); err != nil { + return err + } + return nil +} + // creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表 func (ps *PostgresStorage) creatingHyperTable() error { - // 将 sensor_data 转换为超表 - // 使用 if_not_exists => TRUE 保证幂等性 - // 'time' 是 SensorData 模型中定义的时间列 - // 设置 chunk_time_interval 为 1 天, 以优化按天查询的性能 - sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" - if err := ps.db.Exec(sqlSensorData).Error; err != nil { - ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err) - return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err) + // 定义一个辅助结构体来管理超表转换 + tablesToConvert := []struct { + model interface{ TableName() string } + timeColumn string + }{ + {models.SensorData{}, "time"}, + {models.DeviceCommandLog{}, "sent_at"}, + {models.PlanExecutionLog{}, "started_at"}, + {models.TaskExecutionLog{}, "started_at"}, + {models.PendingCollection{}, "created_at"}, } - ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换), chunk 间隔为 1 天") - // 将 device_command_log 转换为超表 - // 'sent_at' 是 DeviceCommandLog 模型中定义的时间列 - // 设置 chunk_time_interval 为 1 天 - sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);" - if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil { - ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err) - return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err) + for _, table := range tablesToConvert { + tableName := table.model.TableName() + chunkInterval := "1 day" // 统一设置为1天 + ps.logger.Infow("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval) + sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval) + if err := ps.db.Exec(sql).Error; err != nil { + ps.logger.Errorw("转换为超表失败", "table", tableName, "error", err) + return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err) + } + ps.logger.Infow("成功将表转换为超表 (或已转换)", "table", tableName) + } + + return nil +} + +// applyCompressionPolicies 为超表配置自动压缩策略 +func (ps *PostgresStorage) applyCompressionPolicies() error { + policies := []struct { + model interface{ TableName() string } + segmentColumn string + }{ + {models.SensorData{}, "device_id"}, + {models.DeviceCommandLog{}, "device_id"}, + {models.PlanExecutionLog{}, "plan_id"}, + {models.TaskExecutionLog{}, "task_id"}, + {models.PendingCollection{}, "device_id"}, + } + + for _, policy := range policies { + tableName := policy.model.TableName() + compressAfter := "3 days" // 统一设置为2天后(即进入第3天)开始压缩 + + // 1. 开启表的压缩设置,并指定分段列 + ps.logger.Infow("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn) + alterSQL := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress, timescaledb.compress_segmentby = '%s');", tableName, policy.segmentColumn) + if err := ps.db.Exec(alterSQL).Error; err != nil { + // 忽略错误,因为这个设置可能是不可变的,重复执行会报错 + ps.logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err) + } + + // 2. 添加压缩策略 + ps.logger.Infow("为表添加压缩策略", "table", tableName, "compress_after", compressAfter) + policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter) + if err := ps.db.Exec(policySQL).Error; err != nil { + ps.logger.Errorw("添加压缩策略失败", "table", tableName, "error", err) + return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err) + } + ps.logger.Infow("成功为表添加压缩策略 (或已存在)", "table", tableName) } - ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)") return nil }