增加超表, 超过两天的数据全部压缩, 压缩可以释放索引减少内存使用
This commit is contained in:
@@ -7,6 +7,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"gorm.io/driver/postgres"
|
"gorm.io/driver/postgres"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
@@ -126,36 +128,88 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error {
|
|||||||
|
|
||||||
// 如果是 TimescaleDB, 则将部分表转换为 hypertable
|
// 如果是 TimescaleDB, 则将部分表转换为 hypertable
|
||||||
if ps.isTimescaleDB {
|
if ps.isTimescaleDB {
|
||||||
ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换")
|
ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置")
|
||||||
if err := ps.creatingHyperTable(); err != nil {
|
if err := ps.setupTimescaleDB(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置
|
||||||
|
func (ps *PostgresStorage) setupTimescaleDB() error {
|
||||||
|
if err := ps.creatingHyperTable(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := ps.applyCompressionPolicies(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
|
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
|
||||||
func (ps *PostgresStorage) creatingHyperTable() error {
|
func (ps *PostgresStorage) creatingHyperTable() error {
|
||||||
// 将 sensor_data 转换为超表
|
// 定义一个辅助结构体来管理超表转换
|
||||||
// 使用 if_not_exists => TRUE 保证幂等性
|
tablesToConvert := []struct {
|
||||||
// 'time' 是 SensorData 模型中定义的时间列
|
model interface{ TableName() string }
|
||||||
// 设置 chunk_time_interval 为 1 天, 以优化按天查询的性能
|
timeColumn string
|
||||||
sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);"
|
}{
|
||||||
if err := ps.db.Exec(sqlSensorData).Error; err != nil {
|
{models.SensorData{}, "time"},
|
||||||
ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err)
|
{models.DeviceCommandLog{}, "sent_at"},
|
||||||
return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err)
|
{models.PlanExecutionLog{}, "started_at"},
|
||||||
|
{models.TaskExecutionLog{}, "started_at"},
|
||||||
|
{models.PendingCollection{}, "created_at"},
|
||||||
}
|
}
|
||||||
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换), chunk 间隔为 1 天")
|
|
||||||
|
|
||||||
// 将 device_command_log 转换为超表
|
for _, table := range tablesToConvert {
|
||||||
// 'sent_at' 是 DeviceCommandLog 模型中定义的时间列
|
tableName := table.model.TableName()
|
||||||
// 设置 chunk_time_interval 为 1 天
|
chunkInterval := "1 day" // 统一设置为1天
|
||||||
sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);"
|
ps.logger.Infow("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval)
|
||||||
if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil {
|
sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval)
|
||||||
ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err)
|
if err := ps.db.Exec(sql).Error; err != nil {
|
||||||
return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err)
|
ps.logger.Errorw("转换为超表失败", "table", tableName, "error", err)
|
||||||
|
return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err)
|
||||||
|
}
|
||||||
|
ps.logger.Infow("成功将表转换为超表 (或已转换)", "table", tableName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyCompressionPolicies 为超表配置自动压缩策略
|
||||||
|
func (ps *PostgresStorage) applyCompressionPolicies() error {
|
||||||
|
policies := []struct {
|
||||||
|
model interface{ TableName() string }
|
||||||
|
segmentColumn string
|
||||||
|
}{
|
||||||
|
{models.SensorData{}, "device_id"},
|
||||||
|
{models.DeviceCommandLog{}, "device_id"},
|
||||||
|
{models.PlanExecutionLog{}, "plan_id"},
|
||||||
|
{models.TaskExecutionLog{}, "task_id"},
|
||||||
|
{models.PendingCollection{}, "device_id"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, policy := range policies {
|
||||||
|
tableName := policy.model.TableName()
|
||||||
|
compressAfter := "3 days" // 统一设置为2天后(即进入第3天)开始压缩
|
||||||
|
|
||||||
|
// 1. 开启表的压缩设置,并指定分段列
|
||||||
|
ps.logger.Infow("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn)
|
||||||
|
alterSQL := fmt.Sprintf("ALTER TABLE %s SET (timescaledb.compress, timescaledb.compress_segmentby = '%s');", tableName, policy.segmentColumn)
|
||||||
|
if err := ps.db.Exec(alterSQL).Error; err != nil {
|
||||||
|
// 忽略错误,因为这个设置可能是不可变的,重复执行会报错
|
||||||
|
ps.logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 添加压缩策略
|
||||||
|
ps.logger.Infow("为表添加压缩策略", "table", tableName, "compress_after", compressAfter)
|
||||||
|
policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter)
|
||||||
|
if err := ps.db.Exec(policySQL).Error; err != nil {
|
||||||
|
ps.logger.Errorw("添加压缩策略失败", "table", tableName, "error", err)
|
||||||
|
return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err)
|
||||||
|
}
|
||||||
|
ps.logger.Infow("成功为表添加压缩策略 (或已存在)", "table", tableName)
|
||||||
}
|
}
|
||||||
ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)")
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user