From d4e8aba1fd062d1b7ed100bc56f596d4a956d1ba Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Mon, 10 Nov 2025 15:25:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=8C=BA=E5=9F=9F=E9=98=88?= =?UTF-8?q?=E5=80=BC=E5=91=8A=E8=AD=A6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- design/exceeding-threshold-alarm/index.md | 3 +- .../domain/task/area_threshold_check_task.go | 161 ++++++++++++++++++ .../task/device_threshold_check_task.go | 28 +-- internal/infra/models/alarm.go | 11 ++ internal/infra/models/plan.go | 7 + 5 files changed, 190 insertions(+), 20 deletions(-) create mode 100644 internal/domain/task/area_threshold_check_task.go diff --git a/design/exceeding-threshold-alarm/index.md b/design/exceeding-threshold-alarm/index.md index 621db95..4959eac 100644 --- a/design/exceeding-threshold-alarm/index.md +++ b/design/exceeding-threshold-alarm/index.md @@ -138,4 +138,5 @@ 6. 实现设备阈值检查任务 7. 实现忽略告警和取消忽略告警接口及功能 8. 实现列表查询活跃告警和历史告警 -9. 系统初始化时健康计划调整(包括增加延时任务) \ No newline at end of file +9. 系统初始化时健康计划调整(包括增加延时任务) +10. 实现区域阈值告警任务 \ No newline at end of file diff --git a/internal/domain/task/area_threshold_check_task.go b/internal/domain/task/area_threshold_check_task.go new file mode 100644 index 0000000..0b94b85 --- /dev/null +++ b/internal/domain/task/area_threshold_check_task.go @@ -0,0 +1,161 @@ +package task + +import ( + "context" + "fmt" + "sync" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// AreaThresholdCheckParams 定义了区域阈值检查任务的参数 +type AreaThresholdCheckParams struct { + AreaControllerID uint `json:"area_controller_id"` // 区域主控ID + SensorType models.SensorType `json:"sensor_type"` // 传感器类型 + Thresholds float64 `json:"thresholds"` // 阈值 + Operator models.Operator `json:"operator"` // 操作符 + ExcludeDeviceIDs []uint `json:"exclude_device_ids"` // 排除的传感器ID +} + +// AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查 +type AreaThresholdCheckTask struct { + ctx context.Context + onceParse sync.Once + + taskLog *models.TaskExecutionLog + params AreaThresholdCheckParams + + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + alarmService alarm.AlarmService +} + +func NewAreaThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmService alarm.AlarmService) plan.Task { + return &AreaThresholdCheckTask{ + ctx: ctx, + taskLog: taskLog, + sensorDataRepo: sensorDataRepo, + deviceRepo: deviceRepo, + alarmService: alarmService, + } +} + +// Execute 执行区域阈值检查任务 +func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error { + taskCtx, logger := logs.Trace(ctx, a.ctx, "Execute") + err := a.parseParameters(taskCtx) + if err != nil { + logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) + return err + } + + // 1. 查询区域主控下所有设备 + devices, err := a.deviceRepo.ListByAreaControllerID(taskCtx, a.params.AreaControllerID) + if err != nil { + logger.Errorf("任务 %v: 查询区域主控 %d 下设备失败: %v", a.taskLog.TaskID, a.params.AreaControllerID, err) + return fmt.Errorf("查询区域主控 %d 下设备失败: %w", a.params.AreaControllerID, err) + } + + // 构建忽略设备ID的map,方便快速查找 + ignoredMap := make(map[uint]struct{}) + for _, id := range a.params.ExcludeDeviceIDs { + ignoredMap[id] = struct{}{} + } + + // 2. 遍历设备,排除忽略列表里的设备,并执行阈值检查 + for _, device := range devices { + if _, ignored := ignoredMap[device.ID]; ignored { + logger.Debugf("任务 %v: 设备 %d 在忽略列表中,跳过检查。", a.taskLog.TaskID, device.ID) + continue + } + + task := a.taskLog.Task + err = task.SaveParameters(DeviceThresholdCheckParams{ + DeviceID: device.ID, + SensorType: a.params.SensorType, + Thresholds: a.params.Thresholds, + Operator: a.params.Operator, + }) + if err != nil { + logger.Errorf("任务 %v: 保存参数失败: %v", a.taskLog.TaskID, err) + continue + } + // 创建一个临时的 DeviceThresholdCheckTask 实例来复用其核心逻辑 + deviceCheckTask := NewDeviceThresholdCheckTask( + taskCtx, + &models.TaskExecutionLog{ // 为每个设备创建一个模拟的 TaskExecutionLog + TaskID: a.taskLog.TaskID, + Task: task, + }, + a.sensorDataRepo, + a.alarmService, + ).(*DeviceThresholdCheckTask) // 类型断言,以便访问内部参数 + + // 执行单设备的阈值检查 + if err := deviceCheckTask.Execute(taskCtx); err != nil { + logger.Errorf("任务 %v: 设备 %d 阈值检查失败: %v", a.taskLog.TaskID, device.ID, err) + continue + } + } + + return nil +} + +func (a *AreaThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) { + logger := logs.TraceLogger(ctx, a.ctx, "OnFailure") + logger.Errorf("区域阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", a.taskLog.TaskID, executeErr) +} + +func (a *AreaThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint, error) { + taskCtx := logs.AddFuncName(ctx, a.ctx, "ResolveDeviceIDs") + if err := a.parseParameters(taskCtx); err != nil { + return nil, err + } + // 排除列表也意味着关联 + return a.params.ExcludeDeviceIDs, nil +} + +// parseParameters 解析任务参数 +func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error { + logger := logs.TraceLogger(ctx, a.ctx, "parseParameters") + var err error + a.onceParse.Do(func() { + if a.taskLog.Task.Parameters == nil { + logger.Errorf("任务 %v: 缺少参数", a.taskLog.TaskID) + err = fmt.Errorf("任务 %v: 参数不全", a.taskLog.TaskID) + return + } + + var params AreaThresholdCheckParams + err = a.taskLog.Task.ParseParameters(¶ms) + if err != nil { + logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) + err = fmt.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err) + return + } + + if params.SensorType == "" { + err = fmt.Errorf("任务 %v: 未配置传感器类型", a.taskLog.TaskID) + } + if params.Operator == "" { + err = fmt.Errorf("任务 %v: 缺少操作符", a.taskLog.TaskID) + } + if params.Thresholds == 0 { + err = fmt.Errorf("任务 %v: 未配置阈值", a.taskLog.TaskID) + } + if params.AreaControllerID == 0 { + err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID) + } + if params.ExcludeDeviceIDs == nil { + params.ExcludeDeviceIDs = []uint{} + } + + a.params = params + + }) + return err +} diff --git a/internal/domain/task/device_threshold_check_task.go b/internal/domain/task/device_threshold_check_task.go index 365554b..d8c8a9b 100644 --- a/internal/domain/task/device_threshold_check_task.go +++ b/internal/domain/task/device_threshold_check_task.go @@ -13,24 +13,14 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) -type Operator string - -const ( - OperatorLessThan Operator = "<" - OperatorLessThanOrEqualTo Operator = "<=" - OperatorGreaterThan Operator = ">" - OperatorGreaterThanOrEqualTo Operator = ">=" - OperatorEqualTo Operator = "=" - OperatorNotEqualTo Operator = "!=" -) - type DeviceThresholdCheckParams struct { DeviceID uint `json:"device_id"` // 设备ID SensorType models.SensorType `json:"sensor_type"` // 传感器类型 Thresholds float64 `json:"thresholds"` // 阈值 - Operator Operator `json:"operator"` // 操作符 + Operator models.Operator `json:"operator"` // 操作符 } +// DeviceThresholdCheckTask 是一个任务,用于检查设备传感器数据是否满足阈值条件。 type DeviceThresholdCheckTask struct { ctx context.Context onceParse sync.Once @@ -132,19 +122,19 @@ func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error { } // checkThreshold 校验当前值是否满足阈值条件 -func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator Operator, threshold float64) bool { +func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator models.Operator, threshold float64) bool { switch operator { - case OperatorLessThan: + case models.OperatorLessThan: return currentValue < threshold - case OperatorLessThanOrEqualTo: + case models.OperatorLessThanOrEqualTo: return currentValue <= threshold - case OperatorGreaterThan: + case models.OperatorGreaterThan: return currentValue > threshold - case OperatorGreaterThanOrEqualTo: + case models.OperatorGreaterThanOrEqualTo: return currentValue >= threshold - case OperatorEqualTo: + case models.OperatorEqualTo: return currentValue == threshold - case OperatorNotEqualTo: + case models.OperatorNotEqualTo: return currentValue != threshold default: return false diff --git a/internal/infra/models/alarm.go b/internal/infra/models/alarm.go index 6ce054f..8306faa 100644 --- a/internal/infra/models/alarm.go +++ b/internal/infra/models/alarm.go @@ -34,6 +34,17 @@ const ( // (可在此处预留或添加) ) +type Operator string + +const ( + OperatorLessThan Operator = "<" + OperatorLessThanOrEqualTo Operator = "<=" + OperatorGreaterThan Operator = ">" + OperatorGreaterThanOrEqualTo Operator = ">=" + OperatorEqualTo Operator = "=" + OperatorNotEqualTo Operator = "!=" +) + // ActiveAlarm 活跃告警 // 活跃告警会被更新(如忽略状态),因此保留 gorm.Model 以包含所有标准字段。 type ActiveAlarm struct { diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 0e74684..7efd507 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -11,6 +11,13 @@ import ( "gorm.io/gorm" ) +const ( + // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 + PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查" + // PlanNameAlarmNotification 是告警通知发送计划的名称 + PlanNameAlarmNotification = "告警通知发送" +) + // PlanExecutionType 定义了计划的执行类型 type PlanExecutionType string