实现设备阈值检查任务

This commit is contained in:
2025-11-09 21:37:35 +08:00
parent a35a9a1038
commit e54c1bbc97
6 changed files with 340 additions and 20 deletions

View File

@@ -1,18 +1,124 @@
package alarm
import "context"
import (
"context"
"errors"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"gorm.io/gorm"
)
// AlarmService 定义了告警领域服务接口。
type AlarmService interface {
// CreateAlarmIfNotExists 检查是否存在相同的活跃告警,如果不存在,则创建一条新的告警记录。
// "相同"的定义是SourceType, SourceID, 和 AlarmCode 都相同。
CreateAlarmIfNotExists(ctx context.Context, newAlarm *models.ActiveAlarm) error
// CloseAlarm 关闭一个活跃告警,将其归档到历史记录。
// 如果指定的告警当前不活跃,则不执行任何操作并返回 nil。
CloseAlarm(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode, resolveMethod string, resolvedBy *uint) error
}
// alarmService 是 AlarmService 接口的具体实现。
type alarmService struct {
ctx context.Context
ctx context.Context
alarmRepo repository.AlarmRepository
uow repository.UnitOfWork
}
func NewAlarmService(ctx context.Context) AlarmService {
// NewAlarmService 创建一个新的 AlarmService 实例。
func NewAlarmService(ctx context.Context, alarmRepo repository.AlarmRepository, uow repository.UnitOfWork) AlarmService {
return &alarmService{
ctx: ctx,
ctx: ctx,
alarmRepo: alarmRepo,
uow: uow,
}
}
// CreateAlarmIfNotExists 实现了创建告警(如果不存在)的逻辑。
func (s *alarmService) CreateAlarmIfNotExists(ctx context.Context, newAlarm *models.ActiveAlarm) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAlarmIfNotExists")
// 1. 检查告警是否已处于活跃状态
isActive, err := s.alarmRepo.IsAlarmActiveInUse(serviceCtx, newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode)
if err != nil {
logger.Errorf("检查告警活跃状态时发生数据库错误: %v", err)
return err // 直接返回数据库错误
}
if isActive {
// 2. 如果已活跃,则记录日志并忽略
logger.Infof("相同的告警已处于活跃状态,已忽略。来源: %s, ID: %d, 告警代码: %s", newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode)
return nil
}
// 3. 如果不活跃,则创建新告警
logger.Infof("告警尚不活跃,正在创建新告警。来源: %s, ID: %d, 告警代码: %s", newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode)
return s.alarmRepo.CreateActiveAlarm(serviceCtx, newAlarm)
}
// CloseAlarm 实现了关闭告警并将其归档的逻辑。
func (s *alarmService) CloseAlarm(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode, resolveMethod string, resolvedBy *uint) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CloseAlarm")
// 1. 在事务外进行快速只读检查,避免不必要的事务开销
isActive, err := s.alarmRepo.IsAlarmActiveInUse(serviceCtx, sourceType, sourceID, alarmCode)
if err != nil {
logger.Errorf("关闭告警失败:预检查告警活跃状态失败: %v", err)
return err
}
// 如果告警本就不活跃,则无需任何操作
if !isActive {
return nil
}
// 2. 确认告警存在后,再进入事务执行“移动”操作
logger.Infof("检测到活跃告警,正在执行关闭和归档操作。来源: %s, ID: %d, 告警代码: %s", sourceType, sourceID, alarmCode)
return s.uow.ExecuteInTransaction(serviceCtx, func(tx *gorm.DB) error {
// 在事务中再次查找,确保数据一致性并获取完整对象
activeAlarm, err := s.alarmRepo.GetActiveAlarmByUniqueFieldsTx(serviceCtx, tx, sourceType, sourceID, alarmCode)
if err != nil {
// 此时如果没找到,可能在预检查和本事务之间已被其他进程关闭,同样视为正常
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Infof("告警在事务开始前已被关闭,无需操作。")
return nil
}
logger.Errorf("关闭告警失败:在事务中查找活跃告警失败: %v", err)
return err
}
// 创建历史告警记录
historicalAlarm := &models.HistoricalAlarm{
SourceType: activeAlarm.SourceType,
SourceID: activeAlarm.SourceID,
AlarmCode: activeAlarm.AlarmCode,
AlarmSummary: activeAlarm.AlarmSummary,
Level: activeAlarm.Level,
AlarmDetails: activeAlarm.AlarmDetails,
TriggerTime: activeAlarm.TriggerTime,
ResolveTime: time.Now(),
ResolveMethod: resolveMethod,
ResolvedBy: resolvedBy,
}
// 在事务中插入历史告警
if err := s.alarmRepo.CreateHistoricalAlarmTx(serviceCtx, tx, historicalAlarm); err != nil {
logger.Errorf("关闭告警失败:归档告警 %d 到历史表失败: %v", activeAlarm.ID, err)
return err
}
// 在事务中删除活跃告警
if err := s.alarmRepo.DeleteActiveAlarmTx(serviceCtx, tx, activeAlarm.ID); err != nil {
logger.Errorf("关闭告警失败:从活跃表删除告警 %d 失败: %v", activeAlarm.ID, err)
return err
}
logger.Infof("告警 %d 已成功关闭并归档。", activeAlarm.ID)
return nil
})
}

View File

