2025-11-10 15:25:33 +08:00
|
|
|
|
package task
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
|
|
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
|
|
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
|
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
|
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// AreaThresholdCheckParams 定义了区域阈值检查任务的参数
|
|
|
|
|
|
type AreaThresholdCheckParams struct {
|
2025-11-10 17:35:22 +08:00
|
|
|
|
AreaControllerID uint `json:"area_controller_id"` // 区域主控ID
|
|
|
|
|
|
SensorType models.SensorType `json:"sensor_type"` // 传感器类型
|
|
|
|
|
|
Thresholds float64 `json:"thresholds"` // 阈值
|
|
|
|
|
|
Operator models.Operator `json:"operator"` // 操作符
|
|
|
|
|
|
Level models.SeverityLevel `json:"level"` // 告警级别
|
|
|
|
|
|
ExcludeDeviceIDs []uint `json:"exclude_device_ids"` // 排除的传感器ID
|
2025-11-10 15:25:33 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查
|
|
|
|
|
|
type AreaThresholdCheckTask struct {
|
|
|
|
|
|
ctx context.Context
|
|
|
|
|
|
onceParse sync.Once
|
|
|
|
|
|
|
|
|
|
|
|
taskLog *models.TaskExecutionLog
|
|
|
|
|
|
params AreaThresholdCheckParams
|
|
|
|
|
|
|
|
|
|
|
|
sensorDataRepo repository.SensorDataRepository
|
|
|
|
|
|
deviceRepo repository.DeviceRepository
|
|
|
|
|
|
alarmService alarm.AlarmService
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewAreaThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmService alarm.AlarmService) plan.Task {
|
|
|
|
|
|
return &AreaThresholdCheckTask{
|
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
|
taskLog: taskLog,
|
|
|
|
|
|
sensorDataRepo: sensorDataRepo,
|
|
|
|
|
|
deviceRepo: deviceRepo,
|
|
|
|
|
|
alarmService: alarmService,
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Execute 执行区域阈值检查任务
|
|
|
|
|
|
func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error {
|
|
|
|
|
|
taskCtx, logger := logs.Trace(ctx, a.ctx, "Execute")
|
|
|
|
|
|
err := a.parseParameters(taskCtx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 1. 查询区域主控下所有设备
|
|
|
|
|
|
devices, err := a.deviceRepo.ListByAreaControllerID(taskCtx, a.params.AreaControllerID)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 查询区域主控 %d 下设备失败: %v", a.taskLog.TaskID, a.params.AreaControllerID, err)
|
|
|
|
|
|
return fmt.Errorf("查询区域主控 %d 下设备失败: %w", a.params.AreaControllerID, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 构建忽略设备ID的map,方便快速查找
|
|
|
|
|
|
ignoredMap := make(map[uint]struct{})
|
|
|
|
|
|
for _, id := range a.params.ExcludeDeviceIDs {
|
|
|
|
|
|
ignoredMap[id] = struct{}{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 遍历设备,排除忽略列表里的设备,并执行阈值检查
|
|
|
|
|
|
for _, device := range devices {
|
|
|
|
|
|
if _, ignored := ignoredMap[device.ID]; ignored {
|
|
|
|
|
|
logger.Debugf("任务 %v: 设备 %d 在忽略列表中,跳过检查。", a.taskLog.TaskID, device.ID)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
task := a.taskLog.Task
|
|
|
|
|
|
err = task.SaveParameters(DeviceThresholdCheckParams{
|
|
|
|
|
|
DeviceID: device.ID,
|
|
|
|
|
|
SensorType: a.params.SensorType,
|
|
|
|
|
|
Thresholds: a.params.Thresholds,
|
2025-11-10 17:35:22 +08:00
|
|
|
|
Level: a.params.Level,
|
2025-11-10 15:25:33 +08:00
|
|
|
|
Operator: a.params.Operator,
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 保存参数失败: %v", a.taskLog.TaskID, err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
// 创建一个临时的 DeviceThresholdCheckTask 实例来复用其核心逻辑
|
|
|
|
|
|
deviceCheckTask := NewDeviceThresholdCheckTask(
|
|
|
|
|
|
taskCtx,
|
|
|
|
|
|
&models.TaskExecutionLog{ // 为每个设备创建一个模拟的 TaskExecutionLog
|
|
|
|
|
|
TaskID: a.taskLog.TaskID,
|
|
|
|
|
|
Task: task,
|
|
|
|
|
|
},
|
|
|
|
|
|
a.sensorDataRepo,
|
|
|
|
|
|
a.alarmService,
|
|
|
|
|
|
).(*DeviceThresholdCheckTask) // 类型断言,以便访问内部参数
|
|
|
|
|
|
|
|
|
|
|
|
// 执行单设备的阈值检查
|
|
|
|
|
|
if err := deviceCheckTask.Execute(taskCtx); err != nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 设备 %d 阈值检查失败: %v", a.taskLog.TaskID, device.ID, err)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (a *AreaThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) {
|
|
|
|
|
|
logger := logs.TraceLogger(ctx, a.ctx, "OnFailure")
|
|
|
|
|
|
logger.Errorf("区域阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", a.taskLog.TaskID, executeErr)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (a *AreaThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint, error) {
|
|
|
|
|
|
taskCtx := logs.AddFuncName(ctx, a.ctx, "ResolveDeviceIDs")
|
|
|
|
|
|
if err := a.parseParameters(taskCtx); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 排除列表也意味着关联
|
|
|
|
|
|
return a.params.ExcludeDeviceIDs, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// parseParameters 解析任务参数
|
|
|
|
|
|
func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error {
|
|
|
|
|
|
logger := logs.TraceLogger(ctx, a.ctx, "parseParameters")
|
|
|
|
|
|
var err error
|
|
|
|
|
|
a.onceParse.Do(func() {
|
|
|
|
|
|
if a.taskLog.Task.Parameters == nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 缺少参数", a.taskLog.TaskID)
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 参数不全", a.taskLog.TaskID)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var params AreaThresholdCheckParams
|
|
|
|
|
|
err = a.taskLog.Task.ParseParameters(¶ms)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if params.SensorType == "" {
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 未配置传感器类型", a.taskLog.TaskID)
|
|
|
|
|
|
}
|
|
|
|
|
|
if params.Operator == "" {
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 缺少操作符", a.taskLog.TaskID)
|
|
|
|
|
|
}
|
|
|
|
|
|
if params.Thresholds == 0 {
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 未配置阈值", a.taskLog.TaskID)
|
|
|
|
|
|
}
|
|
|
|
|
|
if params.AreaControllerID == 0 {
|
|
|
|
|
|
err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID)
|
|
|
|
|
|
}
|
2025-11-10 17:35:22 +08:00
|
|
|
|
if params.Level == "" {
|
|
|
|
|
|
params.Level = models.WarnLevel
|
|
|
|
|
|
}
|
2025-11-10 15:25:33 +08:00
|
|
|
|
if params.ExcludeDeviceIDs == nil {
|
|
|
|
|
|
params.ExcludeDeviceIDs = []uint{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
a.params = params
|
|
|
|
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|