Files
pig-farm-controller/internal/domain/task/device_threshold_check_task.go

198 lines
6.9 KiB
Go
Raw Normal View History

package task
import (
"context"
"fmt"
"sync"
2025-11-09 21:37:35 +08:00
"time"
2025-11-09 21:37:35 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
2025-11-09 21:37:35 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
type DeviceThresholdCheckParams struct {
DeviceID uint `json:"device_id"` // 设备ID
SensorType models.SensorType `json:"sensor_type"` // 传感器类型
2025-11-10 21:42:46 +08:00
Thresholds float32 `json:"thresholds"` // 阈值
Operator models.Operator `json:"operator"` // 操作符
Level models.SeverityLevel `json:"level"` // 告警等级
}
2025-11-10 15:25:33 +08:00
// DeviceThresholdCheckTask 是一个任务,用于检查设备传感器数据是否满足阈值条件。
type DeviceThresholdCheckTask struct {
ctx context.Context
onceParse sync.Once
taskLog *models.TaskExecutionLog
params DeviceThresholdCheckParams
2025-11-09 21:37:35 +08:00
sensorDataRepo repository.SensorDataRepository
alarmService alarm.AlarmService
}
2025-11-09 21:37:35 +08:00
func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, alarmService alarm.AlarmService) plan.Task {
return &DeviceThresholdCheckTask{
2025-11-09 21:37:35 +08:00
ctx: ctx,
taskLog: taskLog,
sensorDataRepo: sensorDataRepo,
alarmService: alarmService,
}
}
func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error {
2025-11-09 21:37:35 +08:00
taskCtx, logger := logs.Trace(ctx, d.ctx, "Execute")
err := d.parseParameters(taskCtx)
if err != nil {
return err
}
sensorData, err := d.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(taskCtx, d.params.DeviceID, d.params.SensorType)
if err != nil {
logger.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err)
return fmt.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err)
}
2025-11-10 21:42:46 +08:00
var currentValue float32
2025-11-09 21:37:35 +08:00
var alarmCode models.AlarmCode
switch d.params.SensorType {
case models.SensorTypeTemperature:
var data models.TemperatureData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析温度数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.TemperatureCelsius
alarmCode = models.AlarmCodeTemperature
case models.SensorTypeHumidity:
var data models.HumidityData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析湿度数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.HumidityPercent
alarmCode = models.AlarmCodeHumidity
case models.SensorTypeWeight:
var data models.WeightData
if err := sensorData.ParseData(&data); err != nil {
return fmt.Errorf("任务 %v: 解析重量数据失败: %v", d.taskLog.TaskID, err)
}
currentValue = data.WeightKilograms
alarmCode = models.AlarmCodeWeight
default:
return fmt.Errorf("任务 %v: 不支持的传感器类型: %v", d.taskLog.TaskID, d.params.SensorType)
}
// 阈值检查未通过
isExceeded := !d.checkThreshold(currentValue, d.params.Operator, d.params.Thresholds)
if isExceeded {
// 状态一:检查未通过,确保告警开启
summary := fmt.Sprintf("设备 %d(%s) 不满足阈值条件 (%s %.2f)", d.params.DeviceID, d.params.SensorType, d.params.Operator, d.params.Thresholds)
details := fmt.Sprintf("当前检测值: %.2f", currentValue)
logger.Infof("任务 %v: %s。%s", d.taskLog.TaskID, summary, details)
newAlarm := &models.ActiveAlarm{
SourceType: models.AlarmSourceTypeDevice,
SourceID: d.params.DeviceID,
AlarmCode: alarmCode,
AlarmSummary: summary,
AlarmDetails: details,
Level: d.params.Level,
2025-11-09 21:37:35 +08:00
TriggerTime: time.Now(),
}
if err := d.alarmService.CreateAlarmIfNotExists(taskCtx, newAlarm); err != nil {
logger.Errorf("任务 %v: 创建告警失败: %v", d.taskLog.TaskID, err)
// 根据策略决定是否需要返回错误,这里选择不中断任务执行
}
} else {
// 状态二:检查已通过,确保告警关闭
resolveMethod := "系统自动解决:阈值恢复正常"
logger.Infof("任务 %v: 设备 %d 的 %s 阈值已恢复正常,正在尝试关闭告警。", d.taskLog.TaskID, d.params.DeviceID, d.params.SensorType)
if err := d.alarmService.CloseAlarm(taskCtx, models.AlarmSourceTypeDevice, d.params.DeviceID, alarmCode, resolveMethod, nil); err != nil {
logger.Errorf("任务 %v: 关闭告警失败: %v", d.taskLog.TaskID, err)
// 根据策略决定是否需要返回错误,这里选择不中断任务执行
}
}
return nil
}
// checkThreshold 校验当前值是否满足阈值条件
2025-11-10 21:42:46 +08:00
func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float32, operator models.Operator, threshold float32) bool {
2025-11-09 21:37:35 +08:00
switch operator {
2025-11-10 15:25:33 +08:00
case models.OperatorLessThan:
2025-11-09 21:37:35 +08:00
return currentValue < threshold
2025-11-10 15:25:33 +08:00
case models.OperatorLessThanOrEqualTo:
2025-11-09 21:37:35 +08:00
return currentValue <= threshold
2025-11-10 15:25:33 +08:00
case models.OperatorGreaterThan:
2025-11-09 21:37:35 +08:00
return currentValue > threshold
2025-11-10 15:25:33 +08:00
case models.OperatorGreaterThanOrEqualTo:
2025-11-09 21:37:35 +08:00
return currentValue >= threshold
2025-11-10 15:25:33 +08:00
case models.OperatorEqualTo:
2025-11-09 21:37:35 +08:00
return currentValue == threshold
2025-11-10 15:25:33 +08:00
case models.OperatorNotEqualTo:
2025-11-09 21:37:35 +08:00
return currentValue != threshold
default:
return false
}
}
// parseParameters 解析任务参数
func (d *DeviceThresholdCheckTask) parseParameters(ctx context.Context) error {
logger := logs.TraceLogger(ctx, d.ctx, "parseParameters")
var err error
d.onceParse.Do(func() {
if d.taskLog.Task.Parameters == nil {
logger.Errorf("任务 %v: 缺少参数", d.taskLog.TaskID)
err = fmt.Errorf("任务 %v: 参数不全", d.taskLog.TaskID)
return
}
var params DeviceThresholdCheckParams
err = d.taskLog.Task.ParseParameters(&params)
if err != nil {
logger.Errorf("任务 %v: 解析参数失败: %v", d.taskLog.TaskID, err)
err = fmt.Errorf("任务 %v: 解析参数失败: %v", d.taskLog.TaskID, err)
return
}
2025-11-09 21:37:35 +08:00
if params.SensorType == "" {
err = fmt.Errorf("任务 %v: 未配置传感器类型", d.taskLog.TaskID)
}
if params.Operator == "" {
err = fmt.Errorf("任务 %v: 缺少操作符", d.taskLog.TaskID)
}
if params.Thresholds == 0 {
err = fmt.Errorf("任务 %v: 未配置阈值", d.taskLog.TaskID)
}
if params.DeviceID == 0 {
err = fmt.Errorf("任务 %v: 未配置设备ID", d.taskLog.TaskID)
}
if params.Level == "" {
params.Level = models.WarnLevel
}
2025-11-09 21:37:35 +08:00
d.params = params
})
return err
}
func (d *DeviceThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, d.ctx, "OnFailure")
logger.Errorf("设备阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", d.taskLog.TaskID, executeErr)
}
func (d *DeviceThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint, error) {
taskCtx := logs.AddFuncName(ctx, d.ctx, "ResolveDeviceIDs")
if err := d.parseParameters(taskCtx); err != nil {
return nil, err
}
return []uint{d.params.DeviceID}, nil
}