@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
type Operator string
@@ -22,9 +25,10 @@ const (
)
type DeviceThresholdCheckParams struct {
DeviceID uint `json:"device_id"` // 设备ID
Thresholds float64 `json:"thresholds"` // 阈值
Operator Operator `json:"operator"` // 操作符
DeviceID uint `json:"device_id"` // 设备ID
SensorType models.SensorType `json:"sensor_type"` // 传感器类型
Thresholds float64 `json:"thresholds"` // 阈值
Operator Operator `json:"operator"` // 操作符
}
type DeviceThresholdCheckTask struct {
@@ -33,18 +37,118 @@ type DeviceThresholdCheckTask struct {
taskLog *models.TaskExecutionLog
params DeviceThresholdCheckParams
sensorDataRepo repository.SensorDataRepository
alarmService alarm.AlarmService
}
func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog) plan.Task {
func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, alarmService alarm.AlarmService) plan.Task {
return &DeviceThresholdCheckTask{
ctx: ctx,
taskLog: taskLog,
ctx: ctx,
taskLog: taskLog,
sensorDataRepo: sensorDataRepo,
alarmService: alarmService,
}
}
func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error {
//TODO implement me
panic("implement me")
taskCtx, logger := logs.Trace(ctx, d.ctx, "Execute")
err := d.parseParameters(taskCtx)
if err != nil {
return err
}
sensorData, err := d.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(taskCtx, d.params.DeviceID, d.params.SensorType)
if err != nil {
logger.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err)
return fmt.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err)
}
var currentValue float64
var alarmCode models.AlarmCode
switch d.params.SensorType {
case models.SensorTypeTemperature:
var data models.TemperatureData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析温度数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.TemperatureCelsius
alarmCode = models.AlarmCodeTemperature
case models.SensorTypeHumidity:
var data models.HumidityData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析湿度数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.HumidityPercent
alarmCode = models.AlarmCodeHumidity
case models.SensorTypeWeight:
var data models.WeightData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析重量数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.WeightKilograms
alarmCode = models.AlarmCodeWeight
default:
return fmt.Errorf("任务 %v: 不支持的传感器类型: %v", d.taskLog.TaskID, d.params.SensorType)
}
// 阈值检查未通过
isExceeded := !d.checkThreshold(currentValue, d.params.Operator, d.params.Thresholds)
if isExceeded {
// 状态一:检查未通过,确保告警开启
summary := fmt.Sprintf("设备 %d(%s) 不满足阈值条件 (%s %.2f)", d.params.DeviceID, d.params.SensorType, d.params.Operator, d.params.Thresholds)
details := fmt.Sprintf("当前检测值: %.2f", currentValue)
logger.Infof("任务 %v: %s。%s", d.taskLog.TaskID, summary, details)
newAlarm := &models.ActiveAlarm{
SourceType: models.AlarmSourceTypeDevice,
SourceID: d.params.DeviceID,
AlarmCode: alarmCode,
AlarmSummary: summary,
AlarmDetails: details,
Level: models.WarnLevel, // 默认告警等级,可后续根据需求调整
TriggerTime: time.Now(),
}
if err := d.alarmService.CreateAlarmIfNotExists(taskCtx, newAlarm); err != nil {
logger.Errorf("任务 %v: 创建告警失败: %v", d.taskLog.TaskID, err)
// 根据策略决定是否需要返回错误,这里选择不中断任务执行
}
} else {
// 状态二:检查已通过,确保告警关闭
resolveMethod := "系统自动解决:阈值恢复正常"
logger.Infof("任务 %v: 设备 %d 的 %s 阈值已恢复正常,正在尝试关闭告警。", d.taskLog.TaskID, d.params.DeviceID, d.params.SensorType)
if err := d.alarmService.CloseAlarm(taskCtx, models.AlarmSourceTypeDevice, d.params.DeviceID, alarmCode, resolveMethod, nil); err != nil {
logger.Errorf("任务 %v: 关闭告警失败: %v", d.taskLog.TaskID, err)
// 根据策略决定是否需要返回错误,这里选择不中断任务执行
}
}
return nil
}
// checkThreshold 校验当前值是否满足阈值条件
func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator Operator, threshold float64) bool {
switch operator {
case OperatorLessThan:
return currentValue < threshold
case OperatorLessThanOrEqualTo:
return currentValue <= threshold
case OperatorGreaterThan:
return currentValue > threshold
case OperatorGreaterThanOrEqualTo:
return currentValue >= threshold
case OperatorEqualTo:
return currentValue == threshold
case OperatorNotEqualTo:
return currentValue != threshold
default:
return false
}
}
// parseParameters 解析任务参数
@@ -66,6 +170,19 @@ func (d *DeviceThresholdCheckTask) parseParameters(ctx context.Context) error {
return
}
if params.SensorType == "" {
err = fmt.Errorf("任务 %v: 未配置传感器类型", d.taskLog.TaskID)
}
if params.Operator == "" {
err = fmt.Errorf("任务 %v: 缺少操作符", d.taskLog.TaskID)
}
if params.Thresholds == 0 {
err = fmt.Errorf("任务 %v: 未配置阈值", d.taskLog.TaskID)
}
if params.DeviceID == 0 {
err = fmt.Errorf("任务 %v: 未配置设备ID", d.taskLog.TaskID)
}
d.params = params
})