From b2d1b10962d57efafdf68c0099a1a2b0f5c65d12 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 16 Nov 2025 23:03:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=80=9A=E7=9F=A5=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E5=88=B7=E6=96=B0=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/core/data_initializer.go | 8 +++- internal/domain/alarm/alarm_service.go | 24 ++++++++++ .../domain/task/refresh_notification_task.go | 48 +++++++++++++++++++ internal/domain/task/task.go | 4 ++ internal/infra/models/plan.go | 1 + internal/infra/repository/alarm_repository.go | 39 ++++++++++++--- 6 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 internal/domain/task/refresh_notification_task.go diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index 26180f3..fcd4d87 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -186,10 +186,16 @@ func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, exi Status: models.PlanStatusEnabled, ContentType: models.PlanContentTypeTasks, Tasks: []models.Task{ + { + Name: "刷新通知状态", + Description: "刷新所有待处理的告警通知状态, 如将忽略到期的通知改回待发送状态", + ExecutionOrder: 1, + Type: models.TaskTypeNotificationRefresh, + }, { Name: "告警通知发送", Description: "发送所有待处理的告警通知", - ExecutionOrder: 1, + ExecutionOrder: 2, Type: models.TaskTypeAlarmNotification, }, }, diff --git a/internal/domain/alarm/alarm_service.go b/internal/domain/alarm/alarm_service.go index a542935..2103639 100644 --- a/internal/domain/alarm/alarm_service.go +++ b/internal/domain/alarm/alarm_service.go @@ -30,6 +30,11 @@ type AlarmService interface { // CancelAlarmSnooze 取消对一个告警的忽略状态。 // 如果告警不存在,或本就未被忽略,不执行任何操作并返回 nil。 CancelAlarmSnooze(ctx context.Context, alarmID uint32) error + + // ClearExpiredIgnoredAlarms 清除所有已过期的告警忽略状态。 + // 将 is_ignored 设置为 false,ignored_until 设置为 NULL。 + // 返回受影响的行数和错误。 + ClearExpiredIgnoredAlarms(ctx context.Context) (int64, error) } // alarmService 是 AlarmService 接口的具体实现。 @@ -173,3 +178,22 @@ func (s *alarmService) CancelAlarmSnooze(ctx context.Context, alarmID uint32) er logger.Infof("告警 %d 的忽略状态已被成功取消。", alarmID) return nil } + +// ClearExpiredIgnoredAlarms 清除所有已过期的告警忽略状态。 +func (s *alarmService) ClearExpiredIgnoredAlarms(ctx context.Context) (int64, error) { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "ClearExpiredIgnoredAlarms") + + rowsAffected, err := s.alarmRepo.ClearExpiredIgnoredAlarms(serviceCtx) + if err != nil { + logger.Errorf("清除过期忽略告警状态失败: %v", err) + return 0, err + } + + if rowsAffected > 0 { + logger.Infof("成功清除了 %d 条过期忽略告警的状态。", rowsAffected) + } else { + logger.Debugf("没有发现需要清除的过期忽略告警。") + } + + return rowsAffected, nil +} diff --git a/internal/domain/task/refresh_notification_task.go b/internal/domain/task/refresh_notification_task.go new file mode 100644 index 0000000..6648c0b --- /dev/null +++ b/internal/domain/task/refresh_notification_task.go @@ -0,0 +1,48 @@ +package task + +import ( + "context" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" +) + +type RefreshNotificationTask struct { + ctx context.Context + taskLog *models.TaskExecutionLog + + alarmService alarm.AlarmService +} + +func NewRefreshNotificationTask(ctx context.Context, taskLog *models.TaskExecutionLog, alarmService alarm.AlarmService) plan.Task { + return &RefreshNotificationTask{ + ctx: ctx, + taskLog: taskLog, + alarmService: alarmService, + } +} + +func (r *RefreshNotificationTask) Execute(ctx context.Context) error { + taskCtx, logger := logs.Trace(ctx, r.ctx, "Execute") + logger.Infof("开始执行刷新通知任务, 任务ID: %d", r.taskLog.TaskID) + + _, err := r.alarmService.ClearExpiredIgnoredAlarms(taskCtx) + if err != nil { + return err + } + + logger.Infof("刷新通知任务执行完成, 任务ID: %d", r.taskLog.TaskID) + return nil +} + +func (r *RefreshNotificationTask) OnFailure(ctx context.Context, executeErr error) { + logger := logs.TraceLogger(ctx, r.ctx, "OnFailure") + logger.Errorf("刷新通知任务执行失败, 任务ID: %d, 错误: %v", r.taskLog.TaskID, executeErr) +} + +func (r *RefreshNotificationTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { + // 刷新通知任务不与具体设备绑定 + return []uint32{}, nil +} diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index e8c51cf..049dca0 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -68,6 +68,8 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe return NewDeviceThresholdCheckTask(logs.AddCompName(baseCtx, "DeviceThresholdCheckTask"), claimedLog, t.sensorDataRepo, t.alarmService) case models.TaskTypeAreaCollectorThresholdCheck: return NewAreaThresholdCheckTask(logs.AddCompName(baseCtx, "AreaCollectorThresholdCheckTask"), claimedLog, t.sensorDataRepo, t.deviceRepo, t.alarmService) + case models.TaskTypeNotificationRefresh: + return NewRefreshNotificationTask(logs.AddCompName(baseCtx, "NotificationRefreshTask"), claimedLog, t.alarmService) default: // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) @@ -101,6 +103,8 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models return NewDeviceThresholdCheckTask(logs.AddCompName(baseCtx, "DeviceThresholdCheckTask"), tempLog, t.sensorDataRepo, t.alarmService), nil case models.TaskTypeAreaCollectorThresholdCheck: return NewAreaThresholdCheckTask(logs.AddCompName(baseCtx, "AreaCollectorThresholdCheckTask"), tempLog, t.sensorDataRepo, t.deviceRepo, t.alarmService), nil + case models.TaskTypeNotificationRefresh: + return NewRefreshNotificationTask(logs.AddCompName(baseCtx, "NotificationRefreshTask"), tempLog, t.alarmService), nil default: return nil, fmt.Errorf("不支持为类型 '%s' 的任务创建模型实例", taskModel.Type) } diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 7827980..22e7c88 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -45,6 +45,7 @@ const ( TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 + TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务 TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 TaskTypeAreaCollectorThresholdCheck TaskType = "区域阈值检查" // 区域阈值检查任务 ) diff --git a/internal/infra/repository/alarm_repository.go b/internal/infra/repository/alarm_repository.go index 7eea0be..6725308 100644 --- a/internal/infra/repository/alarm_repository.go +++ b/internal/infra/repository/alarm_repository.go @@ -69,6 +69,11 @@ type AlarmRepository interface { // ignoredUntil: 告警新的忽略截止时间 (nil 表示没有忽略截止时间/已取消忽略)。 UpdateAlarmNotificationStatus(ctx context.Context, alarmID uint32, lastNotifiedAt time.Time, isIgnored bool, ignoredUntil *time.Time) error + // ClearExpiredIgnoredAlarms 清除所有已过期的告警忽略状态。 + // 将 is_ignored 设置为 false,ignored_until 设置为 NULL。 + // 返回受影响的行数和错误。 + ClearExpiredIgnoredAlarms(ctx context.Context) (int64, error) + // <-- 下列两个方法是为了性能做出的架构妥协, 业务逻辑入侵仓库层带来的收益远大于通过业务层进行数据筛选 --> // ListAlarmsForNotification 查询满足发送告警消息条件的活跃告警列表。 @@ -159,6 +164,30 @@ func (r *gormAlarmRepository) UpdateIgnoreStatus(ctx context.Context, id uint32, return nil } +// ClearExpiredIgnoredAlarms 清除所有已过期的告警忽略状态。 +// 将 is_ignored 设置为 false,ignored_until 设置为 NULL。 +// 返回受影响的行数和错误。 +func (r *gormAlarmRepository) ClearExpiredIgnoredAlarms(ctx context.Context) (int64, error) { + repoCtx := logs.AddFuncName(ctx, r.ctx, "ClearExpiredIgnoredAlarms") + now := time.Now() + + updates := map[string]interface{}{ + "is_ignored": false, + "ignored_until": nil, // 将 ignored_until 设置为 NULL + } + + result := r.db.WithContext(repoCtx). + Model(&models.ActiveAlarm{}). + Where("is_ignored = ? AND ignored_until <= ?", true, now). + Updates(updates) + + if result.Error != nil { + return 0, result.Error + } + + return result.RowsAffected, nil +} + // ListActiveAlarms 实现了分页和过滤查询活跃告警记录的功能 func (r *gormAlarmRepository) ListActiveAlarms(ctx context.Context, opts ActiveAlarmListOptions, page, pageSize int) ([]models.ActiveAlarm, int64, error) { repoCtx := logs.AddFuncName(ctx, r.ctx, "ListActiveAlarms") @@ -368,8 +397,6 @@ func (r *gormAlarmRepository) buildNotificationBaseQuery(tx *gorm.DB, intervalBy // 针对 Level 缺失配置(或所有 Level)的告警,使用“只发送一次”逻辑:LastNotifiedAt IS NULL // 参数 configuredLevels: 用于构建 Level NOT IN (?) 子句。 func (r *gormAlarmRepository) buildGroupAClause(tx *gorm.DB, configuredLevels []models.SeverityLevel) *gorm.DB { - now := time.Now() - // A.1. 构造 Level 范围检查子句 (Level NOT IN 或 1=1) if len(configuredLevels) > 0 { tx = tx.Where("level NOT IN (?)", configuredLevels) @@ -380,8 +407,8 @@ func (r *gormAlarmRepository) buildGroupAClause(tx *gorm.DB, configuredLevels [] // A.2. 构造 Group A 核心逻辑 (LastNotifiedAt IS NULL 且满足忽略条件) - // C_A_Ignored: 被忽略但忽略期结束 且 仅发送一次 - ignoredQuery := tx.Where("is_ignored = ? AND ignored_until <= ? AND last_notified_at IS NULL", true, now) + // C_A_Ignored: 被忽略 且 仅发送一次 (忽略期已在前置任务中解除) + ignoredQuery := tx.Where("is_ignored = ? AND last_notified_at IS NULL", true) // C_A_NotIgnored: 未被忽略 且 仅发送一次 notIgnoredQuery := tx.Where("is_ignored = ? AND last_notified_at IS NULL", false) @@ -420,8 +447,8 @@ func (r *gormAlarmRepository) buildGroupBClause(tx *gorm.DB, intervalByLevel map tx.Where("last_notified_at IS NULL").Or(iccQuery), // LastNotifiedAt IS NULL OR ICC ) - // C_B_Ignored: 被忽略但忽略期结束 - ignoredQuery := tx.Where("is_ignored = ? AND ignored_until <= ?", true, now).Where( + // C_B_Ignored: 被忽略 (忽略期已在前置任务中解除) + ignoredQuery := tx.Where("is_ignored = ?", true).Where( tx.Where("last_notified_at IS NULL").Or(iccQuery), // LastNotifiedAt IS NULL OR ICC )