issue_56 #58
@@ -1,6 +1,7 @@
|
|||||||
package webhook
|
package webhook
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
@@ -30,7 +31,7 @@ const (
|
|||||||
|
|
||||||
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
||||||
type ChirpStackListener struct {
|
type ChirpStackListener struct {
|
||||||
logger *logs.Logger
|
ctx context.Context
|
||||||
sensorDataRepo repository.SensorDataRepository
|
sensorDataRepo repository.SensorDataRepository
|
||||||
deviceRepo repository.DeviceRepository
|
deviceRepo repository.DeviceRepository
|
||||||
areaControllerRepo repository.AreaControllerRepository
|
areaControllerRepo repository.AreaControllerRepository
|
||||||
@@ -40,15 +41,15 @@ type ChirpStackListener struct {
|
|||||||
|
|
||||||
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
|
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
|
||||||
func NewChirpStackListener(
|
func NewChirpStackListener(
|
||||||
logger *logs.Logger,
|
ctx context.Context,
|
||||||
sensorDataRepo repository.SensorDataRepository,
|
sensorDataRepo repository.SensorDataRepository,
|
||||||
deviceRepo repository.DeviceRepository,
|
deviceRepo repository.DeviceRepository,
|
||||||
areaControllerRepo repository.AreaControllerRepository,
|
areaControllerRepo repository.AreaControllerRepository,
|
||||||
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||||
pendingCollectionRepo repository.PendingCollectionRepository,
|
pendingCollectionRepo repository.PendingCollectionRepository,
|
||||||
) ListenHandler { // 返回接口类型
|
) ListenHandler {
|
||||||
return &ChirpStackListener{
|
return &ChirpStackListener{
|
||||||
logger: logger,
|
ctx: ctx,
|
||||||
sensorDataRepo: sensorDataRepo,
|
sensorDataRepo: sensorDataRepo,
|
||||||
deviceRepo: deviceRepo,
|
deviceRepo: deviceRepo,
|
||||||
areaControllerRepo: areaControllerRepo,
|
areaControllerRepo: areaControllerRepo,
|
||||||
@@ -60,11 +61,13 @@ func NewChirpStackListener(
|
|||||||
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
|
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
|
||||||
func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx, logger := logs.Trace(r.Context(), c.ctx, "ChirpStackListener")
|
||||||
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
|
|
||||||
b, err := io.ReadAll(r.Body)
|
b, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("读取请求体失败: %v", err)
|
logger.Errorf("读取请求体失败: %v", err)
|
||||||
http.Error(w, "failed to read body", http.StatusBadRequest)
|
http.Error(w, "failed to read body", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -74,100 +77,102 @@ func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
// 将异步处理逻辑委托给 handler 方法
|
// 将异步处理逻辑委托给 handler 方法
|
||||||
go c.handler(b, event)
|
go c.handler(ctx, b, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handler 用于处理 ChirpStack 发送的事件
|
// handler 用于处理 ChirpStack 发送的事件
|
||||||
func (c *ChirpStackListener) handler(data []byte, eventType string) {
|
func (c *ChirpStackListener) handler(ctx context.Context, data []byte, eventType string) {
|
||||||
|
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handler")
|
||||||
switch eventType {
|
switch eventType {
|
||||||
case eventTypeUp:
|
case eventTypeUp:
|
||||||
var msg UpEvent
|
var msg UpEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleUpEvent(&msg)
|
c.handleUpEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeJoin:
|
case eventTypeJoin:
|
||||||
var msg JoinEvent
|
var msg JoinEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleJoinEvent(&msg)
|
c.handleJoinEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeAck:
|
case eventTypeAck:
|
||||||
var msg AckEvent
|
var msg AckEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleAckEvent(&msg)
|
c.handleAckEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeTxAck:
|
case eventTypeTxAck:
|
||||||
var msg TxAckEvent
|
var msg TxAckEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleTxAckEvent(&msg)
|
c.handleTxAckEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeStatus:
|
case eventTypeStatus:
|
||||||
var msg StatusEvent
|
var msg StatusEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleStatusEvent(&msg)
|
c.handleStatusEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeLog:
|
case eventTypeLog:
|
||||||
var msg LogEvent
|
var msg LogEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleLogEvent(&msg)
|
c.handleLogEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeLocation:
|
case eventTypeLocation:
|
||||||
var msg LocationEvent
|
var msg LocationEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleLocationEvent(&msg)
|
c.handleLocationEvent(reqCtx, &msg)
|
||||||
|
|
||||||
case eventTypeIntegration:
|
case eventTypeIntegration:
|
||||||
var msg IntegrationEvent
|
var msg IntegrationEvent
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
|
logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.handleIntegrationEvent(&msg)
|
c.handleIntegrationEvent(reqCtx, &msg)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
|
logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- 业务处理函数 ---
|
// --- 业务处理函数 ---
|
||||||
|
|
||||||
// handleUpEvent 处理上行数据事件
|
// handleUpEvent 处理上行数据事件
|
||||||
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
func (c *ChirpStackListener) handleUpEvent(ctx context.Context, event *UpEvent) {
|
||||||
c.logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
|
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handleUpEvent")
|
||||||
|
logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
|
|
||||||
// 1. 查找区域主控设备
|
// 1. 查找区域主控设备
|
||||||
regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui)
|
regionalController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 依赖 SelfCheck 确保区域主控有效
|
// 依赖 SelfCheck 确保区域主控有效
|
||||||
if err := regionalController.SelfCheck(); err != nil {
|
if err := regionalController.SelfCheck(); err != nil {
|
||||||
c.logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err)
|
logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", regionalController.Name, regionalController.ID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID)
|
logger.Infof("找到区域主控: %s (ID: %d)", regionalController.Name, regionalController.ID)
|
||||||
|
|
||||||
// 2. 记录区域主控的信号强度 (如果存在)
|
// 2. 记录区域主控的信号强度 (如果存在)
|
||||||
if len(event.RxInfo) > 0 {
|
if len(event.RxInfo) > 0 {
|
||||||
@@ -182,29 +187,29 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 记录信号强度
|
// 记录信号强度
|
||||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
c.recordSensorData(reqCtx, regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
||||||
c.logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr)
|
logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", regionalController.ID, rx.Rssi, rx.Snr)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 处理上报的传感器数据
|
// 3. 处理上报的传感器数据
|
||||||
if event.Data == "" {
|
if event.Data == "" {
|
||||||
c.logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui)
|
logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.1 Base64 解码
|
// 3.1 Base64 解码
|
||||||
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.2 解析外层 "信封"
|
// 3.2 解析外层 "信封"
|
||||||
var instruction proto.Instruction
|
var instruction proto.Instruction
|
||||||
if err := gproto.Unmarshal(decodedData, &instruction); err != nil {
|
if err := gproto.Unmarshal(decodedData, &instruction); err != nil {
|
||||||
c.logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData)
|
logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,29 +220,29 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
collectResp = p.CollectResult
|
collectResp = p.CollectResult
|
||||||
default:
|
default:
|
||||||
// 如果上行的数据不是采集结果,记录日志并忽略
|
// 如果上行的数据不是采集结果,记录日志并忽略
|
||||||
c.logger.Infof("收到一个非采集响应的上行指令 (Type: %T),无需处理。", p)
|
logger.Infof("收到一个非采集响应的上行指令 (Type: %T),无需处理。", p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查 collectResp 是否为 nil,虽然在 type switch 成功的情况下不太可能
|
// 检查 collectResp 是否为 nil,虽然在 type switch 成功的情况下不太可能
|
||||||
if collectResp == nil {
|
if collectResp == nil {
|
||||||
c.logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil")
|
logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
correlationID := collectResp.CorrelationId
|
correlationID := collectResp.CorrelationId
|
||||||
c.logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
|
logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
|
||||||
|
|
||||||
// 4. 根据 CorrelationID 查找待处理请求
|
// 4. 根据 CorrelationID 查找待处理请求
|
||||||
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(correlationID)
|
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(reqCtx, correlationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
|
logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查状态,防止重复处理
|
// 检查状态,防止重复处理
|
||||||
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
||||||
c.logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
|
logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -245,11 +250,11 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
deviceIDs := pendingReq.CommandMetadata
|
deviceIDs := pendingReq.CommandMetadata
|
||||||
values := collectResp.Values
|
values := collectResp.Values
|
||||||
if len(deviceIDs) != len(values) {
|
if len(deviceIDs) != len(values) {
|
||||||
c.logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
|
logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
|
||||||
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
|
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
|
||||||
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time)
|
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
|
logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -259,35 +264,35 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
|
|
||||||
// 检查设备上报的值是否为 NaN (Not a Number),如果是则跳过
|
// 检查设备上报的值是否为 NaN (Not a Number),如果是则跳过
|
||||||
if math.IsNaN(float64(rawSensorValue)) {
|
if math.IsNaN(float64(rawSensorValue)) {
|
||||||
c.logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
|
logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5.1 获取设备及其模板
|
// 5.1 获取设备及其模板
|
||||||
dev, err := c.deviceRepo.FindByID(deviceID)
|
dev, err := c.deviceRepo.FindByID(reqCtx, deviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
|
logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 依赖 SelfCheck 确保设备和模板有效
|
// 依赖 SelfCheck 确保设备和模板有效
|
||||||
if err := dev.SelfCheck(); err != nil {
|
if err := dev.SelfCheck(); err != nil {
|
||||||
c.logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
|
logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
|
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
|
||||||
c.logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
|
logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5.2 从设备模板中解析 ValueDescriptor
|
// 5.2 从设备模板中解析 ValueDescriptor
|
||||||
var valueDescriptors []*models.ValueDescriptor
|
var valueDescriptors []*models.ValueDescriptor
|
||||||
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
|
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
|
||||||
c.logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
|
logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 根据 DeviceTemplate.SelfCheck,这里应该只有一个 ValueDescriptor
|
// 根据 DeviceTemplate.SelfCheck,这里应该只有一个 ValueDescriptor
|
||||||
if len(valueDescriptors) == 0 {
|
if len(valueDescriptors) == 0 {
|
||||||
c.logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID)
|
logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
valueDescriptor := valueDescriptors[0]
|
valueDescriptor := valueDescriptors[0]
|
||||||
@@ -306,31 +311,32 @@ func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
|
|||||||
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
|
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
|
||||||
default:
|
default:
|
||||||
// TODO 未知传感器的数据需要记录吗
|
// TODO 未知传感器的数据需要记录吗
|
||||||
c.logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
|
logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
|
||||||
dataToRecord = map[string]float64{"value": parsedValue}
|
dataToRecord = map[string]float64{"value": parsedValue}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5.5 记录传感器数据
|
// 5.5 记录传感器数据
|
||||||
c.recordSensorData(regionalController.ID, dev.ID, event.Time, valueDescriptor.Type, dataToRecord)
|
c.recordSensorData(reqCtx, regionalController.ID, dev.ID, event.Time, valueDescriptor.Type, dataToRecord)
|
||||||
c.logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
|
logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. 更新请求状态为“已完成”
|
// 6. 更新请求状态为“已完成”
|
||||||
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(correlationID, event.Time); err != nil {
|
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time); err != nil {
|
||||||
c.logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
|
logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
|
logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleStatusEvent 处理设备状态事件
|
// handleStatusEvent 处理设备状态事件
|
||||||
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
|
func (c *ChirpStackListener) handleStatusEvent(ctx context.Context, event *StatusEvent) {
|
||||||
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
|
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleStatusEvent")
|
||||||
|
logger.Infof("处接收到理 'status' 事件: %+v", event)
|
||||||
|
|
||||||
// 查找区域主控设备
|
// 查找区域主控设备
|
||||||
regionalController, err := c.areaControllerRepo.FindByNetworkID(event.DeviceInfo.DevEui)
|
regionalController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,8 +344,8 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
|
|||||||
signalMetrics := models.SignalMetrics{
|
signalMetrics := models.SignalMetrics{
|
||||||
MarginDb: event.Margin,
|
MarginDb: event.Margin,
|
||||||
}
|
}
|
||||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
c.recordSensorData(reqCtx, regionalController.ID, regionalController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
||||||
c.logger.Infof("已记录区域主控 (ID: %d) 的信号状态: %+v", regionalController.ID, signalMetrics)
|
logger.Infof("已记录区域主控 (ID: %d) 的信号状态: %+v", regionalController.ID, signalMetrics)
|
||||||
|
|
||||||
// 记录电量
|
// 记录电量
|
||||||
batteryLevel := models.BatteryLevel{
|
batteryLevel := models.BatteryLevel{
|
||||||
@@ -347,68 +353,74 @@ func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
|
|||||||
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
|
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
|
||||||
ExternalPower: event.ExternalPower,
|
ExternalPower: event.ExternalPower,
|
||||||
}
|
}
|
||||||
c.recordSensorData(regionalController.ID, regionalController.ID, event.Time, models.SensorTypeBatteryLevel, batteryLevel)
|
c.recordSensorData(reqCtx, regionalController.ID, regionalController.ID, event.Time, models.SensorTypeBatteryLevel, batteryLevel)
|
||||||
c.logger.Infof("已记录区域主控 (ID: %d) 的电池状态: %+v", regionalController.ID, batteryLevel)
|
logger.Infof("已记录区域主控 (ID: %d) 的电池状态: %+v", regionalController.ID, batteryLevel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleAckEvent 处理下行确认事件
|
// handleAckEvent 处理下行确认事件
|
||||||
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
|
func (c *ChirpStackListener) handleAckEvent(ctx context.Context, event *AckEvent) {
|
||||||
c.logger.Infof("接收到 'ack' 事件: %+v", event)
|
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleAckEvent")
|
||||||
|
logger.Infof("接收到 'ack' 事件: %+v", event)
|
||||||
|
|
||||||
// 更新下行任务记录的确认时间及接收成功状态
|
// 更新下行任务记录的确认时间及接收成功状态
|
||||||
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(event.DeduplicationID, event.Time, event.Acknowledged)
|
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(reqCtx, event.DeduplicationID, event.Time, event.Acknowledged)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
|
logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
|
||||||
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
|
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
|
logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
|
||||||
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
|
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLogEvent 处理日志事件
|
// handleLogEvent 处理日志事件
|
||||||
func (c *ChirpStackListener) handleLogEvent(event *LogEvent) {
|
func (c *ChirpStackListener) handleLogEvent(ctx context.Context, event *LogEvent) {
|
||||||
|
logger := logs.TraceLogger(ctx, c.ctx, "handleLogEvent")
|
||||||
// 首先,打印完整的事件结构体,用于详细排查
|
// 首先,打印完整的事件结构体,用于详细排查
|
||||||
c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
|
logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
|
||||||
|
|
||||||
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
|
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
|
||||||
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
|
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
|
||||||
switch event.Level {
|
switch event.Level {
|
||||||
case "INFO":
|
case "INFO":
|
||||||
c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||||
case "WARNING":
|
case "WARNING":
|
||||||
c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||||
case "ERROR":
|
case "ERROR":
|
||||||
c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||||
default:
|
default:
|
||||||
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
|
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
|
||||||
c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
|
logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
|
||||||
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
|
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleJoinEvent 处理入网事件
|
// handleJoinEvent 处理入网事件
|
||||||
func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) {
|
func (c *ChirpStackListener) handleJoinEvent(ctx context.Context, event *JoinEvent) {
|
||||||
c.logger.Infof("接收到 'join' 事件: %+v", event)
|
logger := logs.TraceLogger(ctx, c.ctx, "handleJoinEvent")
|
||||||
|
logger.Infof("接收到 'join' 事件: %+v", event)
|
||||||
// 在这里添加您的业务逻辑
|
// 在这里添加您的业务逻辑
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleTxAckEvent 处理网关发送确认事件
|
// handleTxAckEvent 处理网关发送确认事件
|
||||||
func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) {
|
func (c *ChirpStackListener) handleTxAckEvent(ctx context.Context, event *TxAckEvent) {
|
||||||
c.logger.Infof("接收到 'txack' 事件: %+v", event)
|
logger := logs.TraceLogger(ctx, c.ctx, "handleTxAckEvent")
|
||||||
|
logger.Infof("接收到 'txack' 事件: %+v", event)
|
||||||
// 在这里添加您的业务逻辑
|
// 在这里添加您的业务逻辑
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLocationEvent 处理位置事件
|
// handleLocationEvent 处理位置事件
|
||||||
func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) {
|
func (c *ChirpStackListener) handleLocationEvent(ctx context.Context, event *LocationEvent) {
|
||||||
c.logger.Infof("接收到 'location' 事件: %+v", event)
|
logger := logs.TraceLogger(ctx, c.ctx, "handleLocationEvent")
|
||||||
|
logger.Infof("接收到 'location' 事件: %+v", event)
|
||||||
// 在这里添加您的业务逻辑
|
// 在这里添加您的业务逻辑
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleIntegrationEvent 处理集成事件
|
// handleIntegrationEvent 处理集成事件
|
||||||
func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
|
func (c *ChirpStackListener) handleIntegrationEvent(ctx context.Context, event *IntegrationEvent) {
|
||||||
c.logger.Infof("接收到 'integration' 事件: %+v", event)
|
logger := logs.TraceLogger(ctx, c.ctx, "handleIntegrationEvent")
|
||||||
|
logger.Infof("接收到 'integration' 事件: %+v", event)
|
||||||
// 在这里添加您的业务逻辑
|
// 在这里添加您的业务逻辑
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,11 +429,12 @@ func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
|
|||||||
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
|
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
|
||||||
// sensorType: 传感器值的类型 (例如 models.SensorTypeTemperature)
|
// sensorType: 传感器值的类型 (例如 models.SensorTypeTemperature)
|
||||||
// data: 具体的传感器数据结构体实例 (例如 models.TemperatureData)
|
// data: 具体的传感器数据结构体实例 (例如 models.TemperatureData)
|
||||||
func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) {
|
func (c *ChirpStackListener) recordSensorData(ctx context.Context, regionalControllerID uint, sensorDeviceID uint, eventTime time.Time, sensorType models.SensorType, data interface{}) {
|
||||||
|
reqCtx, logger := logs.Trace(ctx, c.ctx, "recordSensorData")
|
||||||
// 1. 将传入的结构体序列化为 JSON
|
// 1. 将传入的结构体序列化为 JSON
|
||||||
jsonData, err := json.Marshal(data)
|
jsonData, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
|
logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -435,7 +448,7 @@ func (c *ChirpStackListener) recordSensorData(regionalControllerID uint, sensorD
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 3. 调用仓库创建记录
|
// 3. 调用仓库创建记录
|
||||||
if err := c.sensorDataRepo.Create(sensorData); err != nil {
|
if err := c.sensorDataRepo.Create(reqCtx, sensorData); err != nil {
|
||||||
c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
|
logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package webhook
|
package webhook
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
@@ -8,15 +9,14 @@ import (
|
|||||||
|
|
||||||
// PlaceholderListener 是一个占位符, 用于在非 LoRaWAN 配置下满足 ListenHandler 接口
|
// PlaceholderListener 是一个占位符, 用于在非 LoRaWAN 配置下满足 ListenHandler 接口
|
||||||
type PlaceholderListener struct {
|
type PlaceholderListener struct {
|
||||||
logger *logs.Logger
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPlaceholderListener 创建一个新的 PlaceholderListener 实例
|
// NewPlaceholderListener 创建一个新的 PlaceholderListener 实例
|
||||||
// 它只打印一条日志, 表明 ChirpStack webhook 未被激活
|
// 它只打印一条日志, 表明 ChirpStack webhook 未被激活
|
||||||
func NewPlaceholderListener(logger *logs.Logger) ListenHandler {
|
func NewPlaceholderListener(ctx context.Context) ListenHandler {
|
||||||
logger.Info("当前配置非 LoRaWAN, ChirpStack webhook 监听器未激活。")
|
|
||||||
return &PlaceholderListener{
|
return &PlaceholderListener{
|
||||||
logger: logger,
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -24,7 +24,8 @@ func NewPlaceholderListener(logger *logs.Logger) ListenHandler {
|
|||||||
// 理论上, 在占位符生效的模式下, 这个 Handler 不应该被调用
|
// 理论上, 在占位符生效的模式下, 这个 Handler 不应该被调用
|
||||||
func (p *PlaceholderListener) Handler() http.HandlerFunc {
|
func (p *PlaceholderListener) Handler() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
p.logger.Warn("PlaceholderListener 的 Handler 被调用, 这通常是意料之外的。")
|
logger := logs.TraceLogger(r.Context(), p.ctx, "PlaceholderListener")
|
||||||
|
logger.Warn("PlaceholderListener 的 Handler 被调用, 这通常是意料之外的。")
|
||||||
w.WriteHeader(http.StatusNotImplemented)
|
w.WriteHeader(http.StatusNotImplemented)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user