diff --git a/internal/app/service/transport/chirp_stack.go b/internal/app/service/transport/chirp_stack.go index 9828313..9bfb555 100644 --- a/internal/app/service/transport/chirp_stack.go +++ b/internal/app/service/transport/chirp_stack.go @@ -1,12 +1,25 @@ package transport import ( + "encoding/json" "io" "net/http" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" ) +// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型 +const ( + eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。 + eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。 + eventJoin = "join" // 入网事件:当设备成功加入网络时触发。 + eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。 + eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。 + eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。 + eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。 + eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。 +) + // ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件 type ChirpStackListener struct { logger *logs.Logger @@ -18,35 +31,145 @@ func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener { } } +// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息 func (c *ChirpStackListener) Handler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + b, err := io.ReadAll(r.Body) if err != nil { c.logger.Errorf("读取请求体失败: %v", err) - - // TODO 直接崩溃不太合适 - panic(err) + http.Error(w, "failed to read body", http.StatusBadRequest) + return } event := r.URL.Query().Get("event") - switch event { - case "up": // 链路上行事件 - err = c.up(b) - if err != nil { - c.logger.Errorf("处理链路上行事件失败: %v", err) + w.WriteHeader(http.StatusOK) - // TODO 直接崩溃不太合适 - panic(err) - } - default: - c.logger.Errorf("未知的ChirpStack事件: %s", event) - } + // 将异步处理逻辑委托给 handler 方法 + go c.handler(b, event) } } -// up 处理链路上行事件 -func (c *ChirpStackListener) up(data []byte) error { - // TODO implement me - panic("implement me") +// handler 用于处理 ChirpStack 发送的事件 +func (c *ChirpStackListener) handler(data []byte, eventType string) { + switch eventType { + case eventUp: + var msg UpEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleUpEvent(&msg) + + case eventJoin: + var msg JoinEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleJoinEvent(&msg) + + case eventAck: + var msg AckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleAckEvent(&msg) + + case eventTxAck: + var msg TxAckEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleTxAckEvent(&msg) + + case eventStatus: + var msg StatusEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleStatusEvent(&msg) + + case eventLog: + var msg LogEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLogEvent(&msg) + + case eventLocation: + var msg LocationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleLocationEvent(&msg) + + case eventIntegration: + var msg IntegrationEvent + if err := json.Unmarshal(data, &msg); err != nil { + c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data)) + return + } + c.handleIntegrationEvent(&msg) + + default: + c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data)) + } +} + +// --- 业务处理函数 --- + +// handleUpEvent 处理上行数据事件 +func (c *ChirpStackListener) handleUpEvent(event *UpEvent) { + c.logger.Infof("处理 'up' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleStatusEvent 处理设备状态事件 +func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) { + c.logger.Infof("处接收到理 'status' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleAckEvent 处理下行确认事件 +func (c *ChirpStackListener) handleAckEvent(event *AckEvent) { + c.logger.Infof("接收到 'ack' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleLogEvent 处理日志事件 +func (c *ChirpStackListener) handleLogEvent(event *LogEvent) { + c.logger.Infof("接收到 'log' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleJoinEvent 处理入网事件 +func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) { + c.logger.Infof("接收到 'join' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleTxAckEvent 处理网关发送确认事件 +func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) { + c.logger.Infof("接收到 'txack' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleLocationEvent 处理位置事件 +func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) { + c.logger.Infof("接收到 'location' 事件: %+v", event) + // 在这里添加您的业务逻辑 +} + +// handleIntegrationEvent 处理集成事件 +func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) { + c.logger.Infof("接收到 'integration' 事件: %+v", event) + // 在这里添加您的业务逻辑 } diff --git a/internal/app/service/transport/chirp_stack_types.go b/internal/app/service/transport/chirp_stack_types.go new file mode 100644 index 0000000..4de752a --- /dev/null +++ b/internal/app/service/transport/chirp_stack_types.go @@ -0,0 +1,165 @@ +package transport + +import ( + "encoding/json" + "time" +) + +// --- 通用结构体 --- + +// DeviceInfo 包含了所有事件中通用的设备信息。 +type DeviceInfo struct { + TenantID string `json:"tenantId"` + TenantName string `json:"tenantName"` + ApplicationID string `json:"applicationId"` + ApplicationName string `json:"applicationName"` + DeviceProfileID string `json:"deviceProfileId"` + DeviceProfileName string `json:"deviceProfileName"` + DeviceName string `json:"deviceName"` + DevEui string `json:"devEui"` + Tags map[string]string `json:"tags"` +} + +// Location 包含了地理位置信息。 +type Location struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` +} + +// --- 可复用的子结构体 --- + +// UplinkRxInfo 包含了上行接收信息。 +type UplinkRxInfo struct { + GatewayID string `json:"gatewayId"` + UplinkID uint32 `json:"uplinkId"` + Time time.Time `json:"time"` + Rssi int `json:"rssi"` + Snr float64 `json:"snr"` + Channel int `json:"channel"` + Location *Location `json:"location"` + Context string `json:"context"` + Metadata map[string]string `json:"metadata"` +} + +// LoraModulationInfo 包含了 LoRa 调制的具体参数。 +type LoraModulationInfo struct { + Bandwidth int `json:"bandwidth"` + SpreadingFactor int `json:"spreadingFactor"` + CodeRate string `json:"codeRate"` + Polarization bool `json:"polarizationInvert,omitempty"` // omitempty 因为只在下行中出现 +} + +// Modulation 包含了具体的调制信息。 +type Modulation struct { + Lora LoraModulationInfo `json:"lora"` +} + +// UplinkTxInfo 包含了上行发送信息。 +type UplinkTxInfo struct { + Frequency int `json:"frequency"` + Modulation Modulation `json:"modulation"` +} + +// DownlinkTxInfo 包含了下行发送信息。 +type DownlinkTxInfo struct { + Frequency int `json:"frequency"` + Power int `json:"power"` + Modulation Modulation `json:"modulation"` +} + +// ResolvedLocation 包含了地理位置解析结果。 +type ResolvedLocation struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` + Source string `json:"source"` // e.g. "GEO_RESOLVER_TDOA" + Accuracy int `json:"accuracy"` +} + +// --- 事件专属结构体 --- + +// UpEvent 对应 ChirpStack 的 "up" 事件。 +type UpEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` + ADR bool `json:"adr"` + DR int `json:"dr"` + FCnt uint32 `json:"fCnt"` + FPort uint8 `json:"fPort"` + Confirmed bool `json:"confirmed"` + Data string `json:"data"` // Base64 编码的原始数据 + Object json.RawMessage `json:"object"` // Codec 解码后的 JSON 对象 + RxInfo []UplinkRxInfo `json:"rxInfo"` + TxInfo UplinkTxInfo `json:"txInfo"` +} + +// JoinEvent 对应 ChirpStack 的 "join" 事件。 +type JoinEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + DevAddr string `json:"devAddr"` +} + +// AckEvent 对应 ChirpStack 的 "ack" 事件。 +type AckEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Acknowledged bool `json:"acknowledged"` + FCntDown uint32 `json:"fCntDown"` + QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 +} + +// TxAckEvent 对应 ChirpStack 的 "txack" 事件。 +type TxAckEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + FCntDown uint32 `json:"fCntDown"` + GatewayID string `json:"gatewayId"` + QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令 + TxInfo DownlinkTxInfo `json:"txInfo"` +} + +// StatusEvent 对应 ChirpStack 的 "status" 事件。 +type StatusEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Margin int `json:"margin"` // 信号余量,可以近似看作 SNR + ExternalPower bool `json:"externalPowerSource"` + BatteryLevel float32 `json:"batteryLevel"` // 电池电量百分比 + BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"` +} + +// LogEvent 对应 ChirpStack 的 "log" 事件。 +type LogEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Level string `json:"level"` // 日志级别, e.g., "INFO", "WARNING", "ERROR" + Code string `json:"code"` // 日志代码, e.g., "UPLINK_F_CNT_RETRANSMISSION" + Description string `json:"description"` + Context map[string]string `json:"context"` +} + +// LocationEvent 对应 ChirpStack 的 "location" 事件。 +type LocationEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + Location ResolvedLocation `json:"location"` +} + +// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。 +type IntegrationEvent struct { + DeduplicationID string `json:"deduplicationId"` + Time time.Time `json:"time"` + DeviceInfo DeviceInfo `json:"deviceInfo"` + IntegrationName string `json:"integrationName"` + Object json.RawMessage `json:"object"` +}