实现区域阈值告警任务

This commit is contained in:
2025-11-10 15:25:33 +08:00
parent 19d55eb09b
commit d4e8aba1fd
5 changed files with 190 additions and 20 deletions

View File

@@ -138,4 +138,5 @@
6. 实现设备阈值检查任务 6. 实现设备阈值检查任务
7. 实现忽略告警和取消忽略告警接口及功能 7. 实现忽略告警和取消忽略告警接口及功能
8. 实现列表查询活跃告警和历史告警 8. 实现列表查询活跃告警和历史告警
9. 系统初始化时健康计划调整(包括增加延时任务) 9. 系统初始化时健康计划调整(包括增加延时任务)
10. 实现区域阈值告警任务

View File

@@ -0,0 +1,161 @@
package task
import (
"context"
"fmt"
"sync"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// AreaThresholdCheckParams 定义了区域阈值检查任务的参数
type AreaThresholdCheckParams struct {
AreaControllerID uint `json:"area_controller_id"` // 区域主控ID
SensorType models.SensorType `json:"sensor_type"` // 传感器类型
Thresholds float64 `json:"thresholds"` // 阈值
Operator models.Operator `json:"operator"` // 操作符
ExcludeDeviceIDs []uint `json:"exclude_device_ids"` // 排除的传感器ID
}
// AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查
type AreaThresholdCheckTask struct {
ctx context.Context
onceParse sync.Once
taskLog *models.TaskExecutionLog
params AreaThresholdCheckParams
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
alarmService alarm.AlarmService
}
func NewAreaThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmService alarm.AlarmService) plan.Task {
return &AreaThresholdCheckTask{
ctx: ctx,
taskLog: taskLog,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
alarmService: alarmService,
}
}
// Execute 执行区域阈值检查任务
func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error {
taskCtx, logger := logs.Trace(ctx, a.ctx, "Execute")
err := a.parseParameters(taskCtx)
if err != nil {
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
return err
}
// 1. 查询区域主控下所有设备
devices, err := a.deviceRepo.ListByAreaControllerID(taskCtx, a.params.AreaControllerID)
if err != nil {
logger.Errorf("任务 %v: 查询区域主控 %d 下设备失败: %v", a.taskLog.TaskID, a.params.AreaControllerID, err)
return fmt.Errorf("查询区域主控 %d 下设备失败: %w", a.params.AreaControllerID, err)
}
// 构建忽略设备ID的map方便快速查找
ignoredMap := make(map[uint]struct{})
for _, id := range a.params.ExcludeDeviceIDs {
ignoredMap[id] = struct{}{}
}
// 2. 遍历设备,排除忽略列表里的设备,并执行阈值检查
for _, device := range devices {
if _, ignored := ignoredMap[device.ID]; ignored {
logger.Debugf("任务 %v: 设备 %d 在忽略列表中,跳过检查。", a.taskLog.TaskID, device.ID)
continue
}
task := a.taskLog.Task
err = task.SaveParameters(DeviceThresholdCheckParams{
DeviceID: device.ID,
SensorType: a.params.SensorType,
Thresholds: a.params.Thresholds,
Operator: a.params.Operator,
})
if err != nil {
logger.Errorf("任务 %v: 保存参数失败: %v", a.taskLog.TaskID, err)
continue
}
// 创建一个临时的 DeviceThresholdCheckTask 实例来复用其核心逻辑
deviceCheckTask := NewDeviceThresholdCheckTask(
taskCtx,
&models.TaskExecutionLog{ // 为每个设备创建一个模拟的 TaskExecutionLog
TaskID: a.taskLog.TaskID,
Task: task,
},
a.sensorDataRepo,
a.alarmService,
).(*DeviceThresholdCheckTask) // 类型断言,以便访问内部参数
// 执行单设备的阈值检查
if err := deviceCheckTask.Execute(taskCtx); err != nil {
logger.Errorf("任务 %v: 设备 %d 阈值检查失败: %v", a.taskLog.TaskID, device.ID, err)
continue
}
}
return nil
}
func (a *AreaThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, a.ctx, "OnFailure")
logger.Errorf("区域阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", a.taskLog.TaskID, executeErr)
}
func (a *AreaThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint, error) {
taskCtx := logs.AddFuncName(ctx, a.ctx, "ResolveDeviceIDs")
if err := a.parseParameters(taskCtx); err != nil {
return nil, err
}
// 排除列表也意味着关联
return a.params.ExcludeDeviceIDs, nil
}
// parseParameters 解析任务参数
func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error {
logger := logs.TraceLogger(ctx, a.ctx, "parseParameters")
var err error
a.onceParse.Do(func() {
if a.taskLog.Task.Parameters == nil {
logger.Errorf("任务 %v: 缺少参数", a.taskLog.TaskID)
err = fmt.Errorf("任务 %v: 参数不全", a.taskLog.TaskID)
return
}
var params AreaThresholdCheckParams
err = a.taskLog.Task.ParseParameters(&params)
if err != nil {
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
err = fmt.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
return
}
if params.SensorType == "" {
err = fmt.Errorf("任务 %v: 未配置传感器类型", a.taskLog.TaskID)
}
if params.Operator == "" {
err = fmt.Errorf("任务 %v: 缺少操作符", a.taskLog.TaskID)
}
if params.Thresholds == 0 {
err = fmt.Errorf("任务 %v: 未配置阈值", a.taskLog.TaskID)
}
if params.AreaControllerID == 0 {
err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID)
}
if params.ExcludeDeviceIDs == nil {
params.ExcludeDeviceIDs = []uint{}
}
a.params = params
})
return err
}

