diff --git a/internal/app/dto/alarm_dto.go b/internal/app/dto/alarm_dto.go index 7cb2b09..6538184 100644 --- a/internal/app/dto/alarm_dto.go +++ b/internal/app/dto/alarm_dto.go @@ -81,3 +81,35 @@ type ListHistoricalAlarmResponse struct { List []HistoricalAlarmDTO `json:"list"` Pagination PaginationDTO `json:"pagination"` } + +// CreateDeviceThresholdAlarmDTO 创建设备阈值告警的请求DTO +type CreateDeviceThresholdAlarmDTO struct { + DeviceID uint `json:"device_id" binding:"required"` // 设备ID + SensorType models.SensorType `json:"sensor_type" binding:"required"` // 传感器类型 + Thresholds float64 `json:"thresholds" binding:"required"` // 阈值 + Operator models.Operator `json:"operator" binding:"required"` // 操作符 (使用string类型,与前端交互更通用) + Level models.SeverityLevel `json:"level,omitempty"` // 告警等级,可选,如果未提供则使用默认值 +} + +// UpdateDeviceThresholdAlarmDTO 更新设备阈值告警的请求DTO +type UpdateDeviceThresholdAlarmDTO struct { + Thresholds float64 `json:"thresholds" binding:"required"` // 新的阈值 + Operator models.Operator `json:"operator" binding:"required"` // 新的操作符 + Level models.SeverityLevel `json:"level,omitempty"` // 新的告警等级,可选 +} + +// CreateAreaThresholdAlarmDTO 创建区域阈值告警的请求DTO +type CreateAreaThresholdAlarmDTO struct { + AreaControllerID uint `json:"area_controller_id" binding:"required"` // 区域主控ID + SensorType models.SensorType `json:"sensor_type" binding:"required"` // 传感器类型 + Thresholds float64 `json:"thresholds" binding:"required"` // 阈值 + Operator models.Operator `json:"operator" binding:"required"` // 操作符 + Level models.SeverityLevel `json:"level,omitempty"` // 告警等级,可选 +} + +// UpdateAreaThresholdAlarmDTO 更新区域阈值告警的请求DTO +type UpdateAreaThresholdAlarmDTO struct { + Thresholds float64 `json:"thresholds" binding:"required"` // 新的阈值 + Operator models.Operator `json:"operator" binding:"required"` // 新的操作符 + Level models.SeverityLevel `json:"level,omitempty"` // 新的告警等级,可选 +} diff --git a/internal/app/service/plan_service.go b/internal/app/service/plan_service.go index ba8b416..8601487 100644 --- a/internal/app/service/plan_service.go +++ b/internal/app/service/plan_service.go @@ -7,6 +7,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) @@ -147,7 +148,7 @@ func (s *planService) UpdatePlan(ctx context.Context, id uint, req *dto.UpdatePl planToUpdate.ID = id // 确保ID被设置 // 调用领域服务更新计划 - updatedPlan, err := s.domainPlanService.UpdatePlan(serviceCtx, planToUpdate) + updatedPlan, err := s.domainPlanService.UpdatePlan(serviceCtx, planToUpdate, models.PlanTypeCustom) if err != nil { logger.Errorf("%s: 领域服务更新计划失败: %v, ID: %d", actionType, err, id) return nil, err // 直接返回领域层错误 diff --git a/internal/app/service/threshold_alarm_service.go b/internal/app/service/threshold_alarm_service.go index f841f6e..7f44e1b 100644 --- a/internal/app/service/threshold_alarm_service.go +++ b/internal/app/service/threshold_alarm_service.go @@ -2,11 +2,15 @@ package service import ( "context" + "fmt" "time" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" domainAlarm "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) @@ -21,21 +25,42 @@ type ThresholdAlarmService interface { ListActiveAlarms(ctx context.Context, req *dto.ListActiveAlarmRequest) (*dto.ListActiveAlarmResponse, error) // ListHistoricalAlarms 批量查询历史告警。 ListHistoricalAlarms(ctx context.Context, req *dto.ListHistoricalAlarmRequest) (*dto.ListHistoricalAlarmResponse, error) + // CreateDeviceThresholdAlarm 创建一个设备阈值告警。 + CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error + // UpdateDeviceThresholdAlarm 更新一个设备阈值告警。 + UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error + // CreateAreaThresholdAlarm 创建一个区域阈值告警。 + CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error + // UpdateAreaThresholdAlarm 更新一个区域阈值告警。 + UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error } // thresholdAlarmService 是 ThresholdAlarmService 接口的具体实现。 type thresholdAlarmService struct { ctx context.Context alarmService domainAlarm.AlarmService - alarmRepo repository.AlarmRepository + planService plan.Service + + alarmRepo repository.AlarmRepository + planRepo repository.PlanRepository + deviceRepo repository.DeviceRepository } // NewThresholdAlarmService 创建一个新的 ThresholdAlarmService 实例。 -func NewThresholdAlarmService(ctx context.Context, alarmService domainAlarm.AlarmService, alarmRepo repository.AlarmRepository) ThresholdAlarmService { +func NewThresholdAlarmService(ctx context.Context, + alarmService domainAlarm.AlarmService, + planService plan.Service, + alarmRepo repository.AlarmRepository, + planRepo repository.PlanRepository, + deviceRepo repository.DeviceRepository, +) ThresholdAlarmService { return &thresholdAlarmService{ ctx: ctx, alarmService: alarmService, + planService: planService, alarmRepo: alarmRepo, + planRepo: planRepo, + deviceRepo: deviceRepo, } } @@ -95,3 +120,257 @@ func (s *thresholdAlarmService) ListHistoricalAlarms(ctx context.Context, req *d return dto.NewListHistoricalAlarmResponse(alarms, total, req.Page, req.PageSize), nil } + +// CreateDeviceThresholdAlarm 实现了创建一个设备阈值告警的逻辑。 +func (s *thresholdAlarmService) CreateDeviceThresholdAlarm(ctx context.Context, req *dto.CreateDeviceThresholdAlarmDTO) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateThresholdAlarm") + + device, err := s.deviceRepo.FindByID(serviceCtx, req.DeviceID) + if err != nil { + return fmt.Errorf("获取设备 %v 失败: %v", req.DeviceID, err) + } + + plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck) + if err != nil { + return fmt.Errorf("获取系统计划 %v 失败: %v", models.PlanNamePeriodicSystemHealthCheck, err) + } + if plan == nil { + logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck) + } + + // 系统计划肯定是子任务 + for i, t := range plan.Tasks { + switch t.Type { + case models.TaskTypeDeviceThresholdCheck: // 检查任务是否存在 + var params task.DeviceThresholdCheckParams + err = t.ParseParameters(¶ms) + if err != nil { + return fmt.Errorf("任务 %v: 解析设备阈值检查任务参数失败: %v", t.ID, err) + } + if params.DeviceID == req.DeviceID && params.SensorType == req.SensorType { + return fmt.Errorf("设备 %v: 该设备已存在阈值检查任务", req.DeviceID) + } + case models.TaskTypeAreaCollectorThresholdCheck: // 向区域阈值检查任务过滤列表中添加该设备 + params := task.AreaThresholdCheckParams{ + ExcludeDeviceIDs: []uint{}, + } + err = t.ParseParameters(¶ms) + if err != nil { + return fmt.Errorf("任务 %v: 解析区域阈值检查任务参数失败: %v", t.ID, err) + } + if params.AreaControllerID == device.AreaControllerID { + has := false + for _, d := range params.ExcludeDeviceIDs { + if d == req.DeviceID { + has = true + break + } + } + if !has { + params.ExcludeDeviceIDs = append(params.ExcludeDeviceIDs, req.DeviceID) + err = plan.Tasks[i].SaveParameters(params) + if err != nil { + return fmt.Errorf("任务 %v: 保存任务参数失败: %v", t.ID, err) + } + } + } + default: + continue + } + } + + t := models.Task{ + PlanID: plan.ID, + Name: fmt.Sprintf("设备 %v 的阈值检测任务", req.DeviceID), + Description: fmt.Sprintf("检测该设备 %v 是否 %v %v", req.SensorType, req.Operator, req.Thresholds), + ExecutionOrder: len(plan.Tasks) + 1, + Type: models.TaskTypeDeviceThresholdCheck, + } + err = t.SaveParameters(task.DeviceThresholdCheckParams{ + DeviceID: req.DeviceID, + SensorType: req.SensorType, + Thresholds: req.Thresholds, + Level: req.Level, + Operator: req.Operator, + }) + if err != nil { + return fmt.Errorf("保存任务参数失败: %v", err) + } + + plan.Tasks = append(plan.Tasks, t) + plan.ReorderSteps() + _, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem) + if err != nil { + return fmt.Errorf("更新计划失败: %v", err) + } + return nil +} + +// UpdateDeviceThresholdAlarm 实现了更新一个设备阈值告警的逻辑。 +func (s *thresholdAlarmService) UpdateDeviceThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateDeviceThresholdAlarmDTO) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateDeviceThresholdAlarm") + + // 1. 获取系统健康检查计划 + plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck) + if err != nil { + return fmt.Errorf("获取系统健康检查计划失败: %w", err) + } + if plan == nil { + // 这个系统计划必须存在 + logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck) + } + + // 2. 遍历任务列表,查找并更新目标任务 + taskFound := false + for i, t := range plan.Tasks { + if t.ID == taskID && t.Type == models.TaskTypeDeviceThresholdCheck { + taskFound = true + + var params task.DeviceThresholdCheckParams + if err = t.ParseParameters(¶ms); err != nil { + return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err) + } + params.Thresholds = req.Thresholds + params.Operator = req.Operator + params.Level = req.Level + // 刷新任务说明 + plan.Tasks[i].Description = fmt.Sprintf("检测该设备 %v 是否 %v %v", params.SensorType, params.Operator, params.Thresholds) + + err = plan.Tasks[i].SaveParameters(params) + if err != nil { + return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err) + } + + break + } + } + + if !taskFound { + return fmt.Errorf("任务 %d: 不存在", taskID) + } + + // 全量更新计划 + _, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem) + return err +} + +// CreateAreaThresholdAlarm 实现了创建一个区域阈值告警的逻辑。 +func (s *thresholdAlarmService) CreateAreaThresholdAlarm(ctx context.Context, req *dto.CreateAreaThresholdAlarmDTO) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAreaThresholdAlarm") + + // 1. 获取系统健康检查计划 + plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck) + if err != nil { + return fmt.Errorf("获取系统健康检查计划失败: %w", err) + } + if plan == nil { + logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck) + } + + // 2. 获取目标区域下的所有设备,并建立快速查找表 + devicesInArea, err := s.deviceRepo.ListByAreaControllerID(serviceCtx, req.AreaControllerID) + if err != nil { + return fmt.Errorf("获取区域 %d 下的设备列表失败: %w", req.AreaControllerID, err) + } + devicesInAreaMap := make(map[uint]struct{}, len(devicesInArea)) + for _, device := range devicesInArea { + devicesInAreaMap[device.ID] = struct{}{} + } + + // 3. 遍历计划,检查存在性并收集需要排除的设备ID + var excludeDeviceIDs []uint + for _, t := range plan.Tasks { + switch t.Type { + case models.TaskTypeAreaCollectorThresholdCheck: + var params task.AreaThresholdCheckParams + if err := t.ParseParameters(¶ms); err != nil { + return fmt.Errorf("任务 %d: 解析区域阈值检查任务参数失败: %w", t.ID, err) + } + if params.AreaControllerID == req.AreaControllerID && params.SensorType == req.SensorType { + return fmt.Errorf("区域 %d: 该区域已存在针对 %s 的阈值检查任务", req.AreaControllerID, req.SensorType) + } + case models.TaskTypeDeviceThresholdCheck: + var params task.DeviceThresholdCheckParams + if err := t.ParseParameters(¶ms); err != nil { + return fmt.Errorf("任务 %d: 解析设备阈值检查任务参数失败: %w", t.ID, err) + } + // 检查该设备是否属于目标区域 + if _, ok := devicesInAreaMap[params.DeviceID]; ok { + excludeDeviceIDs = append(excludeDeviceIDs, params.DeviceID) + } + } + } + + // 4. 创建新任务 + newTask := models.Task{ + PlanID: plan.ID, + Name: fmt.Sprintf("区域 %d 的 %s 阈值检测任务", req.AreaControllerID, req.SensorType), + Description: fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", req.AreaControllerID, req.SensorType, req.Operator, req.Thresholds), + ExecutionOrder: len(plan.Tasks) + 1, + Type: models.TaskTypeAreaCollectorThresholdCheck, + } + err = newTask.SaveParameters(task.AreaThresholdCheckParams{ + AreaControllerID: req.AreaControllerID, + SensorType: req.SensorType, + Thresholds: req.Thresholds, + Operator: req.Operator, + Level: req.Level, + ExcludeDeviceIDs: excludeDeviceIDs, + }) + if err != nil { + return fmt.Errorf("保存新区域任务的参数失败: %w", err) + } + + // 5. 更新计划 + plan.Tasks = append(plan.Tasks, newTask) + plan.ReorderSteps() + _, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem) + return err +} + +// UpdateAreaThresholdAlarm 实现了更新一个区域阈值告警的逻辑。 +func (s *thresholdAlarmService) UpdateAreaThresholdAlarm(ctx context.Context, taskID int, req *dto.UpdateAreaThresholdAlarmDTO) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "UpdateAreaThresholdAlarm") + + // 1. 获取系统健康检查计划 + plan, err := s.planRepo.GetSystemPlanByName(serviceCtx, models.PlanNamePeriodicSystemHealthCheck) + if err != nil { + return fmt.Errorf("获取系统健康检查计划失败: %w", err) + } + if plan == nil { + logger.Panicf("系统计划 %v 不存在", models.PlanNamePeriodicSystemHealthCheck) + } + + // 2. 遍历任务列表,查找并更新目标任务 + taskFound := false + for i, t := range plan.Tasks { + if t.ID == taskID && t.Type == models.TaskTypeAreaCollectorThresholdCheck { + taskFound = true + + var params task.AreaThresholdCheckParams + if err = t.ParseParameters(¶ms); err != nil { + return fmt.Errorf("任务 %d: 解析现有参数失败: %w", taskID, err) + } + params.Thresholds = req.Thresholds + params.Operator = req.Operator + params.Level = req.Level + // 刷新任务说明 + plan.Tasks[i].Description = fmt.Sprintf("检测区域 %d 的 %s 是否 %v %v", params.AreaControllerID, params.SensorType, params.Operator, params.Thresholds) + + err = plan.Tasks[i].SaveParameters(params) + if err != nil { + return fmt.Errorf("任务 %d: 保存参数失败: %w", taskID, err) + } + + break + } + } + + if !taskFound { + return fmt.Errorf("任务 %d: 不存在或类型不匹配", taskID) + } + + // 全量更新计划 + _, err = s.planService.UpdatePlan(serviceCtx, plan, models.PlanTypeSystem) + return err +} diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index 32edd75..c66bddf 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -10,13 +10,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) -const ( - // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 - PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查" - // PlanNameAlarmNotification 是告警通知发送计划的名称 - PlanNameAlarmNotification = "告警通知发送" -) - // initializeState 在应用启动时准备其初始数据状态。 // 它遵循一个严格的顺序:清理 -> 更新 -> 刷新,以确保数据的一致性和正确性。 func (app *Application) initializeState(ctx context.Context) error { @@ -121,7 +114,7 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont newTasks = append(newTasks, fullCollectionTask, delayTask) // 如果计划已存在,则获取其现有任务并追加到新任务列表后(排除预设任务) - if foundExistingPlan, ok := existingPlanMap[PlanNamePeriodicSystemHealthCheck]; ok { + if foundExistingPlan, ok := existingPlanMap[models.PlanNamePeriodicSystemHealthCheck]; ok { for _, existingTask := range foundExistingPlan.Tasks { // 排除已预设的全量采集和延时任务 if existingTask.Type != models.TaskTypeFullCollection && existingTask.Type != models.TaskTypeWaiting { @@ -136,7 +129,7 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont } predefinedPlan := &models.Plan{ - Name: PlanNamePeriodicSystemHealthCheck, + Name: models.PlanNamePeriodicSystemHealthCheck, Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval), PlanType: models.PlanTypeSystem, ExecutionType: models.PlanExecutionTypeAutomatic, @@ -186,7 +179,7 @@ func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, exi appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeAlarmNotificationPlan") predefinedPlan := &models.Plan{ - Name: PlanNameAlarmNotification, + Name: models.PlanNameAlarmNotification, Description: "这是一个系统预定义的计划, 每分钟自动触发一次告警通知发送。", PlanType: models.PlanTypeSystem, ExecutionType: models.PlanExecutionTypeAutomatic, diff --git a/internal/domain/plan/plan_service.go b/internal/domain/plan/plan_service.go index f83c655..ea61a9f 100644 --- a/internal/domain/plan/plan_service.go +++ b/internal/domain/plan/plan_service.go @@ -44,8 +44,8 @@ type Service interface { GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) // ListPlans 获取计划列表,支持过滤和分页 ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) - // UpdatePlan 更新计划 - UpdatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error) + // UpdatePlan 更新计划, wantPlanType 表示期望被修改的计划是什么类型 + UpdatePlan(ctx context.Context, plan *models.Plan, wantPlanType models.PlanType) (*models.Plan, error) // DeletePlan 删除计划(软删除) DeletePlan(ctx context.Context, id uint) error // StartPlan 启动计划 @@ -207,8 +207,8 @@ func (s *planServiceImpl) ListPlans(ctx context.Context, opts repository.ListPla return plans, total, nil } -// UpdatePlan 更新计划 -func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.Plan) (*models.Plan, error) { +// UpdatePlan 更新计划, wantPlanType 表示期望被修改的计划是什么类型 +func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.Plan, wantPlanType models.PlanType) (*models.Plan, error) { planCtx, logger := logs.Trace(ctx, s.ctx, "UpdatePlan") const actionType = "领域层:更新计划" @@ -222,9 +222,8 @@ func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.P return nil, err } - // 系统计划不允许修改 - if existingPlan.PlanType == models.PlanTypeSystem { - logger.Warnf("%s: 尝试修改系统计划, ID: %d", actionType, planToUpdate.ID) + if existingPlan.PlanType != wantPlanType { + logger.Warnf("%s: 禁止修改 %v 类型计划, ID: %d", actionType, wantPlanType, planToUpdate.ID) return nil, ErrPlanCannotBeModified } diff --git a/internal/domain/task/area_threshold_check_task.go b/internal/domain/task/area_threshold_check_task.go index 0b94b85..f7ba56e 100644 --- a/internal/domain/task/area_threshold_check_task.go +++ b/internal/domain/task/area_threshold_check_task.go @@ -14,11 +14,12 @@ import ( // AreaThresholdCheckParams 定义了区域阈值检查任务的参数 type AreaThresholdCheckParams struct { - AreaControllerID uint `json:"area_controller_id"` // 区域主控ID - SensorType models.SensorType `json:"sensor_type"` // 传感器类型 - Thresholds float64 `json:"thresholds"` // 阈值 - Operator models.Operator `json:"operator"` // 操作符 - ExcludeDeviceIDs []uint `json:"exclude_device_ids"` // 排除的传感器ID + AreaControllerID uint `json:"area_controller_id"` // 区域主控ID + SensorType models.SensorType `json:"sensor_type"` // 传感器类型 + Thresholds float64 `json:"thresholds"` // 阈值 + Operator models.Operator `json:"operator"` // 操作符 + Level models.SeverityLevel `json:"level"` // 告警级别 + ExcludeDeviceIDs []uint `json:"exclude_device_ids"` // 排除的传感器ID } // AreaThresholdCheckTask 是一个任务,用于检查区域阈值并触发告警, 区域主控下的所有没有独立校验任务的设备都会受到此任务的检查 @@ -78,6 +79,7 @@ func (a *AreaThresholdCheckTask) Execute(ctx context.Context) error { DeviceID: device.ID, SensorType: a.params.SensorType, Thresholds: a.params.Thresholds, + Level: a.params.Level, Operator: a.params.Operator, }) if err != nil { @@ -150,6 +152,9 @@ func (a *AreaThresholdCheckTask) parseParameters(ctx context.Context) error { if params.AreaControllerID == 0 { err = fmt.Errorf("任务 %v: 未配置区域主控ID", a.taskLog.TaskID) } + if params.Level == "" { + params.Level = models.WarnLevel + } if params.ExcludeDeviceIDs == nil { params.ExcludeDeviceIDs = []uint{} } diff --git a/internal/domain/task/device_threshold_check_task.go b/internal/domain/task/device_threshold_check_task.go index d8c8a9b..3fbace1 100644 --- a/internal/domain/task/device_threshold_check_task.go +++ b/internal/domain/task/device_threshold_check_task.go @@ -14,10 +14,11 @@ import ( ) type DeviceThresholdCheckParams struct { - DeviceID uint `json:"device_id"` // 设备ID - SensorType models.SensorType `json:"sensor_type"` // 传感器类型 - Thresholds float64 `json:"thresholds"` // 阈值 - Operator models.Operator `json:"operator"` // 操作符 + DeviceID uint `json:"device_id"` // 设备ID + SensorType models.SensorType `json:"sensor_type"` // 传感器类型 + Thresholds float64 `json:"thresholds"` // 阈值 + Operator models.Operator `json:"operator"` // 操作符 + Level models.SeverityLevel `json:"level"` // 告警等级 } // DeviceThresholdCheckTask 是一个任务,用于检查设备传感器数据是否满足阈值条件。 @@ -99,7 +100,7 @@ func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error { AlarmCode: alarmCode, AlarmSummary: summary, AlarmDetails: details, - Level: models.WarnLevel, // 默认告警等级,可后续根据需求调整 + Level: d.params.Level, TriggerTime: time.Now(), } @@ -172,6 +173,9 @@ func (d *DeviceThresholdCheckTask) parseParameters(ctx context.Context) error { if params.DeviceID == 0 { err = fmt.Errorf("任务 %v: 未配置设备ID", d.taskLog.TaskID) } + if params.Level == "" { + params.Level = models.WarnLevel + } d.params = params diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 7efd507..86efc6d 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -11,11 +11,13 @@ import ( "gorm.io/gorm" ) +type PlanName string + const ( // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 - PlanNamePeriodicSystemHealthCheck = "周期性系统健康检查" + PlanNamePeriodicSystemHealthCheck PlanName = "周期性系统健康检查" // PlanNameAlarmNotification 是告警通知发送计划的名称 - PlanNameAlarmNotification = "告警通知发送" + PlanNameAlarmNotification PlanName = "告警通知发送" ) // PlanExecutionType 定义了计划的执行类型 @@ -38,11 +40,13 @@ const ( type TaskType string const ( - TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 - TaskTypeWaiting TaskType = "等待" // 等待任务 - TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 - TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 - TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 + TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 + TaskTypeWaiting TaskType = "等待" // 等待任务 + TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 + TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 + TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 + TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 + TaskTypeAreaCollectorThresholdCheck TaskType = "区域阈值检查" // 区域阈值检查任务 ) // -- Task Parameters -- @@ -72,7 +76,7 @@ const ( type Plan struct { gorm.Model - Name string `gorm:"not null" json:"name"` + Name PlanName `gorm:"not null" json:"name"` Description string `json:"description"` PlanType PlanType `gorm:"not null;index" json:"plan_type"` // 任务类型, 包括系统任务和用户自定义任务 ExecutionType PlanExecutionType `gorm:"not null;index" json:"execution_type"` diff --git a/internal/infra/repository/plan_repository.go b/internal/infra/repository/plan_repository.go index 6ce886b..9418db1 100644 --- a/internal/infra/repository/plan_repository.go +++ b/internal/infra/repository/plan_repository.go @@ -49,6 +49,8 @@ type PlanRepository interface { GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) // GetPlansByIDs 根据ID列表获取计划,不包含子计划和任务详情 GetPlansByIDs(ctx context.Context, ids []uint) ([]models.Plan, error) + // GetSystemPlanByName 根据计划名称获取系统计划,包含子计划和任务详情 + GetSystemPlanByName(ctx context.Context, planName models.PlanName) (*models.Plan, error) // CreatePlan 创建一个新的计划 CreatePlan(ctx context.Context, plan *models.Plan) error // CreatePlanTx 在指定事务中创建一个新的计划 @@ -164,6 +166,23 @@ func (r *gormPlanRepository) GetPlansByIDs(ctx context.Context, ids []uint) ([]m return plans, nil } +// GetSystemPlanByName 根据计划名称获取系统计划,包含子计划和任务详情, 系统任务不该有重名情况, 所以可以这么查询 +func (r *gormPlanRepository) GetSystemPlanByName(ctx context.Context, planName models.PlanName) (*models.Plan, error) { + repoCtx := logs.AddFuncName(ctx, r.ctx, "GetSystemPlanByName") + var plan models.Plan + // 首先只查询计划的基本信息,获取其ID + err := r.db.WithContext(repoCtx).Select("id").Where("name = ? AND plan_type = ?", planName, models.PlanTypeSystem).First(&plan).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil // 未找到系统计划不是错误 + } + if err != nil { + return nil, fmt.Errorf("查询系统计划 '%s' 失败: %w", planName, err) + } + + // 如果找到了计划ID,则复用 GetPlanByID 来获取完整的计划详情 + return r.GetPlanByID(repoCtx, plan.ID) +} + // GetPlanByID 根据ID获取计划,包含子计划和任务详情 func (r *gormPlanRepository) GetPlanByID(ctx context.Context, id uint) (*models.Plan, error) { repoCtx := logs.AddFuncName(ctx, r.ctx, "GetPlanByID") diff --git a/project_structure.txt b/project_structure.txt index 5f20ab7..a01e2b7 100644 --- a/project_structure.txt +++ b/project_structure.txt @@ -103,6 +103,7 @@ internal/domain/plan/plan_execution_manager.go internal/domain/plan/plan_service.go internal/domain/plan/task.go internal/domain/task/alarm_notification_task.go +internal/domain/task/area_threshold_check_task.go internal/domain/task/delay_task.go internal/domain/task/device_threshold_check_task.go internal/domain/task/full_collection_task.go