diff --git a/config.example.yml b/config.example.yml index ae486b5..1a74a77 100644 --- a/config.example.yml +++ b/config.example.yml @@ -112,4 +112,4 @@ notify: # 定时采集配置 collection: - interval: 300 # 采集间隔 (秒) + interval: 1 # 采集间隔 (分钟) diff --git a/config.yml b/config.yml index ace399d..d3eaee2 100644 --- a/config.yml +++ b/config.yml @@ -90,4 +90,4 @@ lora_mesh: # 定时采集配置 collection: - interval: 300 # 采集间隔 (秒) \ No newline at end of file + interval: 1 # 采集间隔 (分钟) \ No newline at end of file diff --git a/internal/core/application.go b/internal/core/application.go index fcc0904..74ee2da 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -9,7 +9,6 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/app/api" "git.huangwc.com/pig/pig-farm-controller/internal/infra/config" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" ) // Application 是整个应用的核心,封装了所有组件和生命周期。 @@ -133,133 +132,3 @@ func (app *Application) Stop() error { app.Logger.Info("应用已成功关闭") return nil } - -// initializeState 在应用启动时准备其初始数据状态。 -// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 -func (app *Application) initializeState() error { - // 清理待采集任务 (非致命错误) - if err := app.initializePendingCollections(); err != nil { - app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) - } - - // 初始化待执行任务列表 (致命错误) - if err := app.initializePendingTasks(); err != nil { - return fmt.Errorf("初始化待执行任务列表失败: %w", err) - } - - return nil -} - -// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 -// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 -// 这保证了系统在每次启动时都处于一个干净、确定的状态。 -func (app *Application) initializePendingCollections() error { - app.Logger.Info("开始清理所有未完成的采集请求...") - - // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 - count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() - if err != nil { - return fmt.Errorf("清理未完成的采集请求失败: %v", err) - } else if count > 0 { - app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) - } else { - app.Logger.Info("没有需要清理的采集请求。") - } - - return nil -} - -// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 -func (app *Application) initializePendingTasks() error { - logger := app.Logger - planRepo := app.Infra.Repos.PlanRepo - pendingTaskRepo := app.Infra.Repos.PendingTaskRepo - executionLogRepo := app.Infra.Repos.ExecutionLogRepo - analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager - - logger.Info("开始初始化待执行任务列表...") - - // 阶段一:修正因崩溃导致状态不一致的固定次数计划 - logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") - plansToCorrect, err := planRepo.FindPlansWithPendingTasks() - if err != nil { - return fmt.Errorf("查找需要修正的计划失败: %w", err) - } - - for _, plan := range plansToCorrect { - logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) - - // 更新计划的执行计数 - plan.ExecuteCount++ - logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) - - if plan.ExecutionType == models.PlanExecutionTypeManual || - (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { - // 更新计划状态为已停止 - plan.Status = models.PlanStatusStopped - logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) - - } - // 保存更新后的计划 - if err := planRepo.UpdatePlan(plan); err != nil { - logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) - // 这是一个非阻塞性错误,继续处理其他计划 - } - } - logger.Info("阶段一:固定次数计划修正完成。") - - // 阶段二:清理所有待执行任务和相关日志 - logger.Info("阶段二:开始清理所有待执行任务和相关日志...") - - // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- - // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) - incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() - if err != nil { - return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) - } - - // 2. 收集所有受影响的唯一 PlanID - affectedPlanIDs := make(map[uint]struct{}) - for _, log := range incompletePlanLogs { - affectedPlanIDs[log.PlanID] = struct{}{} - } - - // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed - for planID := range affectedPlanIDs { - logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) - // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 - if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { - logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) - // 这是一个非阻塞性错误,继续处理其他计划 - } - } - logger.Info("阶段二:计划主表状态修正完成。") - - // 直接调用新的方法来更新计划执行日志状态为失败 - if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { - logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - - // 直接调用新的方法来更新任务执行日志状态为取消 - if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { - logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) - // 这是一个非阻塞性错误,继续执行 - } - - // 清空待执行列表 - if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { - return fmt.Errorf("清空待执行任务列表失败: %w", err) - } - logger.Info("阶段二:待执行任务和相关日志清理完成。") - - // 阶段三:初始刷新 - logger.Info("阶段三:开始刷新待执行列表...") - if err := analysisPlanTaskManager.Refresh(); err != nil { - return fmt.Errorf("刷新待执行任务列表失败: %w", err) - } - logger.Info("阶段三:待执行任务列表初始化完成。") - - logger.Info("待执行任务列表初始化完成。") - return nil -} diff --git a/internal/core/initializers.go b/internal/core/component_initializers.go similarity index 100% rename from internal/core/initializers.go rename to internal/core/component_initializers.go diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go new file mode 100644 index 0000000..6be1ae0 --- /dev/null +++ b/internal/core/data_initializer.go @@ -0,0 +1,217 @@ +package core + +import ( + "fmt" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +const ( + // PlanNameTimedFullDataCollection 是定时全量数据采集计划的名称 + PlanNameTimedFullDataCollection = "定时全量数据采集" +) + +// initializeState 在应用启动时准备其初始数据状态。 +// 这包括清理任何因上次异常关闭而留下的悬空任务或请求。 +func (app *Application) initializeState() error { + // 初始化预定义系统计划 (致命错误) + if err := app.initializeSystemPlans(); err != nil { + return fmt.Errorf("初始化预定义系统计划失败: %w", err) + } + + // 清理待采集任务 (非致命错误) + if err := app.initializePendingCollections(); err != nil { + app.Logger.Errorw("清理待采集任务时发生非致命错误", "error", err) + } + + // 初始化待执行任务列表 (致命错误) + if err := app.initializePendingTasks(); err != nil { + return fmt.Errorf("初始化待执行任务列表失败: %w", err) + } + + return nil +} + +// initializeSystemPlans 确保预定义的系统计划在数据库中存在。 +func (app *Application) initializeSystemPlans() error { + app.Logger.Info("开始检查并创建预定义的系统计划...") + + // 动态构建预定义计划列表 + predefinedSystemPlans := app.getPredefinedSystemPlans() + + // 1. 获取所有已存在的系统计划 + existingPlans, _, err := app.Infra.Repos.PlanRepo.ListPlans(repository.ListPlansOptions{ + PlanType: repository.PlanTypeFilterSystem, + }, 1, 999) // 使用一个较大的 pageSize 来获取所有系统计划 + if err != nil { + return fmt.Errorf("获取现有系统计划失败: %w", err) + } + + // 2. 为了方便查找, 将现有计划名放入一个 map + existingPlanNames := make(map[string]bool) + for _, p := range existingPlans { + existingPlanNames[p.Name] = true + } + + // 3. 遍历预定义的计划列表 + for _, predefinedPlan := range predefinedSystemPlans { + // 4. 如果计划不存在, 则创建 + if !existingPlanNames[predefinedPlan.Name] { + app.Logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) + if err := app.Infra.Repos.PlanRepo.CreatePlan(&predefinedPlan); err != nil { + // 错误现在是致命的 + return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) + } else { + app.Logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) + } + } + } + + app.Logger.Info("预定义系统计划检查完成。") + return nil +} + +// getPredefinedSystemPlans 返回一个基于当前配置的预定义系统计划列表。 +func (app *Application) getPredefinedSystemPlans() []models.Plan { + + // 根据配置创建定时全量采集计划 + interval := app.Config.Collection.Interval + if interval <= 0 { + interval = 1 // 确保间隔至少为1分钟 + } + cronExpression := fmt.Sprintf("*/%d * * * *", interval) + timedCollectionPlan := models.Plan{ + Name: PlanNameTimedFullDataCollection, + Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 秒自动触发一次全量数据采集。", app.Config.Collection.Interval), + PlanType: models.PlanTypeSystem, + ExecutionType: models.PlanExecutionTypeAutomatic, + CronExpression: cronExpression, + Status: models.PlanStatusEnabled, + ContentType: models.PlanContentTypeTasks, + Tasks: []models.Task{ + { + Name: "全量采集", + Description: "触发一次全量数据采集", + ExecutionOrder: 1, + Type: models.TaskTypeFullCollection, + }, + }, + } + + return []models.Plan{timedCollectionPlan} +} + +// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 +// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 +// 这保证了系统在每次启动时都处于一个干净、确定的状态。 +func (app *Application) initializePendingCollections() error { + app.Logger.Info("开始清理所有未完成的采集请求...") + + // 直接将所有 'pending' 状态的请求更新为 'timed_out'。 + count, err := app.Infra.Repos.PendingCollectionRepo.MarkAllPendingAsTimedOut() + if err != nil { + return fmt.Errorf("清理未完成的采集请求失败: %v", err) + } else if count > 0 { + app.Logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count) + } else { + app.Logger.Info("没有需要清理的采集请求。") + } + + return nil +} + +// initializePendingTasks 在应用启动时清理并刷新待执行任务列表。 +func (app *Application) initializePendingTasks() error { + logger := app.Logger + planRepo := app.Infra.Repos.PlanRepo + pendingTaskRepo := app.Infra.Repos.PendingTaskRepo + executionLogRepo := app.Infra.Repos.ExecutionLogRepo + analysisPlanTaskManager := app.Domain.AnalysisPlanTaskManager + + logger.Info("开始初始化待执行任务列表...") + + // 阶段一:修正因崩溃导致状态不一致的固定次数计划 + logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...") + plansToCorrect, err := planRepo.FindPlansWithPendingTasks() + if err != nil { + return fmt.Errorf("查找需要修正的计划失败: %w", err) + } + + for _, plan := range plansToCorrect { + logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name) + + // 更新计划的执行计数 + plan.ExecuteCount++ + logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount) + + if plan.ExecutionType == models.PlanExecutionTypeManual || + (plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) { + // 更新计划状态为已停止 + plan.Status = models.PlanStatusStopped + logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID) + + } + // 保存更新后的计划 + if err := planRepo.UpdatePlan(plan); err != nil { + logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err) + // 这是一个非阻塞性错误,继续处理其他计划 + } + } + logger.Info("阶段一:固定次数计划修正完成。") + + // 阶段二:清理所有待执行任务和相关日志 + logger.Info("阶段二:开始清理所有待执行任务和相关日志...") + + // --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 --- + // 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting) + incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs() + if err != nil { + return fmt.Errorf("查找未完成的计划执行日志失败: %w", err) + } + + // 2. 收集所有受影响的唯一 PlanID + affectedPlanIDs := make(map[uint]struct{}) + for _, log := range incompletePlanLogs { + affectedPlanIDs[log.PlanID] = struct{}{} + } + + // 3. 对于每个受影响的 PlanID,重置其 execute_count 并将其状态设置为 Failed + for planID := range affectedPlanIDs { + logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID) + // 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据 + if err := planRepo.UpdatePlanStateAfterExecution(planID, 0, models.PlanStatusFailed); err != nil { + logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err) + // 这是一个非阻塞性错误,继续处理其他计划 + } + } + logger.Info("阶段二:计划主表状态修正完成。") + + // 直接调用新的方法来更新计划执行日志状态为失败 + if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + + // 直接调用新的方法来更新任务执行日志状态为取消 + if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(); err != nil { + logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err) + // 这是一个非阻塞性错误,继续执行 + } + + // 清空待执行列表 + if err := pendingTaskRepo.ClearAllPendingTasks(); err != nil { + return fmt.Errorf("清空待执行任务列表失败: %w", err) + } + logger.Info("阶段二:待执行任务和相关日志清理完成。") + + // 阶段三:初始刷新 + logger.Info("阶段三:开始刷新待执行列表...") + if err := analysisPlanTaskManager.Refresh(); err != nil { + return fmt.Errorf("刷新待执行任务列表失败: %w", err) + } + logger.Info("阶段三:待执行任务列表初始化完成。") + + logger.Info("待执行任务列表初始化完成。") + return nil +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index b22f97b..81256b5 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -200,13 +200,18 @@ type LarkConfig struct { // CollectionConfig 代表定时采集配置 type CollectionConfig struct { + // Interval 采集间隔(分钟), 默认 1 Interval int `yaml:"interval"` } // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 - return &Config{} + return &Config{ + Collection: CollectionConfig{ + Interval: 1, // 默认为1分钟 + }, + } } // Load 从指定路径加载配置文件