package task import ( "context" "fmt" "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" ) // OtaCheckTaskParams 定义了 OTA 检查任务所需的参数。 // 这些参数从任务的 Parameters JSON 字段中解析而来。 type OtaCheckTaskParams struct { // TimeoutSeconds 定义了任务的全局超时时间(秒)。 // 如果一个升级任务在此时间内没有完成,将被标记为超时。 TimeoutSeconds int `json:"timeout_seconds"` } // otaCheckTask 实现了扫描和处理超时 OTA 升级任务的逻辑。 type otaCheckTask struct { ctx context.Context onceParse sync.Once taskLog *models.TaskExecutionLog params OtaCheckTaskParams otaRepo repository.OtaRepository } // NewOtaCheckTask 创建一个新的 otaCheckTask 实例。 func NewOtaCheckTask( ctx context.Context, taskLog *models.TaskExecutionLog, otaRepo repository.OtaRepository, ) plan.Task { return &otaCheckTask{ ctx: ctx, taskLog: taskLog, otaRepo: otaRepo, } } // Execute 是任务的核心执行逻辑。 func (t *otaCheckTask) Execute(ctx context.Context) error { taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute") // 1. 解析并验证任务参数 if err := t.parseParameters(taskCtx); err != nil { return err } logger.Infof("开始执行OTA升级超时检查任务,超时设置为 %d 秒...", t.params.TimeoutSeconds) timeoutDuration := time.Duration(t.params.TimeoutSeconds) * time.Second timeoutBefore := time.Now().Add(-timeoutDuration) // 2. 定义需要检查的状态 inProgressStatuses := []models.OTATaskStatus{ models.OTATaskStatusInProgress, } // 3. 查找所有超时的、仍在进行中的任务 tasks, err := t.otaRepo.FindTasksByStatusesAndCreationTime(taskCtx, inProgressStatuses, timeoutBefore) if err != nil { logger.Errorf("查找超时的OTA升级任务失败: %v", err) return fmt.Errorf("查找超时的OTA升级任务失败: %w", err) } if len(tasks) == 0 { logger.Info("没有发现超时的OTA升级任务。") return nil } logger.Infof("发现 %d 个超时的OTA升级任务,正在逐一处理...", len(tasks)) message := fmt.Sprintf("任务因超过全局超时时间(%d秒)未完成而被系统自动标记为超时。", t.params.TimeoutSeconds) // 4. 逐一更新任务状态 for _, task := range tasks { logger.Warnf("正在处理超时的OTA升级任务: ID=%d, 区域主控ID=%d, 目标版本=%s, 创建于=%v", task.ID, task.AreaControllerID, task.TargetVersion, task.CreatedAt) task.Status = models.OTATaskStatusTimedOut task.ErrorMessage = message completedTime := time.Now() task.CompletedAt = &completedTime if err := t.otaRepo.Update(taskCtx, task); err != nil { // 仅记录错误,不中断整个检查任务,以确保其他超时任务能被处理 logger.Errorf("更新超时的OTA任务 #%d 状态失败: %v", task.ID, err) } } logger.Infof("成功处理了 %d 个超时的OTA升级任务。", len(tasks)) return nil } // parseParameters 使用 sync.Once 确保任务参数只被解析一次。 func (t *otaCheckTask) parseParameters(ctx context.Context) error { logger := logs.TraceLogger(ctx, t.ctx, "parseParameters") var err error t.onceParse.Do(func() { if t.taskLog.Task.Parameters == nil { err = fmt.Errorf("任务 %d: 缺少参数", t.taskLog.TaskID) logger.Error(err.Error()) return } var params OtaCheckTaskParams if pErr := t.taskLog.Task.ParseParameters(¶ms); pErr != nil { err = fmt.Errorf("任务 %d: 解析参数失败: %w", t.taskLog.TaskID, pErr) logger.Error(err.Error()) return } // 验证参数 if params.TimeoutSeconds <= 0 { err = fmt.Errorf("任务 %d: 参数 'timeout_seconds' 必须是一个正整数", t.taskLog.TaskID) logger.Error(err.Error()) return } t.params = params }) return err } // OnFailure 定义了当 Execute 方法返回错误时的回滚或清理逻辑。 func (t *otaCheckTask) OnFailure(ctx context.Context, executeErr error) { logger := logs.TraceLogger(ctx, t.ctx, "OnFailure") logger.Errorf("OTA升级超时检查任务执行失败, 任务ID: %d: %v", t.taskLog.TaskID, executeErr) } // ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表。 func (t *otaCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { // 这是一个系统级的任务,不与任何特定设备直接关联。 return []uint32{}, nil }