handleRequestFile

This commit is contained in:
2025-12-08 19:00:09 +08:00
parent aac9ecf371
commit 9a1b148a7a

View File

@@ -14,6 +14,7 @@ import (
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file"
gproto "google.golang.org/protobuf/proto"
"gorm.io/datatypes"
)
@@ -28,6 +29,7 @@ type loraListener struct {
sensorDataRepo repository.SensorDataRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
otaRepo repository.OtaRepository
comm transport.Communicator
}
// NewLoRaListener 创建一个新的 loraListener 实例。
@@ -40,6 +42,7 @@ func NewLoRaListener(
sensorDataRepo repository.SensorDataRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
otaRepo repository.OtaRepository,
comm transport.Communicator,
) transport.UpstreamHandler {
return &loraListener{
selfCtx: logs.AddCompName(ctx, "LoRaListener"),
@@ -49,6 +52,7 @@ func NewLoRaListener(
sensorDataRepo: sensorDataRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
otaRepo: otaRepo,
comm: comm,
}
}
@@ -60,13 +64,12 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr
switch p := instruction.Payload.(type) {
case *proto.Instruction_CollectResult:
return l.handleCollectResult(ctx, sourceAddr, p.CollectResult)
case *proto.Instruction_Pong:
return l.handlePong(ctx, sourceAddr, p.Pong)
case *proto.Instruction_UpdateStatusReport:
return l.handleUpdateStatusReport(ctx, sourceAddr, p.UpdateStatusReport)
case *proto.Instruction_RequestFile:
return l.handleRequestFile(ctx, sourceAddr, p.RequestFile)
default:
logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p))
return nil
@@ -279,7 +282,6 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui
// handlePong 处理设备上报的Pong响应或主动心跳。
func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong")
logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.GetFirmwareVersion())
@@ -310,15 +312,16 @@ func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *
// handleUpdateStatusReport 处理设备上报的 OTA 升级状态报告。
func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr string, report *proto.UpdateStatusReport) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleUpdateStatusReport")
if report == nil {
return fmt.Errorf("传入的UpdateStatusReport为nil")
}
logger := logs.GetLogger(ctx).With("任务ID", report.GetTaskId(), "来源地址", sourceAddr)
logger.With("任务ID", report.GetTaskId(), "来源地址", sourceAddr)
logger.Infow("开始处理 OTA 状态报告", "当前版本", report.GetCurrentVersion(), "状态", report.GetStatus().String())
// 1. 根据 task_id 查找对应的 OTA 任务
task, err := l.otaRepo.FindByID(ctx, report.GetTaskId())
task, err := l.otaRepo.FindByID(reqCtx, report.GetTaskId())
if err != nil {
logger.Errorw("处理 OTA 状态报告失败:未找到对应的 OTA 任务", "error", err)
return fmt.Errorf("处理 OTA 状态报告失败:未找到任务 %d: %w", report.GetTaskId(), err)
@@ -374,7 +377,7 @@ func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr
}
task.ErrorMessage = errorMessage
if err := l.otaRepo.Update(ctx, task); err != nil {
if err := l.otaRepo.Update(reqCtx, task); err != nil {
logger.Errorw("更新 OTA 任务状态失败", "error", err)
return fmt.Errorf("更新 OTA 任务 %d 状态失败: %w", report.GetTaskId(), err)
}
@@ -392,3 +395,65 @@ func (l *loraListener) handleUpdateStatusReport(ctx context.Context, sourceAddr
return nil
}
// handleRequestFile 处理设备的文件请求。
func (l *loraListener) handleRequestFile(ctx context.Context, sourceAddr string, req *proto.RequestFile) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleRequestFile")
if req == nil {
return fmt.Errorf("传入的RequestFile为nil")
}
logger.With("任务ID", req.GetTaskId(), "文件路径", req.GetFilepath(), "来源地址", sourceAddr)
logger.Infow("开始处理文件请求")
// 1. 根据 task_id 查找对应的 OTA 任务
task, err := l.otaRepo.FindByID(reqCtx, req.GetTaskId())
if err != nil {
logger.Errorw("处理文件请求失败:未找到对应的 OTA 任务", "error", err)
return fmt.Errorf("处理文件请求失败:未找到任务 %d: %w", req.GetTaskId(), err)
}
// 2. 根据 AreaControllerID 查找 AreaController获取 NetworkID
areaController, err := l.areaControllerRepo.FindByID(reqCtx, task.AreaControllerID)
if err != nil {
logger.Errorw("处理文件请求失败:未找到对应的区域主控", "error", err)
return fmt.Errorf("处理文件请求失败:未找到区域主控 %d: %w", task.AreaControllerID, err)
}
// 3. 构造文件路径并读取文件内容
subDir := filepath.Join(models.OTADir, fmt.Sprintf("%d", req.GetTaskId()))
content, err := file.ReadTempFile(subDir, req.GetFilepath())
if err != nil {
logger.Errorw("处理文件请求失败:读取文件失败", "error", err)
return fmt.Errorf("处理文件请求失败:读取文件 %s 失败: %w", req.GetFilepath(), err)
}
// 4. 构造 FileResponse
fileResp := &proto.FileResponse{
TaskId: req.GetTaskId(),
Filepath: req.GetFilepath(),
Content: content,
}
// 5. 构造并序列化 Instruction
instruction := &proto.Instruction{
Payload: &proto.Instruction_FileResponse{
FileResponse: fileResp,
},
}
message, err := gproto.Marshal(instruction)
if err != nil {
logger.Errorw("处理文件请求失败:序列化指令失败", "error", err)
return fmt.Errorf("处理文件请求失败:序列化指令失败: %w", err)
}
// 6. 发送指令
_, err = l.comm.Send(reqCtx, areaController.NetworkID, message)
if err != nil {
logger.Errorw("处理文件请求失败:发送指令失败", "error", err)
return fmt.Errorf("处理文件请求失败:发送指令到 %s 失败: %w", areaController.NetworkID, err)
}
logger.Infow("成功处理文件请求")
return nil
}