Files
pig-farm-controller/internal/app/service/threshold_alarm_service.go
2025-11-16 16:30:26 +08:00

797 lines
29 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
domainAlarm "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// ThresholdAlarmService 定义了阈值告警配置服务的接口。
// 该服务负责管理阈值告警任务的配置,并将其与计划进行联动。
type ThresholdAlarmService interface {
// SnoozeThresholdAlarm 忽略一个阈值告警,或更新其忽略时间。
SnoozeThresholdAlarm(ctx context.Context, alarmID uint32, durationMinutes uint32) error
// CancelSnoozeThresholdAlarm 取消对一个阈值告警的忽略状态。
CancelSnoozeThresholdAlarm(ctx context.Context, alarmID uint32) error
// ListActiveAlarms 批量查询活跃告警。
ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error)
// ListHistoricalAlarms 批量查询历史告警。
ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error)
// CreateDeviceThresholdAlarm 创建一个设备阈值告警。
CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error
// UpdateDeviceThresholdAlarm 更新一个设备阈值告警。
UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error
// GetDeviceThresholdAlarm 根据ID获取一个设备阈值告警任务。
GetDeviceThresholdAlarm(ctx context.Context, taskID int) (*dto.DeviceThresholdAlarmDTO, error)
// DeleteDeviceThresholdAlarm 删除一个设备阈值告警。
DeleteDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.DeleteDeviceThresholdAlarmDTO) error
// DeleteDeviceThresholdAlarmByDeviceID 实现了根据设备ID删除一个设备下所有设备阈值告警的逻辑。
DeleteDeviceThresholdAlarmByDeviceID(ctx context.Context, deviceID uint32) error
// ListDeviceThresholdAlarms 批量查询设备阈值告警配置。
ListDeviceThresholdAlarms(ctx context.Context, req *dto.ListDeviceThresholdAlarmRequest) (*dto.ListDeviceThresholdAlarmResponse, error)
// CreateAreaThresholdAlarm 创建一个区域阈值告警。
CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error
// UpdateAreaThresholdAlarm 更新一个区域阈值告警。
UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error
// GetAreaThresholdAlarm 根据ID获取一个区域阈值告警任务。
GetAreaThresholdAlarm(ctx context.Context, taskID int) (*dto.AreaThresholdAlarmDTO, error)
// DeleteAreaThresholdAlarm 实现了删除一个区域阈值告警的逻辑。
DeleteAreaThresholdAlarm(ctx context.Context, taskID int) error
// DeleteAreaThresholdAlarmByAreaControllerID 实现了根据区域ID删除一个区域下所有区域阈值告警的逻辑。
DeleteAreaThresholdAlarmByAreaControllerID(ctx context.Context, areaControllerID uint32) error
// ListAreaThresholdAlarms 批量查询区域阈值告警配置。
ListAreaThresholdAlarms(ctx context.Context, req *dto.ListAreaThresholdAlarmRequest) (*dto.ListAreaThresholdAlarmResponse, error)
}
// thresholdAlarmService 是 ThresholdAlarmService 接口的具体实现。
type thresholdAlarmService struct {
ctx context.Context
alarmService domainAlarm.AlarmService
planService plan.Service
alarmRepo repository.AlarmRepository
planRepo repository.PlanRepository
areaRepo repository.AreaControllerRepository
deviceRepo repository.DeviceRepository
}
// NewThresholdAlarmService 创建一个新的 ThresholdAlarmService 实例。
func NewThresholdAlarmService(ctx context.Context,
alarmService domainAlarm.AlarmService,
planService plan.Service,
alarmRepo repository.AlarmRepository,
planRepo repository.PlanRepository,
areaRepo repository.AreaControllerRepository,
deviceRepo repository.DeviceRepository,
) ThresholdAlarmService {
return &thresholdAlarmService{
ctx: ctx,
alarmService: alarmService,
planService: planService,
alarmRepo: alarmRepo,
planRepo: planRepo,
areaRepo: areaRepo,
deviceRepo: deviceRepo,
}
}
// SnoozeThresholdAlarm 实现了忽略阈值告警的逻辑。
func (s *thresholdAlarmService) SnoozeThresholdAlarm(ctx context.Context, alarmID uint32, durationMinutes uint32) error {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "SnoozeThresholdAlarm")
return s.alarmService.SnoozeAlarm(serviceCtx, alarmID, time.Duration(durationMinutes)*time.Minute)
}
// CancelSnoozeThresholdAlarm 实现了取消忽略阈值告警的逻辑。
func (s *thresholdAlarmService) CancelSnoozeThresholdAlarm(ctx context.Context, alarmID uint32) error {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "CancelSnoozeThresholdAlarm")
return s.alarmService.CancelAlarmSnooze(serviceCtx, alarmID)
}
// ListActiveAlarms 实现了批量查询活跃告警的逻辑。
func (s *thresholdAlarmService) ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListActiveAlarms")
opts := repository.ActiveAlarmListOptions{
SourceType: req.SourceType,
SourceID: req.SourceID,
Level: req.Level,
IsIgnored: req.IsIgnored,
TriggerTime: req.TriggerTime,
EndTime: req.EndTime,
OrderBy: req.OrderBy,
}
alarms, total, err := s.alarmRepo.ListActiveAlarms(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, err
}
return dto.NewListActiveAlarmResponse(alarms, total, req.Page, req.PageSize), nil
}
// ListHistoricalAlarms 实现了批量查询历史告警的逻辑。
func (s *thresholdAlarmService) ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListHistoricalAlarms")
opts := repository.HistoricalAlarmListOptions{
SourceType: req.SourceType,
SourceID: req.SourceID,
Level: req.Level,
TriggerTimeStart: req.TriggerTimeStart,
TriggerTimeEnd: req.TriggerTimeEnd,
ResolveTimeStart: req.ResolveTimeStart,
ResolveTimeEnd: req.ResolveTimeEnd,
OrderBy: req.OrderBy,
}
alarms, total, err := s.alarmRepo.ListHistoricalAlarms(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, err
}
return dto.NewListHistoricalAlarmResponse(alarms, total, req.Page, req.PageSize), nil
}
// CreateDeviceThresholdAlarm 实现了创建一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateThresholdAlarm")
device, err := s.deviceRepo.FindByID(serviceCtx, req.DeviceID)
if err != nil {
return fmt.Errorf("获取设备 %v 失败: %v", req.DeviceID, err)
}
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统计划 %v 失败: %v", models.PlanNamePeriodicSystemHealthCheck, err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 系统计划肯定是子任务
for i, t := range plan.Tasks {
switch t.Type {
case models.TaskTypeDeviceThresholdCheck: // 检查任务是否存在
var params task.DeviceThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return fmt.Errorf("任务 %v: 解析设备阈值检查任务参数失败: %v", t.ID, err)
}
if params.DeviceID == req.DeviceID && params.SensorType == req.SensorType {
return fmt.Errorf("设备 %v: 该设备已存在阈值检查任务", req.DeviceID)
}
case models.TaskTypeAreaCollectorThresholdCheck: // 向区域阈值检查任务过滤列表中添加该设备
params := task.AreaThresholdCheckParams{
ExcludeDeviceIDs: []uint32{},
}
err = t.ParseParameters(&params)
if err != nil {
return fmt.Errorf("任务 %v: 解析区域阈值检查任务参数失败: %v", t.ID, err)
}
if params.AreaControllerID == device.AreaControllerID {
has := false
for _, d := range params.ExcludeDeviceIDs {
if d == req.DeviceID {
has = true
break
}
}
if !has {
params.ExcludeDeviceIDs = append(params.ExcludeDeviceIDs, req.DeviceID)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %v: 保存任务参数失败: %v", t.ID, err)
}
}
}
default:
continue
}
}
t := models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("设备 %v 的阈值检测任务", req.DeviceID),
Description: fmt.Sprintf("检测该设备 %v 是否 %v %v", req.SensorType, req.Operator, req.Thresholds),
ExecutionOrder: len(plan.Tasks) + 1,
Type: models.TaskTypeDeviceThresholdCheck,
}
err = t.SaveParameters(task.DeviceThresholdCheckParams{
DeviceID: req.DeviceID,
SensorType: req.SensorType,
Thresholds: req.Thresholds,
Level: req.Level,
Operator: req.Operator,
})
if err != nil {
return fmt.Errorf("保存任务参数失败: %v", err)
}
plan.Tasks = append(plan.Tasks, t)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
if err != nil {
return fmt.Errorf("更新计划失败: %v", err)
}
return nil
}
// UpdateDeviceThresholdAlarm 实现了更新一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateDeviceThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
// 这个系统计划必须存在
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 遍历任务列表,查找并更新目标任务
taskFound := false
for i, t := range plan.Tasks {
if t.ID == taskID && t.Type == models.TaskTypeDeviceThresholdCheck {
taskFound = true
var params task.DeviceThresholdCheckParams
if err = t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
}
params.Thresholds = req.Thresholds
params.Operator = req.Operator
params.Level = req.Level
// 刷新任务说明
plan.Tasks[i].Description = fmt.Sprintf("检测该设备 %v 是否 %v %v", params.SensorType, params.Operator, params.Thresholds)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
}
break
}
}
if !taskFound {
return fmt.Errorf("任务 %d: 不存在", taskID)
}
// 全量更新计划
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// GetDeviceThresholdAlarm 实现了根据ID获取一个设备阈值告警任务的逻辑。
func (s *thresholdAlarmService) GetDeviceThresholdAlarm(ctx context.Context, taskID int) (*dto.DeviceThresholdAlarmDTO, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetDeviceThresholdAlarm")
// 1. 使用 planRepo 查询任务
t, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return nil, err // 如果未找到或发生其他错误,直接返回
}
// 2. 验证任务类型是否正确
if t.Type != models.TaskTypeDeviceThresholdCheck {
return nil, fmt.Errorf("任务 %d 不是一个设备阈值检查任务", taskID)
}
var params task.DeviceThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return nil, fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
resp := &dto.DeviceThresholdAlarmDTO{
ID: t.ID,
DeviceID: params.DeviceID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
}
return resp, nil
}
// DeleteDeviceThresholdAlarm 实现了删除一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.DeleteDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteDeviceThresholdAlarm")
// 获取待删除任务并校验
deleteTask, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return fmt.Errorf("获取任务失败: %w", err)
}
if deleteTask.Type != models.TaskTypeDeviceThresholdCheck {
return fmt.Errorf("任务 %d 不是一个设备阈值检查任务", taskID)
}
var deviceParams task.DeviceThresholdCheckParams
if err := deleteTask.ParseParameters(&deviceParams); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
// 获得任务对应设备对应区域主控
device, err := s.deviceRepo.FindByID(serviceCtx, deviceParams.DeviceID)
if err != nil {
return fmt.Errorf("获取设备 %d 失败: %w", deviceParams.DeviceID, err)
}
area, err := s.areaRepo.FindByID(serviceCtx, device.AreaControllerID)
if err != nil {
return fmt.Errorf("获取区域 %d 失败: %w", device.AreaControllerID, err)
}
// 获取健康检查计划任务列表
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
taskIndexToDelete := -1
for i, t := range plan.Tasks {
if t.ID == taskID {
taskIndexToDelete = i
}
if t.Type == models.TaskTypeAreaCollectorThresholdCheck {
var areaParams task.AreaThresholdCheckParams
if err := t.ParseParameters(&areaParams); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", t.ID, err)
}
if areaParams.AreaControllerID == area.ID && areaParams.SensorType == deviceParams.SensorType {
for ia, e := range areaParams.ExcludeDeviceIDs {
if e == deviceParams.DeviceID {
areaParams.ExcludeDeviceIDs = append(areaParams.ExcludeDeviceIDs[:ia], areaParams.ExcludeDeviceIDs[ia+1:]...)
continue
}
}
err = plan.Tasks[i].SaveParameters(areaParams)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", t.ID, err)
}
}
}
}
if taskIndexToDelete == -1 {
return fmt.Errorf("任务 %d 在系统计划中未找到", taskID)
}
plan.Tasks = append(plan.Tasks[:taskIndexToDelete], plan.Tasks[taskIndexToDelete+1:]...)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// DeleteDeviceThresholdAlarmByDeviceID 实现了根据设备ID删除一个设备下所有设备阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteDeviceThresholdAlarmByDeviceID(ctx context.Context, deviceID uint32) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteDeviceThresholdAlarmByDeviceID")
tasks, err := s.planRepo.ListTasksByDeviceID(serviceCtx, deviceID)
if err != nil {
return fmt.Errorf("获取任务列表失败: %w", err)
}
device, err := s.deviceRepo.FindByID(serviceCtx, deviceID)
if err != nil {
return fmt.Errorf("获取设备 %d 失败: %w", deviceID, err)
}
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
deleteNums := []int{}
for i, t := range plan.Tasks {
for _, dt := range tasks {
if t.ID == dt.ID && t.Type == models.TaskTypeDeviceThresholdCheck {
deleteNums = append(deleteNums, i)
break
}
}
if t.Type == models.TaskTypeAreaCollectorThresholdCheck {
var areaParams task.AreaThresholdCheckParams
if err := t.ParseParameters(&areaParams); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", t.ID, err)
}
if areaParams.AreaControllerID == device.AreaControllerID {
for ai, ae := range areaParams.ExcludeDeviceIDs {
if ae == deviceID {
areaParams.ExcludeDeviceIDs = append(areaParams.ExcludeDeviceIDs[:ai], areaParams.ExcludeDeviceIDs[ai+1:]...)
break
}
}
err = plan.Tasks[i].SaveParameters(areaParams)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", t.ID, err)
}
}
}
}
// 为了高效地判断一个索引是否需要被删除,先将 deleteNums 转换为一个 map。
deleteMap := make(map[int]struct{}, len(deleteNums))
for _, index := range deleteNums {
deleteMap[index] = struct{}{}
}
// 创建一个新的任务切片,只包含不需要删除的任务。
newTasks := make([]models.Task, 0, len(plan.Tasks)-len(deleteNums))
for i, t := range plan.Tasks {
if _, found := deleteMap[i]; !found {
newTasks = append(newTasks, t)
}
}
plan.Tasks = newTasks
// 重新排序任务并更新计划
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// ListDeviceThresholdAlarms 实现了批量查询设备阈值告警配置的逻辑。
func (s *thresholdAlarmService) ListDeviceThresholdAlarms(ctx context.Context, req *dto.ListDeviceThresholdAlarmRequest) (*dto.ListDeviceThresholdAlarmResponse, error) {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "ListDeviceThresholdAlarms")
// 1. 准备调用 planRepo.ListTasks 的选项
taskType := models.TaskTypeDeviceThresholdCheck
opts := repository.TaskListOptions{
Type: &taskType,
DeviceID: req.DeviceID,
SensorType: req.SensorType,
Level: req.Level,
OrderBy: req.OrderBy,
}
// 2. 调用底层的 ListTasks 方法
tasks, total, err := s.planRepo.ListTasks(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, fmt.Errorf("查询设备阈值告警任务失败: %w", err)
}
// 3. 将查询到的 models.Task 列表转换为 dto.DeviceThresholdAlarmDTO 列表
alarmDTOs := make([]dto.DeviceThresholdAlarmDTO, 0, len(tasks))
for _, t := range tasks {
var params task.DeviceThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
logger.Warnf("解析任务 %d 的参数失败: %v已在列表中跳过", t.ID, err)
continue
}
alarmDTOs = append(alarmDTOs, dto.DeviceThresholdAlarmDTO{
ID: t.ID,
DeviceID: params.DeviceID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
})
}
// 4. 构建并返回响应
response := &dto.ListDeviceThresholdAlarmResponse{
List: alarmDTOs,
Pagination: dto.PaginationDTO{
Total: total,
Page: req.Page,
PageSize: req.PageSize,
},
}
return response, nil
}
// ListAreaThresholdAlarms 实现了批量查询区域阈值告警配置的逻辑。
func (s *thresholdAlarmService) ListAreaThresholdAlarms(ctx context.Context, req *dto.ListAreaThresholdAlarmRequest) (*dto.ListAreaThresholdAlarmResponse, error) {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "ListAreaThresholdAlarms")
// 1. 准备调用 planRepo.ListTasks 的选项
taskType := models.TaskTypeAreaCollectorThresholdCheck
opts := repository.TaskListOptions{
Type: &taskType,
AreaControllerID: req.AreaControllerID,
SensorType: req.SensorType,
Level: req.Level,
OrderBy: req.OrderBy,
}
// 2. 调用底层的 ListTasks 方法
tasks, total, err := s.planRepo.ListTasks(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, fmt.Errorf("查询区域阈值告警任务失败: %w", err)
}
// 3. 将查询到的 models.Task 列表转换为 dto.AreaThresholdAlarmDTO 列表
alarmDTOs := make([]dto.AreaThresholdAlarmDTO, 0, len(tasks))
for _, t := range tasks {
var params task.AreaThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
logger.Warnf("解析区域任务 %d 的参数失败: %v已在列表中跳过", t.ID, err)
continue
}
alarmDTOs = append(alarmDTOs, dto.AreaThresholdAlarmDTO{
ID: t.ID,
AreaControllerID: params.AreaControllerID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
})
}
// 4. 构建并返回响应
response := &dto.ListAreaThresholdAlarmResponse{
List: alarmDTOs,
Pagination: dto.PaginationDTO{
Total: total,
Page: req.Page,
PageSize: req.PageSize,
},
}
return response, nil
}
// CreateAreaThresholdAlarm 实现了创建一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAreaThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 获取目标区域下的所有设备,并建立快速查找表
devicesInArea, err := s.deviceRepo.ListByAreaControllerID(serviceCtx, req.AreaControllerID)
if err != nil {
return fmt.Errorf("获取区域 %d 下的设备列表失败: %w", req.AreaControllerID, err)
}
devicesInAreaMap := make(map[uint32]struct{}, len(devicesInArea))
for _, device := range devicesInArea {
devicesInAreaMap[device.ID] = struct{}{}
}
// 3. 遍历计划检查存在性并收集需要排除的设备ID
var excludeDeviceIDs []uint32
for _, t := range plan.Tasks {
switch t.Type {
case models.TaskTypeAreaCollectorThresholdCheck:
var params task.AreaThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析区域阈值检查任务参数失败: %w", t.ID, err)
}
if params.AreaControllerID == req.AreaControllerID && params.SensorType == req.SensorType {
return fmt.Errorf("区域 %d: 该区域已存在针对 %s 的阈值检查任务", req.AreaControllerID, req.SensorType)
}
case models.TaskTypeDeviceThresholdCheck:
var params task.DeviceThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析设备阈值检查任务参数失败: %w", t.ID, err)
}
// 检查该设备是否属于目标区域
if _, ok := devicesInAreaMap[params.DeviceID]; ok {
excludeDeviceIDs = append(excludeDeviceIDs, params.DeviceID)
}
}
}
// 4. 创建新任务
newTask := models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("区域 %d 的 %s 阈值检测任务", req.AreaControllerID, req.SensorType),
Description: fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", req.AreaControllerID, req.SensorType, req.Operator, req.Thresholds),
ExecutionOrder: len(plan.Tasks) + 1,
Type: models.TaskTypeAreaCollectorThresholdCheck,
}
err = newTask.SaveParameters(task.AreaThresholdCheckParams{
AreaControllerID: req.AreaControllerID,
SensorType: req.SensorType,
Thresholds: req.Thresholds,
Operator: req.Operator,
Level: req.Level,
ExcludeDeviceIDs: excludeDeviceIDs,
})
if err != nil {
return fmt.Errorf("保存新区域任务的参数失败: %w", err)
}
// 5. 更新计划
plan.Tasks = append(plan.Tasks, newTask)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// UpdateAreaThresholdAlarm 实现了更新一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateAreaThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 遍历任务列表,查找并更新目标任务
taskFound := false
for i, t := range plan.Tasks {
if t.ID == taskID && t.Type == models.TaskTypeAreaCollectorThresholdCheck {
taskFound = true
var params task.AreaThresholdCheckParams
if err = t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
}
params.Thresholds = req.Thresholds
params.Operator = req.Operator
params.Level = req.Level
// 刷新任务说明
plan.Tasks[i].Description = fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", params.AreaControllerID, params.SensorType, params.Operator, params.Thresholds)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
}
break
}
}
if !taskFound {
return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID)
}
// 全量更新计划
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// GetAreaThresholdAlarm 实现了根据ID获取一个区域阈值告警任务的逻辑。
func (s *thresholdAlarmService) GetAreaThresholdAlarm(ctx context.Context, taskID int) (*dto.AreaThresholdAlarmDTO, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetAreaThresholdAlarm")
// 1. 使用 planRepo 查询任务
t, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return nil, err // 如果未找到或发生其他错误,直接返回
}
// 2. 验证任务类型是否正确
if t.Type != models.TaskTypeAreaCollectorThresholdCheck {
return nil, fmt.Errorf("任务 %d 不是一个区域阈值检查任务", taskID)
}
var params task.AreaThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return nil, fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
resp := &dto.AreaThresholdAlarmDTO{
ID: t.ID,
AreaControllerID: params.AreaControllerID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
}
return resp, nil
}
// DeleteAreaThresholdAlarm 实现了删除一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteAreaThresholdAlarm(ctx context.Context, taskID int) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteAreaThresholdAlarm")
// 获取健康检查计划任务列表
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 找到这个任务并删掉
deleteTaskNum := -1
for i, t := range plan.Tasks {
if t.Type != models.TaskTypeAreaCollectorThresholdCheck {
continue
}
if t.ID != taskID {
continue
}
deleteTaskNum = i
}
if deleteTaskNum == -1 {
return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID)
}
plan.Tasks = append(plan.Tasks[:deleteTaskNum], plan.Tasks[deleteTaskNum+1:]...)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// DeleteAreaThresholdAlarmByAreaControllerID 实现了根据区域ID删除一个区域下所有区域阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteAreaThresholdAlarmByAreaControllerID(ctx context.Context, areaControllerID uint32) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteAreaThresholdAlarmByAreaControllerID")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 收集所有与指定 areaControllerID 相关的区域阈值告警任务的索引
var deleteIndices []int
for i, t := range plan.Tasks {
// 只关心区域阈值检查任务
if t.Type != models.TaskTypeAreaCollectorThresholdCheck {
continue
}
var params task.AreaThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", t.ID, err)
}
// 如果 AreaControllerID 匹配,则记录其索引以待删除
if params.AreaControllerID == areaControllerID {
deleteIndices = append(deleteIndices, i)
}
}
// 如果没有找到要删除的任务,则直接返回
if len(deleteIndices) == 0 {
return nil
}
// 3. 使用 map 和新切片的方式安全地删除多个任务
deleteMap := make(map[int]struct{}, len(deleteIndices))
for _, index := range deleteIndices {
deleteMap[index] = struct{}{}
}
newTasks := make([]models.Task, 0, len(plan.Tasks)-len(deleteMap))
for i, t := range plan.Tasks {
if _, found := deleteMap[i]; !found {
newTasks = append(newTasks, t)
}
}
plan.Tasks = newTasks
// 4. 重新排序任务并更新计划
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}