diff --git a/design/exceeding-threshold-alarm/index.md b/design/exceeding-threshold-alarm/index.md index 232453d..58de104 100644 --- a/design/exceeding-threshold-alarm/index.md +++ b/design/exceeding-threshold-alarm/index.md @@ -125,6 +125,8 @@ ### TODO 1. 是否要加一个延时操作, 因为采集是异步的, 采集任务结束时不一定能拿到最新数据, 所以需要一个延时操作等待区域主控上传 +2. 统一一下区域主控的命名, 目前有AreaController和RegionalController, 不排除还有别的 +3. 将数据类型转为float32, 节约空间, float64精度有些浪费, float32小数点后6-7位足够了 # 实现记录 @@ -132,4 +134,5 @@ 2. 重构部分枚举, 让models包不依赖其他项目中的包 3. 创建仓库层对象(不包含方法) 4. 实现告警发送任务 -5. 实现告警通知发送计划/全量采集计划改名 \ No newline at end of file +5. 实现告警通知发送计划/全量采集计划改名 +6. 实现设备阈值检查任务 \ No newline at end of file diff --git a/internal/domain/alarm/alarm_service.go b/internal/domain/alarm/alarm_service.go index 06e6bee..8ddf3df 100644 --- a/internal/domain/alarm/alarm_service.go +++ b/internal/domain/alarm/alarm_service.go @@ -1,18 +1,124 @@ package alarm -import "context" +import ( + "context" + "errors" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + + "gorm.io/gorm" +) // AlarmService 定义了告警领域服务接口。 type AlarmService interface { + // CreateAlarmIfNotExists 检查是否存在相同的活跃告警,如果不存在,则创建一条新的告警记录。 + // "相同"的定义是:SourceType, SourceID, 和 AlarmCode 都相同。 + CreateAlarmIfNotExists(ctx context.Context, newAlarm *models.ActiveAlarm) error + + // CloseAlarm 关闭一个活跃告警,将其归档到历史记录。 + // 如果指定的告警当前不活跃,则不执行任何操作并返回 nil。 + CloseAlarm(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode, resolveMethod string, resolvedBy *uint) error } // alarmService 是 AlarmService 接口的具体实现。 type alarmService struct { - ctx context.Context + ctx context.Context + alarmRepo repository.AlarmRepository + uow repository.UnitOfWork } -func NewAlarmService(ctx context.Context) AlarmService { +// NewAlarmService 创建一个新的 AlarmService 实例。 +func NewAlarmService(ctx context.Context, alarmRepo repository.AlarmRepository, uow repository.UnitOfWork) AlarmService { return &alarmService{ - ctx: ctx, + ctx: ctx, + alarmRepo: alarmRepo, + uow: uow, } } + +// CreateAlarmIfNotExists 实现了创建告警(如果不存在)的逻辑。 +func (s *alarmService) CreateAlarmIfNotExists(ctx context.Context, newAlarm *models.ActiveAlarm) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "CreateAlarmIfNotExists") + + // 1. 检查告警是否已处于活跃状态 + isActive, err := s.alarmRepo.IsAlarmActiveInUse(serviceCtx, newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode) + if err != nil { + logger.Errorf("检查告警活跃状态时发生数据库错误: %v", err) + return err // 直接返回数据库错误 + } + + if isActive { + // 2. 如果已活跃,则记录日志并忽略 + logger.Infof("相同的告警已处于活跃状态,已忽略。来源: %s, ID: %d, 告警代码: %s", newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode) + return nil + } + + // 3. 如果不活跃,则创建新告警 + logger.Infof("告警尚不活跃,正在创建新告警。来源: %s, ID: %d, 告警代码: %s", newAlarm.SourceType, newAlarm.SourceID, newAlarm.AlarmCode) + return s.alarmRepo.CreateActiveAlarm(serviceCtx, newAlarm) +} + +// CloseAlarm 实现了关闭告警并将其归档的逻辑。 +func (s *alarmService) CloseAlarm(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode, resolveMethod string, resolvedBy *uint) error { + serviceCtx, logger := logs.Trace(ctx, s.ctx, "CloseAlarm") + + // 1. 在事务外进行快速只读检查,避免不必要的事务开销 + isActive, err := s.alarmRepo.IsAlarmActiveInUse(serviceCtx, sourceType, sourceID, alarmCode) + if err != nil { + logger.Errorf("关闭告警失败:预检查告警活跃状态失败: %v", err) + return err + } + + // 如果告警本就不活跃,则无需任何操作 + if !isActive { + return nil + } + + // 2. 确认告警存在后,再进入事务执行“移动”操作 + logger.Infof("检测到活跃告警,正在执行关闭和归档操作。来源: %s, ID: %d, 告警代码: %s", sourceType, sourceID, alarmCode) + return s.uow.ExecuteInTransaction(serviceCtx, func(tx *gorm.DB) error { + // 在事务中再次查找,确保数据一致性并获取完整对象 + activeAlarm, err := s.alarmRepo.GetActiveAlarmByUniqueFieldsTx(serviceCtx, tx, sourceType, sourceID, alarmCode) + if err != nil { + // 此时如果没找到,可能在预检查和本事务之间已被其他进程关闭,同样视为正常 + if errors.Is(err, gorm.ErrRecordNotFound) { + logger.Infof("告警在事务开始前已被关闭,无需操作。") + return nil + } + logger.Errorf("关闭告警失败:在事务中查找活跃告警失败: %v", err) + return err + } + + // 创建历史告警记录 + historicalAlarm := &models.HistoricalAlarm{ + SourceType: activeAlarm.SourceType, + SourceID: activeAlarm.SourceID, + AlarmCode: activeAlarm.AlarmCode, + AlarmSummary: activeAlarm.AlarmSummary, + Level: activeAlarm.Level, + AlarmDetails: activeAlarm.AlarmDetails, + TriggerTime: activeAlarm.TriggerTime, + ResolveTime: time.Now(), + ResolveMethod: resolveMethod, + ResolvedBy: resolvedBy, + } + + // 在事务中插入历史告警 + if err := s.alarmRepo.CreateHistoricalAlarmTx(serviceCtx, tx, historicalAlarm); err != nil { + logger.Errorf("关闭告警失败:归档告警 %d 到历史表失败: %v", activeAlarm.ID, err) + return err + } + + // 在事务中删除活跃告警 + if err := s.alarmRepo.DeleteActiveAlarmTx(serviceCtx, tx, activeAlarm.ID); err != nil { + logger.Errorf("关闭告警失败:从活跃表删除告警 %d 失败: %v", activeAlarm.ID, err) + return err + } + + logger.Infof("告警 %d 已成功关闭并归档。", activeAlarm.ID) + return nil + }) +} diff --git a/internal/domain/task/device_threshold_check_task.go b/internal/domain/task/device_threshold_check_task.go index 53b4f46..365554b 100644 --- a/internal/domain/task/device_threshold_check_task.go +++ b/internal/domain/task/device_threshold_check_task.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "sync" + "time" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/alarm" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) type Operator string @@ -22,9 +25,10 @@ const ( ) type DeviceThresholdCheckParams struct { - DeviceID uint `json:"device_id"` // 设备ID - Thresholds float64 `json:"thresholds"` // 阈值 - Operator Operator `json:"operator"` // 操作符 + DeviceID uint `json:"device_id"` // 设备ID + SensorType models.SensorType `json:"sensor_type"` // 传感器类型 + Thresholds float64 `json:"thresholds"` // 阈值 + Operator Operator `json:"operator"` // 操作符 } type DeviceThresholdCheckTask struct { @@ -33,18 +37,118 @@ type DeviceThresholdCheckTask struct { taskLog *models.TaskExecutionLog params DeviceThresholdCheckParams + + sensorDataRepo repository.SensorDataRepository + alarmService alarm.AlarmService } -func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog) plan.Task { +func NewDeviceThresholdCheckTask(ctx context.Context, taskLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, alarmService alarm.AlarmService) plan.Task { return &DeviceThresholdCheckTask{ - ctx: ctx, - taskLog: taskLog, + ctx: ctx, + taskLog: taskLog, + sensorDataRepo: sensorDataRepo, + alarmService: alarmService, } } func (d *DeviceThresholdCheckTask) Execute(ctx context.Context) error { - //TODO implement me - panic("implement me") + taskCtx, logger := logs.Trace(ctx, d.ctx, "Execute") + err := d.parseParameters(taskCtx) + if err != nil { + return err + } + + sensorData, err := d.sensorDataRepo.GetLatestSensorDataByDeviceIDAndSensorType(taskCtx, d.params.DeviceID, d.params.SensorType) + if err != nil { + logger.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err) + return fmt.Errorf("任务 %v: 获取最新传感器数据失败: %v", d.taskLog.TaskID, err) + } + + var currentValue float64 + var alarmCode models.AlarmCode + + switch d.params.SensorType { + case models.SensorTypeTemperature: + var data models.TemperatureData + if err := sensorData.ParseData(&data); err != nil { + return fmt.Errorf("任务 %v: 解析温度数据失败: %v", d.taskLog.TaskID, err) + } + currentValue = data.TemperatureCelsius + alarmCode = models.AlarmCodeTemperature + case models.SensorTypeHumidity: + var data models.HumidityData + if err := sensorData.ParseData(&data); err != nil { + return fmt.Errorf("任务 %v: 解析湿度数据失败: %v", d.taskLog.TaskID, err) + } + currentValue = data.HumidityPercent + alarmCode = models.AlarmCodeHumidity + case models.SensorTypeWeight: + var data models.WeightData + if err := sensorData.ParseData(&data); err != nil { + return fmt.Errorf("任务 %v: 解析重量数据失败: %v", d.taskLog.TaskID, err) + } + currentValue = data.WeightKilograms + alarmCode = models.AlarmCodeWeight + + default: + return fmt.Errorf("任务 %v: 不支持的传感器类型: %v", d.taskLog.TaskID, d.params.SensorType) + } + + // 阈值检查未通过 + isExceeded := !d.checkThreshold(currentValue, d.params.Operator, d.params.Thresholds) + + if isExceeded { + // 状态一:检查未通过,确保告警开启 + summary := fmt.Sprintf("设备 %d(%s) 不满足阈值条件 (%s %.2f)", d.params.DeviceID, d.params.SensorType, d.params.Operator, d.params.Thresholds) + details := fmt.Sprintf("当前检测值: %.2f", currentValue) + logger.Infof("任务 %v: %s。%s", d.taskLog.TaskID, summary, details) + + newAlarm := &models.ActiveAlarm{ + SourceType: models.AlarmSourceTypeDevice, + SourceID: d.params.DeviceID, + AlarmCode: alarmCode, + AlarmSummary: summary, + AlarmDetails: details, + Level: models.WarnLevel, // 默认告警等级,可后续根据需求调整 + TriggerTime: time.Now(), + } + + if err := d.alarmService.CreateAlarmIfNotExists(taskCtx, newAlarm); err != nil { + logger.Errorf("任务 %v: 创建告警失败: %v", d.taskLog.TaskID, err) + // 根据策略决定是否需要返回错误,这里选择不中断任务执行 + } + } else { + // 状态二:检查已通过,确保告警关闭 + resolveMethod := "系统自动解决:阈值恢复正常" + logger.Infof("任务 %v: 设备 %d 的 %s 阈值已恢复正常,正在尝试关闭告警。", d.taskLog.TaskID, d.params.DeviceID, d.params.SensorType) + + if err := d.alarmService.CloseAlarm(taskCtx, models.AlarmSourceTypeDevice, d.params.DeviceID, alarmCode, resolveMethod, nil); err != nil { + logger.Errorf("任务 %v: 关闭告警失败: %v", d.taskLog.TaskID, err) + // 根据策略决定是否需要返回错误,这里选择不中断任务执行 + } + } + + return nil +} + +// checkThreshold 校验当前值是否满足阈值条件 +func (d *DeviceThresholdCheckTask) checkThreshold(currentValue float64, operator Operator, threshold float64) bool { + switch operator { + case OperatorLessThan: + return currentValue < threshold + case OperatorLessThanOrEqualTo: + return currentValue <= threshold + case OperatorGreaterThan: + return currentValue > threshold + case OperatorGreaterThanOrEqualTo: + return currentValue >= threshold + case OperatorEqualTo: + return currentValue == threshold + case OperatorNotEqualTo: + return currentValue != threshold + default: + return false + } } // parseParameters 解析任务参数 @@ -66,6 +170,19 @@ func (d *DeviceThresholdCheckTask) parseParameters(ctx context.Context) error { return } + if params.SensorType == "" { + err = fmt.Errorf("任务 %v: 未配置传感器类型", d.taskLog.TaskID) + } + if params.Operator == "" { + err = fmt.Errorf("任务 %v: 缺少操作符", d.taskLog.TaskID) + } + if params.Thresholds == 0 { + err = fmt.Errorf("任务 %v: 未配置阈值", d.taskLog.TaskID) + } + if params.DeviceID == 0 { + err = fmt.Errorf("任务 %v: 未配置设备ID", d.taskLog.TaskID) + } + d.params = params }) diff --git a/internal/infra/models/alarm.go b/internal/infra/models/alarm.go index 53a40be..6ce054f 100644 --- a/internal/infra/models/alarm.go +++ b/internal/infra/models/alarm.go @@ -15,28 +15,51 @@ const ( AlarmSourceTypeSystem AlarmSourceType = "系统" ) +// AlarmCode 定义了标准化的告警类型标识 +type AlarmCode string + +const ( + // --- 设备相关告警 --- + AlarmCodeTemperature AlarmCode = "温度阈值" + AlarmCodeHumidity AlarmCode = "湿度阈值" + AlarmCodeWeight AlarmCode = "重量阈值" + AlarmCodeBatteryLevel AlarmCode = "电池电量阈值" + AlarmCodeSignalMetrics AlarmCode = "信号强度阈值" + AlarmCodeDeviceOffline AlarmCode = "设备离线" + + // --- 区域主控相关告警 --- + AlarmCodeAreaControllerOffline AlarmCode = "区域主控离线" + + // --- 系统相关告警 --- + // (可在此处预留或添加) +) + // ActiveAlarm 活跃告警 // 活跃告警会被更新(如忽略状态),因此保留 gorm.Model 以包含所有标准字段。 type ActiveAlarm struct { gorm.Model - SourceType AlarmSourceType `gorm:"type:varchar(50);not null;index;comment:告警来源类型" json:"source_type"` + SourceType AlarmSourceType `gorm:"type:varchar(50);not null;index:idx_alarm_uniqueness;comment:告警来源类型" json:"source_type"` // SourceID 告警来源ID,其具体含义取决于 SourceType 字段 (例如:设备ID, 区域主控ID, 猪栏ID)。 - SourceID uint `gorm:"not null;index;comment:告警来源ID" json:"source_id"` + SourceID uint `gorm:"not null;index:idx_alarm_uniqueness;comment:告警来源ID" json:"source_id"` + + // AlarmCode 是一个机器可读的、标准化的告警类型标识。 + // 它与 SourceType 和 SourceID 共同构成一个活跃告警的唯一标识。 + AlarmCode AlarmCode `gorm:"type:varchar(100);not null;index:idx_alarm_uniqueness;comment:告警代码" json:"alarm_code"` AlarmSummary string `gorm:"comment:告警简述" json:"alarm_summary"` - Level SeverityLevel `gorm:"type:varchar(10);not null;comment:严重性等级" json:"level"` + Level SeverityLevel `gorm:"type:varchar(10);not null;index:idx_notification_query;comment:严重性等级" json:"level"` AlarmDetails string `gorm:"comment:告警详细内容" json:"alarm_details"` TriggerTime time.Time `gorm:"not null;comment:告警触发时间" json:"trigger_time"` - // IsIgnored 是否被手动忽略 (Snooze) - IsIgnored bool `gorm:"default:false;comment:是否被手动忽略" json:"is_ignored"` + // IsIgnored 字段加入到专为通知查询优化的复合索引中 + IsIgnored bool `gorm:"default:false;index:idx_notification_query;comment:是否被手动忽略" json:"is_ignored"` // IgnoredUntil 忽略截止时间。在此时间之前,即使告警持续,也不会发送通知。 // 使用指针类型 *time.Time 来表示可为空的时间。 IgnoredUntil *time.Time `gorm:"comment:忽略截止时间" json:"ignored_until"` - // LastNotifiedAt 上次发送通知的时间。用于控制重复通知的频率。 - LastNotifiedAt *time.Time `gorm:"comment:上次发送通知时间" json:"last_notified_at"` + // LastNotifiedAt 字段加入到专为通知查询优化的复合索引中 + LastNotifiedAt *time.Time `gorm:"index:idx_notification_query;comment:上次发送通知时间" json:"last_notified_at"` } // TableName 指定 ActiveAlarm 结构体对应的数据库表名 @@ -55,6 +78,9 @@ type HistoricalAlarm struct { // SourceID 告警来源ID,其具体含义取决于 SourceType 字段 (例如:设备ID, 区域主控ID, 猪栏ID)。 SourceID uint `gorm:"not null;index;comment:告警来源ID" json:"source_id"` + // AlarmCode 是一个机器可读的、标准化的告警类型标识。 + AlarmCode AlarmCode `gorm:"type:varchar(100);not null;index;comment:告警代码" json:"alarm_code"` + AlarmSummary string `gorm:"comment:告警简述" json:"alarm_summary"` Level SeverityLevel `gorm:"type:varchar(10);not null;comment:严重性等级" json:"level"` AlarmDetails string `gorm:"comment:告警详细内容" json:"alarm_details"` diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 4c50221..fe683a9 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -1,6 +1,8 @@ package models import ( + "encoding/json" + "errors" "time" "gorm.io/datatypes" @@ -68,3 +70,12 @@ type SensorData struct { func (SensorData) TableName() string { return "sensor_data" } + +// ParseData 解析 JSON 数据到一个具体的结构体中。 +// 调用方需要传入一个指向目标结构体实例的指针。 +func (s *SensorData) ParseData(v interface{}) error { + if s.Data == nil { + return errors.New("传感器数据为空,无法解析") + } + return json.Unmarshal(s.Data, v) +} diff --git a/internal/infra/repository/alarm_repository.go b/internal/infra/repository/alarm_repository.go index 8408ab5..e5dfba8 100644 --- a/internal/infra/repository/alarm_repository.go +++ b/internal/infra/repository/alarm_repository.go @@ -25,6 +25,21 @@ type ActiveAlarmListOptions struct { // AlarmRepository 定义了对告警模型的数据库操作接口 type AlarmRepository interface { + // CreateActiveAlarm 创建一条新的活跃告警记录 + CreateActiveAlarm(ctx context.Context, alarm *models.ActiveAlarm) error + + // IsAlarmActiveInUse 检查具有相同来源和告警代码的告警当前是否处于活跃表中 + IsAlarmActiveInUse(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode) (bool, error) + + // GetActiveAlarmByUniqueFieldsTx 在指定事务中根据唯一业务键获取一个活跃告警 + GetActiveAlarmByUniqueFieldsTx(ctx context.Context, tx *gorm.DB, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode) (*models.ActiveAlarm, error) + + // CreateHistoricalAlarmTx 在指定事务中创建一条历史告警记录 + CreateHistoricalAlarmTx(ctx context.Context, tx *gorm.DB, alarm *models.HistoricalAlarm) error + + // DeleteActiveAlarmTx 在指定事务中根据主键 ID 删除一个活跃告警 + DeleteActiveAlarmTx(ctx context.Context, tx *gorm.DB, id uint) error + // ListActiveAlarms 支持分页和过滤的活跃告警列表查询。 // 返回活跃告警列表、总记录数和错误。 ListActiveAlarms(ctx context.Context, opts ActiveAlarmListOptions, page, pageSize int) ([]models.ActiveAlarm, int64, error) @@ -59,6 +74,48 @@ func NewGormAlarmRepository(ctx context.Context, db *gorm.DB) AlarmRepository { } } +// CreateActiveAlarm 创建一条新的活跃告警记录 +func (r *gormAlarmRepository) CreateActiveAlarm(ctx context.Context, alarm *models.ActiveAlarm) error { + repoCtx := logs.AddFuncName(ctx, r.ctx, "CreateActiveAlarm") + return r.db.WithContext(repoCtx).Create(alarm).Error +} + +// IsAlarmActive 检查具有相同来源和告警代码的告警当前是否处于活跃状态 +func (r *gormAlarmRepository) IsAlarmActiveInUse(ctx context.Context, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode) (bool, error) { + repoCtx := logs.AddFuncName(ctx, r.ctx, "IsAlarmActiveInUse") + var count int64 + err := r.db.WithContext(repoCtx).Model(&models.ActiveAlarm{}). + Where("source_type = ? AND source_id = ? AND alarm_code = ?", sourceType, sourceID, alarmCode). + Count(&count).Error + if err != nil { + return false, err + } + return count > 0, nil +} + +// GetActiveAlarmByUniqueFieldsTx 在指定事务中根据唯一业务键获取一个活跃告警 +func (r *gormAlarmRepository) GetActiveAlarmByUniqueFieldsTx(ctx context.Context, tx *gorm.DB, sourceType models.AlarmSourceType, sourceID uint, alarmCode models.AlarmCode) (*models.ActiveAlarm, error) { + repoCtx := logs.AddFuncName(ctx, r.ctx, "GetActiveAlarmByUniqueFieldsTx") + var alarm models.ActiveAlarm + err := tx.WithContext(repoCtx). + Where("source_type = ? AND source_id = ? AND alarm_code = ?", sourceType, sourceID, alarmCode). + First(&alarm).Error + return &alarm, err +} + +// CreateHistoricalAlarmTx 在指定事务中创建一条历史告警记录 +func (r *gormAlarmRepository) CreateHistoricalAlarmTx(ctx context.Context, tx *gorm.DB, alarm *models.HistoricalAlarm) error { + repoCtx := logs.AddFuncName(ctx, r.ctx, "CreateHistoricalAlarmTx") + return tx.WithContext(repoCtx).Create(alarm).Error +} + +// DeleteActiveAlarmTx 在指定事务中根据主键 ID 删除一个活跃告警 +func (r *gormAlarmRepository) DeleteActiveAlarmTx(ctx context.Context, tx *gorm.DB, id uint) error { + repoCtx := logs.AddFuncName(ctx, r.ctx, "DeleteActiveAlarmTx") + // 使用 Unscoped() 确保执行物理删除,而不是软删除 + return tx.WithContext(repoCtx).Unscoped().Delete(&models.ActiveAlarm{}, id).Error +} + // ListActiveAlarms 实现了分页和过滤查询活跃告警记录的功能 func (r *gormAlarmRepository) ListActiveAlarms(ctx context.Context, opts ActiveAlarmListOptions, page, pageSize int) ([]models.ActiveAlarm, int64, error) { repoCtx := logs.AddFuncName(ctx, r.ctx, "ListActiveAlarms")