Files
pig-farm-controller/internal/domain/notify/notify.go

299 lines
11 KiB
Go
Raw Normal View History

2025-10-24 20:33:15 +08:00
package notify
import (
2025-11-05 21:40:19 +08:00
"context"
2025-10-24 20:33:15 +08:00
"fmt"
"strings"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/notify"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// Service 定义了通知领域的核心业务逻辑接口
type Service interface {
// SendBatchAlarm 向一批用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
2025-11-05 21:40:19 +08:00
SendBatchAlarm(ctx context.Context, userIDs []uint, content notify.AlarmContent) error
2025-10-24 20:33:15 +08:00
// BroadcastAlarm 向所有用户发送告警通知。它会并发地为每个用户执行带故障转移的发送逻辑。
2025-11-05 21:40:19 +08:00
BroadcastAlarm(ctx context.Context, content notify.AlarmContent) error
2025-10-24 20:33:15 +08:00
// SendTestMessage 向指定用户发送一条测试消息,用于手动验证特定通知渠道的配置。
SendTestMessage(ctx context.Context, userID uint, notifierType models.NotifierType) error
2025-10-24 20:33:15 +08:00
}
// failoverService 是 Service 接口的实现,提供了故障转移功能
type failoverService struct {
2025-11-05 21:40:19 +08:00
ctx context.Context
2025-10-24 20:33:15 +08:00
userRepo repository.UserRepository
notifiers map[models.NotifierType]notify.Notifier
2025-10-24 20:33:15 +08:00
primaryNotifier notify.Notifier
failureThreshold int
failureCounters *sync.Map // 使用 sync.Map 来安全地并发读写失败计数, key: userID (uint), value: counter (int)
2025-10-25 14:15:17 +08:00
notificationRepo repository.NotificationRepository
2025-10-24 20:33:15 +08:00
}
// NewFailoverService 创建一个新的故障转移通知服务
func NewFailoverService(
2025-11-05 21:40:19 +08:00
ctx context.Context,
2025-10-24 20:33:15 +08:00
userRepo repository.UserRepository,
notifiers []notify.Notifier,
primaryNotifierType models.NotifierType,
2025-10-24 20:33:15 +08:00
failureThreshold int,
2025-10-25 14:15:17 +08:00
notificationRepo repository.NotificationRepository,
2025-10-24 20:33:15 +08:00
) (Service, error) {
notifierMap := make(map[models.NotifierType]notify.Notifier)
2025-10-24 20:33:15 +08:00
for _, n := range notifiers {
notifierMap[n.Type()] = n
}
primaryNotifier, ok := notifierMap[primaryNotifierType]
if !ok {
return nil, fmt.Errorf("首选通知器类型 '%s' 在提供的通知器列表中不存在", primaryNotifierType)
}
return &failoverService{
2025-11-05 21:40:19 +08:00
ctx: ctx,
2025-10-24 20:33:15 +08:00
userRepo: userRepo,
notifiers: notifierMap,
primaryNotifier: primaryNotifier,
failureThreshold: failureThreshold,
failureCounters: &sync.Map{},
2025-10-25 14:15:17 +08:00
notificationRepo: notificationRepo,
2025-10-24 20:33:15 +08:00
}, nil
}
// SendBatchAlarm 实现了向多个用户并发发送告警的功能
2025-11-05 21:40:19 +08:00
func (s *failoverService) SendBatchAlarm(ctx context.Context, userIDs []uint, content notify.AlarmContent) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "SendBatchAlarm")
2025-10-24 20:33:15 +08:00
var wg sync.WaitGroup
var mu sync.Mutex
var allErrors []string
2025-11-05 21:40:19 +08:00
logger.Infow("开始批量发送告警...", "userCount", len(userIDs))
2025-10-24 20:33:15 +08:00
for _, userID := range userIDs {
wg.Add(1)
go func(id uint) {
defer wg.Done()
2025-11-05 21:40:19 +08:00
if err := s.sendAlarmToUser(serviceCtx, id, content); err != nil {
2025-10-24 20:33:15 +08:00
mu.Lock()
allErrors = append(allErrors, fmt.Sprintf("发送失败 (用户ID: %d): %v", id, err))
mu.Unlock()
}
}(userID)
}
wg.Wait()
if len(allErrors) > 0 {
finalError := fmt.Errorf("批量告警发送完成,但有 %d 个用户发送失败:\n%s", len(allErrors), strings.Join(allErrors, "\n"))
2025-11-05 21:40:19 +08:00
logger.Error(finalError.Error())
2025-10-24 20:33:15 +08:00
return finalError
}
2025-11-05 21:40:19 +08:00
logger.Info("批量发送告警成功完成,所有用户均已通知。")
2025-10-24 20:33:15 +08:00
return nil
}
// BroadcastAlarm 实现了向所有用户发送告警的功能
2025-11-05 21:40:19 +08:00
func (s *failoverService) BroadcastAlarm(ctx context.Context, content notify.AlarmContent) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "BroadcastAlarm")
users, err := s.userRepo.FindAll(serviceCtx)
2025-10-24 20:33:15 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorw("广播告警失败:查找所有用户时出错", "error", err)
2025-10-24 20:33:15 +08:00
return fmt.Errorf("广播告警失败:查找所有用户时出错: %w", err)
}
var userIDs []uint
for _, user := range users {
userIDs = append(userIDs, user.ID)
}
2025-11-05 21:40:19 +08:00
logger.Infow("开始广播告警给所有用户", "totalUsers", len(userIDs))
2025-10-24 20:33:15 +08:00
// 复用 SendBatchAlarm 的逻辑进行并发发送和错误处理
2025-11-05 21:40:19 +08:00
return s.SendBatchAlarm(serviceCtx, userIDs, content)
2025-10-24 20:33:15 +08:00
}
// sendAlarmToUser 是为单个用户发送告警的内部方法,包含了完整的故障转移逻辑
2025-11-05 21:40:19 +08:00
func (s *failoverService) sendAlarmToUser(ctx context.Context, userID uint, content notify.AlarmContent) error {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "sendAlarmToUser")
user, err := s.userRepo.FindByID(serviceCtx, userID)
2025-10-24 20:33:15 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorw("发送告警失败:查找用户时出错", "userID", userID, "error", err)
2025-10-24 20:33:15 +08:00
return fmt.Errorf("查找用户失败: %w", err)
}
counter, _ := s.failureCounters.LoadOrStore(userID, 0)
failureCount := counter.(int)
if failureCount < s.failureThreshold {
primaryType := s.primaryNotifier.Type()
addr := getAddressForNotifier(primaryType, user.Contact)
if addr == "" {
2025-10-25 14:15:17 +08:00
// 记录跳过通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, primaryType, content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType))
2025-10-24 20:33:15 +08:00
return fmt.Errorf("用户未配置首选通知方式 '%s' 的地址", primaryType)
}
2025-11-05 21:40:19 +08:00
err = s.primaryNotifier.Send(serviceCtx, content, addr)
2025-10-24 20:33:15 +08:00
if err == nil {
2025-10-25 14:15:17 +08:00
// 记录成功通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, primaryType, content, addr, models.NotificationStatusSuccess, nil)
2025-10-24 20:33:15 +08:00
if failureCount > 0 {
2025-11-05 21:40:19 +08:00
logger.Infow("首选渠道发送恢复正常", "userID", userID, "notifierType", primaryType)
2025-10-24 20:33:15 +08:00
s.failureCounters.Store(userID, 0)
}
return nil
}
2025-10-25 14:15:17 +08:00
// 记录失败通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, primaryType, content, addr, models.NotificationStatusFailed, err)
2025-10-24 20:33:15 +08:00
newFailureCount := failureCount + 1
s.failureCounters.Store(userID, newFailureCount)
2025-11-05 21:40:19 +08:00
logger.Warnw("首选渠道发送失败", "userID", userID, "notifierType", primaryType, "error", err, "failureCount", newFailureCount)
2025-10-24 20:33:15 +08:00
failureCount = newFailureCount
}
if failureCount >= s.failureThreshold {
2025-11-05 21:40:19 +08:00
logger.Warnw("故障转移阈值已达到,开始广播通知", "userID", userID, "threshold", s.failureThreshold)
2025-10-24 20:33:15 +08:00
var lastErr error
for _, notifier := range s.notifiers {
addr := getAddressForNotifier(notifier.Type(), user.Contact)
if addr == "" {
2025-10-25 14:15:17 +08:00
// 记录跳过通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifier.Type(), content, "", models.NotificationStatusSkipped, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifier.Type()))
2025-10-24 20:33:15 +08:00
continue
}
2025-11-05 21:40:19 +08:00
if err := notifier.Send(serviceCtx, content, addr); err == nil {
2025-10-25 14:15:17 +08:00
// 记录成功通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifier.Type(), content, addr, models.NotificationStatusSuccess, nil)
logger.Infow("广播通知成功", "userID", userID, "notifierType", notifier.Type())
2025-10-24 20:33:15 +08:00
s.failureCounters.Store(userID, 0)
return nil
}
2025-10-25 14:15:17 +08:00
// 记录失败通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifier.Type(), content, addr, models.NotificationStatusFailed, err)
2025-10-24 20:33:15 +08:00
lastErr = err
2025-11-05 21:40:19 +08:00
logger.Warnw("广播通知:渠道发送失败", "userID", userID, "notifierType", notifier.Type(), "error", err)
2025-10-24 20:33:15 +08:00
}
return fmt.Errorf("所有渠道均发送失败,最后一个错误: %w", lastErr)
}
return nil
}
// SendTestMessage 实现了手动发送测试消息的功能
func (s *failoverService) SendTestMessage(ctx context.Context, userID uint, notifierType models.NotifierType) error {
2025-11-05 21:40:19 +08:00
serviceCtx, logger := logs.Trace(ctx, s.ctx, "SendTestMessage")
user, err := s.userRepo.FindByID(serviceCtx, userID)
2025-10-24 20:33:15 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorw("发送测试消息失败:查找用户时出错", "userID", userID, "error", err)
2025-10-24 20:33:15 +08:00
return fmt.Errorf("查找用户失败: %w", err)
}
notifier, ok := s.notifiers[notifierType]
if !ok {
2025-11-05 21:40:19 +08:00
logger.Errorw("发送测试消息失败:通知器类型不存在", "userID", userID, "notifierType", notifierType)
2025-10-24 20:33:15 +08:00
return fmt.Errorf("指定的通知器类型 '%s' 不存在", notifierType)
}
addr := getAddressForNotifier(notifierType, user.Contact)
if addr == "" {
2025-11-05 21:40:19 +08:00
logger.Warnw("发送测试消息失败:缺少地址", "userID", userID, "notifierType", notifierType)
2025-10-25 14:15:17 +08:00
// 记录跳过通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifierType, notify.AlarmContent{
2025-10-25 14:15:17 +08:00
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: models.InfoLevel,
2025-10-25 14:15:17 +08:00
Timestamp: time.Now(),
}, "", models.NotificationStatusFailed, fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType))
2025-10-24 20:33:15 +08:00
return fmt.Errorf("用户未配置通知方式 '%s' 的地址", notifierType)
}
testContent := notify.AlarmContent{
Title: "通知服务测试",
Message: fmt.Sprintf("这是一条来自【%s】渠道的测试消息。如果您收到此消息说明您的配置正确。", notifierType),
Level: models.InfoLevel,
2025-10-24 20:33:15 +08:00
Timestamp: time.Now(),
}
2025-11-05 21:40:19 +08:00
logger.Infow("正在发送测试消息...", "userID", userID, "notifierType", notifierType, "address", addr)
err = notifier.Send(serviceCtx, testContent, addr)
2025-10-24 20:33:15 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorw("发送测试消息失败", "userID", userID, "notifierType", notifierType, "error", err)
2025-10-25 14:15:17 +08:00
// 记录失败通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifierType, testContent, addr, models.NotificationStatusFailed, err)
2025-10-24 20:33:15 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infow("发送测试消息成功", "userID", userID, "notifierType", notifierType)
2025-10-25 14:15:17 +08:00
// 记录成功通知
2025-11-05 21:40:19 +08:00
s.recordNotificationAttempt(serviceCtx, userID, notifierType, testContent, addr, models.NotificationStatusSuccess, nil)
2025-10-24 20:33:15 +08:00
return nil
}
// getAddressForNotifier 是一个辅助函数,根据通知器类型从 ContactInfo 中获取对应的地址
func getAddressForNotifier(notifierType models.NotifierType, contact models.ContactInfo) string {
2025-10-24 20:33:15 +08:00
switch notifierType {
case models.NotifierTypeSMTP:
2025-10-24 20:33:15 +08:00
return contact.Email
case models.NotifierTypeWeChat:
2025-10-24 20:33:15 +08:00
return contact.WeChat
case models.NotifierTypeLark:
2025-10-24 20:33:15 +08:00
return contact.Feishu
case models.NotifierTypeLog:
return "log" // LogNotifier不需要具体的地址但为了函数签名一致性返回一个无意义的非空字符串以绕过配置存在检查
2025-10-24 20:33:15 +08:00
default:
return ""
}
}
2025-10-25 14:15:17 +08:00
// recordNotificationAttempt 记录一次通知发送尝试的结果
// userID: 接收通知的用户ID
// notifierType: 使用的通知器类型
// content: 通知内容
// toAddress: 实际发送到的地址
// status: 发送尝试的状态 (成功、失败、跳过)
// err: 如果发送失败,记录的错误信息
func (s *failoverService) recordNotificationAttempt(
2025-11-05 21:40:19 +08:00
ctx context.Context,
2025-10-25 14:15:17 +08:00
userID uint,
notifierType models.NotifierType,
2025-10-25 14:15:17 +08:00
content notify.AlarmContent,
toAddress string,
status models.NotificationStatus,
err error,
) {
2025-11-05 21:40:19 +08:00
serviceCtx, logger := logs.Trace(ctx, s.ctx, "recordNotificationAttempt")
2025-10-25 14:15:17 +08:00
errorMessage := ""
if err != nil {
errorMessage = err.Error()
}
notification := &models.Notification{
NotifierType: notifierType,
UserID: userID,
Title: content.Title,
Message: content.Message,
Level: content.Level,
2025-10-25 14:15:17 +08:00
AlarmTimestamp: content.Timestamp,
ToAddress: toAddress,
Status: status,
ErrorMessage: errorMessage,
}
2025-11-05 21:40:19 +08:00
if saveErr := s.notificationRepo.Create(serviceCtx, notification); saveErr != nil {
logger.Errorw("无法保存通知发送记录到数据库",
2025-10-25 14:15:17 +08:00
"userID", userID,
"notifierType", notifierType,
"status", status,
"originalError", errorMessage,
"saveError", saveErr,
)
}
}