记录任务下发历史和接收是否成功
This commit is contained in:
@@ -30,13 +30,20 @@ type ChirpStackListener struct {
|
|||||||
logger *logs.Logger
|
logger *logs.Logger
|
||||||
sensorDataRepo repository.SensorDataRepository
|
sensorDataRepo repository.SensorDataRepository
|
||||||
deviceRepo repository.DeviceRepository
|
deviceRepo repository.DeviceRepository
|
||||||
|
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener {
|
func NewChirpStackListener(
|
||||||
|
logger *logs.Logger,
|
||||||
|
sensorDataRepo repository.SensorDataRepository,
|
||||||
|
deviceRepo repository.DeviceRepository,
|
||||||
|
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||||
|
) *ChirpStackListener {
|
||||||
return &ChirpStackListener{
|
return &ChirpStackListener{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
sensorDataRepo: sensorDataRepo,
|
sensorDataRepo: sensorDataRepo,
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
|
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -258,7 +265,17 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
|
|||||||
// handleAckEvent 处理下行确认事件
|
// handleAckEvent 处理下行确认事件
|
||||||
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
|
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
|
||||||
c.logger.Infof("接收到 'ack' 事件: %+v", event)
|
c.logger.Infof("接收到 'ack' 事件: %+v", event)
|
||||||
// 在这里添加您的业务逻辑
|
|
||||||
|
// 更新下行任务记录的确认时间及接收成功状态
|
||||||
|
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
|
||||||
|
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
|
||||||
|
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLogEvent 处理日志事件
|
// handleLogEvent 处理日志事件
|
||||||
|
|||||||
@@ -72,8 +72,11 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
// 初始化传感器数据仓库
|
// 初始化传感器数据仓库
|
||||||
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
|
sensorDataRepo := repository.NewGormSensorDataRepository(storage.GetDB())
|
||||||
|
|
||||||
|
// 初始化命令下发历史仓库
|
||||||
|
deviceCommandLogRepo := repository.NewGormDeviceCommandLogRepository(storage.GetDB())
|
||||||
|
|
||||||
// 初始化设备上行监听器
|
// 初始化设备上行监听器
|
||||||
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo)
|
listenHandler := transport.NewChirpStackListener(logger, sensorDataRepo, deviceRepo, deviceCommandLogRepo)
|
||||||
|
|
||||||
// 初始化计划触发器管理器
|
// 初始化计划触发器管理器
|
||||||
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
analysisPlanTaskManager := task.NewAnalysisPlanTaskManager(planRepo, pendingTaskRepo, executionLogRepo, logger)
|
||||||
|
|||||||
@@ -146,14 +146,14 @@ func (ps *PostgresStorage) creatingHyperTable() error {
|
|||||||
}
|
}
|
||||||
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
|
ps.logger.Info("成功将 sensor_data 转换为超表 (或已转换)")
|
||||||
|
|
||||||
// 将 downlink_task_records 转换为超表
|
// 将 device_command_log 转换为超表
|
||||||
// 'sent_at' 是 DownlinkTaskRecord 模型中定义的时间列
|
// 'sent_at' 是 DeviceCommandLog 模型中定义的时间列
|
||||||
sqlDownlinkTaskRecords := "SELECT create_hypertable('downlink_task_records', 'sent_at', if_not_exists => TRUE);"
|
sqlDeviceCommandLogs := "SELECT create_hypertable('device_command_log', 'sent_at', if_not_exists => TRUE);"
|
||||||
if err := ps.db.Exec(sqlDownlinkTaskRecords).Error; err != nil {
|
if err := ps.db.Exec(sqlDeviceCommandLogs).Error; err != nil {
|
||||||
ps.logger.Errorw("将 downlink_task_records 转换为超表失败", "error", err)
|
ps.logger.Errorw("将 device_command_log 转换为超表失败", "error", err)
|
||||||
return fmt.Errorf("将 downlink_task_records 转换为超表失败: %w", err)
|
return fmt.Errorf("将 device_command_log 转换为超表失败: %w", err)
|
||||||
}
|
}
|
||||||
ps.logger.Info("成功将 downlink_task_records 转换为超表 (或已转换)")
|
ps.logger.Info("成功将 device_command_log 转换为超表 (或已转换)")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,11 +4,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DownlinkTaskRecord 记录下行任务的下发情况和设备确认状态
|
// DeviceCommandLog 记录下行任务的下发情况和设备确认状态
|
||||||
type DownlinkTaskRecord struct {
|
type DeviceCommandLog struct {
|
||||||
// MessageID 是下行消息的唯一标识符。
|
// MessageID 是下行消息的唯一标识符。
|
||||||
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
|
// 可以是 ChirpStack 的 DeduplicationID 或其他系统生成的ID。
|
||||||
MessageID string `gorm:"uniqueIndex;not null" json:"message_id"`
|
MessageID string `gorm:"primaryKey" json:"message_id"`
|
||||||
|
|
||||||
// DeviceID 是接收此下行任务的设备的ID。
|
// DeviceID 是接收此下行任务的设备的ID。
|
||||||
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
// 对于 LoRaWAN,这通常是区域主控设备的ID。
|
||||||
@@ -20,9 +20,13 @@ type DownlinkTaskRecord struct {
|
|||||||
// AcknowledgedAt 记录设备确认收到下行消息的时间。
|
// AcknowledgedAt 记录设备确认收到下行消息的时间。
|
||||||
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
|
// 如果设备未确认,则为零值或 NULL。使用指针类型 *time.Time 允许 NULL 值。
|
||||||
AcknowledgedAt *time.Time `json:"acknowledged_at"`
|
AcknowledgedAt *time.Time `json:"acknowledged_at"`
|
||||||
|
|
||||||
|
// ReceivedSuccess 表示设备是否成功接收到下行消息。
|
||||||
|
// true 表示设备已确认收到,false 表示设备未确认收到或下发失败。
|
||||||
|
ReceivedSuccess bool `gorm:"not null" json:"received_success"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TableName 自定义 GORM 使用的数据库表名
|
// TableName 自定义 GORM 使用的数据库表名
|
||||||
func (DownlinkTaskRecord) TableName() string {
|
func (DeviceCommandLog) TableName() string {
|
||||||
return "downlink_task_records"
|
return "device_command_log"
|
||||||
}
|
}
|
||||||
@@ -13,6 +13,6 @@ func GetAllModels() []interface{} {
|
|||||||
&TaskExecutionLog{},
|
&TaskExecutionLog{},
|
||||||
&PendingTask{},
|
&PendingTask{},
|
||||||
&SensorData{},
|
&SensorData{},
|
||||||
&DownlinkTaskRecord{},
|
&DeviceCommandLog{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
52
internal/infra/repository/device_command_log_repository.go
Normal file
52
internal/infra/repository/device_command_log_repository.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeviceCommandLogRepository 定义了设备下行命令历史记录的数据访问接口
|
||||||
|
type DeviceCommandLogRepository interface {
|
||||||
|
Create(record *models.DeviceCommandLog) error
|
||||||
|
FindByMessageID(messageID string) (*models.DeviceCommandLog, error)
|
||||||
|
// UpdateAcknowledgedAt 用于更新指定 MessageID 的下行命令记录的确认时间及接收成功状态。
|
||||||
|
// AcknowledgedAt 和 ReceivedSuccess 字段会被更新。
|
||||||
|
UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// gormDeviceCommandLogRepository 是 DeviceCommandLogRepository 接口的 GORM 实现
|
||||||
|
type gormDeviceCommandLogRepository struct {
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGormDeviceCommandLogRepository 创建一个新的 DeviceCommandLogRepository GORM 实现
|
||||||
|
func NewGormDeviceCommandLogRepository(db *gorm.DB) DeviceCommandLogRepository {
|
||||||
|
return &gormDeviceCommandLogRepository{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create 实现 DeviceCommandLogRepository 接口的 Create 方法
|
||||||
|
func (r *gormDeviceCommandLogRepository) Create(record *models.DeviceCommandLog) error {
|
||||||
|
return r.db.Create(record).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindByMessageID 实现 DeviceCommandLogRepository 接口的 FindByMessageID 方法
|
||||||
|
func (r *gormDeviceCommandLogRepository) FindByMessageID(messageID string) (*models.DeviceCommandLog, error) {
|
||||||
|
var record models.DeviceCommandLog
|
||||||
|
if err := r.db.Where("message_id = ?", messageID).First(&record).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &record, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAcknowledgedAt 实现 DeviceCommandLogRepository 接口的 UpdateAcknowledgedAt 方法
|
||||||
|
func (r *gormDeviceCommandLogRepository) UpdateAcknowledgedAt(messageID string, acknowledgedAt time.Time, receivedSuccess bool) error {
|
||||||
|
// 使用 Updates 方法更新指定字段
|
||||||
|
return r.db.Model(&models.DeviceCommandLog{}).
|
||||||
|
Where("message_id = ?", messageID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"acknowledged_at": acknowledgedAt,
|
||||||
|
"received_success": receivedSuccess,
|
||||||
|
}).Error
|
||||||
|
}
|
||||||
@@ -4,6 +4,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora/chirp_stack_proto/client/device_service"
|
||||||
"github.com/go-openapi/runtime"
|
"github.com/go-openapi/runtime"
|
||||||
httptransport "github.com/go-openapi/runtime/client"
|
httptransport "github.com/go-openapi/runtime/client"
|
||||||
@@ -33,11 +35,19 @@ type ChirpStackTransport struct {
|
|||||||
authInfo runtime.ClientAuthInfoWriter
|
authInfo runtime.ClientAuthInfoWriter
|
||||||
config ChirpStackConfig
|
config ChirpStackConfig
|
||||||
|
|
||||||
|
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||||
|
deviceRepo repository.DeviceRepository
|
||||||
|
|
||||||
logger *logs.Logger
|
logger *logs.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
|
// NewChirpStackTransport 创建一个新的通信实例,用于与 ChirpStack 通信。
|
||||||
func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *ChirpStackTransport {
|
func NewChirpStackTransport(
|
||||||
|
config ChirpStackConfig,
|
||||||
|
logger *logs.Logger,
|
||||||
|
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||||
|
deviceRepo repository.DeviceRepository,
|
||||||
|
) *ChirpStackTransport {
|
||||||
// 使用配置中的服务器地址创建一个 HTTP transport。
|
// 使用配置中的服务器地址创建一个 HTTP transport。
|
||||||
// 它会使用生成的客户端中定义的默认 base path 和 schemes。
|
// 它会使用生成的客户端中定义的默认 base path 和 schemes。
|
||||||
transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes)
|
transport := httptransport.New(config.ServerAddress, client.DefaultBasePath, client.DefaultSchemes)
|
||||||
@@ -53,6 +63,8 @@ func NewChirpStackTransport(config ChirpStackConfig, logger *logs.Logger) *Chirp
|
|||||||
authInfo: authInfo,
|
authInfo: authInfo,
|
||||||
config: config,
|
config: config,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||||
|
deviceRepo: deviceRepo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,12 +92,56 @@ func (c *ChirpStackTransport) Send(address string, payload []byte) error {
|
|||||||
|
|
||||||
// 3. 调用生成的客户端方法来发送请求。
|
// 3. 调用生成的客户端方法来发送请求。
|
||||||
// c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。
|
// c.authInfo 是您在 NewChirpStackTransport 中创建的认证信息。
|
||||||
_, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo)
|
resp, err := c.client.DeviceService.DeviceServiceEnqueue(params, c.authInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err)
|
c.logger.Errorf("设备 %s 调用ChirpStack Enqueue失败: %v", address, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功", address)
|
// 4. 成功发送后,尝试记录下行任务
|
||||||
|
messageID := ""
|
||||||
|
if resp != nil && resp.Payload != nil && resp.Payload.ID != "" { // 根据实际结构,使用 resp.Payload.ID
|
||||||
|
messageID = resp.Payload.ID
|
||||||
|
} else {
|
||||||
|
c.logger.Warnf("ChirpStack Enqueue 响应未包含 MessageID (ID),无法记录下行任务。设备: %s", address)
|
||||||
|
// 即使无法获取 MessageID,也认为发送成功,因为 ChirpStack Enqueue 成功了
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 调用私有方法记录下行任务
|
||||||
|
if err := c.recordDownlinkTask(address, messageID); err != nil {
|
||||||
|
// 记录失败不影响下行命令的发送成功
|
||||||
|
c.logger.Errorf("记录下行任务失败 (MessageID: %s, DevEui: %s): %v", messageID, address, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Infof("设备 %s 调用ChirpStack Enqueue成功,并创建下行任务记录 (MessageID: %s)", address, messageID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordDownlinkTask 记录下行任务到数据库
|
||||||
|
func (c *ChirpStackTransport) recordDownlinkTask(devEui string, messageID string) error {
|
||||||
|
// 获取区域主控的内部 DeviceID
|
||||||
|
regionalController, err := c.deviceRepo.FindByDevEui(devEui)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("记录下行任务失败:无法通过 DevEui '%s' 找到区域主控设备: %v", devEui, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建 DeviceCommandLog
|
||||||
|
record := &models.DeviceCommandLog{
|
||||||
|
MessageID: messageID,
|
||||||
|
DeviceID: regionalController.ID,
|
||||||
|
SentAt: time.Now(),
|
||||||
|
AcknowledgedAt: nil, // 初始状态为未确认
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.deviceCommandLogRepo.Create(record); err != nil {
|
||||||
|
c.logger.Errorf("创建下行任务记录失败 (MessageID: %s, DeviceID: %d): %v", messageID, regionalController.ID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.logger.Infof("成功创建下行任务记录 (MessageID: %s, DeviceID: %d)", messageID, regionalController.ID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user