Files
pig-farm-controller/internal/domain/task/area_threshold_check_task.go
2025-11-10 22:23:31 +08:00

167 lines
5.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"context"
"fmt"
"sync"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// AreaThresholdCheckParams 定义了区域阈值检查任务的参数
type AreaThresholdCheckParams struct {
AreaControllerID uint32 `json:"area_controller_id"` // 区域主控ID
SensorType models.SensorType `json:"sensor_type"` // 传感器类型
Thresholds float32 `json:"thresholds"` // 阈值
Operator models.Operator `json:"operator"` // 操作符
Level models.SeverityLevel `json:"level"` // 告警级别
ExcludeDeviceIDs []uint32 `json:"exclude_device_ids"` // 排除的传感器ID
}
// AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查
type AreaThresholdCheckTask struct {
ctx context.Context
onceParse sync.Once
taskLog *models.TaskExecutionLog
params AreaThresholdCheckParams
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
alarmService alarm.AlarmService
}
func NewAreaThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmService alarm.AlarmService) plan.Task {
return &AreaThresholdCheckTask{
ctx: ctx,
taskLog: taskLog,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
alarmService: alarmService,
}
}
// Execute 执行区域阈值检查任务
func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error {
taskCtx, logger := logs.Trace(ctx, a.ctx, "Execute")
err := a.parseParameters(taskCtx)
if err != nil {
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
return err
}
// 1. 查询区域主控下所有设备
devices, err := a.deviceRepo.ListByAreaControllerID(taskCtx, a.params.AreaControllerID)
if err != nil {
logger.Errorf("任务 %v: 查询区域主控 %d 下设备失败: %v", a.taskLog.TaskID, a.params.AreaControllerID, err)
return fmt.Errorf("查询区域主控 %d 下设备失败: %w", a.params.AreaControllerID, err)
}
// 构建忽略设备ID的map方便快速查找
ignoredMap := make(map[uint32]struct{})
for _, id := range a.params.ExcludeDeviceIDs {
ignoredMap[id] = struct{}{}
}
// 2. 遍历设备,排除忽略列表里的设备,并执行阈值检查
for _, device := range devices {
if _, ignored := ignoredMap[device.ID]; ignored {
logger.Debugf("任务 %v: 设备 %d 在忽略列表中,跳过检查。", a.taskLog.TaskID, device.ID)
continue
}
task := a.taskLog.Task
err = task.SaveParameters(DeviceThresholdCheckParams{
DeviceID: device.ID,
SensorType: a.params.SensorType,
Thresholds: a.params.Thresholds,
Level: a.params.Level,
Operator: a.params.Operator,
})
if err != nil {
logger.Errorf("任务 %v: 保存参数失败: %v", a.taskLog.TaskID, err)
continue
}
// 创建一个临时的 DeviceThresholdCheckTask 实例来复用其核心逻辑
deviceCheckTask := NewDeviceThresholdCheckTask(
taskCtx,
&models.TaskExecutionLog{ // 为每个设备创建一个模拟的 TaskExecutionLog
TaskID: a.taskLog.TaskID,
Task: task,
},
a.sensorDataRepo,
a.alarmService,
).(*DeviceThresholdCheckTask) // 类型断言,以便访问内部参数
// 执行单设备的阈值检查
if err := deviceCheckTask.Execute(taskCtx); err != nil {
logger.Errorf("任务 %v: 设备 %d 阈值检查失败: %v", a.taskLog.TaskID, device.ID, err)
continue
}
}
return nil
}
func (a *AreaThresholdCheckTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, a.ctx, "OnFailure")
logger.Errorf("区域阈值检测任务执行失败, 任务ID: %v: 执行失败: %v", a.taskLog.TaskID, executeErr)
}
func (a *AreaThresholdCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) {
taskCtx := logs.AddFuncName(ctx, a.ctx, "ResolveDeviceIDs")
if err := a.parseParameters(taskCtx); err != nil {
return nil, err
}
// 排除列表也意味着关联
return a.params.ExcludeDeviceIDs, nil
}
// parseParameters 解析任务参数
func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error {
logger := logs.TraceLogger(ctx, a.ctx, "parseParameters")
var err error
a.onceParse.Do(func() {
if a.taskLog.Task.Parameters == nil {
logger.Errorf("任务 %v: 缺少参数", a.taskLog.TaskID)
err = fmt.Errorf("任务 %v: 参数不全", a.taskLog.TaskID)
return
}
var params AreaThresholdCheckParams
err = a.taskLog.Task.ParseParameters(&params)
if err != nil {
logger.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
err = fmt.Errorf("任务 %v: 解析参数失败: %v", a.taskLog.TaskID, err)
return
}
if params.SensorType == "" {
err = fmt.Errorf("任务 %v: 未配置传感器类型", a.taskLog.TaskID)
}
if params.Operator == "" {
err = fmt.Errorf("任务 %v: 缺少操作符", a.taskLog.TaskID)
}
if params.Thresholds == 0 {
err = fmt.Errorf("任务 %v: 未配置阈值", a.taskLog.TaskID)
}
if params.AreaControllerID == 0 {
err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID)
}
if params.Level == "" {
params.Level = models.WarnLevel
}
if params.ExcludeDeviceIDs == nil {
params.ExcludeDeviceIDs = []uint32{}
}
a.params = params
})
return err
}