Files
pig-farm-controller/internal/app/service/threshold_alarm_service.go
huang cb075c907d DeleteDeviceThresholdAlarm
DeleteAreaThresholdAlarm
2025-11-10 18:22:00 +08:00

558 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
domainAlarm "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// ThresholdAlarmService 定义了阈值告警配置服务的接口。
// 该服务负责管理阈值告警任务的配置,并将其与计划进行联动。
type ThresholdAlarmService interface {
// SnoozeThresholdAlarm 忽略一个阈值告警,或更新其忽略时间。
SnoozeThresholdAlarm(ctx context.Context, alarmID uint, durationMinutes uint) error
// CancelSnoozeThresholdAlarm 取消对一个阈值告警的忽略状态。
CancelSnoozeThresholdAlarm(ctx context.Context, alarmID uint) error
// ListActiveAlarms 批量查询活跃告警。
ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error)
// ListHistoricalAlarms 批量查询历史告警。
ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error)
// CreateDeviceThresholdAlarm 创建一个设备阈值告警。
CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error
// UpdateDeviceThresholdAlarm 更新一个设备阈值告警。
UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error
// GetDeviceThresholdAlarm 根据ID获取一个设备阈值告警任务。
GetDeviceThresholdAlarm(ctx context.Context, taskID int) (*dto.DeviceThresholdAlarmDTO, error)
// DeleteDeviceThresholdAlarm 删除一个设备阈值告警。
DeleteDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.DeleteDeviceThresholdAlarmDTO) error
// CreateAreaThresholdAlarm 创建一个区域阈值告警。
CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error
// UpdateAreaThresholdAlarm 更新一个区域阈值告警。
UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error
// GetAreaThresholdAlarm 根据ID获取一个区域阈值告警任务。
GetAreaThresholdAlarm(ctx context.Context, taskID int) (*dto.AreaThresholdAlarmDTO, error)
// DeleteAreaThresholdAlarm 实现了删除一个区域阈值告警的逻辑。
DeleteAreaThresholdAlarm(ctx context.Context, taskID int) error
}
// thresholdAlarmService 是 ThresholdAlarmService 接口的具体实现。
type thresholdAlarmService struct {
ctx context.Context
alarmService domainAlarm.AlarmService
planService plan.Service
alarmRepo repository.AlarmRepository
planRepo repository.PlanRepository
areaRepo repository.AreaControllerRepository
deviceRepo repository.DeviceRepository
}
// NewThresholdAlarmService 创建一个新的 ThresholdAlarmService 实例。
func NewThresholdAlarmService(ctx context.Context,
alarmService domainAlarm.AlarmService,
planService plan.Service,
alarmRepo repository.AlarmRepository,
planRepo repository.PlanRepository,
areaRepo repository.AreaControllerRepository,
deviceRepo repository.DeviceRepository,
) ThresholdAlarmService {
return &thresholdAlarmService{
ctx: ctx,
alarmService: alarmService,
planService: planService,
alarmRepo: alarmRepo,
planRepo: planRepo,
areaRepo: areaRepo,
deviceRepo: deviceRepo,
}
}
// SnoozeThresholdAlarm 实现了忽略阈值告警的逻辑。
func (s *thresholdAlarmService) SnoozeThresholdAlarm(ctx context.Context, alarmID uint, durationMinutes uint) error {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "SnoozeThresholdAlarm")
return s.alarmService.SnoozeAlarm(serviceCtx, alarmID, time.Duration(durationMinutes)*time.Minute)
}
// CancelSnoozeThresholdAlarm 实现了取消忽略阈值告警的逻辑。
func (s *thresholdAlarmService) CancelSnoozeThresholdAlarm(ctx context.Context, alarmID uint) error {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "CancelSnoozeThresholdAlarm")
return s.alarmService.CancelAlarmSnooze(serviceCtx, alarmID)
}
// ListActiveAlarms 实现了批量查询活跃告警的逻辑。
func (s *thresholdAlarmService) ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListActiveAlarms")
opts := repository.ActiveAlarmListOptions{
SourceType: req.SourceType,
SourceID: req.SourceID,
Level: req.Level,
IsIgnored: req.IsIgnored,
TriggerTime: req.TriggerTime,
EndTime: req.EndTime,
OrderBy: req.OrderBy,
}
alarms, total, err := s.alarmRepo.ListActiveAlarms(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, err
}
return dto.NewListActiveAlarmResponse(alarms, total, req.Page, req.PageSize), nil
}
// ListHistoricalAlarms 实现了批量查询历史告警的逻辑。
func (s *thresholdAlarmService) ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListHistoricalAlarms")
opts := repository.HistoricalAlarmListOptions{
SourceType: req.SourceType,
SourceID: req.SourceID,
Level: req.Level,
TriggerTimeStart: req.TriggerTimeStart,
TriggerTimeEnd: req.TriggerTimeEnd,
ResolveTimeStart: req.ResolveTimeStart,
ResolveTimeEnd: req.ResolveTimeEnd,
OrderBy: req.OrderBy,
}
alarms, total, err := s.alarmRepo.ListHistoricalAlarms(serviceCtx, opts, req.Page, req.PageSize)
if err != nil {
return nil, err
}
return dto.NewListHistoricalAlarmResponse(alarms, total, req.Page, req.PageSize), nil
}
// CreateDeviceThresholdAlarm 实现了创建一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateThresholdAlarm")
device, err := s.deviceRepo.FindByID(serviceCtx, req.DeviceID)
if err != nil {
return fmt.Errorf("获取设备 %v 失败: %v", req.DeviceID, err)
}
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统计划 %v 失败: %v", models.PlanNamePeriodicSystemHealthCheck, err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 系统计划肯定是子任务
for i, t := range plan.Tasks {
switch t.Type {
case models.TaskTypeDeviceThresholdCheck: // 检查任务是否存在
var params task.DeviceThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return fmt.Errorf("任务 %v: 解析设备阈值检查任务参数失败: %v", t.ID, err)
}
if params.DeviceID == req.DeviceID && params.SensorType == req.SensorType {
return fmt.Errorf("设备 %v: 该设备已存在阈值检查任务", req.DeviceID)
}
case models.TaskTypeAreaCollectorThresholdCheck: // 向区域阈值检查任务过滤列表中添加该设备
params := task.AreaThresholdCheckParams{
ExcludeDeviceIDs: []uint{},
}
err = t.ParseParameters(&params)
if err != nil {
return fmt.Errorf("任务 %v: 解析区域阈值检查任务参数失败: %v", t.ID, err)
}
if params.AreaControllerID == device.AreaControllerID {
has := false
for _, d := range params.ExcludeDeviceIDs {
if d == req.DeviceID {
has = true
break
}
}
if !has {
params.ExcludeDeviceIDs = append(params.ExcludeDeviceIDs, req.DeviceID)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %v: 保存任务参数失败: %v", t.ID, err)
}
}
}
default:
continue
}
}
t := models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("设备 %v 的阈值检测任务", req.DeviceID),
Description: fmt.Sprintf("检测该设备 %v 是否 %v %v", req.SensorType, req.Operator, req.Thresholds),
ExecutionOrder: len(plan.Tasks) + 1,
Type: models.TaskTypeDeviceThresholdCheck,
}
err = t.SaveParameters(task.DeviceThresholdCheckParams{
DeviceID: req.DeviceID,
SensorType: req.SensorType,
Thresholds: req.Thresholds,
Level: req.Level,
Operator: req.Operator,
})
if err != nil {
return fmt.Errorf("保存任务参数失败: %v", err)
}
plan.Tasks = append(plan.Tasks, t)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
if err != nil {
return fmt.Errorf("更新计划失败: %v", err)
}
return nil
}
// UpdateDeviceThresholdAlarm 实现了更新一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateDeviceThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
// 这个系统计划必须存在
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 遍历任务列表,查找并更新目标任务
taskFound := false
for i, t := range plan.Tasks {
if t.ID == taskID && t.Type == models.TaskTypeDeviceThresholdCheck {
taskFound = true
var params task.DeviceThresholdCheckParams
if err = t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
}
params.Thresholds = req.Thresholds
params.Operator = req.Operator
params.Level = req.Level
// 刷新任务说明
plan.Tasks[i].Description = fmt.Sprintf("检测该设备 %v 是否 %v %v", params.SensorType, params.Operator, params.Thresholds)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
}
break
}
}
if !taskFound {
return fmt.Errorf("任务 %d: 不存在", taskID)
}
// 全量更新计划
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// GetDeviceThresholdAlarm 实现了根据ID获取一个设备阈值告警任务的逻辑。
func (s *thresholdAlarmService) GetDeviceThresholdAlarm(ctx context.Context, taskID int) (*dto.DeviceThresholdAlarmDTO, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetDeviceThresholdAlarm")
// 1. 使用 planRepo 查询任务
t, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return nil, err // 如果未找到或发生其他错误,直接返回
}
// 2. 验证任务类型是否正确
if t.Type != models.TaskTypeDeviceThresholdCheck {
return nil, fmt.Errorf("任务 %d 不是一个设备阈值检查任务", taskID)
}
var params task.DeviceThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return nil, fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
resp := &dto.DeviceThresholdAlarmDTO{
ID: t.ID,
DeviceID: params.DeviceID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
}
return resp, nil
}
// DeleteDeviceThresholdAlarm 实现了删除一个设备阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.DeleteDeviceThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteDeviceThresholdAlarm")
// 获取待删除任务并校验
deleteTask, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return fmt.Errorf("获取任务失败: %w", err)
}
if deleteTask.Type != models.TaskTypeDeviceThresholdCheck {
return fmt.Errorf("任务 %d 不是一个设备阈值检查任务", taskID)
}
var deviceParams task.DeviceThresholdCheckParams
if err := deleteTask.ParseParameters(&deviceParams); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
// 获得任务对应设备对应区域主控
device, err := s.deviceRepo.FindByID(serviceCtx, deviceParams.DeviceID)
if err != nil {
return fmt.Errorf("获取设备 %d 失败: %w", deviceParams.DeviceID, err)
}
area, err := s.areaRepo.FindByID(serviceCtx, device.AreaControllerID)
if err != nil {
return fmt.Errorf("获取区域 %d 失败: %w", device.AreaControllerID, err)
}
// 获取健康检查计划任务列表
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
taskIndexToDelete := -1
for i, t := range plan.Tasks {
if t.ID == taskID {
taskIndexToDelete = i
}
if t.Type == models.TaskTypeAreaCollectorThresholdCheck {
var areaParams task.AreaThresholdCheckParams
if err := t.ParseParameters(&areaParams); err != nil {
return fmt.Errorf("任务 %d: 解析参数失败: %w", t.ID, err)
}
if areaParams.AreaControllerID == area.ID && areaParams.SensorType == deviceParams.SensorType {
for ia, e := range areaParams.ExcludeDeviceIDs {
if e == deviceParams.DeviceID {
areaParams.ExcludeDeviceIDs = append(areaParams.ExcludeDeviceIDs[:ia], areaParams.ExcludeDeviceIDs[ia+1:]...)
continue
}
}
err = plan.Tasks[i].SaveParameters(areaParams)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", t.ID, err)
}
}
}
}
if taskIndexToDelete == -1 {
return fmt.Errorf("任务 %d 在系统计划中未找到", taskID)
}
plan.Tasks = append(plan.Tasks[:taskIndexToDelete], plan.Tasks[taskIndexToDelete+1:]...)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// CreateAreaThresholdAlarm 实现了创建一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAreaThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 获取目标区域下的所有设备,并建立快速查找表
devicesInArea, err := s.deviceRepo.ListByAreaControllerID(serviceCtx, req.AreaControllerID)
if err != nil {
return fmt.Errorf("获取区域 %d 下的设备列表失败: %w", req.AreaControllerID, err)
}
devicesInAreaMap := make(map[uint]struct{}, len(devicesInArea))
for _, device := range devicesInArea {
devicesInAreaMap[device.ID] = struct{}{}
}
// 3. 遍历计划检查存在性并收集需要排除的设备ID
var excludeDeviceIDs []uint
for _, t := range plan.Tasks {
switch t.Type {
case models.TaskTypeAreaCollectorThresholdCheck:
var params task.AreaThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析区域阈值检查任务参数失败: %w", t.ID, err)
}
if params.AreaControllerID == req.AreaControllerID && params.SensorType == req.SensorType {
return fmt.Errorf("区域 %d: 该区域已存在针对 %s 的阈值检查任务", req.AreaControllerID, req.SensorType)
}
case models.TaskTypeDeviceThresholdCheck:
var params task.DeviceThresholdCheckParams
if err := t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析设备阈值检查任务参数失败: %w", t.ID, err)
}
// 检查该设备是否属于目标区域
if _, ok := devicesInAreaMap[params.DeviceID]; ok {
excludeDeviceIDs = append(excludeDeviceIDs, params.DeviceID)
}
}
}
// 4. 创建新任务
newTask := models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("区域 %d 的 %s 阈值检测任务", req.AreaControllerID, req.SensorType),
Description: fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", req.AreaControllerID, req.SensorType, req.Operator, req.Thresholds),
ExecutionOrder: len(plan.Tasks) + 1,
Type: models.TaskTypeAreaCollectorThresholdCheck,
}
err = newTask.SaveParameters(task.AreaThresholdCheckParams{
AreaControllerID: req.AreaControllerID,
SensorType: req.SensorType,
Thresholds: req.Thresholds,
Operator: req.Operator,
Level: req.Level,
ExcludeDeviceIDs: excludeDeviceIDs,
})
if err != nil {
return fmt.Errorf("保存新区域任务的参数失败: %w", err)
}
// 5. 更新计划
plan.Tasks = append(plan.Tasks, newTask)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// UpdateAreaThresholdAlarm 实现了更新一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateAreaThresholdAlarm")
// 1. 获取系统健康检查计划
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 2. 遍历任务列表,查找并更新目标任务
taskFound := false
for i, t := range plan.Tasks {
if t.ID == taskID && t.Type == models.TaskTypeAreaCollectorThresholdCheck {
taskFound = true
var params task.AreaThresholdCheckParams
if err = t.ParseParameters(&params); err != nil {
return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err)
}
params.Thresholds = req.Thresholds
params.Operator = req.Operator
params.Level = req.Level
// 刷新任务说明
plan.Tasks[i].Description = fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", params.AreaControllerID, params.SensorType, params.Operator, params.Thresholds)
err = plan.Tasks[i].SaveParameters(params)
if err != nil {
return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err)
}
break
}
}
if !taskFound {
return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID)
}
// 全量更新计划
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}
// GetAreaThresholdAlarm 实现了根据ID获取一个区域阈值告警任务的逻辑。
func (s *thresholdAlarmService) GetAreaThresholdAlarm(ctx context.Context, taskID int) (*dto.AreaThresholdAlarmDTO, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetAreaThresholdAlarm")
// 1. 使用 planRepo 查询任务
t, err := s.planRepo.FindTaskByID(serviceCtx, taskID)
if err != nil {
return nil, err // 如果未找到或发生其他错误,直接返回
}
// 2. 验证任务类型是否正确
if t.Type != models.TaskTypeAreaCollectorThresholdCheck {
return nil, fmt.Errorf("任务 %d 不是一个区域阈值检查任务", taskID)
}
var params task.AreaThresholdCheckParams
err = t.ParseParameters(&params)
if err != nil {
return nil, fmt.Errorf("任务 %d: 解析参数失败: %w", taskID, err)
}
resp := &dto.AreaThresholdAlarmDTO{
ID: t.ID,
AreaControllerID: params.AreaControllerID,
SensorType: params.SensorType,
Thresholds: params.Thresholds,
Operator: params.Operator,
Level: params.Level,
}
return resp, nil
}
// DeleteAreaThresholdAlarm 实现了删除一个区域阈值告警的逻辑。
func (s *thresholdAlarmService) DeleteAreaThresholdAlarm(ctx context.Context, taskID int) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "DeleteAreaThresholdAlarm")
// 获取健康检查计划任务列表
plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck)
if err != nil {
return fmt.Errorf("获取系统健康检查计划失败: %w", err)
}
if plan == nil {
logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck)
}
// 找到这个任务并删掉
deleteTaskNum := -1
for i, t := range plan.Tasks {
if t.Type != models.TaskTypeAreaCollectorThresholdCheck {
continue
}
if t.ID != taskID {
continue
}
deleteTaskNum = i
}
if deleteTaskNum == -1 {
return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID)
}
plan.Tasks = append(plan.Tasks[:deleteTaskNum], plan.Tasks[deleteTaskNum+1:]...)
plan.ReorderSteps()
_, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem)
return err
}