diff --git a/internal/app/listener/lora_listener.go b/internal/app/listener/lora_listener.go index aee4f67..7228cac 100644 --- a/internal/app/listener/lora_listener.go +++ b/internal/app/listener/lora_listener.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math" + "path/filepath" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" @@ -12,6 +13,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file" "gorm.io/datatypes" ) @@ -25,6 +27,7 @@ type loraListener struct { deviceRepo repository.DeviceRepository sensorDataRepo repository.SensorDataRepository deviceCommandLogRepo repository.DeviceCommandLogRepository + otaRepo repository.OtaRepository } // NewLoRaListener 创建一个新的 loraListener 实例。 @@ -36,6 +39,7 @@ func NewLoRaListener( deviceRepo repository.DeviceRepository, sensorDataRepo repository.SensorDataRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, + otaRepo repository.OtaRepository, ) transport.UpstreamHandler { return &loraListener{ selfCtx: logs.AddCompName(ctx, "LoRaListener"), @@ -44,6 +48,7 @@ func NewLoRaListener( deviceRepo: deviceRepo, sensorDataRepo: sensorDataRepo, deviceCommandLogRepo: deviceCommandLogRepo, + otaRepo: otaRepo, } } @@ -59,6 +64,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr case *proto.Instruction_Pong: return l.handlePong(ctx, sourceAddr, p.Pong) + case *proto.Instruction_UpdateStatusReport: + return l.handleUpdateStatusReport(ctx, sourceAddr, p.UpdateStatusReport) + default: logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p)) return nil @@ -140,9 +148,9 @@ func (l *loraListener) handleCollectResult(ctx context.Context, sourceAddr strin return fmt.Errorf("传入的CollectResult为nil") } - correlationID := collectResp.CorrelationId + correlationID := collectResp.GetCorrelationId() logger := logs.GetLogger(ctx).With("correlationID", correlationID, "来源地址", sourceAddr) - logger.Infow("开始处理采集响应", "数据点数量", len(collectResp.Values)) + logger.Infow("开始处理采集响应", "数据点数量", len(collectResp.GetValues())) // 1. 查找区域主控 areaController, err := l.areaControllerRepo.FindByNetworkID(ctx, sourceAddr) @@ -167,7 +175,7 @@ func (l *loraListener) handleCollectResult(ctx context.Context, sourceAddr strin // 4. 匹配数据并存入数据库 deviceIDs := pendingReq.CommandMetadata - values := collectResp.Values + values := collectResp.GetValues() if len(deviceIDs) != len(values) { err := fmt.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值", len(deviceIDs), len(values)) // 即使数量不匹配,也尝试更新状态为完成,以防止请求永远 pending @@ -271,8 +279,9 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui // handlePong 处理设备上报的Pong响应或主动心跳。 func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error { + reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong") - logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.FirmwareVersion) + logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.GetFirmwareVersion()) // 1. 查找区域主控 areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr) @@ -281,13 +290,13 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong * } // 2. 如果 Pong 中包含版本号,则更新 - if pong.FirmwareVersion != "" { - err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, pong.FirmwareVersion) + if firmwareVersion := pong.GetFirmwareVersion(); firmwareVersion != "" { + err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, firmwareVersion) if err != nil { // 只记录错误,不中断流程,因为还要记录在线状态 logger.Errorw("处理Pong时更新固件版本失败", "主控ID", areaController.ID, "error", err) } else { - logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", pong.FirmwareVersion) + logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", firmwareVersion) } } @@ -298,3 +307,88 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong * return nil } + +// handleUpdateStatusReport 处理设备上报的 OTA 升级状态报告。 +func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr string, report *proto.UpdateStatusReport) error { + if report == nil { + return fmt.Errorf("传入的UpdateStatusReport为nil") + } + + logger := logs.GetLogger(ctx).With("任务ID", report.GetTaskId(), "来源地址", sourceAddr) + logger.Infow("开始处理 OTA 状态报告", "当前版本", report.GetCurrentVersion(), "状态", report.GetStatus().String()) + + // 1. 根据 task_id 查找对应的 OTA 任务 + task, err := l.otaRepo.FindByID(ctx, report.GetTaskId()) + if err != nil { + logger.Errorw("处理 OTA 状态报告失败:未找到对应的 OTA 任务", "error", err) + return fmt.Errorf("处理 OTA 状态报告失败:未找到任务 %d: %w", report.GetTaskId(), err) + } + + // 2. 检查任务是否已处于终态,防止重复处理 + if task.IsOver() { + logger.Warnw("OTA 任务已处于终态,忽略本次状态报告", "当前状态", task.Status) + return nil + } + + // 3. 根据报告状态更新 OTA 任务 + now := time.Now() + task.CompletedAt = &now + task.FinalReportedVersion = report.GetCurrentVersion() + task.FailedFilePath = report.GetFailedFile() + + // 初始化错误信息,优先使用设备上报的原始信息 + errorMessage := report.GetErrorMessage() + + switch report.GetStatus() { + case proto.UpdateStatusReport_SUCCESS: + task.Status = models.OTATaskStatusSuccess + case proto.UpdateStatusReport_SUCCESS_ALREADY_UP_TO_DATE: + task.Status = models.OTATaskStatusAlreadyUpToDate + case proto.UpdateStatusReport_FAILED_PRE_CHECK: + task.Status = models.OTATaskStatusFailedPreCheck + case proto.UpdateStatusReport_FAILED_MANIFEST_VERIFY, proto.UpdateStatusReport_FAILED_DOWNLOAD: + task.Status = models.OTATaskStatusFailedDownload + case proto.UpdateStatusReport_FAILED_ROLLED_BACK: + task.Status = models.OTATaskStatusFailedRollback + case proto.UpdateStatusReport_FAILED_TIMEOUT: + task.Status = models.OTATaskStatusPlatformError + if errorMessage != "" { + errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了平台推断的超时状态,可能存在逻辑错误。(设备原始报错: %s)", errorMessage) + } else { + errorMessage = "平台诊断信息: 设备上报了平台推断的超时状态,可能存在逻辑错误。" + } + case proto.UpdateStatusReport_STATUS_UNSPECIFIED: + task.Status = models.OTATaskStatusPlatformError + if errorMessage != "" { + errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未指定状态。(设备原始报错: %s)", errorMessage) + } else { + errorMessage = "平台诊断信息: 设备上报了未指定状态。" + } + default: + task.Status = models.OTATaskStatusPlatformError + if errorMessage != "" { + errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未知状态 '%s'。(设备原始报错: %s)", report.GetStatus().String(), errorMessage) + } else { + errorMessage = fmt.Sprintf("平台诊断信息: 设备上报了未知状态 '%s'。", report.GetStatus().String()) + } + } + task.ErrorMessage = errorMessage + + if err := l.otaRepo.Update(ctx, task); err != nil { + logger.Errorw("更新 OTA 任务状态失败", "error", err) + return fmt.Errorf("更新 OTA 任务 %d 状态失败: %w", report.GetTaskId(), err) + } + logger.Infow("成功更新 OTA 任务状态", "新状态", task.Status) + + // 4. 如果任务已完成(成功或失败),则清理临时文件目录 + if task.IsOver() { + dirToRemove := filepath.Join(models.OTADir, fmt.Sprintf("%d", report.GetTaskId())) + if err := file.RemoveDir(dirToRemove); err != nil { + logger.Warnw("清理 OTA 任务临时文件目录失败", "目录", dirToRemove, "error", err) + } else { + logger.Infow("成功清理 OTA 任务临时文件目录", "目录", dirToRemove) + } + } + + return nil +}