提供lora公共逻辑
This commit is contained in:
@@ -5,8 +5,8 @@
|
||||
统一项目当前并存的两种 LoRa 通信模式(基于 ChirpStack API 和基于串口透传),使其在架构层面遵循相同的接口和设计模式。最终实现:
|
||||
|
||||
- **业务逻辑统一**:所有上行业务处理逻辑集中在一个地方,与具体的通信方式无关。
|
||||
- **发送接口统一**:上层服务(如 `DeviceService`)使用同一个接口发送下行指令,无需关心底层实现。
|
||||
- **架构清晰**:明确划分基础设施层(负责传输)和应用层(负责业务)的职责。
|
||||
- **发送接口统一**:上层服务使用同一个接口发送下行指令,无需关心底层实现。
|
||||
- **架构清晰**:明确划分基础设施层(负责传输)和应用层(负责业务)的职责,并确保正确的依赖方向 (`app` -> `infra`)。
|
||||
- **高扩展性**:未来支持新的通信方式时,只需添加新的“适配器”,而无需改动核心业务代码。
|
||||
|
||||
## 2. 背景与问题分析
|
||||
@@ -29,25 +29,23 @@
|
||||
### 2.3. 问题
|
||||
|
||||
- **业务逻辑分散**:处理 `CollectResult` 的业务逻辑在 `chirp_stack.go` 和 `lora_mesh_uart_passthrough_transport.go` 中都存在,造成代码重复和维护困难。
|
||||
- **职责不清**:`lora_mesh_uart_passthrough_transport.go` 同时承担了基础设施(串口读写、分片重组)和应用(处理采集结果、写数据库)两种职责,违反了分层架构原则。
|
||||
- **抽象缺失**:两种模式没有统一的接口,导致上层代码如果需要切换模式,将产生大量修改。
|
||||
- **职责不清**:`lora_mesh_uart_passthrough_transport.go` 同时承担了基础设施(串口读写)和应用(处理业务)两种职责。
|
||||
- **依赖关系混乱**:为了让 `infra` 层的串口模块能调用业务逻辑,可能会导致 `infra` 层反向依赖 `app` 层,破坏了项目的核心架构原则。
|
||||
|
||||
## 3. 统一架构设计方案
|
||||
|
||||
### 3.1. 核心思想
|
||||
|
||||
采用 **适配器模式 (Adapter Pattern)** 和 **依赖倒置原则 (Dependency Inversion Principle)**。
|
||||
采用 **端口与适配器模式 (Ports and Adapters Pattern)**,严格遵守 **依赖倒置原则**。
|
||||
|
||||
- 将所有上行业务逻辑抽离到**统一的业务处理器**中。
|
||||
- 将 `chirp_stack.go` 和 `lora_mesh_uart_passthrough_transport.go` 的接收部分重构为**适配器**。它们的唯一职责是将各自协议的数据,适配并转发给统一的业务处理器。
|
||||
- 上层依赖抽象接口,而不是具体实现。
|
||||
- **端口 (Port)**:在 `infra` 层定义一个 `UpstreamHandler` 接口。这个接口是 `infra` 层向上层暴露的“端口”,它规定了上行业务处理器必须满足的协约。
|
||||
- **适配器 (Adapter)**:在 `app` 层创建一个 `LoRaListener` 作为“适配器”,它实现 `infra` 层定义的 `UpstreamHandler` 接口,并封装所有核心业务处理逻辑。
|
||||
- **依赖注入**:在系统启动时,将 `app` 层的 `LoRaListener` 实例注入到需要它的 `infra` 层组件中。
|
||||
|
||||
### 3.2. 统一接口定义
|
||||
|
||||
#### 3.2.1. 发送接口 (已存在,无需修改)
|
||||
|
||||
此接口设计良好,上层业务通过它下发指令,无需关心底层实现。
|
||||
|
||||
```go
|
||||
// file: internal/infra/transport/transport.go
|
||||
package transport
|
||||
@@ -57,13 +55,13 @@ type Communicator interface {
|
||||
}
|
||||
```
|
||||
|
||||
#### 3.2.2. 接收处理接口 (新定义)
|
||||
#### 3.2.2. 接收处理接口 (端口定义)
|
||||
|
||||
此接口是应用层对所有上行数据处理能力的抽象。
|
||||
此接口定义了 `infra` 层对上行业务处理器的期望,是 `infra` 层向上层暴露的“端口”。
|
||||
|
||||
```go
|
||||
// file: internal/app/listener/handler.go
|
||||
package listener
|
||||
// file: internal/infra/transport/transport.go
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -71,48 +69,43 @@ import (
|
||||
)
|
||||
|
||||
// UpstreamHandler 定义了处理所有来源的上行数据的统一协约。
|
||||
// 任何实现了上行消息监听的基础设施(如串口、MQTT客户端),都应该在收到消息后调用此接口的实现者。
|
||||
// 这样,基础设施层只负责“接收和解析”,而将“业务处理”的控制权交给了上层。
|
||||
type UpstreamHandler interface {
|
||||
// HandleInstruction 处理来自设备的、已解析为Instruction的业务指令。
|
||||
HandleInstruction(ctx context.Context, sourceAddr string, instruction *proto.Instruction)
|
||||
HandleInstruction(ctx context.Context, sourceAddr string, instruction *proto.Instruction) error
|
||||
|
||||
// HandleStatus 处理非业务指令的设备状态更新,例如信号强度、电量等。
|
||||
HandleStatus(ctx context.Context, sourceAddr string, status map[string]interface{})
|
||||
HandleStatus(ctx context.Context, sourceAddr string, status map[string]interface{}) error
|
||||
}
|
||||
```
|
||||
|
||||
### 3.3. 组件职责划分 (重构后)
|
||||
|
||||
#### 3.3.1. 统一业务处理器 (应用层核心)
|
||||
#### 3.3.1. 统一业务处理器 (应用层适配器)
|
||||
|
||||
- **文件**: `internal/app/listener/lora_listener.go` (新)
|
||||
- **职责**:
|
||||
- 实现 `listener.UpstreamHandler` 接口。
|
||||
- 包含所有处理业务所需的数据库仓库依赖。
|
||||
- 实现 `HandleInstruction` 方法,通过 `switch-case` 处理 `CollectResult`, `OtaUpgradeStatus`, `LogUploadRequest` 等所有核心业务。
|
||||
- 实现 `HandleStatus` 方法,处理 ChirpStack 上报的电量、信号等旁路信息。
|
||||
- 实现 `transport.UpstreamHandler` 接口。
|
||||
- 包含所有处理业务所需的依赖(如领域服务、仓储等)。
|
||||
- 实现 `HandleInstruction` 方法,通过 `switch-case` 编排所有核心业务。
|
||||
- 实现 `HandleStatus` 方法,处理设备状态更新。
|
||||
- **这是项目中唯一处理 LoRa 上行业务的地方。**
|
||||
|
||||
#### 3.3.2. 适配器层 (连接 Infra 与 App)
|
||||
#### 3.3.2. 基础设施层 (Infra Layer)
|
||||
|
||||
- **文件 1**: `internal/app/listener/chirp_stack/chirp_stack.go` (重构)
|
||||
- **职责**: 纯粹的 Webhook 适配器。
|
||||
- 移除所有业务逻辑和数据库依赖。
|
||||
- 依赖 `listener.UpstreamHandler` 接口。
|
||||
- 依赖 `transport.UpstreamHandler` 接口。
|
||||
- 功能:接收 Webhook -> 解析 JSON -> 调用 `handler.HandleInstruction` 或 `handler.HandleStatus`。
|
||||
|
||||
- **文件 2**: `internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go` (重构)
|
||||
- **职责**: 纯粹的串口适配器。
|
||||
- **职责**: 纯粹的串口传输工具。
|
||||
- 移除所有业务逻辑和数据库依赖。
|
||||
- 依赖 `listener.UpstreamHandler` 接口。
|
||||
- 依赖 `transport.UpstreamHandler` 接口。
|
||||
- 功能:管理串口 -> 读字节流 -> 重组分片 -> 解析 `proto.Instruction` -> 调用 `handler.HandleInstruction`。
|
||||
|
||||
#### 3.3.3. 发送器 (基础设施层)
|
||||
|
||||
- **文件 1**: `internal/infra/transport/lora/chirp_stack.go` (或拆分出的新文件)
|
||||
- 实现 `transport.Communicator` 接口,通过调用 ChirpStack API 完成发送。
|
||||
- **文件 2**: `internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go`
|
||||
- 实现 `transport.Communicator` 接口,通过向串口写入数据完成发送。
|
||||
|
||||
### 3.4. 架构图 (重构后)
|
||||
|
||||
```
|
||||
@@ -123,28 +116,33 @@ type UpstreamHandler interface {
|
||||
|
|
||||
v (uses)
|
||||
+--------------------------------+
|
||||
| transport.Communicator (I) |
|
||||
| transport.Communicator (I) | <-- Infra Layer (Send Port)
|
||||
+--------------------------------+
|
||||
^ ^
|
||||
| | (implements)
|
||||
+------------------+------------------+
|
||||
| ChirpStackTransport | UartPassthroughTransport | (Infra Layer - Senders)
|
||||
| ChirpStackSender | UartSender | <-- Infra Layer (Senders)
|
||||
+------------------+------------------+
|
||||
|
||||
|
||||
+--------------------------------+
|
||||
| listener.LoRaListener |
|
||||
| (Unified Business Logic) |
|
||||
| listener.LoRaListener | <-- App Layer (Adapter)
|
||||
| (Implements UpstreamHandler) |
|
||||
+--------------------------------+
|
||||
^
|
||||
| (dependency, via interface)
|
||||
+--------------------------------+
|
||||
| transport.UpstreamHandler (I) | <-- Infra Layer (Receive Port)
|
||||
+--------------------------------+
|
||||
^ ^
|
||||
| | (calls)
|
||||
+------------------+------------------+
|
||||
| ChirpStackAdapter| UartPassthroughAdapter | (Adapter Layer)
|
||||
| ChirpStackWebhook| UartPassthrough | <-- Infra Layer (Receivers)
|
||||
+------------------+------------------+
|
||||
^ ^
|
||||
| | (receives from)
|
||||
+------------------+------------------+
|
||||
| HTTP Webhook | Serial Port | (Physical/Network Layer)
|
||||
| HTTP Webhook | Serial Port |
|
||||
+------------------+------------------+
|
||||
```
|
||||
|
||||
@@ -153,19 +151,20 @@ type UpstreamHandler interface {
|
||||
```go
|
||||
// file: internal/core/component_initializers.go
|
||||
|
||||
// 1. 创建统一的业务处理器 (单例)
|
||||
loraListener := listener.NewLoRaListener(dbRepo1, dbRepo2, ...)
|
||||
// 1. 创建统一的业务处理器 (App层适配器)
|
||||
// 它实现了 infra 层的 transport.UpstreamHandler 接口
|
||||
loraListener := listener.NewLoRaListener(logger, dbRepo1, dbRepo2)
|
||||
|
||||
// 2. 初始化 ChirpStack 模式
|
||||
// 2a. 创建 ChirpStack 的发送器 (infra)
|
||||
chirpStackCommunicator := chirp_stack.NewChirpStackTransport(...)
|
||||
// 2b. 创建 ChirpStack 的监听适配器 (app),并注入统一的业务处理器
|
||||
chirpStackListenerAdapter := chirp_stack.NewChirpStackListener(loraListener)
|
||||
// 2b. 创建 ChirpStack 的监听器 (infra),并注入 App 层的业务处理器
|
||||
chirpStackListener := chirp_stack.NewChirpStackListener(loraListener)
|
||||
// 2c. 注册 Webhook 路由
|
||||
api.RegisterWebhook("/chirpstack", chirpStackListenerAdapter.Handler())
|
||||
api.RegisterWebhook("/chirpstack", chirpStackListener.Handler())
|
||||
|
||||
// 3. 初始化串口透传模式
|
||||
// 3a. 创建串口的发送器/监听器 (infra),并注入统一的业务处理器
|
||||
// 3a. 创建串口的传输工具 (infra),并注入 App 层的业务处理器
|
||||
uartTransport := lora.NewLoRaMeshUartPassthroughTransport(port, loraListener)
|
||||
// 3b. 启动串口监听
|
||||
uartTransport.Listen()
|
||||
@@ -182,11 +181,11 @@ if config.UseChirpStack {
|
||||
|
||||
## 4. 实施步骤
|
||||
|
||||
1. **定义接口**: 在 `internal/app/listener/` 下创建 `handler.go` 并定义 `UpstreamHandler` 接口。
|
||||
2. **创建统一处理器**: 创建 `internal/app/listener/lora_listener.go`,定义 `LoRaListener` 结构体,并实现 `UpstreamHandler` 接口的空方法。
|
||||
1. **定义端口**: 在 `internal/infra/transport/transport.go` 中定义 `UpstreamHandler` 接口。
|
||||
2. **创建适配器**: 创建 `internal/app/listener/lora_listener.go`,定义 `LoRaListener` 结构体,并实现 `transport.UpstreamHandler` 接口。
|
||||
3. **迁移业务逻辑**: 将 `chirp_stack.go` 和 `lora_mesh_uart_passthrough_transport.go` 中的业务逻辑(查库、存数据等)逐步迁移到 `lora_listener.go` 的对应方法中。
|
||||
4. **重构适配器**:
|
||||
- 清理 `chirp_stack.go`,移除 Repo 依赖,改为依赖 `UpstreamHandler` 接口,并调用其方法。
|
||||
4. **重构基础设施**:
|
||||
- 清理 `chirp_stack.go`,移除 Repo 依赖,改为依赖 `transport.UpstreamHandler` 接口,并调用其方法。
|
||||
- 清理 `lora_mesh_uart_passthrough_transport.go`,做同样的操作。
|
||||
5. **更新依赖注入**: 修改 `component_initializers.go`,按照 `3.5` 中的示例完成组件的创建和注入。
|
||||
6. **测试与验证**: 对两种模式分别进行完整的上下行通信测试。
|
||||
@@ -195,5 +194,6 @@ if config.UseChirpStack {
|
||||
|
||||
- **消除代码重复**:业务逻辑仅存在于一处。
|
||||
- **职责清晰**:基础设施层只管传输,应用层只管业务。
|
||||
- **正确的依赖关系**:确保了 `app` -> `infra` 的单向依赖,核心架构更加稳固。
|
||||
- **可维护性**:修改业务逻辑只需改一个文件,修改传输细节不影响业务。
|
||||
- **可测试性**:可以轻松地对 `LoRaListener` 进行单元测试,无需真实的硬件或网络。
|
||||
|
||||
@@ -4,19 +4,16 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/listener"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
|
||||
|
||||
gproto "google.golang.org/protobuf/proto"
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
|
||||
@@ -31,45 +28,34 @@ const (
|
||||
eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
|
||||
)
|
||||
|
||||
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
||||
// ChirpStackListener 是一个监听器, 用于将ChirpStack的Webhook事件适配到统一的UpstreamHandler。
|
||||
type ChirpStackListener struct {
|
||||
ctx context.Context
|
||||
sensorDataRepo repository.SensorDataRepository
|
||||
deviceRepo repository.DeviceRepository
|
||||
areaControllerRepo repository.AreaControllerRepository
|
||||
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||
pendingCollectionRepo repository.PendingCollectionRepository
|
||||
selfCtx context.Context
|
||||
handler transport.UpstreamHandler // 依赖注入的统一业务处理器
|
||||
}
|
||||
|
||||
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
|
||||
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例。
|
||||
func NewChirpStackListener(
|
||||
ctx context.Context,
|
||||
sensorDataRepo repository.SensorDataRepository,
|
||||
deviceRepo repository.DeviceRepository,
|
||||
areaControllerRepo repository.AreaControllerRepository,
|
||||
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||
pendingCollectionRepo repository.PendingCollectionRepository,
|
||||
handler transport.UpstreamHandler,
|
||||
) listener.ListenHandler {
|
||||
return &ChirpStackListener{
|
||||
ctx: ctx,
|
||||
sensorDataRepo: sensorDataRepo,
|
||||
deviceRepo: deviceRepo,
|
||||
areaControllerRepo: areaControllerRepo,
|
||||
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||
pendingCollectionRepo: pendingCollectionRepo,
|
||||
selfCtx: logs.AddCompName(ctx, "ChirpStackListener"),
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
|
||||
func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, logger := logs.Trace(r.Context(), c.ctx, "ChirpStackListener")
|
||||
// 注意:这里的 selfCtx 是 r.Context(),因为它包含了HTTP请求的追踪信息
|
||||
ctx, logger := logs.Trace(r.Context(), c.selfCtx, "Handler")
|
||||
|
||||
defer r.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("读取请求体失败: %v", err)
|
||||
logger.Errorw("读取请求体失败", "error", err)
|
||||
http.Error(w, "failed to read body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
@@ -78,379 +64,112 @@ func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
// 将异步处理逻辑委托给 handler 方法
|
||||
go c.handler(ctx, b, event)
|
||||
// 使用分离的上下文进行异步处理,防止原始请求取消导致处理中断
|
||||
detachedCtx := logs.DetachContext(ctx)
|
||||
go c.dispatch(detachedCtx, b, event)
|
||||
}
|
||||
}
|
||||
|
||||
// handler 用于处理 ChirpStack 发送的事件
|
||||
func (c *ChirpStackListener) handler(ctx context.Context, data []byte, eventType string) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handler")
|
||||
// dispatch 用于解析并分发 ChirpStack 发送的事件
|
||||
func (c *ChirpStackListener) dispatch(ctx context.Context, data []byte, eventType string) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.selfCtx, "dispatch")
|
||||
|
||||
var err error
|
||||
switch eventType {
|
||||
case eventTypeUp:
|
||||
var msg UpEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
if err = json.Unmarshal(data, &msg); err == nil {
|
||||
c.adaptUpEvent(reqCtx, &msg)
|
||||
}
|
||||
c.handleUpEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeJoin:
|
||||
var msg JoinEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
}
|
||||
c.handleJoinEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeAck:
|
||||
var msg AckEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
}
|
||||
c.handleAckEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeTxAck:
|
||||
var msg TxAckEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
}
|
||||
c.handleTxAckEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeStatus:
|
||||
var msg StatusEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
if err = json.Unmarshal(data, &msg); err == nil {
|
||||
c.adaptStatusEvent(reqCtx, &msg)
|
||||
}
|
||||
c.handleStatusEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeLog:
|
||||
var msg LogEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
case eventTypeAck:
|
||||
var msg AckEvent
|
||||
if err = json.Unmarshal(data, &msg); err == nil {
|
||||
c.adaptAckEvent(reqCtx, &msg)
|
||||
}
|
||||
c.handleLogEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeLocation:
|
||||
var msg LocationEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
}
|
||||
c.handleLocationEvent(reqCtx, &msg)
|
||||
|
||||
case eventTypeIntegration:
|
||||
var msg IntegrationEvent
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
|
||||
return
|
||||
}
|
||||
c.handleIntegrationEvent(reqCtx, &msg)
|
||||
// --- 其他事件只记录日志,不进行业务处理 ---
|
||||
case eventTypeJoin, eventTypeTxAck, eventTypeLog, eventTypeLocation, eventTypeIntegration:
|
||||
logger.Infow("收到一个非业务处理的ChirpStack事件", "type", eventType)
|
||||
|
||||
default:
|
||||
logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
|
||||
logger.Warnw("未知的ChirpStack事件类型", "type", eventType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Errorw("解析ChirpStack事件失败", "type", eventType, "error", err, "data", string(data))
|
||||
}
|
||||
}
|
||||
|
||||
// --- 业务处理函数 ---
|
||||
// --- 适配器函数 ---
|
||||
|
||||
// handleUpEvent 处理上行数据事件
|
||||
func (c *ChirpStackListener) handleUpEvent(ctx context.Context, event *UpEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handleUpEvent")
|
||||
logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
|
||||
// adaptUpEvent 将 'up' 事件适配并委托给 UpstreamHandler
|
||||
func (c *ChirpStackListener) adaptUpEvent(ctx context.Context, event *UpEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.selfCtx, "adaptUpEvent")
|
||||
|
||||
// 1. 查找区域主控设备
|
||||
areaController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
|
||||
if err != nil {
|
||||
logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||
return
|
||||
}
|
||||
// 依赖 SelfCheck 确保区域主控有效
|
||||
if err := areaController.SelfCheck(); err != nil {
|
||||
logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", areaController.Name, areaController.ID, err)
|
||||
return
|
||||
}
|
||||
logger.Infof("找到区域主控: %s (ID: %d)", areaController.Name, areaController.ID)
|
||||
|
||||
// 2. 记录区域主控的信号强度 (如果存在)
|
||||
// 1. 优先处理并委托旁路状态信息(如信号强度)
|
||||
if len(event.RxInfo) > 0 {
|
||||
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
|
||||
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
|
||||
rx := event.RxInfo[0] // 取第一个接收到的网关信息
|
||||
|
||||
// 构建 SignalMetrics 结构体
|
||||
signalMetrics := models.SignalMetrics{
|
||||
RssiDbm: rx.Rssi,
|
||||
SnrDb: rx.Snr,
|
||||
rx := event.RxInfo[0]
|
||||
status := map[string]interface{}{
|
||||
"rssi": float64(rx.Rssi),
|
||||
"snr": float64(rx.Snr),
|
||||
}
|
||||
if err := c.handler.HandleStatus(reqCtx, event.DeviceInfo.DevEui, status); err != nil {
|
||||
logger.Errorw("委托 'up' 事件中的状态信息失败", "error", err)
|
||||
}
|
||||
|
||||
// 记录信号强度
|
||||
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
||||
logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", areaController.ID, rx.Rssi, rx.Snr)
|
||||
} else {
|
||||
logger.Warnf("处理 'up' 事件时未找到 RxInfo,无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||
}
|
||||
|
||||
// 3. 处理上报的传感器数据
|
||||
// 2. 如果没有业务数据,则直接返回
|
||||
if event.Data == "" {
|
||||
logger.Warnf("处理 'up' 事件时 Data 字段为空,无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui)
|
||||
return
|
||||
}
|
||||
|
||||
// 3.1 Base64 解码
|
||||
// 3. 解码并解析业务指令
|
||||
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
|
||||
if err != nil {
|
||||
logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
|
||||
logger.Errorw("Base64解码 'up' 事件的Data失败", "error", err, "data", event.Data)
|
||||
return
|
||||
}
|
||||
|
||||
// 3.2 解析外层 "信封"
|
||||
var instruction proto.Instruction
|
||||
if err := gproto.Unmarshal(decodedData, &instruction); err != nil {
|
||||
logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData)
|
||||
logger.Errorw("解析上行Instruction Protobuf失败", "error", err, "decodedData", fmt.Sprintf("%x", decodedData))
|
||||
return
|
||||
}
|
||||
|
||||
// 3.3 使用 type switch 从 oneof payload 中提取 CollectResult
|
||||
var collectResp *proto.CollectResult
|
||||
switch p := instruction.GetPayload().(type) {
|
||||
case *proto.Instruction_CollectResult:
|
||||
collectResp = p.CollectResult
|
||||
default:
|
||||
// 如果上行的数据不是采集结果,记录日志并忽略
|
||||
logger.Infof("收到一个非采集响应的上行指令 (Type: %T),无需处理。", p)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查 collectResp 是否为 nil,虽然在 type switch 成功的情况下不太可能
|
||||
if collectResp == nil {
|
||||
logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil")
|
||||
return
|
||||
}
|
||||
|
||||
correlationID := collectResp.CorrelationId
|
||||
logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
|
||||
|
||||
// 4. 根据 CorrelationID 查找待处理请求
|
||||
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(reqCtx, correlationID)
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查状态,防止重复处理
|
||||
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
||||
logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
|
||||
return
|
||||
}
|
||||
|
||||
// 5. 匹配数据并存入数据库
|
||||
deviceIDs := pendingReq.CommandMetadata
|
||||
values := collectResp.Values
|
||||
if len(deviceIDs) != len(values) {
|
||||
logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
|
||||
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
|
||||
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time)
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i, deviceID := range deviceIDs {
|
||||
rawSensorValue := values[i] // 这是设备上报的原始值
|
||||
|
||||
// 检查设备上报的值是否为 NaN (Not a Number),如果是则跳过
|
||||
if math.IsNaN(float64(rawSensorValue)) {
|
||||
logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
|
||||
continue
|
||||
}
|
||||
|
||||
// 5.1 获取设备及其模板
|
||||
dev, err := c.deviceRepo.FindByID(reqCtx, deviceID)
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
|
||||
continue
|
||||
}
|
||||
// 依赖 SelfCheck 确保设备和模板有效
|
||||
if err := dev.SelfCheck(); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 5.2 从设备模板中解析 ValueDescriptor
|
||||
var valueDescriptors []*models.ValueDescriptor
|
||||
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
// 根据 DeviceTemplate.SelfCheck,这里应该只有一个 ValueDescriptor
|
||||
if len(valueDescriptors) == 0 {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID)
|
||||
continue
|
||||
}
|
||||
valueDescriptor := valueDescriptors[0]
|
||||
|
||||
// 5.3 应用乘数和偏移量计算最终值
|
||||
parsedValue := rawSensorValue*valueDescriptor.Multiplier + valueDescriptor.Offset
|
||||
|
||||
// 5.4 根据传感器类型构建具体的数据结构
|
||||
var dataToRecord interface{}
|
||||
switch valueDescriptor.Type {
|
||||
case models.SensorTypeTemperature:
|
||||
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
|
||||
case models.SensorTypeHumidity:
|
||||
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
|
||||
case models.SensorTypeWeight:
|
||||
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
|
||||
default:
|
||||
// TODO 未知传感器的数据需要记录吗
|
||||
logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
|
||||
dataToRecord = map[string]float32{"value": parsedValue}
|
||||
}
|
||||
|
||||
// 5.5 记录传感器数据
|
||||
c.recordSensorData(reqCtx, areaController.ID, dev.ID, event.Time, valueDescriptor.Type, dataToRecord)
|
||||
logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
|
||||
}
|
||||
|
||||
// 6. 更新请求状态为“已完成”
|
||||
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time); err != nil {
|
||||
logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
|
||||
} else {
|
||||
logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
|
||||
// 4. 委托给统一处理器
|
||||
if err := c.handler.HandleInstruction(reqCtx, event.DeviceInfo.DevEui, &instruction); err != nil {
|
||||
logger.Errorw("委托 'up' 事件中的业务指令失败", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleStatusEvent 处理设备状态事件
|
||||
func (c *ChirpStackListener) handleStatusEvent(ctx context.Context, event *StatusEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleStatusEvent")
|
||||
logger.Infof("处接收到理 'status' 事件: %+v", event)
|
||||
// adaptStatusEvent 将 'status' 事件适配并委托给 UpstreamHandler
|
||||
func (c *ChirpStackListener) adaptStatusEvent(ctx context.Context, event *StatusEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.selfCtx, "adaptStatusEvent")
|
||||
|
||||
// 查找区域主控设备
|
||||
areaController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
|
||||
if err != nil {
|
||||
logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
|
||||
return
|
||||
status := map[string]interface{}{
|
||||
"margin": float64(event.Margin),
|
||||
"batteryLevel": float64(event.BatteryLevel),
|
||||
"batteryLevelUnavailable": event.BatteryLevelUnavailable,
|
||||
"externalPower": event.ExternalPower,
|
||||
}
|
||||
|
||||
// 记录信号强度
|
||||
signalMetrics := models.SignalMetrics{
|
||||
MarginDb: event.Margin,
|
||||
}
|
||||
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
|
||||
logger.Infof("已记录区域主控 (ID: %d) 的信号状态: %+v", areaController.ID, signalMetrics)
|
||||
|
||||
// 记录电量
|
||||
batteryLevel := models.BatteryLevel{
|
||||
BatteryLevelRatio: event.BatteryLevel,
|
||||
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
|
||||
ExternalPower: event.ExternalPower,
|
||||
}
|
||||
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeBatteryLevel, batteryLevel)
|
||||
logger.Infof("已记录区域主控 (ID: %d) 的电池状态: %+v", areaController.ID, batteryLevel)
|
||||
}
|
||||
|
||||
// handleAckEvent 处理下行确认事件
|
||||
func (c *ChirpStackListener) handleAckEvent(ctx context.Context, event *AckEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleAckEvent")
|
||||
logger.Infof("接收到 'ack' 事件: %+v", event)
|
||||
|
||||
// 更新下行任务记录的确认时间及接收成功状态
|
||||
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(reqCtx, event.DeduplicationID, event.Time, event.Acknowledged)
|
||||
if err != nil {
|
||||
logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
|
||||
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
|
||||
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
|
||||
}
|
||||
|
||||
// handleLogEvent 处理日志事件
|
||||
func (c *ChirpStackListener) handleLogEvent(ctx context.Context, event *LogEvent) {
|
||||
logger := logs.TraceLogger(ctx, c.ctx, "handleLogEvent")
|
||||
// 首先,打印完整的事件结构体,用于详细排查
|
||||
logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
|
||||
|
||||
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
|
||||
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
|
||||
switch event.Level {
|
||||
case "INFO":
|
||||
logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
case "WARNING":
|
||||
logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
case "ERROR":
|
||||
logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
default:
|
||||
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
|
||||
logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
|
||||
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
|
||||
if err := c.handler.HandleStatus(reqCtx, event.DeviceInfo.DevEui, status); err != nil {
|
||||
logger.Errorw("委托 'status' 事件失败", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleJoinEvent 处理入网事件
|
||||
func (c *ChirpStackListener) handleJoinEvent(ctx context.Context, event *JoinEvent) {
|
||||
logger := logs.TraceLogger(ctx, c.ctx, "handleJoinEvent")
|
||||
logger.Infof("接收到 'join' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
}
|
||||
// adaptAckEvent 将 'ack' 事件适配并委托给 UpstreamHandler
|
||||
func (c *ChirpStackListener) adaptAckEvent(ctx context.Context, event *AckEvent) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.selfCtx, "adaptAckEvent")
|
||||
|
||||
// handleTxAckEvent 处理网关发送确认事件
|
||||
func (c *ChirpStackListener) handleTxAckEvent(ctx context.Context, event *TxAckEvent) {
|
||||
logger := logs.TraceLogger(ctx, c.ctx, "handleTxAckEvent")
|
||||
logger.Infof("接收到 'txack' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
}
|
||||
|
||||
// handleLocationEvent 处理位置事件
|
||||
func (c *ChirpStackListener) handleLocationEvent(ctx context.Context, event *LocationEvent) {
|
||||
logger := logs.TraceLogger(ctx, c.ctx, "handleLocationEvent")
|
||||
logger.Infof("接收到 'location' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
}
|
||||
|
||||
// handleIntegrationEvent 处理集成事件
|
||||
func (c *ChirpStackListener) handleIntegrationEvent(ctx context.Context, event *IntegrationEvent) {
|
||||
logger := logs.TraceLogger(ctx, c.ctx, "handleIntegrationEvent")
|
||||
logger.Infof("接收到 'integration' 事件: %+v", event)
|
||||
// 在这里添加您的业务逻辑
|
||||
}
|
||||
|
||||
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
|
||||
// areaControllerID: 区域主控设备的ID
|
||||
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
|
||||
// sensorType: 传感器值的类型 (例如 models.SensorTypeTemperature)
|
||||
// data: 具体的传感器数据结构体实例 (例如 models.TemperatureData)
|
||||
func (c *ChirpStackListener) recordSensorData(ctx context.Context, areaControllerID uint32, sensorDeviceID uint32, eventTime time.Time, sensorType models.SensorType, data interface{}) {
|
||||
reqCtx, logger := logs.Trace(ctx, c.ctx, "recordSensorData")
|
||||
// 1. 将传入的结构体序列化为 JSON
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. 构建 SensorData 模型
|
||||
sensorData := &models.SensorData{
|
||||
Time: eventTime,
|
||||
DeviceID: sensorDeviceID,
|
||||
AreaControllerID: areaControllerID,
|
||||
SensorType: sensorType,
|
||||
Data: datatypes.JSON(jsonData),
|
||||
}
|
||||
|
||||
// 3. 调用仓库创建记录
|
||||
if err := c.sensorDataRepo.Create(reqCtx, sensorData); err != nil {
|
||||
logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
|
||||
if err := c.handler.HandleAck(reqCtx, event.DeviceInfo.DevEui, event.DeduplicationID, event.Acknowledged, event.Time); err != nil {
|
||||
logger.Errorw("委托 'ack' 事件失败", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
277
internal/app/listener/lora_listener.go
Normal file
277
internal/app/listener/lora_listener.go
Normal file
@@ -0,0 +1,277 @@
|
||||
package listener
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
|
||||
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// loraListener 是一个统一的LoRa上行业务处理器,实现了 transport.UpstreamHandler 接口。
|
||||
// 它包含了处理业务所需的所有依赖,是项目中唯一处理LoRa上行业务的地方。
|
||||
type loraListener struct {
|
||||
selfCtx context.Context
|
||||
areaControllerRepo repository.AreaControllerRepository
|
||||
pendingCollectionRepo repository.PendingCollectionRepository
|
||||
deviceRepo repository.DeviceRepository
|
||||
sensorDataRepo repository.SensorDataRepository
|
||||
deviceCommandLogRepo repository.DeviceCommandLogRepository
|
||||
}
|
||||
|
||||
// NewLoRaListener 创建一个新的 loraListener 实例。
|
||||
// 注意:返回的是 transport.UpstreamHandler 接口,向上层隐藏具体实现。
|
||||
func NewLoRaListener(
|
||||
ctx context.Context,
|
||||
areaControllerRepo repository.AreaControllerRepository,
|
||||
pendingCollectionRepo repository.PendingCollectionRepository,
|
||||
deviceRepo repository.DeviceRepository,
|
||||
sensorDataRepo repository.SensorDataRepository,
|
||||
deviceCommandLogRepo repository.DeviceCommandLogRepository,
|
||||
) transport.UpstreamHandler {
|
||||
return &loraListener{
|
||||
selfCtx: logs.AddCompName(ctx, "LoRaListener"),
|
||||
areaControllerRepo: areaControllerRepo,
|
||||
pendingCollectionRepo: pendingCollectionRepo,
|
||||
deviceRepo: deviceRepo,
|
||||
sensorDataRepo: sensorDataRepo,
|
||||
deviceCommandLogRepo: deviceCommandLogRepo,
|
||||
}
|
||||
}
|
||||
|
||||
// HandleInstruction 处理来自设备的、已解析为Instruction的业务指令。
|
||||
func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr string, instruction *proto.Instruction) error {
|
||||
ctx, logger := logs.Trace(upstreamCtx, l.selfCtx, "HandleInstruction")
|
||||
logger.Infow("接收到设备指令", "来源地址", sourceAddr)
|
||||
|
||||
switch p := instruction.Payload.(type) {
|
||||
case *proto.Instruction_CollectResult:
|
||||
return l.handleCollectResult(ctx, sourceAddr, p.CollectResult)
|
||||
|
||||
case *proto.Instruction_OtaUpgradeStatus:
|
||||
logger.Infow("收到OTA升级状态,暂未实现处理逻辑", "来源地址", sourceAddr, "状态", p.OtaUpgradeStatus)
|
||||
// TODO: 在这里实现OTA升级状态的处理逻辑
|
||||
return nil
|
||||
|
||||
case *proto.Instruction_LogUploadRequest:
|
||||
logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries))
|
||||
// TODO: 在这里实现设备日志的处理逻辑
|
||||
return nil
|
||||
|
||||
default:
|
||||
logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// HandleStatus 处理非业务指令的设备状态更新,例如信号强度、电量等。
|
||||
func (l *loraListener) HandleStatus(upstreamCtx context.Context, sourceAddr string, status map[string]interface{}) error {
|
||||
ctx, logger := logs.Trace(upstreamCtx, l.selfCtx, "HandleStatus")
|
||||
logger.Infow("接收到设备状态更新", "来源地址", sourceAddr, "状态", status)
|
||||
|
||||
areaController, err := l.areaControllerRepo.FindByNetworkID(ctx, sourceAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("处理 'status' 事件失败:无法通过源地址 '%s' 找到区域主控设备: %w", sourceAddr, err)
|
||||
}
|
||||
|
||||
eventTime := time.Now() // 状态事件通常是实时的,使用当前时间
|
||||
|
||||
// 尝试记录信号强度
|
||||
if rssi, ok := status["rssi"].(float64); ok {
|
||||
if snr, ok := status["snr"].(float64); ok {
|
||||
signalMetrics := models.SignalMetrics{
|
||||
RssiDbm: int(rssi),
|
||||
SnrDb: float32(snr),
|
||||
}
|
||||
if margin, ok := status["margin"].(float64); ok {
|
||||
signalMetrics.MarginDb = int(margin)
|
||||
}
|
||||
l.recordSensorData(ctx, areaController.ID, areaController.ID, eventTime, models.SensorTypeSignalMetrics, signalMetrics)
|
||||
logger.Infow("已记录区域主控的信号强度", "主控ID", areaController.ID, "指标", signalMetrics)
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试记录电池电量
|
||||
if batteryLevel, ok := status["batteryLevel"].(float64); ok {
|
||||
batteryData := models.BatteryLevel{
|
||||
BatteryLevelRatio: float32(batteryLevel),
|
||||
}
|
||||
if unavailable, ok := status["batteryLevelUnavailable"].(bool); ok {
|
||||
batteryData.BatteryLevelUnavailable = unavailable
|
||||
}
|
||||
if externalPower, ok := status["externalPower"].(bool); ok {
|
||||
batteryData.ExternalPower = externalPower
|
||||
}
|
||||
l.recordSensorData(ctx, areaController.ID, areaController.ID, eventTime, models.SensorTypeBatteryLevel, batteryData)
|
||||
logger.Infow("已记录区域主控的电池状态", "主控ID", areaController.ID, "状态", batteryData)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleAck 处理对下行指令的确认(ACK)事件。
|
||||
func (l *loraListener) HandleAck(upstreamCtx context.Context, sourceAddr string, deduplicationID string, acknowledged bool, eventTime time.Time) error {
|
||||
ctx, logger := logs.Trace(upstreamCtx, l.selfCtx, "HandleAck")
|
||||
|
||||
err := l.deviceCommandLogRepo.UpdateAcknowledgedAt(ctx, deduplicationID, eventTime, acknowledged)
|
||||
if err != nil {
|
||||
logger.Errorw("更新下行任务记录的确认状态失败",
|
||||
"MessageID", deduplicationID,
|
||||
"DevEui", sourceAddr,
|
||||
"Acknowledged", acknowledged,
|
||||
"error", err,
|
||||
)
|
||||
return fmt.Errorf("更新下行任务记录失败: %w", err)
|
||||
}
|
||||
|
||||
logger.Infow("成功更新下行任务记录确认状态",
|
||||
"MessageID", deduplicationID,
|
||||
"DevEui", sourceAddr,
|
||||
"Acknowledged", acknowledged,
|
||||
"AcknowledgedAt", eventTime.Format(time.RFC3339),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleCollectResult 是处理采集结果的核心业务逻辑
|
||||
func (l *loraListener) handleCollectResult(ctx context.Context, sourceAddr string, collectResp *proto.CollectResult) error {
|
||||
if collectResp == nil {
|
||||
return fmt.Errorf("传入的CollectResult为nil")
|
||||
}
|
||||
|
||||
correlationID := collectResp.CorrelationId
|
||||
logger := logs.GetLogger(ctx).With("correlationID", correlationID, "来源地址", sourceAddr)
|
||||
logger.Infow("开始处理采集响应", "数据点数量", len(collectResp.Values))
|
||||
|
||||
// 1. 查找区域主控
|
||||
areaController, err := l.areaControllerRepo.FindByNetworkID(ctx, sourceAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("处理采集响应失败:无法通过源地址 '%s' 找到区域主控设备: %w", sourceAddr, err)
|
||||
}
|
||||
if err := areaController.SelfCheck(); err != nil {
|
||||
return fmt.Errorf("处理采集响应失败:区域主控 %v(ID: %d) 未通过自检: %w", areaController.Name, areaController.ID, err)
|
||||
}
|
||||
|
||||
// 2. 根据 CorrelationID 查找待处理请求
|
||||
pendingReq, err := l.pendingCollectionRepo.FindByCorrelationID(ctx, correlationID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("处理采集响应失败:无法找到待处理请求: %w", err)
|
||||
}
|
||||
|
||||
// 3. 检查状态,防止重复处理
|
||||
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
||||
logger.Warnw("收到一个已处理过的采集响应,将忽略。", "状态", string(pendingReq.Status))
|
||||
return nil // 返回 nil,因为这不是一个错误,只是一个重复的请求
|
||||
}
|
||||
|
||||
// 4. 匹配数据并存入数据库
|
||||
deviceIDs := pendingReq.CommandMetadata
|
||||
values := collectResp.Values
|
||||
if len(deviceIDs) != len(values) {
|
||||
err := fmt.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值", len(deviceIDs), len(values))
|
||||
// 即使数量不匹配,也尝试更新状态为完成,以防止请求永远 pending
|
||||
if updateErr := l.pendingCollectionRepo.UpdateStatusToFulfilled(ctx, correlationID, time.Now()); updateErr != nil {
|
||||
logger.Errorw("更新待采集请求状态为 'fulfilled' 失败", "error", updateErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
eventTime := time.Now() // 对整个采集批次使用统一的时间戳
|
||||
|
||||
for i, deviceID := range deviceIDs {
|
||||
rawSensorValue := values[i]
|
||||
devLogger := logger.With("设备ID", deviceID)
|
||||
|
||||
if math.IsNaN(float64(rawSensorValue)) {
|
||||
devLogger.Warnw("设备上报了一个无效的 NaN 值,已跳过当前值的记录。")
|
||||
continue
|
||||
}
|
||||
|
||||
dev, err := l.deviceRepo.FindByID(ctx, deviceID)
|
||||
if err != nil {
|
||||
devLogger.Errorw("处理采集数据失败:无法找到设备", "error", err)
|
||||
continue
|
||||
}
|
||||
if err := dev.SelfCheck(); err != nil {
|
||||
devLogger.Warnw("跳过设备,因其未通过自检或设备模板无效", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var valueDescriptors []*models.ValueDescriptor
|
||||
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
|
||||
devLogger.Warnw("跳过设备,因其设备模板的 Values 属性解析失败", "error", err)
|
||||
continue
|
||||
}
|
||||
if len(valueDescriptors) == 0 {
|
||||
devLogger.Warnw("跳过设备,因其设备模板缺少 ValueDescriptor 定义")
|
||||
continue
|
||||
}
|
||||
valueDescriptor := valueDescriptors[0]
|
||||
|
||||
parsedValue := rawSensorValue*valueDescriptor.Multiplier + valueDescriptor.Offset
|
||||
|
||||
var dataToRecord interface{}
|
||||
switch valueDescriptor.Type {
|
||||
case models.SensorTypeTemperature:
|
||||
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
|
||||
case models.SensorTypeHumidity:
|
||||
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
|
||||
case models.SensorTypeWeight:
|
||||
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
|
||||
default:
|
||||
devLogger.Warnw("未知的传感器类型,将使用通用格式记录", "传感器类型", string(valueDescriptor.Type))
|
||||
dataToRecord = map[string]float32{"value": parsedValue}
|
||||
}
|
||||
|
||||
l.recordSensorData(ctx, areaController.ID, dev.ID, eventTime, valueDescriptor.Type, dataToRecord)
|
||||
devLogger.Infow("成功记录传感器数据",
|
||||
"类型", string(valueDescriptor.Type),
|
||||
"原始值", rawSensorValue,
|
||||
"解析值", parsedValue,
|
||||
)
|
||||
}
|
||||
|
||||
// 5. 更新请求状态为“已完成”
|
||||
if err := l.pendingCollectionRepo.UpdateStatusToFulfilled(ctx, correlationID, eventTime); err != nil {
|
||||
logger.Errorw("更新待采集请求状态为 'fulfilled' 失败", "error", err)
|
||||
return fmt.Errorf("更新待采集请求状态失败: %w", err)
|
||||
}
|
||||
|
||||
logger.Infow("成功完成并关闭采集请求")
|
||||
return nil
|
||||
}
|
||||
|
||||
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
|
||||
func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID uint32, sensorDeviceID uint32, eventTime time.Time, sensorType models.SensorType, data interface{}) {
|
||||
logger := logs.GetLogger(ctx).With("方法", "recordSensorData")
|
||||
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Errorw("记录传感器数据失败:序列化数据为 JSON 时出错", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
sensorData := &models.SensorData{
|
||||
Time: eventTime,
|
||||
DeviceID: sensorDeviceID,
|
||||
AreaControllerID: areaControllerID,
|
||||
SensorType: sensorType,
|
||||
Data: datatypes.JSON(jsonData),
|
||||
}
|
||||
|
||||
if err := l.sensorDataRepo.Create(ctx, sensorData); err != nil {
|
||||
logger.Errorw("记录传感器数据失败:存入数据库时出错",
|
||||
"设备ID", sensorDeviceID,
|
||||
"传感器类型", string(sensorType),
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ package listener
|
||||
|
||||
import "net/http"
|
||||
|
||||
// ListenHandler 是一个监听器, 用于监听设备上行事件
|
||||
// ListenHandler 是一个监听器, 用于监听设备上行事件, 通常用于适配http webhook。
|
||||
type ListenHandler interface {
|
||||
Handler() http.HandlerFunc
|
||||
}
|
||||
|
||||
@@ -377,17 +377,40 @@ func initLora(
|
||||
var comm transport.Communicator
|
||||
var loraListener transport.Listener
|
||||
baseCtx := context.Background()
|
||||
|
||||
logger := logs.GetLogger(ctx)
|
||||
|
||||
// 1. 创建统一的业务处理器 (App层适配器)
|
||||
// 它实现了 infra 层的 transport.UpstreamHandler 接口
|
||||
upstreamHandler := listener.NewLoRaListener(
|
||||
baseCtx,
|
||||
repos.areaControllerRepo,
|
||||
repos.pendingCollectionRepo,
|
||||
repos.deviceRepo,
|
||||
repos.sensorDataRepo,
|
||||
repos.deviceCommandLogRepo,
|
||||
)
|
||||
|
||||
// 2. 根据配置初始化具体的传输层和监听器
|
||||
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
|
||||
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
|
||||
listenHandler = chirp_stack.NewChirpStackListener(logs.AddCompName(baseCtx, "ChirpStackListener"), repos.sensorDataRepo, repos.deviceRepo, repos.areaControllerRepo, repos.deviceCommandLogRepo, repos.pendingCollectionRepo)
|
||||
|
||||
// 2a. 创建 ChirpStack 的 Webhook 监听器 (infra),并注入 App 层的业务处理器
|
||||
listenHandler = chirp_stack.NewChirpStackListener(baseCtx, upstreamHandler)
|
||||
|
||||
// 2b. 创建 ChirpStack 的发送器 (infra)
|
||||
comm = lora.NewChirpStackTransport(logs.AddCompName(baseCtx, "ChirpStackTransport"), cfg.ChirpStack)
|
||||
|
||||
// 2c. LoRaWAN 模式下没有主动监听的 Listener,使用占位符
|
||||
loraListener = lora.NewPlaceholderTransport(logs.AddCompName(baseCtx, "PlaceholderTransport"))
|
||||
|
||||
} else {
|
||||
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
|
||||
|
||||
// 2a. LoRa Mesh 模式下没有 Webhook 监听器,使用占位符
|
||||
listenHandler = chirp_stack.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
|
||||
tp, err := lora.NewLoRaMeshUartPassthroughTransport(logs.AddCompName(baseCtx, "LoRaMeshTransport"), cfg.LoraMesh, repos.areaControllerRepo, repos.pendingCollectionRepo, repos.deviceRepo, repos.sensorDataRepo)
|
||||
|
||||
// 2b. 创建串口的传输工具 (infra),它同时实现了发送和监听,并注入 App 层的业务处理器
|
||||
tp, err := lora.NewLoRaMeshUartPassthroughTransport(baseCtx, cfg.LoraMesh, upstreamHandler)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
|
||||
}
|
||||
|
||||
@@ -4,25 +4,20 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/tarm/serial"
|
||||
gproto "google.golang.org/protobuf/proto"
|
||||
"gorm.io/datatypes"
|
||||
)
|
||||
|
||||
// transportState 定义了传输层的内部状态
|
||||
@@ -43,9 +38,10 @@ type message struct {
|
||||
|
||||
// LoRaMeshUartPassthroughTransport 实现了 transport.Communicator 和 transport.Listener 接口
|
||||
type LoRaMeshUartPassthroughTransport struct {
|
||||
ctx context.Context
|
||||
config config.LoraMeshConfig
|
||||
port *serial.Port
|
||||
selfCtx context.Context
|
||||
config config.LoraMeshConfig
|
||||
port *serial.Port
|
||||
handler transport.UpstreamHandler // 依赖注入的统一业务处理器
|
||||
|
||||
mu sync.Mutex // 用于保护对外的公共方法(如Send)的并发调用
|
||||
state transportState
|
||||
@@ -59,12 +55,6 @@ type LoRaMeshUartPassthroughTransport struct {
|
||||
currentRecvSource uint16 // 当前正在接收的源地址
|
||||
reassemblyTimeout *time.Timer // 分片重组的超时定时器
|
||||
reassemblyTimeoutCh chan uint16 // 当超时触发时,用于传递源地址
|
||||
|
||||
// --- 依赖注入的仓库 ---
|
||||
areaControllerRepo repository.AreaControllerRepository
|
||||
pendingCollectionRepo repository.PendingCollectionRepository
|
||||
deviceRepo repository.DeviceRepository
|
||||
sensorDataRepo repository.SensorDataRepository
|
||||
}
|
||||
|
||||
// sendRequest 封装了一次发送请求
|
||||
@@ -91,10 +81,7 @@ type reassemblyBuffer struct {
|
||||
func NewLoRaMeshUartPassthroughTransport(
|
||||
ctx context.Context,
|
||||
config config.LoraMeshConfig,
|
||||
areaControllerRepo repository.AreaControllerRepository,
|
||||
pendingCollectionRepo repository.PendingCollectionRepository,
|
||||
deviceRepo repository.DeviceRepository,
|
||||
sensorDataRepo repository.SensorDataRepository,
|
||||
handler transport.UpstreamHandler,
|
||||
) (*LoRaMeshUartPassthroughTransport, error) {
|
||||
c := &serial.Config{
|
||||
Name: config.UARTPort,
|
||||
@@ -108,20 +95,15 @@ func NewLoRaMeshUartPassthroughTransport(
|
||||
}
|
||||
|
||||
t := &LoRaMeshUartPassthroughTransport{
|
||||
ctx: ctx,
|
||||
selfCtx: logs.AddCompName(ctx, "LoRaMeshUartPassthroughTransport"),
|
||||
config: config,
|
||||
port: port,
|
||||
handler: handler,
|
||||
state: stateIdle,
|
||||
stopChan: make(chan struct{}),
|
||||
sendChan: make(chan *sendRequest),
|
||||
reassemblyBuffers: make(map[uint16]*reassemblyBuffer),
|
||||
reassemblyTimeoutCh: make(chan uint16, 1),
|
||||
|
||||
// 注入依赖
|
||||
areaControllerRepo: areaControllerRepo,
|
||||
pendingCollectionRepo: pendingCollectionRepo,
|
||||
deviceRepo: deviceRepo,
|
||||
sensorDataRepo: sensorDataRepo,
|
||||
}
|
||||
|
||||
return t, nil
|
||||
@@ -129,10 +111,11 @@ func NewLoRaMeshUartPassthroughTransport(
|
||||
|
||||
// Listen 启动后台监听协程(非阻塞)
|
||||
func (t *LoRaMeshUartPassthroughTransport) Listen(ctx context.Context) error {
|
||||
loraCtx, logger := logs.Trace(ctx, t.ctx, "Listen")
|
||||
// 注意:这里的 loraCtx 是从 selfCtx 派生的,因为它代表了这个组件自身的生命周期
|
||||
loraCtx, logger := logs.Trace(ctx, t.selfCtx, "Listen")
|
||||
t.wg.Add(1)
|
||||
go t.workerLoop(loraCtx)
|
||||
logger.Info("LoRa传输层工作协程已启动")
|
||||
logger.Info("LoRa Mesh 传输层工作协程已启动")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -167,7 +150,7 @@ func (t *LoRaMeshUartPassthroughTransport) Stop(ctx context.Context) error {
|
||||
|
||||
// workerLoop 是核心的状态机和调度器
|
||||
func (t *LoRaMeshUartPassthroughTransport) workerLoop(ctx context.Context) {
|
||||
loraCtx, logger := logs.Trace(ctx, t.ctx, "workerLoop")
|
||||
loraCtx, logger := logs.Trace(ctx, t.selfCtx, "workerLoop")
|
||||
|
||||
defer t.wg.Done()
|
||||
|
||||
@@ -218,7 +201,7 @@ func (t *LoRaMeshUartPassthroughTransport) workerLoop(ctx context.Context) {
|
||||
|
||||
// runIdleState 处理空闲状态下的逻辑,主要是检查并启动发送任务
|
||||
func (t *LoRaMeshUartPassthroughTransport) runIdleState(ctx context.Context) {
|
||||
loraCtx := logs.AddFuncName(ctx, t.ctx, "Listen")
|
||||
loraCtx, _ := logs.Trace(ctx, t.selfCtx, "runIdleState")
|
||||
|
||||
select {
|
||||
case req := <-t.sendChan:
|
||||
@@ -234,10 +217,10 @@ func (t *LoRaMeshUartPassthroughTransport) runIdleState(ctx context.Context) {
|
||||
|
||||
// runReceivingState 处理接收状态下的逻辑,主要是检查超时
|
||||
func (t *LoRaMeshUartPassthroughTransport) runReceivingState(ctx context.Context) {
|
||||
logger := logs.TraceLogger(ctx, t.ctx, "runReceivingState")
|
||||
_, logger := logs.Trace(ctx, t.selfCtx, "runReceivingState")
|
||||
select {
|
||||
case sourceAddr := <-t.reassemblyTimeoutCh:
|
||||
logger.Warnf("接收来自 0x%04X 的消息超时", sourceAddr)
|
||||
logger.Warnw("接收消息超时", "sourceAddr", fmt.Sprintf("0x%04X", sourceAddr))
|
||||
delete(t.reassemblyBuffers, sourceAddr)
|
||||
t.state = stateIdle
|
||||
default:
|
||||
@@ -247,7 +230,7 @@ func (t *LoRaMeshUartPassthroughTransport) runReceivingState(ctx context.Context
|
||||
|
||||
// executeSend 执行完整的发送流程(分片、构建、写入)
|
||||
func (t *LoRaMeshUartPassthroughTransport) executeSend(ctx context.Context, req *sendRequest) (*transport.SendResult, error) {
|
||||
logger := logs.TraceLogger(ctx, t.ctx, "executeSend")
|
||||
_, logger := logs.Trace(ctx, t.selfCtx, "executeSend")
|
||||
chunks := splitPayload(req.payload, t.config.MaxChunkSize)
|
||||
totalChunks := uint8(len(chunks))
|
||||
|
||||
@@ -266,7 +249,7 @@ func (t *LoRaMeshUartPassthroughTransport) executeSend(ctx context.Context, req
|
||||
frame.WriteByte(currentChunk) // 当前包序号
|
||||
frame.Write(chunk) // 数据块
|
||||
|
||||
logger.Debugf("构建LoRa数据包: %v", frame.Bytes())
|
||||
logger.Debugw("构建LoRa数据包", "bytes", frame.Bytes())
|
||||
_, err := t.port.Write(frame.Bytes())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("写入串口失败: %w", err)
|
||||
@@ -282,9 +265,9 @@ func (t *LoRaMeshUartPassthroughTransport) executeSend(ctx context.Context, req
|
||||
|
||||
// handleFrame 处理一个从串口解析出的完整物理帧
|
||||
func (t *LoRaMeshUartPassthroughTransport) handleFrame(ctx context.Context, frame []byte) {
|
||||
loraCtx, logger := logs.Trace(ctx, t.ctx, "handleFrame")
|
||||
reqCtx, logger := logs.Trace(ctx, t.selfCtx, "handleFrame")
|
||||
if len(frame) < 8 {
|
||||
logger.Warnf("收到了一个无效长度的帧: %d", len(frame))
|
||||
logger.Warnw("收到了一个无效长度的帧", "length", len(frame))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -301,7 +284,9 @@ func (t *LoRaMeshUartPassthroughTransport) handleFrame(ctx context.Context, fram
|
||||
DestAddr: fmt.Sprintf("%04X", destAddr),
|
||||
Payload: chunkData,
|
||||
}
|
||||
go t.handleUpstreamMessage(loraCtx, msg)
|
||||
// 使用分离的上下文进行异步处理
|
||||
detachedCtx := logs.DetachContext(reqCtx)
|
||||
go t.handleUpstreamMessage(detachedCtx, msg)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -326,18 +311,21 @@ func (t *LoRaMeshUartPassthroughTransport) handleFrame(ctx context.Context, fram
|
||||
t.reassemblyTimeoutCh <- sourceAddr
|
||||
})
|
||||
} else {
|
||||
logger.Warnf("在空闲状态下收到了一个来自 0x%04X 的非首包分片,已忽略。", sourceAddr)
|
||||
logger.Warnw("在空闲状态下收到了一个非首包分片,已忽略", "sourceAddr", fmt.Sprintf("0x%04X", sourceAddr))
|
||||
}
|
||||
|
||||
case stateReceiving:
|
||||
if sourceAddr != t.currentRecvSource {
|
||||
logger.Warnf("正在接收来自 0x%04X 的数据时,收到了另一个源 0x%04X 的分片,已忽略。", t.currentRecvSource, sourceAddr)
|
||||
logger.Warnw("正在接收数据时,收到了另一个源的分片,已忽略",
|
||||
"currentSource", fmt.Sprintf("0x%04X", t.currentRecvSource),
|
||||
"newSource", fmt.Sprintf("0x%04X", sourceAddr),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
buffer, ok := t.reassemblyBuffers[sourceAddr]
|
||||
if !ok {
|
||||
logger.Errorf("内部错误: 处于接收状态,但没有为 0x%04X 找到缓冲区", sourceAddr)
|
||||
logger.Errorw("内部错误: 处于接收状态,但没有找到缓冲区", "sourceAddr", fmt.Sprintf("0x%04X", sourceAddr))
|
||||
t.state = stateIdle // 重置状态
|
||||
return
|
||||
}
|
||||
@@ -362,165 +350,43 @@ func (t *LoRaMeshUartPassthroughTransport) handleFrame(ctx context.Context, fram
|
||||
DestAddr: fmt.Sprintf("%04X", destAddr),
|
||||
Payload: fullPayload.Bytes(),
|
||||
}
|
||||
go t.handleUpstreamMessage(loraCtx, msg)
|
||||
// 使用分离的上下文进行异步处理
|
||||
detachedCtx := logs.DetachContext(reqCtx)
|
||||
go t.handleUpstreamMessage(detachedCtx, msg)
|
||||
|
||||
// 清理并返回空闲状态
|
||||
delete(t.reassemblyBuffers, sourceAddr)
|
||||
t.state = stateIdle
|
||||
}
|
||||
default:
|
||||
logger.Errorf("内部错误: 状态机处于未知状态 %d", t.state)
|
||||
logger.Errorw("内部错误: 状态机处于未知状态", "state", t.state)
|
||||
}
|
||||
}
|
||||
|
||||
// handleUpstreamMessage 在独立的协程中处理单个上行的、完整的消息。
|
||||
// 【已重构】此方法现在只负责解析和委托,不包含任何业务逻辑。
|
||||
func (t *LoRaMeshUartPassthroughTransport) handleUpstreamMessage(ctx context.Context, msg *message) {
|
||||
loraCtx, logger := logs.Trace(ctx, t.ctx, "handleUpstreamMessage")
|
||||
|
||||
logger.Infof("开始处理来自 %s 的上行消息", msg.SourceAddr)
|
||||
reqCtx, logger := logs.Trace(ctx, t.selfCtx, "handleUpstreamMessage")
|
||||
logger.Infow("开始适配上行消息并委托", "sourceAddr", msg.SourceAddr)
|
||||
|
||||
// 1. 解析外层 "信封"
|
||||
var instruction proto.Instruction
|
||||
if err := gproto.Unmarshal(msg.Payload, &instruction); err != nil {
|
||||
logger.Errorf("解析上行 Instruction Protobuf 失败: %v, 源地址: %s, 原始数据: %x", err, msg.SourceAddr, msg.Payload)
|
||||
logger.Errorw("解析上行 Instruction Protobuf 失败",
|
||||
"sourceAddr", msg.SourceAddr,
|
||||
"error", err,
|
||||
"rawData", fmt.Sprintf("%x", msg.Payload),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. 使用 type switch 从 oneof payload 中提取 CollectResult
|
||||
var collectResp *proto.CollectResult
|
||||
switch p := instruction.GetPayload().(type) {
|
||||
case *proto.Instruction_CollectResult:
|
||||
collectResp = p.CollectResult
|
||||
default:
|
||||
// 如果上行的数据不是采集结果,记录日志并忽略
|
||||
logger.Infof("收到一个非采集响应的上行指令 (类型: %T),无需处理。源地址: %s", p, msg.SourceAddr)
|
||||
return
|
||||
}
|
||||
|
||||
if collectResp == nil {
|
||||
logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil。源地址: %s", msg.SourceAddr)
|
||||
return
|
||||
}
|
||||
|
||||
correlationID := collectResp.CorrelationId
|
||||
logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
|
||||
|
||||
// 3. 查找区域主控 (注意:LoRa Mesh 的 SourceAddr 对应于区域主控的 NetworkID)
|
||||
areaController, err := t.areaControllerRepo.FindByNetworkID(loraCtx, msg.SourceAddr)
|
||||
if err != nil {
|
||||
logger.Errorf("处理上行消息失败:无法通过源地址 '%s' 找到区域主控设备: %v", msg.SourceAddr, err)
|
||||
return
|
||||
}
|
||||
if err := areaController.SelfCheck(); err != nil {
|
||||
logger.Errorf("处理上行消息失败:区域主控 %v(ID: %d) 未通过自检: %v", areaController.Name, areaController.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 4. 根据 CorrelationID 查找待处理请求
|
||||
pendingReq, err := t.pendingCollectionRepo.FindByCorrelationID(loraCtx, correlationID)
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查状态,防止重复处理
|
||||
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
|
||||
logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
|
||||
return
|
||||
}
|
||||
|
||||
// 5. 匹配数据并存入数据库
|
||||
deviceIDs := pendingReq.CommandMetadata
|
||||
values := collectResp.Values
|
||||
if len(deviceIDs) != len(values) {
|
||||
logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
|
||||
err = t.pendingCollectionRepo.UpdateStatusToFulfilled(loraCtx, correlationID, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i, deviceID := range deviceIDs {
|
||||
rawSensorValue := values[i]
|
||||
|
||||
if math.IsNaN(float64(rawSensorValue)) {
|
||||
logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
|
||||
continue
|
||||
}
|
||||
|
||||
dev, err := t.deviceRepo.FindByID(loraCtx, deviceID)
|
||||
if err != nil {
|
||||
logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
|
||||
continue
|
||||
}
|
||||
if err := dev.SelfCheck(); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其未通过自检: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板未通过自检: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var valueDescriptors []*models.ValueDescriptor
|
||||
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
|
||||
continue
|
||||
}
|
||||
if len(valueDescriptors) == 0 {
|
||||
logger.Warnf("跳过设备 %d,因其设备模板缺少 ValueDescriptor 定义", dev.ID)
|
||||
continue
|
||||
}
|
||||
valueDescriptor := valueDescriptors[0]
|
||||
|
||||
parsedValue := rawSensorValue*valueDescriptor.Multiplier + valueDescriptor.Offset
|
||||
|
||||
var dataToRecord interface{}
|
||||
switch valueDescriptor.Type {
|
||||
case models.SensorTypeTemperature:
|
||||
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
|
||||
case models.SensorTypeHumidity:
|
||||
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
|
||||
case models.SensorTypeWeight:
|
||||
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
|
||||
default:
|
||||
logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
|
||||
dataToRecord = map[string]float32{"value": parsedValue}
|
||||
}
|
||||
|
||||
t.recordSensorData(loraCtx, areaController.ID, dev.ID, time.Now(), valueDescriptor.Type, dataToRecord)
|
||||
logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
|
||||
}
|
||||
|
||||
// 6. 更新请求状态为“已完成”
|
||||
if err := t.pendingCollectionRepo.UpdateStatusToFulfilled(loraCtx, correlationID, time.Now()); err != nil {
|
||||
logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
|
||||
} else {
|
||||
logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
|
||||
}
|
||||
}
|
||||
|
||||
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
|
||||
func (t *LoRaMeshUartPassthroughTransport) recordSensorData(ctx context.Context, areaControllerID uint32, sensorDeviceID uint32, eventTime time.Time, sensorType models.SensorType, data interface{}) {
|
||||
loraCtx, logger := logs.Trace(ctx, t.ctx, "recordSensorData")
|
||||
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
sensorData := &models.SensorData{
|
||||
Time: eventTime,
|
||||
DeviceID: sensorDeviceID,
|
||||
AreaControllerID: areaControllerID,
|
||||
SensorType: sensorType,
|
||||
Data: datatypes.JSON(jsonData),
|
||||
}
|
||||
|
||||
if err := t.sensorDataRepo.Create(loraCtx, sensorData); err != nil {
|
||||
logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
|
||||
// 2. 委托给统一处理器
|
||||
// 注意:对于 LoRa Mesh,目前只处理业务指令,没有单独的状态或ACK事件。
|
||||
if err := t.handler.HandleInstruction(reqCtx, msg.SourceAddr, &instruction); err != nil {
|
||||
logger.Errorw("委托上行指令给统一处理器失败",
|
||||
"sourceAddr", msg.SourceAddr,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package transport
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
|
||||
)
|
||||
|
||||
// Communicator 用于其他设备通信
|
||||
@@ -35,3 +37,17 @@ type Listener interface {
|
||||
// Stop 用于停止监听
|
||||
Stop(ctx context.Context) error
|
||||
}
|
||||
|
||||
// UpstreamHandler 定义了处理所有来源的上行数据的统一协约。
|
||||
// 任何实现了上行消息监听的基础设施(如串口、MQTT客户端),都应该在收到消息后调用此接口的实现者。
|
||||
// 这样,基础设施层只负责“接收和解析”,而将“业务处理”的控制权交给了上层。
|
||||
type UpstreamHandler interface {
|
||||
// HandleInstruction 处理来自设备的、已解析为Instruction的业务指令。
|
||||
HandleInstruction(ctx context.Context, sourceAddr string, instruction *proto.Instruction) error
|
||||
|
||||
// HandleStatus 处理非业务指令的设备状态更新,例如信号强度、电量等。
|
||||
HandleStatus(ctx context.Context, sourceAddr string, status map[string]interface{}) error
|
||||
|
||||
// HandleAck 处理对下行指令的确认(ACK)事件。
|
||||
HandleAck(ctx context.Context, sourceAddr string, deduplicationID string, acknowledged bool, eventTime time.Time) error
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user