handleUpdateStatusReport

This commit is contained in:
2025-12-08 15:17:57 +08:00
parent 5c6290432a
commit aac9ecf371

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"path/filepath"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
@@ -12,6 +13,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file"
"gorm.io/datatypes"
)
@@ -25,6 +27,7 @@ type loraListener struct {
deviceRepo repository.DeviceRepository
sensorDataRepo repository.SensorDataRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
otaRepo repository.OtaRepository
}
// NewLoRaListener 创建一个新的 loraListener 实例。
@@ -36,6 +39,7 @@ func NewLoRaListener(
deviceRepo repository.DeviceRepository,
sensorDataRepo repository.SensorDataRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
otaRepo repository.OtaRepository,
) transport.UpstreamHandler {
return &loraListener{
selfCtx: logs.AddCompName(ctx, "LoRaListener"),
@@ -44,6 +48,7 @@ func NewLoRaListener(
deviceRepo: deviceRepo,
sensorDataRepo: sensorDataRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
otaRepo: otaRepo,
}
}
@@ -59,6 +64,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr
case *proto.Instruction_Pong:
return l.handlePong(ctx, sourceAddr, p.Pong)
case *proto.Instruction_UpdateStatusReport:
return l.handleUpdateStatusReport(ctx, sourceAddr, p.UpdateStatusReport)
default:
logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p))
return nil
@@ -140,9 +148,9 @@ func (l *loraListener) handleCollectResult(ctx context.Context, sourceAddr strin
return fmt.Errorf("传入的CollectResult为nil")
}
correlationID := collectResp.CorrelationId
correlationID := collectResp.GetCorrelationId()
logger := logs.GetLogger(ctx).With("correlationID", correlationID, "来源地址", sourceAddr)
logger.Infow("开始处理采集响应", "数据点数量", len(collectResp.Values))
logger.Infow("开始处理采集响应", "数据点数量", len(collectResp.GetValues()))
// 1. 查找区域主控
areaController, err := l.areaControllerRepo.FindByNetworkID(ctx, sourceAddr)
@@ -167,7 +175,7 @@ func (l *loraListener) handleCollectResult(ctx context.Context, sourceAddr strin
// 4. 匹配数据并存入数据库
deviceIDs := pendingReq.CommandMetadata
values := collectResp.Values
values := collectResp.GetValues()
if len(deviceIDs) != len(values) {
err := fmt.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值", len(deviceIDs), len(values))
// 即使数量不匹配,也尝试更新状态为完成,以防止请求永远 pending
@@ -271,8 +279,9 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui
// handlePong 处理设备上报的Pong响应或主动心跳。
func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong")
logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.FirmwareVersion)
logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.GetFirmwareVersion())
// 1. 查找区域主控
areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr)
@@ -281,13 +290,13 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *
}
// 2. 如果 Pong 中包含版本号,则更新
if pong.FirmwareVersion != "" {
err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, pong.FirmwareVersion)
if firmwareVersion := pong.GetFirmwareVersion(); firmwareVersion != "" {
err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, firmwareVersion)
if err != nil {
// 只记录错误,不中断流程,因为还要记录在线状态
logger.Errorw("处理Pong时更新固件版本失败", "主控ID", areaController.ID, "error", err)
} else {
logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", pong.FirmwareVersion)
logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", firmwareVersion)
}
}
@@ -298,3 +307,88 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *
return nil
}
// handleUpdateStatusReport 处理设备上报的 OTA 升级状态报告。
func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr string, report *proto.UpdateStatusReport) error {
if report == nil {
return fmt.Errorf("传入的UpdateStatusReport为nil")
}
logger := logs.GetLogger(ctx).With("任务ID", report.GetTaskId(), "来源地址", sourceAddr)
logger.Infow("开始处理 OTA 状态报告", "当前版本", report.GetCurrentVersion(), "状态", report.GetStatus().String())
// 1. 根据 task_id 查找对应的 OTA 任务
task, err := l.otaRepo.FindByID(ctx, report.GetTaskId())
if err != nil {
logger.Errorw("处理 OTA 状态报告失败:未找到对应的 OTA 任务", "error", err)
return fmt.Errorf("处理 OTA 状态报告失败:未找到任务 %d: %w", report.GetTaskId(), err)
}
// 2. 检查任务是否已处于终态,防止重复处理
if task.IsOver() {
logger.Warnw("OTA 任务已处于终态,忽略本次状态报告", "当前状态", task.Status)
return nil
}
// 3. 根据报告状态更新 OTA 任务
now := time.Now()
task.CompletedAt = &now
task.FinalReportedVersion = report.GetCurrentVersion()
task.FailedFilePath = report.GetFailedFile()
// 初始化错误信息,优先使用设备上报的原始信息
errorMessage := report.GetErrorMessage()
switch report.GetStatus() {
case proto.UpdateStatusReport_SUCCESS:
task.Status = models.OTATaskStatusSuccess
case proto.UpdateStatusReport_SUCCESS_ALREADY_UP_TO_DATE:
task.Status = models.OTATaskStatusAlreadyUpToDate
case proto.UpdateStatusReport_FAILED_PRE_CHECK:
task.Status = models.OTATaskStatusFailedPreCheck
case proto.UpdateStatusReport_FAILED_MANIFEST_VERIFY, proto.UpdateStatusReport_FAILED_DOWNLOAD:
task.Status = models.OTATaskStatusFailedDownload
case proto.UpdateStatusReport_FAILED_ROLLED_BACK:
task.Status = models.OTATaskStatusFailedRollback
case proto.UpdateStatusReport_FAILED_TIMEOUT:
task.Status = models.OTATaskStatusPlatformError
if errorMessage != "" {
errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了平台推断的超时状态,可能存在逻辑错误。(设备原始报错: %s)", errorMessage)
} else {
errorMessage = "平台诊断信息: 设备上报了平台推断的超时状态,可能存在逻辑错误。"
}
case proto.UpdateStatusReport_STATUS_UNSPECIFIED:
task.Status = models.OTATaskStatusPlatformError
if errorMessage != "" {
errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未指定状态。(设备原始报错: %s)", errorMessage)
} else {
errorMessage = "平台诊断信息: 设备上报了未指定状态。"
}
default:
task.Status = models.OTATaskStatusPlatformError
if errorMessage != "" {
errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未知状态 '%s'。(设备原始报错: %s)", report.GetStatus().String(), errorMessage)
} else {
errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未知状态 '%s'。", report.GetStatus().String())
}
}
task.ErrorMessage = errorMessage
if err := l.otaRepo.Update(ctx, task); err != nil {
logger.Errorw("更新 OTA 任务状态失败", "error", err)
return fmt.Errorf("更新 OTA 任务 %d 状态失败: %w", report.GetTaskId(), err)
}
logger.Infow("成功更新 OTA 任务状态", "新状态", task.Status)
// 4. 如果任务已完成(成功或失败),则清理临时文件目录
if task.IsOver() {
dirToRemove := filepath.Join(models.OTADir, fmt.Sprintf("%d", report.GetTaskId()))
if err := file.RemoveDir(dirToRemove); err != nil {
logger.Warnw("清理 OTA 任务临时文件目录失败", "目录", dirToRemove, "error", err)
} else {
logger.Infow("成功清理 OTA 任务临时文件目录", "目录", dirToRemove)
}
}
return nil
}