CreateDeviceThresholdAlarm
UpdateDeviceThresholdAlarm CreateAreaThresholdAlarm UpdateAreaThresholdAlarm
This commit is contained in:
@@ -2,11 +2,15 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
|
||||
domainAlarm "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
)
|
||||
|
||||
@@ -21,21 +25,42 @@ type ThresholdAlarmService interface {
|
||||
ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error)
|
||||
// ListHistoricalAlarms 批量查询历史告警。
|
||||
ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error)
|
||||
// CreateDeviceThresholdAlarm 创建一个设备阈值告警。
|
||||
CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error
|
||||
// UpdateDeviceThresholdAlarm 更新一个设备阈值告警。
|
||||
UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error
|
||||
// CreateAreaThresholdAlarm 创建一个区域阈值告警。
|
||||
CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error
|
||||
// UpdateAreaThresholdAlarm 更新一个区域阈值告警。
|
||||
UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error
|
||||
}
|
||||
|
||||
// thresholdAlarmService 是 ThresholdAlarmService 接口的具体实现。
|
||||
type thresholdAlarmService struct {
|
||||
ctx context.Context
|
||||
alarmService domainAlarm.AlarmService
|
||||
alarmRepo repository.AlarmRepository
|
||||
planService plan.Service
|
||||
|
||||
alarmRepo repository.AlarmRepository
|
||||
planRepo repository.PlanRepository
|
||||
deviceRepo repository.DeviceRepository
|
||||
}
|
||||
|
||||
// NewThresholdAlarmService 创建一个新的 ThresholdAlarmService 实例。
|
||||
func NewThresholdAlarmService(ctx context.Context, alarmService domainAlarm.AlarmService, alarmRepo repository.AlarmRepository) ThresholdAlarmService {
|
||||
func NewThresholdAlarmService(ctx context.Context,
|
||||
alarmService domainAlarm.AlarmService,
|
||||
planService plan.Service,
|
||||
alarmRepo repository.AlarmRepository,
|
||||
planRepo repository.PlanRepository,
|
||||
deviceRepo repository.DeviceRepository,
|
||||
) ThresholdAlarmService {
|
||||
return &thresholdAlarmService{
|
||||
ctx: ctx,
|
||||
alarmService: alarmService,
|
||||
planService: planService,
|
||||
alarmRepo: alarmRepo,
|
||||
planRepo: planRepo,
|
||||
deviceRepo: deviceRepo,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,3 +120,257 @@ func (s *thresholdAlarmService) ListHistoricalAlarms(ctx context.Context, req *d
|
||||
|
||||
return dto.NewListHistoricalAlarmResponse(alarms, total, req.Page, req.PageSize), nil
|
||||
}
|
||||
|
||||
// CreateDeviceThresholdAlarm 实现了创建一个设备阈值告警的逻辑。
|
||||
func (s *thresholdAlarmService) CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error {
|
||||
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateThresholdAlarm")
|
||||
|
||||
device, err := s.deviceRepo.FindByID(serviceCtx, req.DeviceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取设备 %v 失败: %v", req.DeviceID, err)
|
||||
}
|
||||
|
||||
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取系统计划 %v 失败: %v", models.PlanNamePeriodicSystemHealthCheck, err)
|
||||
}
|
||||
if plan == nil {
|
||||
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
|
||||
}
|
||||
|
||||
// 系统计划肯定是子任务
|
||||
for i, t := range plan.Tasks {
|
||||
switch t.Type {
|
||||
case models.TaskTypeDeviceThresholdCheck: // 检查任务是否存在
|
||||
var params task.DeviceThresholdCheckParams
|
||||
err = t.ParseParameters(¶ms)
|
||||
if err != nil {
|
||||
return fmt.Errorf("任务 %v: 解析设备阈值检查任务参数失败: %v", t.ID, err)
|
||||
}
|
||||
if params.DeviceID == req.DeviceID && params.SensorType == req.SensorType {
|
||||
return fmt.Errorf("设备 %v: 该设备已存在阈值检查任务", req.DeviceID)
|
||||
}
|
||||
case models.TaskTypeAreaCollectorThresholdCheck: // 向区域阈值检查任务过滤列表中添加该设备
|
||||
params := task.AreaThresholdCheckParams{
|
||||
ExcludeDeviceIDs: []uint{},
|
||||
}
|
||||
err = t.ParseParameters(¶ms)
|
||||
if err != nil {
|
||||
return fmt.Errorf("任务 %v: 解析区域阈值检查任务参数失败: %v", t.ID, err)
|
||||
}
|
||||
if params.AreaControllerID == device.AreaControllerID {
|
||||
has := false
|
||||
for _, d := range params.ExcludeDeviceIDs {
|
||||
if d == req.DeviceID {
|
||||
has = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !has {
|
||||
params.ExcludeDeviceIDs = append(params.ExcludeDeviceIDs, req.DeviceID)
|
||||
err = plan.Tasks[i].SaveParameters(params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("任务 %v: 保存任务参数失败: %v", t.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
t := models.Task{
|
||||
PlanID: plan.ID,
|
||||
Name: fmt.Sprintf("设备 %v 的阈值检测任务", req.DeviceID),
|
||||
Description: fmt.Sprintf("检测该设备 %v 是否 %v %v", req.SensorType, req.Operator, req.Thresholds),
|
||||
ExecutionOrder: len(plan.Tasks) + 1,
|
||||
Type: models.TaskTypeDeviceThresholdCheck,
|
||||
}
|
||||
err = t.SaveParameters(task.DeviceThresholdCheckParams{
|
||||
DeviceID: req.DeviceID,
|
||||
SensorType: req.SensorType,
|
||||
Thresholds: req.Thresholds,
|
||||
Level: req.Level,
|
||||
Operator: req.Operator,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("保存任务参数失败: %v", err)
|
||||
}
|
||||
|
||||
plan.Tasks = append(plan.Tasks, t)
|
||||
plan.ReorderSteps()
|
||||
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("更新计划失败: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateDeviceThresholdAlarm 实现了更新一个设备阈值告警的逻辑。
|
||||
func (s *thresholdAlarmService) UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error {
|
||||
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateDeviceThresholdAlarm")
|
||||
|
||||
// 1. 获取系统健康检查计划
|
||||
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
|
||||
}
|
||||
if plan == nil {
|
||||
// 这个系统计划必须存在
|
||||
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
|
||||
}
|
||||
|
||||
// 2. 遍历任务列表,查找并更新目标任务
|
||||
taskFound := false
|
||||
for i, t := range plan.Tasks {
|
||||
if t.ID == taskID && t.Type == models.TaskTypeDeviceThresholdCheck {
|
||||
taskFound = true
|
||||
|
||||
var params task.DeviceThresholdCheckParams
|
||||
if err = t.ParseParameters(¶ms); err != nil {
|
||||
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
|
||||
}
|
||||
params.Thresholds = req.Thresholds
|
||||
params.Operator = req.Operator
|
||||
params.Level = req.Level
|
||||
// 刷新任务说明
|
||||
plan.Tasks[i].Description = fmt.Sprintf("检测该设备 %v 是否 %v %v", params.SensorType, params.Operator, params.Thresholds)
|
||||
|
||||
err = plan.Tasks[i].SaveParameters(params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !taskFound {
|
||||
return fmt.Errorf("任务 %d: 不存在", taskID)
|
||||
}
|
||||
|
||||
// 全量更新计划
|
||||
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateAreaThresholdAlarm 实现了创建一个区域阈值告警的逻辑。
|
||||
func (s *thresholdAlarmService) CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error {
|
||||
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAreaThresholdAlarm")
|
||||
|
||||
// 1. 获取系统健康检查计划
|
||||
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
|
||||
}
|
||||
if plan == nil {
|
||||
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
|
||||
}
|
||||
|
||||
// 2. 获取目标区域下的所有设备,并建立快速查找表
|
||||
devicesInArea, err := s.deviceRepo.ListByAreaControllerID(serviceCtx, req.AreaControllerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取区域 %d 下的设备列表失败: %w", req.AreaControllerID, err)
|
||||
}
|
||||
devicesInAreaMap := make(map[uint]struct{}, len(devicesInArea))
|
||||
for _, device := range devicesInArea {
|
||||
devicesInAreaMap[device.ID] = struct{}{}
|
||||
}
|
||||
|
||||
// 3. 遍历计划,检查存在性并收集需要排除的设备ID
|
||||
var excludeDeviceIDs []uint
|
||||
for _, t := range plan.Tasks {
|
||||
switch t.Type {
|
||||
case models.TaskTypeAreaCollectorThresholdCheck:
|
||||
var params task.AreaThresholdCheckParams
|
||||
if err := t.ParseParameters(¶ms); err != nil {
|
||||
return fmt.Errorf("任务 %d: 解析区域阈值检查任务参数失败: %w", t.ID, err)
|
||||
}
|
||||
if params.AreaControllerID == req.AreaControllerID && params.SensorType == req.SensorType {
|
||||
return fmt.Errorf("区域 %d: 该区域已存在针对 %s 的阈值检查任务", req.AreaControllerID, req.SensorType)
|
||||
}
|
||||
case models.TaskTypeDeviceThresholdCheck:
|
||||
var params task.DeviceThresholdCheckParams
|
||||
if err := t.ParseParameters(¶ms); err != nil {
|
||||
return fmt.Errorf("任务 %d: 解析设备阈值检查任务参数失败: %w", t.ID, err)
|
||||
}
|
||||
// 检查该设备是否属于目标区域
|
||||
if _, ok := devicesInAreaMap[params.DeviceID]; ok {
|
||||
excludeDeviceIDs = append(excludeDeviceIDs, params.DeviceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 创建新任务
|
||||
newTask := models.Task{
|
||||
PlanID: plan.ID,
|
||||
Name: fmt.Sprintf("区域 %d 的 %s 阈值检测任务", req.AreaControllerID, req.SensorType),
|
||||
Description: fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", req.AreaControllerID, req.SensorType, req.Operator, req.Thresholds),
|
||||
ExecutionOrder: len(plan.Tasks) + 1,
|
||||
Type: models.TaskTypeAreaCollectorThresholdCheck,
|
||||
}
|
||||
err = newTask.SaveParameters(task.AreaThresholdCheckParams{
|
||||
AreaControllerID: req.AreaControllerID,
|
||||
SensorType: req.SensorType,
|
||||
Thresholds: req.Thresholds,
|
||||
Operator: req.Operator,
|
||||
Level: req.Level,
|
||||
ExcludeDeviceIDs: excludeDeviceIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("保存新区域任务的参数失败: %w", err)
|
||||
}
|
||||
|
||||
// 5. 更新计划
|
||||
plan.Tasks = append(plan.Tasks, newTask)
|
||||
plan.ReorderSteps()
|
||||
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateAreaThresholdAlarm 实现了更新一个区域阈值告警的逻辑。
|
||||
func (s *thresholdAlarmService) UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error {
|
||||
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateAreaThresholdAlarm")
|
||||
|
||||
// 1. 获取系统健康检查计划
|
||||
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
|
||||
}
|
||||
if plan == nil {
|
||||
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
|
||||
}
|
||||
|
||||
// 2. 遍历任务列表,查找并更新目标任务
|
||||
taskFound := false
|
||||
for i, t := range plan.Tasks {
|
||||
if t.ID == taskID && t.Type == models.TaskTypeAreaCollectorThresholdCheck {
|
||||
taskFound = true
|
||||
|
||||
var params task.AreaThresholdCheckParams
|
||||
if err = t.ParseParameters(¶ms); err != nil {
|
||||
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
|
||||
}
|
||||
params.Thresholds = req.Thresholds
|
||||
params.Operator = req.Operator
|
||||
params.Level = req.Level
|
||||
// 刷新任务说明
|
||||
plan.Tasks[i].Description = fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", params.AreaControllerID, params.SensorType, params.Operator, params.Thresholds)
|
||||
|
||||
err = plan.Tasks[i].SaveParameters(params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !taskFound {
|
||||
return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID)
|
||||
}
|
||||
|
||||
// 全量更新计划
|
||||
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user