Files
pig-farm-controller/internal/domain/task/ota_check_task.go
2025-12-03 17:34:38 +08:00

142 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"context"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
)
// OtaCheckTaskParams 定义了 OTA 检查任务所需的参数。
// 这些参数从任务的 Parameters JSON 字段中解析而来。
type OtaCheckTaskParams struct {
// TimeoutSeconds 定义了任务的全局超时时间(秒)。
// 如果一个升级任务在此时间内没有完成,将被标记为超时。
TimeoutSeconds int `json:"timeout_seconds"`
}
// otaCheckTask 实现了扫描和处理超时 OTA 升级任务的逻辑。
type otaCheckTask struct {
ctx context.Context
onceParse sync.Once
taskLog *models.TaskExecutionLog
params OtaCheckTaskParams
otaRepo repository.OtaRepository
}
// NewOtaCheckTask 创建一个新的 otaCheckTask 实例。
func NewOtaCheckTask(
ctx context.Context,
taskLog *models.TaskExecutionLog,
otaRepo repository.OtaRepository,
) plan.Task {
return &otaCheckTask{
ctx: ctx,
taskLog: taskLog,
otaRepo: otaRepo,
}
}
// Execute 是任务的核心执行逻辑。
func (t *otaCheckTask) Execute(ctx context.Context) error {
taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute")
// 1. 解析并验证任务参数
if err := t.parseParameters(taskCtx); err != nil {
return err
}
logger.Infof("开始执行OTA升级超时检查任务超时设置为 %d 秒...", t.params.TimeoutSeconds)
timeoutDuration := time.Duration(t.params.TimeoutSeconds) * time.Second
timeoutBefore := time.Now().Add(-timeoutDuration)
// 2. 定义需要检查的状态
inProgressStatuses := []models.OTATaskStatus{
models.OTATaskStatusInProgress,
}
// 3. 查找所有超时的、仍在进行中的任务
tasks, err := t.otaRepo.FindTasksByStatusesAndCreationTime(taskCtx, inProgressStatuses, timeoutBefore)
if err != nil {
logger.Errorf("查找超时的OTA升级任务失败: %v", err)
return fmt.Errorf("查找超时的OTA升级任务失败: %w", err)
}
if len(tasks) == 0 {
logger.Info("没有发现超时的OTA升级任务。")
return nil
}
logger.Infof("发现 %d 个超时的OTA升级任务正在逐一处理...", len(tasks))
message := fmt.Sprintf("任务因超过全局超时时间(%d秒)未完成而被系统自动标记为超时。", t.params.TimeoutSeconds)
// 4. 逐一更新任务状态
for _, task := range tasks {
logger.Warnf("正在处理超时的OTA升级任务: ID=%d, 区域主控ID=%d, 目标版本=%s, 创建于=%v",
task.ID, task.AreaControllerID, task.TargetVersion, task.CreatedAt)
task.Status = models.OTATaskStatusTimedOut
task.ErrorMessage = message
completedTime := time.Now()
task.CompletedAt = &completedTime
if err := t.otaRepo.Update(taskCtx, task); err != nil {
// 仅记录错误,不中断整个检查任务,以确保其他超时任务能被处理
logger.Errorf("更新超时的OTA任务 #%d 状态失败: %v", task.ID, err)
}
}
logger.Infof("成功处理了 %d 个超时的OTA升级任务。", len(tasks))
return nil
}
// parseParameters 使用 sync.Once 确保任务参数只被解析一次。
func (t *otaCheckTask) parseParameters(ctx context.Context) error {
logger := logs.TraceLogger(ctx, t.ctx, "parseParameters")
var err error
t.onceParse.Do(func() {
if t.taskLog.Task.Parameters == nil {
err = fmt.Errorf("任务 %d: 缺少参数", t.taskLog.TaskID)
logger.Error(err.Error())
return
}
var params OtaCheckTaskParams
if pErr := t.taskLog.Task.ParseParameters(&params); pErr != nil {
err = fmt.Errorf("任务 %d: 解析参数失败: %w", t.taskLog.TaskID, pErr)
logger.Error(err.Error())
return
}
// 验证参数
if params.TimeoutSeconds <= 0 {
err = fmt.Errorf("任务 %d: 参数 'timeout_seconds' 必须是一个正整数", t.taskLog.TaskID)
logger.Error(err.Error())
return
}
t.params = params
})
return err
}
// OnFailure 定义了当 Execute 方法返回错误时的回滚或清理逻辑。
func (t *otaCheckTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, t.ctx, "OnFailure")
logger.Errorf("OTA升级超时检查任务执行失败, 任务ID: %d: %v", t.taskLog.TaskID, executeErr)
}
// ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表。
func (t *otaCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) {
// 这是一个系统级的任务,不与任何特定设备直接关联。
return []uint32{}, nil
}