View File

@@ -13,24 +13,14 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
) )
type Operator string
const (
OperatorLessThan Operator = "<"
OperatorLessThanOrEqualTo Operator = "<="
OperatorGreaterThan Operator = ">"
OperatorGreaterThanOrEqualTo Operator = ">="
OperatorEqualTo Operator = "="
OperatorNotEqualTo Operator = "!="
)
type DeviceThresholdCheckParams struct { type DeviceThresholdCheckParams struct {
DeviceID uint `json:"device_id"` // 设备ID DeviceID uint `json:"device_id"` // 设备ID
SensorType models.SensorType `json:"sensor_type"` // 传感器类型 SensorType models.SensorType `json:"sensor_type"` // 传感器类型
Thresholds float64 `json:"thresholds"` // 阈值 Thresholds float64 `json:"thresholds"` // 阈值
Operator Operator `json:"operator"` // 操作符 Operator models.Operator `json:"operator"` // 操作符
} }
// DeviceThresholdCheckTask 是一个任务,用于检查设备传感器数据是否满足阈值条件。
type DeviceThresholdCheckTask struct { type DeviceThresholdCheckTask struct {
ctx context.Context ctx context.Context
onceParse sync.Once onceParse sync.Once
@@ -132,19 +122,19 @@ func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error {
} }
// checkThreshold 校验当前值是否满足阈值条件 // checkThreshold 校验当前值是否满足阈值条件
func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator Operator, threshold float64) bool { func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator models.Operator, threshold float64) bool {
switch operator { switch operator {
case OperatorLessThan: case models.OperatorLessThan:
return currentValue < threshold return currentValue < threshold
case OperatorLessThanOrEqualTo: case models.OperatorLessThanOrEqualTo:
return currentValue <= threshold return currentValue <= threshold
case OperatorGreaterThan: case models.OperatorGreaterThan:
return currentValue > threshold return currentValue > threshold
case OperatorGreaterThanOrEqualTo: case models.OperatorGreaterThanOrEqualTo:
return currentValue >= threshold return currentValue >= threshold
case OperatorEqualTo: case models.OperatorEqualTo:
return currentValue == threshold return currentValue == threshold
case OperatorNotEqualTo: case models.OperatorNotEqualTo:
return currentValue != threshold return currentValue != threshold
default: default:
return false return false

View File

@@ -34,6 +34,17 @@ const (
// (可在此处预留或添加) // (可在此处预留或添加)
) )
type Operator string
const (
OperatorLessThan Operator = "<"
OperatorLessThanOrEqualTo Operator = "<="
OperatorGreaterThan Operator = ">"
OperatorGreaterThanOrEqualTo Operator = ">="
OperatorEqualTo Operator = "="
OperatorNotEqualTo Operator = "!="
)
// ActiveAlarm 活跃告警 // ActiveAlarm 活跃告警
// 活跃告警会被更新(如忽略状态),因此保留 gorm.Model 以包含所有标准字段。 // 活跃告警会被更新(如忽略状态),因此保留 gorm.Model 以包含所有标准字段。
type ActiveAlarm struct { type ActiveAlarm struct {

View File

@@ -11,6 +11,13 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
) )
const (
// PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称
PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查"
// PlanNameAlarmNotification 是告警通知发送计划的名称
PlanNameAlarmNotification = "告警通知发送"
)
// PlanExecutionType 定义了计划的执行类型 // PlanExecutionType 定义了计划的执行类型
type PlanExecutionType string type PlanExecutionType string