定义事件结构体和接收器

This commit is contained in:
2025-09-24 19:13:15 +08:00
parent 3a030f5bca
commit 17e2c6471a
2 changed files with 306 additions and 18 deletions

View File

@@ -1,12 +1,25 @@
package transport
import (
"encoding/json"
"io"
"net/http"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
)
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
const (
eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
eventJoin = "join" // 入网事件:当设备成功加入网络时触发。
eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
)
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
type ChirpStackListener struct {
logger *logs.Logger
@@ -18,35 +31,145 @@ func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener {
}
}
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
func (c *ChirpStackListener) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
c.logger.Errorf("读取请求体失败: %v", err)
// TODO 直接崩溃不太合适
panic(err)
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
event := r.URL.Query().Get("event")
switch event {
case "up": // 链路上行事件
err = c.up(b)
if err != nil {
c.logger.Errorf("处理链路上行事件失败: %v", err)
w.WriteHeader(http.StatusOK)
// TODO 直接崩溃不太合适
panic(err)
}
default:
c.logger.Errorf("未知的ChirpStack事件: %s", event)
}
// 将异步处理逻辑委托给 handler 方法
go c.handler(b, event)
}
}
// up 处理链路上行事件
func (c *ChirpStackListener) up(data []byte) error {
// TODO implement me
panic("implement me")
// handler 用于处理 ChirpStack 发送的事件
func (c *ChirpStackListener) handler(data []byte, eventType string) {
switch eventType {
case eventUp:
var msg UpEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleUpEvent(&msg)
case eventJoin:
var msg JoinEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleJoinEvent(&msg)
case eventAck:
var msg AckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleAckEvent(&msg)
case eventTxAck:
var msg TxAckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleTxAckEvent(&msg)
case eventStatus:
var msg StatusEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleStatusEvent(&msg)
case eventLog:
var msg LogEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLogEvent(&msg)
case eventLocation:
var msg LocationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLocationEvent(&msg)
case eventIntegration:
var msg IntegrationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleIntegrationEvent(&msg)
default:
c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
}
}
// --- 业务处理函数 ---
// handleUpEvent 处理上行数据事件
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
c.logger.Infof("处理 'up' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleStatusEvent 处理设备状态事件
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleAckEvent 处理下行确认事件
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
c.logger.Infof("接收到 'ack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLogEvent 处理日志事件
func (c *ChirpStackListener) handleLogEvent(event *LogEvent) {
c.logger.Infof("接收到 'log' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleJoinEvent 处理入网事件
func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) {
c.logger.Infof("接收到 'join' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleTxAckEvent 处理网关发送确认事件
func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) {
c.logger.Infof("接收到 'txack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLocationEvent 处理位置事件
func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) {
c.logger.Infof("接收到 'location' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleIntegrationEvent 处理集成事件
func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
c.logger.Infof("接收到 'integration' 事件: %+v", event)
// 在这里添加您的业务逻辑
}

View File

@@ -0,0 +1,165 @@
package transport
import (
"encoding/json"
"time"
)
// --- 通用结构体 ---
// DeviceInfo 包含了所有事件中通用的设备信息。
type DeviceInfo struct {
TenantID string `json:"tenantId"`
TenantName string `json:"tenantName"`
ApplicationID string `json:"applicationId"`
ApplicationName string `json:"applicationName"`
DeviceProfileID string `json:"deviceProfileId"`
DeviceProfileName string `json:"deviceProfileName"`
DeviceName string `json:"deviceName"`
DevEui string `json:"devEui"`
Tags map[string]string `json:"tags"`
}
// Location 包含了地理位置信息。
type Location struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Altitude float64 `json:"altitude"`
}
// --- 可复用的子结构体 ---
// UplinkRxInfo 包含了上行接收信息。
type UplinkRxInfo struct {
GatewayID string `json:"gatewayId"`
UplinkID uint32 `json:"uplinkId"`
Time time.Time `json:"time"`
Rssi int `json:"rssi"`
Snr float64 `json:"snr"`
Channel int `json:"channel"`
Location *Location `json:"location"`
Context string `json:"context"`
Metadata map[string]string `json:"metadata"`
}
// LoraModulationInfo 包含了 LoRa 调制的具体参数。
type LoraModulationInfo struct {
Bandwidth int `json:"bandwidth"`
SpreadingFactor int `json:"spreadingFactor"`
CodeRate string `json:"codeRate"`
Polarization bool `json:"polarizationInvert,omitempty"` // omitempty 因为只在下行中出现
}
// Modulation 包含了具体的调制信息。
type Modulation struct {
Lora LoraModulationInfo `json:"lora"`
}
// UplinkTxInfo 包含了上行发送信息。
type UplinkTxInfo struct {
Frequency int `json:"frequency"`
Modulation Modulation `json:"modulation"`
}
// DownlinkTxInfo 包含了下行发送信息。
type DownlinkTxInfo struct {
Frequency int `json:"frequency"`
Power int `json:"power"`
Modulation Modulation `json:"modulation"`
}
// ResolvedLocation 包含了地理位置解析结果。
type ResolvedLocation struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Altitude float64 `json:"altitude"`
Source string `json:"source"` // e.g. "GEO_RESOLVER_TDOA"
Accuracy int `json:"accuracy"`
}
// --- 事件专属结构体 ---
// UpEvent 对应 ChirpStack 的 "up" 事件。
type UpEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
DevAddr string `json:"devAddr"`
ADR bool `json:"adr"`
DR int `json:"dr"`
FCnt uint32 `json:"fCnt"`
FPort uint8 `json:"fPort"`
Confirmed bool `json:"confirmed"`
Data string `json:"data"` // Base64 编码的原始数据
Object json.RawMessage `json:"object"` // Codec 解码后的 JSON 对象
RxInfo []UplinkRxInfo `json:"rxInfo"`
TxInfo UplinkTxInfo `json:"txInfo"`
}
// JoinEvent 对应 ChirpStack 的 "join" 事件。
type JoinEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
DevAddr string `json:"devAddr"`
}
// AckEvent 对应 ChirpStack 的 "ack" 事件。
type AckEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
Acknowledged bool `json:"acknowledged"`
FCntDown uint32 `json:"fCntDown"`
QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令
}
// TxAckEvent 对应 ChirpStack 的 "txack" 事件。
type TxAckEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
FCntDown uint32 `json:"fCntDown"`
GatewayID string `json:"gatewayId"`
QueueItemID string `json:"queueItemId"` // 关键字段,用于关联下行指令
TxInfo DownlinkTxInfo `json:"txInfo"`
}
// StatusEvent 对应 ChirpStack 的 "status" 事件。
type StatusEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
Margin int `json:"margin"` // 信号余量,可以近似看作 SNR
ExternalPower bool `json:"externalPowerSource"`
BatteryLevel float32 `json:"batteryLevel"` // 电池电量百分比
BatteryLevelUnavailable bool `json:"batteryLevelUnavailable"`
}
// LogEvent 对应 ChirpStack 的 "log" 事件。
type LogEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
Level string `json:"level"` // 日志级别, e.g., "INFO", "WARNING", "ERROR"
Code string `json:"code"` // 日志代码, e.g., "UPLINK_F_CNT_RETRANSMISSION"
Description string `json:"description"`
Context map[string]string `json:"context"`
}
// LocationEvent 对应 ChirpStack 的 "location" 事件。
type LocationEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
Location ResolvedLocation `json:"location"`
}
// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。
type IntegrationEvent struct {
DeduplicationID string `json:"deduplicationId"`
Time time.Time `json:"time"`
DeviceInfo DeviceInfo `json:"deviceInfo"`
IntegrationName string `json:"integrationName"`
Object json.RawMessage `json:"object"`
}