Files
pig-farm-controller/internal/core/data_initializer.go

367 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package core
import (
"context"
"fmt"
"path/filepath"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// initializeState 在应用启动时准备其初始数据状态。
// 它遵循一个严格的顺序:清理 -> 更新 -> 刷新,以确保数据的一致性和正确性。
func (app *Application) initializeState(ctx context.Context, cfgPath string) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeState")
// 1. 播种预设数据
logger.Info("开始播种预设数据...")
presetDir := filepath.Join(filepath.Dir(cfgPath), "presets-data")
if err := database.SeedFromPreset(appCtx, app.Infra.storage.GetDB(appCtx), presetDir); err != nil {
return fmt.Errorf("预设数据播种失败: %w", err)
}
logger.Info("预设数据播种成功。")
// 2. 清理所有上次运行时遗留的待执行任务和相关日志。
// 这一步必须在任何可能修改计划结构的操作之前执行,以避免外键约束冲突。
if err := app.cleanupStaleTasksAndLogs(appCtx); err != nil {
return fmt.Errorf("清理过期的任务及日志失败: %w", err)
}
// 3. 清理待采集任务 (非致命错误)。
if err := app.initializePendingCollections(appCtx); err != nil {
logger.Errorw("清理待采集任务时发生非致命错误", "error", err)
}
// 4. 初始化并更新系统计划。
// 此时,所有旧的待执行任务已被清除,可以安全地更新计划结构。
if err := app.initializeSystemPlans(ctx); err != nil {
return fmt.Errorf("初始化预定义系统计划失败: %w", err)
}
// 5. 最后,根据最新的计划状态,统一刷新所有计划的触发器。
// 这一步确保了新创建或更新的系统计划能够被正确地调度。
logger.Info("正在刷新所有计划的触发器...")
if err := app.Domain.planService.RefreshPlanTriggers(appCtx); err != nil {
return fmt.Errorf("刷新计划触发器失败: %w", err)
}
logger.Info("计划触发器刷新完成。")
return nil
}
// initializeSystemPlans 确保预定义的系统计划在数据库中存在并保持最新。
// 它通过调用各个独立的计划初始化方法来完成此操作。
func (app *Application) initializeSystemPlans(ctx context.Context) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializeSystemPlans")
logger.Info("开始检查并更新预定义的系统计划...")
// 1. 获取所有已存在的系统计划
existingPlans, _, err := app.Infra.repos.planRepo.ListPlans(appCtx, repository.ListPlansOptions{
PlanType: repository.PlanTypeFilterSystem,
IncludeAssociations: true,
}, 1, 99999) // 使用一个较大的 pageSize 来获取所有系统计划
if err != nil {
return fmt.Errorf("获取现有系统计划失败: %w", err)
}
// 2. 为了方便查找, 将现有计划名放入一个 map
existingPlanMap := make(map[models.PlanName]*models.Plan)
for i := range existingPlans {
existingPlanMap[existingPlans[i].Name] = &existingPlans[i]
}
// 3. 调用独立的初始化方法来处理每个系统计划
if err := app.initializePeriodicSystemHealthCheckPlan(appCtx, existingPlanMap); err != nil {
return err // 如果任何一个计划初始化失败,则立即返回错误
}
if err := app.initializeAlarmNotificationPlan(appCtx, existingPlanMap); err != nil {
return err
}
logger.Info("预定义系统计划检查完成。")
return nil
}
// initializePeriodicSystemHealthCheckPlan 负责初始化 "周期性系统健康检查" 计划。
// 它会根据当前配置动态构建计划,并决定是创建新计划还是更新现有计划。
func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Context, existingPlanMap map[models.PlanName]*models.Plan) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "initializePeriodicSystemHealthCheckPlan")
// 根据配置创建定时全量采集计划
interval := app.Config.Collection.Interval
if interval <= 0 {
interval = 1 // 确保间隔至少为1分钟
}
cronExpression := fmt.Sprintf("*/%d * * * *", interval)
// 定义预设的全量采集任务
fullCollectionTask := models.Task{
Name: "全量采集",
Description: "触发一次全量数据采集",
ExecutionOrder: 1,
Type: models.TaskTypeFullCollection,
}
// 定义预设的延时任务
delayParams := task.DelayTaskParams{DelayDuration: 10} // 延时10秒
delayTask := models.Task{
Name: "延时任务",
Description: "系统预设延时任务,用于等待设备上传数据",
ExecutionOrder: 2,
Type: models.TaskTypeWaiting,
}
err := delayTask.SaveParameters(delayParams)
if err != nil {
return fmt.Errorf("序列化延时任务参数失败: %w", err)
}
// 构建新的任务列表
var newTasks []models.Task
newTasks = append(newTasks, fullCollectionTask, delayTask)
// 如果计划已存在,则获取其现有任务并追加到新任务列表后(排除预设任务)
if foundExistingPlan, ok := existingPlanMap[models.PlanNamePeriodicSystemHealthCheck]; ok {
for _, existingTask := range foundExistingPlan.Tasks {
// 排除已预设的全量采集和延时任务
if existingTask.Type != models.TaskTypeFullCollection && existingTask.Type != models.TaskTypeWaiting {
newTasks = append(newTasks, existingTask)
}
}
}
predefinedPlan := &models.Plan{
Name: models.PlanNamePeriodicSystemHealthCheck,
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval),
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: cronExpression,
Status: models.PlanStatusEnabled,
ContentType: models.PlanContentTypeTasks,
Tasks: newTasks,
}
// 刷新所有任务的 ExecutionOrder
predefinedPlan.ReorderSteps()
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
// 将数据库中已存在的计划的ID和运行时状态字段赋值给预定义计划
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
// 1. 使用 UpdatePlanMetadataAndStructure 来更新计划的元数据和关联任务
// 这会处理 Name, Description, ExecutionType, ExecuteNum, CronExpression, ContentType
// 并且最重要的是,它会正确处理 Tasks 的增删改,确保任务列表与 predefinedPlan.Tasks 完全同步
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
// 2. 接着使用 UpdatePlan 来更新所有顶层字段,包括 PlanType 和 Status
// 由于任务已经在上一步正确同步,此步不会导致任务冗余
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
return nil
}
// initializeAlarmNotificationPlan 负责初始化 "告警通知发送" 计划。
// 它确保系统中存在一个每分钟执行的、用于发送告警通知的预定义计划。
func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, existingPlanMap map[models.PlanName]*models.Plan) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeAlarmNotificationPlan")
predefinedPlan := &models.Plan{
Name: models.PlanNameAlarmNotification,
Description: "这是一个系统预定义的计划, 每分钟自动触发一次告警通知发送。",
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: "*/1 * * * *", // 每分钟执行一次
Status: models.PlanStatusEnabled,
ContentType: models.PlanContentTypeTasks,
Tasks: []models.Task{
{
Name: "刷新通知状态",
Description: "刷新所有待处理的告警通知状态, 如将忽略到期的通知改回待发送状态",
ExecutionOrder: 1,
Type: models.TaskTypeNotificationRefresh,
},
{
Name: "告警通知发送",
Description: "发送所有待处理的告警通知",
ExecutionOrder: 2,
Type: models.TaskTypeAlarmNotification,
},
},
}
err := predefinedPlan.Tasks[1].SaveParameters(app.Config.AlarmNotification.NotificationIntervals)
if err != nil {
return fmt.Errorf("序列化告警通知任务参数失败: %w", err)
}
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。
func (app *Application) initializePendingCollections(ctx context.Context) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "InitializePendingCollections")
logger.Info("开始清理所有未完成的采集请求...")
// 直接将所有 'pending' 状态的请求更新为 'timed_out'。
count, err := app.Infra.repos.pendingCollectionRepo.MarkAllPendingAsTimedOut(appCtx)
if err != nil {
return fmt.Errorf("清理未完成的采集请求失败: %v", err)
} else if count > 0 {
logger.Infof("成功将 %d 个未完成的采集请求标记为超时。", count)
} else {
logger.Info("没有需要清理的采集请求。")
}
return nil
}
// cleanupStaleTasksAndLogs 在应用启动时清理所有因上次异常关闭而遗留的待执行任务和相关日志。
func (app *Application) cleanupStaleTasksAndLogs(ctx context.Context) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "CleanupStaleTasksAndLogs")
planRepo := app.Infra.repos.planRepo
pendingTaskRepo := app.Infra.repos.pendingTaskRepo
executionLogRepo := app.Infra.repos.executionLogRepo
logger.Info("开始清理过期的任务及日志...")
// 阶段一:修正因崩溃导致状态不一致的固定次数计划
logger.Info("阶段一:开始修正因崩溃导致状态不一致的固定次数计划...")
plansToCorrect, err := planRepo.FindPlansWithPendingTasks(appCtx)
if err != nil {
return fmt.Errorf("查找需要修正的计划失败: %w", err)
}
for _, plan := range plansToCorrect {
logger.Infof("发现需要修正的计划 #%d (名称: %s)。", plan.ID, plan.Name)
// 更新计划的执行计数
plan.ExecuteCount++
logger.Infof("计划 #%d 执行计数已从 %d 更新为 %d。", plan.ID, plan.ExecuteCount-1, plan.ExecuteCount)
if plan.ExecutionType == models.PlanExecutionTypeManual ||
(plan.ExecutionType == models.PlanExecutionTypeAutomatic && plan.ExecuteCount >= plan.ExecuteNum) {
// 更新计划状态为已停止
plan.Status = models.PlanStatusStopped
logger.Infof("计划 #%d 状态已更新为 '执行完毕'。", plan.ID)
}
// 保存更新后的计划
if err := planRepo.UpdatePlan(appCtx, plan); err != nil {
logger.Errorf("修正计划 #%d 状态失败: %v", plan.ID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段一:固定次数计划修正完成。")
// 阶段二:清理所有待执行任务和相关日志
logger.Info("阶段二:开始清理所有待执行任务和相关日志...")
// --- 新增逻辑:处理因崩溃导致状态不一致的计划主表状态 ---
// 1. 查找所有未完成的计划执行日志 (状态为 Started 或 Waiting)
incompletePlanLogs, err := executionLogRepo.FindIncompletePlanExecutionLogs(appCtx)
if err != nil {
return fmt.Errorf("查找未完成的计划执行日志失败: %w", err)
}
// 2. 收集所有受影响的唯一 PlanID
affectedPlanIDs := make(map[uint32]struct{})
for _, log := range incompletePlanLogs {
affectedPlanIDs[log.PlanID] = struct{}{}
}
// 3. 对于每个受影响的 PlanID重置其 execute_count 并将其状态设置为 Failed, 系统计划不受此影响
for planID := range affectedPlanIDs {
// 首先,获取计划的详细信息以判断其类型
plan, err := planRepo.GetBasicPlanByID(appCtx, planID)
if err != nil {
logger.Errorf("在尝试修正计划状态时,获取计划 #%d 的基本信息失败: %v", planID, err)
continue // 获取失败,跳过此计划
}
// 如果是系统计划,则不应标记为失败,仅记录日志
if plan.PlanType == models.PlanTypeSystem {
logger.Warnf("检测到系统计划 #%d 在应用崩溃前处于未完成状态,但根据策略,将保持其原有状态不标记为失败。", planID)
continue // 跳过,不处理
}
// 对于非系统计划,执行原有的失败标记逻辑
logger.Warnf("检测到计划 #%d 在应用崩溃前处于未完成状态,将重置其计数并标记为失败。", planID)
// 使用 UpdatePlanStateAfterExecution 来更新主表状态,避免影响关联数据
if err := planRepo.UpdatePlanStateAfterExecution(appCtx, planID, 0, models.PlanStatusFailed); err != nil {
logger.Errorf("重置计划 #%d 计数并标记为失败时出错: %v", planID, err)
// 这是一个非阻塞性错误,继续处理其他计划
}
}
logger.Info("阶段二:计划主表状态修正完成。")
// 直接调用新的方法来更新计划执行日志状态为失败
if err := executionLogRepo.FailAllIncompletePlanExecutionLogs(appCtx); err != nil {
logger.Errorf("更新所有未完成计划执行日志状态为失败失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 直接调用新的方法来更新任务执行日志状态为取消
if err := executionLogRepo.CancelAllIncompleteTaskExecutionLogs(appCtx); err != nil {
logger.Errorf("更新所有未完成任务执行日志状态为取消失败: %v", err)
// 这是一个非阻塞性错误,继续执行
}
// 清空待执行列表
if err := pendingTaskRepo.ClearAllPendingTasks(appCtx); err != nil {
return fmt.Errorf("清空待执行任务列表失败: %w", err)
}
logger.Info("阶段二:待执行任务和相关日志清理完成。")
logger.Info("过期的任务及日志清理完成。")
return nil
}