From 9a1b148a7a0b231cb648f76fe800f664fe4b3651 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Mon, 8 Dec 2025 19:00:09 +0800 Subject: [PATCH] handleRequestFile --- internal/app/listener/lora_listener.go | 79 +++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/internal/app/listener/lora_listener.go b/internal/app/listener/lora_listener.go index 7228cac..8f4ba93 100644 --- a/internal/app/listener/lora_listener.go +++ b/internal/app/listener/lora_listener.go @@ -14,6 +14,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport" "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file" + gproto "google.golang.org/protobuf/proto" "gorm.io/datatypes" ) @@ -28,6 +29,7 @@ type loraListener struct { sensorDataRepo repository.SensorDataRepository deviceCommandLogRepo repository.DeviceCommandLogRepository otaRepo repository.OtaRepository + comm transport.Communicator } // NewLoRaListener 创建一个新的 loraListener 实例。 @@ -40,6 +42,7 @@ func NewLoRaListener( sensorDataRepo repository.SensorDataRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, otaRepo repository.OtaRepository, + comm transport.Communicator, ) transport.UpstreamHandler { return &loraListener{ selfCtx: logs.AddCompName(ctx, "LoRaListener"), @@ -49,6 +52,7 @@ func NewLoRaListener( sensorDataRepo: sensorDataRepo, deviceCommandLogRepo: deviceCommandLogRepo, otaRepo: otaRepo, + comm: comm, } } @@ -60,13 +64,12 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr switch p := instruction.Payload.(type) { case *proto.Instruction_CollectResult: return l.handleCollectResult(ctx, sourceAddr, p.CollectResult) - case *proto.Instruction_Pong: return l.handlePong(ctx, sourceAddr, p.Pong) - case *proto.Instruction_UpdateStatusReport: return l.handleUpdateStatusReport(ctx, sourceAddr, p.UpdateStatusReport) - + case *proto.Instruction_RequestFile: + return l.handleRequestFile(ctx, sourceAddr, p.RequestFile) default: logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p)) return nil @@ -279,7 +282,6 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui // handlePong 处理设备上报的Pong响应或主动心跳。 func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error { - reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong") logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.GetFirmwareVersion()) @@ -310,15 +312,16 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong * // handleUpdateStatusReport 处理设备上报的 OTA 升级状态报告。 func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr string, report *proto.UpdateStatusReport) error { + reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleUpdateStatusReport") if report == nil { return fmt.Errorf("传入的UpdateStatusReport为nil") } - logger := logs.GetLogger(ctx).With("任务ID", report.GetTaskId(), "来源地址", sourceAddr) + logger.With("任务ID", report.GetTaskId(), "来源地址", sourceAddr) logger.Infow("开始处理 OTA 状态报告", "当前版本", report.GetCurrentVersion(), "状态", report.GetStatus().String()) // 1. 根据 task_id 查找对应的 OTA 任务 - task, err := l.otaRepo.FindByID(ctx, report.GetTaskId()) + task, err := l.otaRepo.FindByID(reqCtx, report.GetTaskId()) if err != nil { logger.Errorw("处理 OTA 状态报告失败:未找到对应的 OTA 任务", "error", err) return fmt.Errorf("处理 OTA 状态报告失败:未找到任务 %d: %w", report.GetTaskId(), err) @@ -374,7 +377,7 @@ func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr } task.ErrorMessage = errorMessage - if err := l.otaRepo.Update(ctx, task); err != nil { + if err := l.otaRepo.Update(reqCtx, task); err != nil { logger.Errorw("更新 OTA 任务状态失败", "error", err) return fmt.Errorf("更新 OTA 任务 %d 状态失败: %w", report.GetTaskId(), err) } @@ -392,3 +395,65 @@ func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr return nil } + +// handleRequestFile 处理设备的文件请求。 +func (l *loraListener) handleRequestFile(ctx context.Context, sourceAddr string, req *proto.RequestFile) error { + reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleRequestFile") + if req == nil { + return fmt.Errorf("传入的RequestFile为nil") + } + + logger.With("任务ID", req.GetTaskId(), "文件路径", req.GetFilepath(), "来源地址", sourceAddr) + logger.Infow("开始处理文件请求") + + // 1. 根据 task_id 查找对应的 OTA 任务 + task, err := l.otaRepo.FindByID(reqCtx, req.GetTaskId()) + if err != nil { + logger.Errorw("处理文件请求失败:未找到对应的 OTA 任务", "error", err) + return fmt.Errorf("处理文件请求失败:未找到任务 %d: %w", req.GetTaskId(), err) + } + + // 2. 根据 AreaControllerID 查找 AreaController,获取 NetworkID + areaController, err := l.areaControllerRepo.FindByID(reqCtx, task.AreaControllerID) + if err != nil { + logger.Errorw("处理文件请求失败:未找到对应的区域主控", "error", err) + return fmt.Errorf("处理文件请求失败:未找到区域主控 %d: %w", task.AreaControllerID, err) + } + + // 3. 构造文件路径并读取文件内容 + subDir := filepath.Join(models.OTADir, fmt.Sprintf("%d", req.GetTaskId())) + content, err := file.ReadTempFile(subDir, req.GetFilepath()) + if err != nil { + logger.Errorw("处理文件请求失败:读取文件失败", "error", err) + return fmt.Errorf("处理文件请求失败:读取文件 %s 失败: %w", req.GetFilepath(), err) + } + + // 4. 构造 FileResponse + fileResp := &proto.FileResponse{ + TaskId: req.GetTaskId(), + Filepath: req.GetFilepath(), + Content: content, + } + + // 5. 构造并序列化 Instruction + instruction := &proto.Instruction{ + Payload: &proto.Instruction_FileResponse{ + FileResponse: fileResp, + }, + } + message, err := gproto.Marshal(instruction) + if err != nil { + logger.Errorw("处理文件请求失败:序列化指令失败", "error", err) + return fmt.Errorf("处理文件请求失败:序列化指令失败: %w", err) + } + + // 6. 发送指令 + _, err = l.comm.Send(reqCtx, areaController.NetworkID, message) + if err != nil { + logger.Errorw("处理文件请求失败:发送指令失败", "error", err) + return fmt.Errorf("处理文件请求失败:发送指令到 %s 失败: %w", areaController.NetworkID, err) + } + + logger.Infow("成功处理文件请求") + return nil +}