实现 StartUpgrade

This commit is contained in:
2025-12-07 17:43:18 +08:00
parent a7022c4c3f
commit 35c19d0495
5 changed files with 192 additions and 25 deletions

View File

@@ -8,6 +8,7 @@ import (
"crypto/sha256"
"crypto/x509"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
"io/fs"
@@ -19,8 +20,8 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file"
"github.com/gibson042/canonicaljson-go"
)
@@ -56,25 +57,157 @@ type ManifestFile struct {
Size int64 `json:"size"` // 文件的大小(字节)
}
// OtaConfig 封装了 OTA 服务所需的可配置参数。
type OtaConfig struct {
DefaultRetryCount uint32 // 默认的设备端文件下载重试次数
DefaultRequestTimeoutS uint32 // 默认的设备端文件下载请求超时时间(秒)
}
// otaServiceImpl 是 OtaService 接口的实现。
type otaServiceImpl struct {
ctx context.Context
otaRepo repository.OtaRepository
deviceRepo repository.DeviceRepository
ctx context.Context
config OtaConfig
otaRepo repository.OtaRepository
generalDeviceService *GeneralDeviceService
}
// NewOtaService 创建一个新的 OtaService 实例。
func NewOtaService(ctx context.Context, otaRepo repository.OtaRepository, deviceRepo repository.DeviceRepository) OtaService {
func NewOtaService(
ctx context.Context,
config OtaConfig,
otaRepo repository.OtaRepository,
generalDeviceService *GeneralDeviceService,
) OtaService {
return &otaServiceImpl{
ctx: ctx,
otaRepo: otaRepo,
deviceRepo: deviceRepo,
ctx: ctx,
config: config,
otaRepo: otaRepo,
generalDeviceService: generalDeviceService,
}
}
// upgradeTask 封装了单次升级任务的所有上下文和操作,以提高代码的可读性和模块化。
type upgradeTask struct {
service *otaServiceImpl
ctx context.Context
logger *logs.Logger
task *models.OTATask
firmwarePath string
tempSubDir string
}
// run 执行核心的升级准备流程。
// 此方法内的所有操作都处于一个文件锁的保护下。
func (t *upgradeTask) run() error {
// 步骤 1: 解压固件
tempDestPath, err := file.CreateTempDir(t.tempSubDir)
if err != nil {
return fmt.Errorf("创建临时目录失败: %w", err)
}
if err := file.Decompress(t.firmwarePath, tempDestPath); err != nil {
return fmt.Errorf("解压固件失败: %w", err)
}
t.logger.Infof("为任务 %d 成功解压固件到 %s", t.task.ID, tempDestPath)
// 步骤 2: 生成、签名并写入 manifest 文件
manifest, err := t.service.generateManifest(t.tempSubDir)
if err != nil {
return fmt.Errorf("生成 manifest 失败: %w", err)
}
if err := t.service.signManifest(manifest); err != nil {
return fmt.Errorf("签名 manifest 失败: %w", err)
}
manifestBytes, err := json.Marshal(manifest)
if err != nil {
return fmt.Errorf("序列化 manifest.json 失败: %w", err)
}
if _, err := file.WriteTempFile(t.tempSubDir, "manifest.json", manifestBytes); err != nil {
return fmt.Errorf("写入 manifest.json 失败: %w", err)
}
t.logger.Infof("为任务 %d 成功生成并签名 manifest.json", t.task.ID)
// 步骤 3: 发送升级指令
manifestMD5 := fmt.Sprintf("%x", md5.Sum(manifestBytes))
prepareReq := &proto.PrepareUpdateReq{
Version: manifest.Version,
TaskId: t.task.ID,
ManifestMd5: manifestMD5,
RetryCount: t.service.config.DefaultRetryCount,
RequestTimeoutSeconds: t.service.config.DefaultRequestTimeoutS,
}
instructionPayload := &proto.Instruction_PrepareUpdateReq{PrepareUpdateReq: prepareReq}
if err := t.service.generalDeviceService.Send(t.ctx, t.task.AreaControllerID, instructionPayload); err != nil {
return fmt.Errorf("发送升级指令失败: %w", err)
}
t.logger.Infof("为任务 %d 成功发送升级指令", t.task.ID)
// 步骤 4: 更新任务状态为“进行中”
t.task.Status = models.OTATaskStatusInProgress
t.task.TargetVersion = manifest.Version // 回填目标版本号
if err := t.service.otaRepo.Update(t.ctx, t.task); err != nil {
return fmt.Errorf("更新任务状态为 '进行中' 失败: %w", err)
}
return nil
}
// rollback 在 run 方法失败时执行清理和状态更新操作。
func (t *upgradeTask) rollback(originalErr error) {
t.logger.Errorf("任务 %d 文件准备阶段失败,执行回滚: %v", t.task.ID, originalErr)
// 更新数据库状态为“准备文件失败”
t.task.Status = models.OTATaskStatusFailedPreparation
t.task.ErrorMessage = fmt.Sprintf("文件准备阶段失败: %v", originalErr)
now := time.Now()
t.task.CompletedAt = &now
if updateErr := t.service.otaRepo.Update(t.ctx, t.task); updateErr != nil {
t.logger.DPanicf("CRITICAL: 任务 %d 回滚失败后,更新其状态也失败了: %v", t.task.ID, updateErr)
}
// 清理临时解压目录
if removeDirErr := file.RemoveTempDir(t.tempSubDir); removeDirErr != nil {
t.logger.Warnf("回滚操作:清理任务 %d 的临时目录 %s 失败: %v", t.task.ID, t.tempSubDir, removeDirErr)
}
// 清理原始固件压缩包
if removeSrcErr := os.Remove(t.firmwarePath); removeSrcErr != nil {
t.logger.Warnf("回滚操作:清理任务 %d 的源固件 %s 失败: %v", t.task.ID, t.firmwarePath, removeSrcErr)
}
}
func (o *otaServiceImpl) StartUpgrade(ctx context.Context, areaControllerID uint32, firmwarePath string) (uint32, error) {
//TODO implement me
panic("implement me")
serviceCtx, logger := logs.Trace(ctx, o.ctx, "StartUpgrade")
// 步骤 1: 预创建数据库记录
task := &models.OTATask{
AreaControllerID: areaControllerID,
Status: models.OTATaskStatusPending,
CreatedAt: time.Now(),
}
if err := o.otaRepo.Create(serviceCtx, task); err != nil {
logger.Errorf("预创建 OTA 任务记录失败: %v", err)
return 0, fmt.Errorf("预创建 OTA 任务记录失败: %w", err)
}
logger.Infof("成功预创建 OTA 任务记录, ID: %d", task.ID)
// 步骤 2: 初始化升级任务执行器
upgrade := &upgradeTask{
service: o,
ctx: serviceCtx,
logger: logger,
task: task,
firmwarePath: firmwarePath,
tempSubDir: filepath.Join(models.OTADir, fmt.Sprintf("%d", task.ID)),
}
// 步骤 3: 在文件锁的保护下,原子化地执行升级准备流程
if err := file.ExecuteWithLock(upgrade.run, upgrade.rollback); err != nil {
// 此处的错误已在 rollback 中处理和记录,这里只向调用方返回失败信号
logger.Errorf("OTA 任务 %d 未能成功启动: %v", task.ID, err)
return 0, err
}
logger.Infof("OTA 升级任务 %d 已成功启动", task.ID)
return task.ID, nil
}
func (o *otaServiceImpl) GetUpgradeProgress(ctx context.Context, taskID uint32) (executed, total uint32, CurrentStage models.OTATaskStatus, err error) {
@@ -134,7 +267,7 @@ func (o *otaServiceImpl) generateManifest(packageSubDir string) (*Manifest, erro
if err != nil {
return err
}
if d.Name() == "version" {
if d.Name() == "version" || d.Name() == "manifest.json" {
return nil
}