修改domain包

This commit is contained in:
2025-11-05 21:40:19 +08:00
parent 203be4307d
commit 07d8c719ac
28 changed files with 943 additions and 793 deletions

View File

@@ -1,6 +1,7 @@
package plan
import (
"context"
"fmt"
"sync"
"time"
@@ -14,78 +15,80 @@ import (
// AnalysisPlanTaskManager 定义了分析计划任务管理器的接口。
type AnalysisPlanTaskManager interface {
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
Refresh() error
Refresh(ctx context.Context) error
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 如果触发器已存在,会根据计划类型更新其执行时间。
CreateOrUpdateTrigger(planID uint) error
CreateOrUpdateTrigger(ctx context.Context, planID uint) error
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
EnsureAnalysisTaskDefinition(planID uint) error
EnsureAnalysisTaskDefinition(ctx context.Context, planID uint) error
}
// analysisPlanTaskManagerImpl 负责管理分析计划的触发器任务。
// 它确保数据库中可执行的计划在待执行队列中有对应的触发器,并移除无效的触发器。
// 这是一个有状态的组件,包含一个互斥锁以确保并发安全。
type analysisPlanTaskManagerImpl struct {
ctx context.Context
planRepo repository.PlanRepository
pendingTaskRepo repository.PendingTaskRepository
executionLogRepo repository.ExecutionLogRepository
logger *logs.Logger
mu sync.Mutex
}
// NewAnalysisPlanTaskManager 是 analysisPlanTaskManagerImpl 的构造函数。
func NewAnalysisPlanTaskManager(
ctx context.Context,
planRepo repository.PlanRepository,
pendingTaskRepo repository.PendingTaskRepository,
executionLogRepo repository.ExecutionLogRepository,
logger *logs.Logger,
) AnalysisPlanTaskManager {
return &analysisPlanTaskManagerImpl{
ctx: ctx,
planRepo: planRepo,
pendingTaskRepo: pendingTaskRepo,
executionLogRepo: executionLogRepo,
logger: logger,
}
}
// Refresh 同步数据库中的计划状态和待执行队列中的触发器任务。
// 这是一个编排方法,将复杂的逻辑分解到多个内部方法中。
func (m *analysisPlanTaskManagerImpl) Refresh() error {
func (m *analysisPlanTaskManagerImpl) Refresh(ctx context.Context) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "Refresh")
m.mu.Lock()
defer m.mu.Unlock()
m.logger.Info("开始同步计划任务管理器...")
logger.Info("开始同步计划任务管理器...")
// 1. 一次性获取所有需要的数据
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData()
runnablePlans, invalidPlanIDs, pendingTasks, err := m.getRefreshData(managerCtx)
if err != nil {
return fmt.Errorf("获取刷新数据失败: %w", err)
}
// 2. 清理所有与失效计划相关的待执行任务
if err := m.cleanupInvalidTasks(invalidPlanIDs, pendingTasks); err != nil {
if err := m.cleanupInvalidTasks(managerCtx, invalidPlanIDs, pendingTasks); err != nil {
// 仅记录错误,清理失败不应阻止新任务的添加
m.logger.Errorf("清理无效任务时出错: %v", err)
logger.Errorf("清理无效任务时出错: %v", err)
}
// 3. 添加或更新触发器
if err := m.addOrUpdateTriggers(runnablePlans, pendingTasks); err != nil {
if err := m.addOrUpdateTriggers(managerCtx, runnablePlans, pendingTasks); err != nil {
return fmt.Errorf("添加或更新触发器时出错: %w", err)
}
m.logger.Info("计划任务管理器同步完成.")
logger.Info("计划任务管理器同步完成.")
return nil
}
// CreateOrUpdateTrigger 为给定的 planID 创建其关联的触发任务。
// 如果触发器已存在,会根据计划类型更新其执行时间。
func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error {
func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(ctx context.Context, planID uint) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "CreateOrUpdateTrigger")
m.mu.Lock()
defer m.mu.Unlock()
// 检查计划是否可执行
plan, err := m.planRepo.GetBasicPlanByID(planID)
plan, err := m.planRepo.GetBasicPlanByID(managerCtx, planID)
if err != nil {
return fmt.Errorf("获取计划基本信息失败: %w", err)
}
@@ -94,7 +97,7 @@ func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error {
}
// 查找现有触发器
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(planID)
existingTrigger, err := m.pendingTaskRepo.FindPendingTriggerByPlanID(managerCtx, planID)
if err != nil {
return fmt.Errorf("查找现有触发器失败: %w", err)
}
@@ -109,7 +112,7 @@ func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error {
// 自动计划,根据 Cron 表达式计算下一次执行时间
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
logger.Errorf("为计划 #%d 解析Cron表达式失败无法更新触发器: %v", plan.ID, err)
return fmt.Errorf("解析 Cron 表达式失败: %w", err)
}
expectedExecuteAt = next
@@ -117,47 +120,48 @@ func (m *analysisPlanTaskManagerImpl) CreateOrUpdateTrigger(planID uint) error {
// 如果计算出的执行时间与当前待执行任务的时间不一致,则更新
if !existingTrigger.ExecuteAt.Equal(expectedExecuteAt) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, expectedExecuteAt); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, expectedExecuteAt)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(managerCtx, existingTrigger.ID, expectedExecuteAt); err != nil {
logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
return fmt.Errorf("更新触发器执行时间失败: %w", err)
}
} else {
m.logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
logger.Infof("计划 #%d 的触发器已存在且执行时间无需更新。", plan.ID)
}
return nil // 触发器已存在且已处理更新,直接返回
}
// 如果触发器不存在,则创建新的触发器
m.logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(plan)
logger.Infof("为计划 #%d 创建新的触发器...", planID)
return m.createTriggerTask(managerCtx, plan)
}
// EnsureAnalysisTaskDefinition 确保计划的分析任务定义存在于 tasks 表中。
// 如果不存在,则会自动创建。此方法不涉及待执行队列。
func (m *analysisPlanTaskManagerImpl) EnsureAnalysisTaskDefinition(planID uint) error {
func (m *analysisPlanTaskManagerImpl) EnsureAnalysisTaskDefinition(ctx context.Context, planID uint) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "EnsureAnalysisTaskDefinition")
m.mu.Lock()
defer m.mu.Unlock()
plan, err := m.planRepo.GetBasicPlanByID(planID)
plan, err := m.planRepo.GetBasicPlanByID(managerCtx, planID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:获取计划 #%d 基本信息时出错: %w", planID, err)
}
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(managerCtx, plan.ID)
if err != nil {
return fmt.Errorf("确保分析任务定义失败:查找计划 #%d 的分析任务时出错: %w", plan.ID, err)
}
if analysisTask == nil {
m.logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(plan) // CreatePlanAnalysisTask returns *models.Task, error
logger.Infof("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
_, err := m.planRepo.CreatePlanAnalysisTask(managerCtx, plan) // CreatePlanAnalysisTask returns *models.Task, error
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义。", plan.ID)
} else {
m.logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
logger.Infof("计划 #%d 的 'plan_analysis' 任务定义已存在。", plan.ID)
}
return nil
@@ -166,16 +170,17 @@ func (m *analysisPlanTaskManagerImpl) EnsureAnalysisTaskDefinition(planID uint)
// --- 内部私有方法 ---
// getRefreshData 从数据库获取刷新所需的所有数据。
func (m *analysisPlanTaskManagerImpl) getRefreshData() (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
runnablePlans, err = m.planRepo.FindRunnablePlans()
func (m *analysisPlanTaskManagerImpl) getRefreshData(ctx context.Context) (runnablePlans []*models.Plan, invalidPlanIDs []uint, pendingTasks []models.PendingTask, err error) {
managerCtx, logger := logs.Trace(ctx, m.ctx, "getRefreshData")
runnablePlans, err = m.planRepo.FindRunnablePlans(managerCtx)
if err != nil {
m.logger.Errorf("获取可执行计划列表失败: %v", err)
logger.Errorf("获取可执行计划列表失败: %v", err)
return
}
invalidPlans, err := m.planRepo.FindInactivePlans()
invalidPlans, err := m.planRepo.FindInactivePlans(managerCtx)
if err != nil {
m.logger.Errorf("获取失效计划列表失败: %v", err)
logger.Errorf("获取失效计划列表失败: %v", err)
return
}
invalidPlanIDs = make([]uint, len(invalidPlans))
@@ -183,16 +188,17 @@ func (m *analysisPlanTaskManagerImpl) getRefreshData() (runnablePlans []*models.
invalidPlanIDs[i] = p.ID
}
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks()
pendingTasks, err = m.pendingTaskRepo.FindAllPendingTasks(managerCtx)
if err != nil {
m.logger.Errorf("获取所有待执行任务失败: %v", err)
logger.Errorf("获取所有待执行任务失败: %v", err)
return
}
return
}
// cleanupInvalidTasks 清理所有与失效计划相关的待执行任务。
func (m *analysisPlanTaskManagerImpl) cleanupInvalidTasks(invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
func (m *analysisPlanTaskManagerImpl) cleanupInvalidTasks(ctx context.Context, invalidPlanIDs []uint, allPendingTasks []models.PendingTask) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "cleanupInvalidTasks")
if len(invalidPlanIDs) == 0 {
return nil // 没有需要清理的计划
}
@@ -219,24 +225,25 @@ func (m *analysisPlanTaskManagerImpl) cleanupInvalidTasks(invalidPlanIDs []uint,
return nil // 没有找到需要清理的任务
}
m.logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
logger.Infof("准备从待执行队列中清理 %d 个与失效计划相关的任务...", len(tasksToDeleteIDs))
// 批量删除待执行任务
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(tasksToDeleteIDs); err != nil {
if err := m.pendingTaskRepo.DeletePendingTasksByIDs(managerCtx, tasksToDeleteIDs); err != nil {
return fmt.Errorf("批量删除待执行任务失败: %w", err)
}
// 批量更新相关执行日志状态为“已取消”
if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
if err := m.executionLogRepo.UpdateTaskExecutionLogStatusByIDs(managerCtx, logsToCancelIDs, models.ExecutionStatusCancelled); err != nil {
// 这是一个非关键性错误,只记录日志
m.logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
logger.Warnf("批量更新日志状态为 'Cancelled' 失败: %v", err)
}
return nil
}
// addOrUpdateTriggers 检查、更新或创建触发器。
func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(ctx context.Context, runnablePlans []*models.Plan, allPendingTasks []models.PendingTask) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "addOrUpdateTriggers")
// 创建一个映射,存放所有已在队列中的计划触发器
pendingTriggersMap := make(map[uint]models.PendingTask)
for _, pt := range allPendingTasks {
@@ -254,22 +261,22 @@ func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(runnablePlans []*model
if plan.ExecutionType == models.PlanExecutionTypeAutomatic {
next, err := utils.GetNextCronTime(plan.CronExpression)
if err != nil {
m.logger.Errorf("为计划 #%d 解析Cron表达式失败跳过更新: %v", plan.ID, err)
logger.Errorf("为计划 #%d 解析Cron表达式失败跳过更新: %v", plan.ID, err)
continue
}
// 如果数据库中记录的执行时间与根据当前Cron表达式计算出的下一次时间不一致则更新
if !existingTrigger.ExecuteAt.Equal(next) {
m.logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(existingTrigger.ID, next); err != nil {
m.logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
logger.Infof("计划 #%d 的执行时间已变更,正在更新触发器 #%d 的执行时间从 %v 到 %v...", plan.ID, existingTrigger.ID, existingTrigger.ExecuteAt, next)
if err := m.pendingTaskRepo.UpdatePendingTaskExecuteAt(managerCtx, existingTrigger.ID, next); err != nil {
logger.Errorf("更新触发器 #%d 的执行时间失败: %v", existingTrigger.ID, err)
}
}
}
} else {
// --- 原有逻辑:为缺失的计划创建新触发器 ---
m.logger.Infof("发现应执行但队列中缺失的计划 #%d正在为其创建触发器...", plan.ID)
if err := m.createTriggerTask(plan); err != nil {
m.logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
logger.Infof("发现应执行但队列中缺失的计划 #%d正在为其创建触发器...", plan.ID)
if err := m.createTriggerTask(managerCtx, plan); err != nil {
logger.Errorf("为计划 #%d 创建触发器失败: %v", plan.ID, err)
// 继续处理下一个,不因单点失败而中断
}
}
@@ -278,21 +285,22 @@ func (m *analysisPlanTaskManagerImpl) addOrUpdateTriggers(runnablePlans []*model
}
// createTriggerTask 是创建触发器任务的内部核心逻辑。
func (m *analysisPlanTaskManagerImpl) createTriggerTask(plan *models.Plan) error {
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(plan.ID)
func (m *analysisPlanTaskManagerImpl) createTriggerTask(ctx context.Context, plan *models.Plan) error {
managerCtx, logger := logs.Trace(ctx, m.ctx, "createTriggerTask")
analysisTask, err := m.planRepo.FindPlanAnalysisTaskByPlanID(managerCtx, plan.ID)
if err != nil {
return fmt.Errorf("查找计划分析任务失败: %w", err)
}
// --- 如果触发器任务定义不存在,则自动创建 ---
if analysisTask == nil {
m.logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(plan)
logger.Warnf("未找到计划 #%d 关联的 'plan_analysis' 任务定义,将自动创建...", plan.ID)
newAnalysisTask, err := m.planRepo.CreatePlanAnalysisTask(managerCtx, plan)
if err != nil {
return fmt.Errorf("自动创建 'plan_analysis' 任务定义失败: %w", err)
}
analysisTask = newAnalysisTask
m.logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
logger.Infof("已成功为计划 #%d 创建 'plan_analysis' 任务定义 (ID: %d)", plan.ID, analysisTask.ID)
}
var executeAt time.Time
@@ -310,7 +318,7 @@ func (m *analysisPlanTaskManagerImpl) createTriggerTask(plan *models.Plan) error
TaskID: analysisTask.ID,
Status: models.ExecutionStatusWaiting,
}
if err := m.executionLogRepo.CreateTaskExecutionLog(taskLog); err != nil {
if err := m.executionLogRepo.CreateTaskExecutionLog(managerCtx, taskLog); err != nil {
return fmt.Errorf("创建任务执行日志失败: %w", err)
}
@@ -319,10 +327,10 @@ func (m *analysisPlanTaskManagerImpl) createTriggerTask(plan *models.Plan) error
ExecuteAt: executeAt,
TaskExecutionLogID: taskLog.ID,
}
if err := m.pendingTaskRepo.CreatePendingTask(pendingTask); err != nil {
if err := m.pendingTaskRepo.CreatePendingTask(managerCtx, pendingTask); err != nil {
return fmt.Errorf("创建待执行任务失败: %w", err)
}
m.logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
logger.Infof("成功为计划 #%d 创建触发器 (任务ID: %d),执行时间: %v", plan.ID, analysisTask.ID, executeAt)
return nil
}