定义ack表
This commit is contained in:
@@ -126,28 +126,47 @@ func (ps *PostgresStorage) Migrate(models ...interface{}) error {
|
|||||||
|
|
||||||
// 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable
|
// 如果是 TimescaleDB, 则将 sensor_data 转换为 hypertable
|
||||||
if ps.isTimescaleDB {
|
if ps.isTimescaleDB {
|
||||||
ps.logger.Info("检测到 TimescaleDB, 准备转换 sensor_data 为超表")
|
ps.logger.Info("检测到 TimescaleDB, 准备进行超表转换")
|
||||||
|
if err := ps.creatingHyperTable(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
|
||||||
|
func (ps *PostgresStorage) creatingHyperTable() error {
|
||||||
|
// 将 sensor_data 转换为超表
|
||||||
// 使用 if_not_exists => TRUE 保证幂等性
|
// 使用 if_not_exists => TRUE 保证幂等性
|
||||||
// 如果 sensor_data 已经是超表,此命令不会报错
|
|
||||||
// 'time' 是 SensorData 模型中定义的时间列
|
// 'time' 是 SensorData 模型中定义的时间列
|
||||||
sql := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);"
|
sqlSensorData := "SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);"
|
||||||
if err := ps.db.Exec(sql).Error; err != nil {
|
if err := ps.db.Exec(sqlSensorData).Error; err != nil {
|
||||||
ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err)
|
ps.logger.Errorw("将 sensor_data 转换为超表失败", "error", err)
|
||||||
return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err)
|
return fmt.Errorf("将 sensor_data 转换为超表失败: %w", err)
|
||||||
}
|
}
|
||||||
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
|
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
|
||||||
|
|
||||||
|
// 将 downlink_task_records 转换为超表
|
||||||
|
// 'sent_at' 是 DownlinkTaskRecord 模型中定义的时间列
|
||||||
|
sqlDownlinkTaskRecords := "SELECT create_hypertable('downlink_task_records', 'sent_at', if_not_exists => TRUE);"
|
||||||
|
if err := ps.db.Exec(sqlDownlinkTaskRecords).Error; err != nil {
|
||||||
|
ps.logger.Errorw("将 downlink_task_records 转换为超表失败", "error", err)
|
||||||
|
return fmt.Errorf("将 downlink_task_records 转换为超表失败: %w", err)
|
||||||
}
|
}
|
||||||
|
ps.logger.Info("成功将 downlink_task_records 转换为超表 (或已转换)")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
|
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
|
||||||
func (ps *PostgresStorage) creatingIndex() error {
|
func (ps *PostgresStorage) creatingIndex() error {
|
||||||
// 创建GIN索引(用于优化JSONB查询)
|
|
||||||
ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引")
|
|
||||||
// 使用 IF NOT EXISTS 保证幂等性
|
// 使用 IF NOT EXISTS 保证幂等性
|
||||||
// 如果索引已存在,此命令不会报错
|
// 如果索引已存在,此命令不会报错
|
||||||
ginIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
|
|
||||||
if err := ps.db.Exec(ginIndexSQL).Error; err != nil {
|
// 为 sensor_data 表的 data 字段创建 GIN 索引
|
||||||
|
ps.logger.Info("正在为 sensor_data 表的 data 字段创建 GIN 索引")
|
||||||
|
ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
|
||||||
|
if err := ps.db.Exec(ginSensorDataIndexSQL).Error; err != nil {
|
||||||
ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
|
ps.logger.Errorw("为 sensor_data 的 data 字段创建 GIN 索引失败", "error", err)
|
||||||
return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
|
return fmt.Errorf("为 sensor_data 的 data 字段创建 GIN 索引失败: %w", err)
|
||||||
}
|
}
|
||||||
@@ -161,5 +180,15 @@ func (ps *PostgresStorage) creatingIndex() error {
|
|||||||
return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err)
|
return fmt.Errorf("为 tasks 的 parameters 字段创建 GIN 索引失败: %w", err)
|
||||||
}
|
}
|
||||||
ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
|
ps.logger.Info("成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)")
|
||||||
|
|
||||||
|
// 为 devices 表的 properties 字段创建 GIN 索引
|
||||||
|
ps.logger.Info("正在为 devices 表的 properties 字段创建 GIN 索引")
|
||||||
|
ginDevicePropertiesIndexSQL := "CREATE INDEX IF NOT EXISTS idx_devices_properties_gin ON devices USING GIN (properties);"
|
||||||
|
if err := ps.db.Exec(ginDevicePropertiesIndexSQL).Error; err != nil {
|
||||||
|
ps.logger.Errorw("为 devices 的 properties 字段创建 GIN 索引失败", "error", err)
|
||||||
|
return fmt.Errorf("为 devices 的 properties 字段创建 GIN 索引失败: %w", err)
|
||||||
|
}
|
||||||
|
ps.logger.Info("成功为 devices 的 properties 字段创建 GIN 索引 (或已存在)")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
28
internal/infra/models/downlink_task_record.go
Normal file
28
internal/infra/models/downlink_task_record.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DownlinkTaskRecord 记录下行任务的下发情况和设备确认状态
|
||||||
|
type DownlinkTaskRecord struct {
|
||||||
|
// MessageID 是下行消息的唯一标识符。
|
||||||
|
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
|
||||||
|
MessageID string `gorm:"uniqueIndex;not null" json:"message_id"`
|
||||||
|
|
||||||
|
// DeviceID 是接收此下行任务的设备的ID。
|
||||||
|
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
||||||
|
DeviceID uint `gorm:"not null;index" json:"device_id"`
|
||||||
|
|
||||||
|
// SentAt 记录下行任务最初发送的时间。
|
||||||
|
SentAt time.Time `gorm:"not null" json:"sent_at"`
|
||||||
|
|
||||||
|
// AcknowledgedAt 记录设备确认收到下行消息的时间。
|
||||||
|
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
|
||||||
|
AcknowledgedAt *time.Time `json:"acknowledged_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TableName 自定义 GORM 使用的数据库表名
|
||||||
|
func (DownlinkTaskRecord) TableName() string {
|
||||||
|
return "downlink_task_records"
|
||||||
|
}
|
||||||
@@ -13,5 +13,6 @@ func GetAllModels() []interface{} {
|
|||||||
&TaskExecutionLog{},
|
&TaskExecutionLog{},
|
||||||
&PendingTask{},
|
&PendingTask{},
|
||||||
&SensorData{},
|
&SensorData{},
|
||||||
|
&DownlinkTaskRecord{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user