Files
pig-farm-controller/internal/app/service/transport/chirp_stack.go
2025-09-24 21:53:18 +08:00

261 lines
9.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package transport
import (
"encoding/json"
"io"
"net/http"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"gorm.io/datatypes"
)
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
const (
eventUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
eventStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
eventJoin = "join" // 入网事件:当设备成功加入网络时触发。
eventAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
eventTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
eventLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
eventLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
eventIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
)
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
type ChirpStackListener struct {
logger *logs.Logger
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
}
func NewChirpStackListener(logger *logs.Logger, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository) *ChirpStackListener {
return &ChirpStackListener{
logger: logger,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
}
}
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
func (c *ChirpStackListener) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
c.logger.Errorf("读取请求体失败: %v", err)
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
event := r.URL.Query().Get("event")
w.WriteHeader(http.StatusOK)
// 将异步处理逻辑委托给 handler 方法
go c.handler(b, event)
}
}
// handler 用于处理 ChirpStack 发送的事件
func (c *ChirpStackListener) handler(data []byte, eventType string) {
switch eventType {
case eventUp:
var msg UpEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleUpEvent(&msg)
case eventJoin:
var msg JoinEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleJoinEvent(&msg)
case eventAck:
var msg AckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleAckEvent(&msg)
case eventTxAck:
var msg TxAckEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleTxAckEvent(&msg)
case eventStatus:
var msg StatusEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleStatusEvent(&msg)
case eventLog:
var msg LogEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLogEvent(&msg)
case eventLocation:
var msg LocationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLocationEvent(&msg)
case eventIntegration:
var msg IntegrationEvent
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleIntegrationEvent(&msg)
default:
c.logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
}
}
// --- 业务处理函数 ---
// handleUpEvent 处理上行数据事件
func (c *ChirpStackListener) handleUpEvent(event *UpEvent) {
c.logger.Infof("处理 'up' 事件: %+v", event)
// 记录信号强度
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
if len(event.RxInfo) > 0 {
rx := event.RxInfo[0] // 取第一个接收到的网关信息
// 构建 SignalMetrics 结构体
signalMetrics := models.SignalMetrics{
RSSI: rx.Rssi,
SNR: rx.Snr,
}
c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
} else {
c.logger.Warnf("处理 'up' 事件时未找到 RxInfo无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
}
// 在这里添加您的业务逻辑
}
// handleStatusEvent 处理设备状态事件
func (c *ChirpStackListener) handleStatusEvent(event *StatusEvent) {
c.logger.Infof("处接收到理 'status' 事件: %+v", event)
// 记录信号强度
signalMetrics := models.SignalMetrics{
Margin: event.Margin,
}
c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeSignalMetrics, signalMetrics)
// 记录 电量
batteryLevel := models.BatteryLevel{
BatteryLevel: event.BatteryLevel,
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
ExternalPower: event.ExternalPower,
}
c.recordSensorData(event.DeviceInfo.DevEui, event.Time, models.SensorDataTypeBatteryLevel, batteryLevel)
}
// handleAckEvent 处理下行确认事件
func (c *ChirpStackListener) handleAckEvent(event *AckEvent) {
c.logger.Infof("接收到 'ack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLogEvent 处理日志事件
func (c *ChirpStackListener) handleLogEvent(event *LogEvent) {
// 首先,打印完整的事件结构体,用于详细排查
c.logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
switch event.Level {
case "INFO":
c.logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "WARNING":
c.logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "ERROR":
c.logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
default:
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
c.logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
}
}
// handleJoinEvent 处理入网事件
func (c *ChirpStackListener) handleJoinEvent(event *JoinEvent) {
c.logger.Infof("接收到 'join' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleTxAckEvent 处理网关发送确认事件
func (c *ChirpStackListener) handleTxAckEvent(event *TxAckEvent) {
c.logger.Infof("接收到 'txack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLocationEvent 处理位置事件
func (c *ChirpStackListener) handleLocationEvent(event *LocationEvent) {
c.logger.Infof("接收到 'location' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleIntegrationEvent 处理集成事件
func (c *ChirpStackListener) handleIntegrationEvent(event *IntegrationEvent) {
c.logger.Infof("接收到 'integration' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
func (c *ChirpStackListener) recordSensorData(devEui string, eventTime time.Time, dataType models.SensorDataType, data interface{}) {
// 1. 查找设备
device, err := c.deviceRepo.FindByDevEui(devEui)
if err != nil {
c.logger.Warnf("记录传感器数据失败:无法通过 DevEui '%s' 找到设备: %v", devEui, err)
return
}
// 2. 序列化数据结构体为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
c.logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
return
}
// 3. 构建 SensorData 模型
sensorData := &models.SensorData{
Time: eventTime,
DeviceID: device.ID,
RegionalControllerID: *device.ParentID,
SensorDataType: dataType, // 设置传感器数据类型
Data: datatypes.JSON(jsonData),
}
// 4. 调用仓库创建记录
if err := c.sensorDataRepo.Create(sensorData); err != nil {
c.logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
}
}