ota升级超时检查任务

This commit is contained in:
2025-12-03 17:34:38 +08:00
parent a1deb0011b
commit 9d9b5f801f
9 changed files with 267 additions and 18 deletions

View File

@@ -0,0 +1,141 @@
package task
import (
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// OtaCheckTaskParams 定义了 OTA 检查任务所需的参数。
// 这些参数从任务的 Parameters JSON 字段中解析而来。
type OtaCheckTaskParams struct {
// TimeoutSeconds 定义了任务的全局超时时间(秒)。
// 如果一个升级任务在此时间内没有完成,将被标记为超时。
TimeoutSeconds int `json:"timeout_seconds"`
}
// otaCheckTask 实现了扫描和处理超时 OTA 升级任务的逻辑。
type otaCheckTask struct {
ctx context.Context
onceParse sync.Once
taskLog *models.TaskExecutionLog
params OtaCheckTaskParams
otaRepo repository.OtaRepository
}
// NewOtaCheckTask 创建一个新的 otaCheckTask 实例。
func NewOtaCheckTask(
ctx context.Context,
taskLog *models.TaskExecutionLog,
otaRepo repository.OtaRepository,
) plan.Task {
return &otaCheckTask{
ctx: ctx,
taskLog: taskLog,
otaRepo: otaRepo,
}
}
// Execute 是任务的核心执行逻辑。
func (t *otaCheckTask) Execute(ctx context.Context) error {
taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute")
// 1. 解析并验证任务参数
if err := t.parseParameters(taskCtx); err != nil {
return err
}
logger.Infof("开始执行OTA升级超时检查任务超时设置为 %d 秒...", t.params.TimeoutSeconds)
timeoutDuration := time.Duration(t.params.TimeoutSeconds) * time.Second
timeoutBefore := time.Now().Add(-timeoutDuration)
// 2. 定义需要检查的状态
inProgressStatuses := []models.OTATaskStatus{
models.OTATaskStatusInProgress,
}
// 3. 查找所有超时的、仍在进行中的任务
tasks, err := t.otaRepo.FindTasksByStatusesAndCreationTime(taskCtx, inProgressStatuses, timeoutBefore)
if err != nil {
logger.Errorf("查找超时的OTA升级任务失败: %v", err)
return fmt.Errorf("查找超时的OTA升级任务失败: %w", err)
}
if len(tasks) == 0 {
logger.Info("没有发现超时的OTA升级任务。")
return nil
}
logger.Infof("发现 %d 个超时的OTA升级任务正在逐一处理...", len(tasks))
message := fmt.Sprintf("任务因超过全局超时时间(%d秒)未完成而被系统自动标记为超时。", t.params.TimeoutSeconds)
// 4. 逐一更新任务状态
for _, task := range tasks {
logger.Warnf("正在处理超时的OTA升级任务: ID=%d, 区域主控ID=%d, 目标版本=%s, 创建于=%v",
task.ID, task.AreaControllerID, task.TargetVersion, task.CreatedAt)
task.Status = models.OTATaskStatusTimedOut
task.ErrorMessage = message
completedTime := time.Now()
task.CompletedAt = &completedTime
if err := t.otaRepo.Update(taskCtx, task); err != nil {
// 仅记录错误,不中断整个检查任务,以确保其他超时任务能被处理
logger.Errorf("更新超时的OTA任务 #%d 状态失败: %v", task.ID, err)
}
}
logger.Infof("成功处理了 %d 个超时的OTA升级任务。", len(tasks))
return nil
}
// parseParameters 使用 sync.Once 确保任务参数只被解析一次。
func (t *otaCheckTask) parseParameters(ctx context.Context) error {
logger := logs.TraceLogger(ctx, t.ctx, "parseParameters")
var err error
t.onceParse.Do(func() {
if t.taskLog.Task.Parameters == nil {
err = fmt.Errorf("任务 %d: 缺少参数", t.taskLog.TaskID)
logger.Error(err.Error())
return
}
var params OtaCheckTaskParams
if pErr := t.taskLog.Task.ParseParameters(&params); pErr != nil {
err = fmt.Errorf("任务 %d: 解析参数失败: %w", t.taskLog.TaskID, pErr)
logger.Error(err.Error())
return
}
// 验证参数
if params.TimeoutSeconds <= 0 {
err = fmt.Errorf("任务 %d: 参数 'timeout_seconds' 必须是一个正整数", t.taskLog.TaskID)
logger.Error(err.Error())
return
}
t.params = params
})
return err
}
// OnFailure 定义了当 Execute 方法返回错误时的回滚或清理逻辑。
func (t *otaCheckTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, t.ctx, "OnFailure")
logger.Errorf("OTA升级超时检查任务执行失败, 任务ID: %d: %v", t.taskLog.TaskID, executeErr)
}
// ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表。
func (t *otaCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) {
// 这是一个系统级的任务,不与任何特定设备直接关联。
return []uint32{}, nil
}