重构webhook包

This commit is contained in:
2025-11-29 17:17:36 +08:00
parent 0eb7c6f371
commit 80100658a2
9 changed files with 22 additions and 18 deletions

View File

@@ -0,0 +1,456 @@
package chirp_stack
import (
"context"
"encoding/base64"
"encoding/json"
"io"
"math"
"net/http"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/app/listener"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
gproto "google.golang.org/protobuf/proto"
"gorm.io/datatypes"
)
// ChirpStackListener 主动发送的请求的event字段, 这个字段代表事件类型
const (
eventTypeUp = "up" // 上行数据事件:当接收到设备发送的数据时触发,这是最核心的事件。
eventTypeStatus = "status" // 设备状态事件:当设备报告其状态时触发(例如电池电量、信号强度)。
eventTypeJoin = "join" // 入网事件:当设备成功加入网络时触发。
eventTypeAck = "ack" // 下行确认事件:当设备确认收到下行消息时触发。
eventTypeTxAck = "txack" // 网关发送确认事件:当网关确认已发送下行消息时触发(不代表设备已收到)。
eventTypeLog = "log" // 日志事件:当设备或 ChirpStack 产生日志信息时触发。
eventTypeLocation = "location" // 位置事件:当设备的位置被解析或更新时触发。
eventTypeIntegration = "integration" // 集成事件:当其他集成(如第三方服务)处理数据后触发。
)
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
type ChirpStackListener struct {
ctx context.Context
sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository
areaControllerRepo repository.AreaControllerRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
pendingCollectionRepo repository.PendingCollectionRepository
}
// NewChirpStackListener 创建一个新的 ChirpStackListener 实例
func NewChirpStackListener(
ctx context.Context,
sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
pendingCollectionRepo repository.PendingCollectionRepository,
) listener.ListenHandler {
return &ChirpStackListener{
ctx: ctx,
sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo,
areaControllerRepo: areaControllerRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
pendingCollectionRepo: pendingCollectionRepo,
}
}
// Handler 监听ChirpStack反馈的事件, 因为这是个Webhook, 所以直接回复掉再慢慢处理信息
func (c *ChirpStackListener) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, logger := logs.Trace(r.Context(), c.ctx, "ChirpStackListener")
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
logger.Errorf("读取请求体失败: %v", err)
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
event := r.URL.Query().Get("event")
w.WriteHeader(http.StatusOK)
// 将异步处理逻辑委托给 handler 方法
go c.handler(ctx, b, event)
}
}
// handler 用于处理 ChirpStack 发送的事件
func (c *ChirpStackListener) handler(ctx context.Context, data []byte, eventType string) {
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handler")
switch eventType {
case eventTypeUp:
var msg UpEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'up' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleUpEvent(reqCtx, &msg)
case eventTypeJoin:
var msg JoinEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'join' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleJoinEvent(reqCtx, &msg)
case eventTypeAck:
var msg AckEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'ack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleAckEvent(reqCtx, &msg)
case eventTypeTxAck:
var msg TxAckEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'txack' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleTxAckEvent(reqCtx, &msg)
case eventTypeStatus:
var msg StatusEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'status' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleStatusEvent(reqCtx, &msg)
case eventTypeLog:
var msg LogEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'log' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLogEvent(reqCtx, &msg)
case eventTypeLocation:
var msg LocationEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'location' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleLocationEvent(reqCtx, &msg)
case eventTypeIntegration:
var msg IntegrationEvent
if err := json.Unmarshal(data, &msg); err != nil {
logger.Errorf("解析 'integration' 事件失败: %v, data: %s", err, string(data))
return
}
c.handleIntegrationEvent(reqCtx, &msg)
default:
logger.Errorf("未知的ChirpStack事件: %s, data: %s", eventType, string(data))
}
}
// --- 业务处理函数 ---
// handleUpEvent 处理上行数据事件
func (c *ChirpStackListener) handleUpEvent(ctx context.Context, event *UpEvent) {
reqCtx, logger := logs.Trace(ctx, c.ctx, "ChirpStackListener.handleUpEvent")
logger.Infof("开始处理 'up' 事件, DevEui: %s", event.DeviceInfo.DevEui)
// 1. 查找区域主控设备
areaController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
if err != nil {
logger.Errorf("处理 'up' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 依赖 SelfCheck 确保区域主控有效
if err := areaController.SelfCheck(); err != nil {
logger.Errorf("处理 'up' 事件失败:区域主控 %v(ID: %d) 未通过自检: %v", areaController.Name, areaController.ID, err)
return
}
logger.Infof("找到区域主控: %s (ID: %d)", areaController.Name, areaController.ID)
// 2. 记录区域主控的信号强度 (如果存在)
if len(event.RxInfo) > 0 {
// 根据业务逻辑,一个猪场只有一个网关,所以 RxInfo 中通常只有一个元素,或者 gateway_id 都是相同的。
// 因此,我们只取第一个 RxInfo 中的信号数据即可。
rx := event.RxInfo[0] // 取第一个接收到的网关信息
// 构建 SignalMetrics 结构体
signalMetrics := models.SignalMetrics{
RssiDbm: rx.Rssi,
SnrDb: rx.Snr,
}
// 记录信号强度
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
logger.Infof("已记录区域主控 (ID: %d) 的信号强度: RSSI=%d, SNR=%.2f", areaController.ID, rx.Rssi, rx.Snr)
} else {
logger.Warnf("处理 'up' 事件时未找到 RxInfo无法记录信号数据。DevEui: %s", event.DeviceInfo.DevEui)
}
// 3. 处理上报的传感器数据
if event.Data == "" {
logger.Warnf("处理 'up' 事件时 Data 字段为空无需记录上行数据。DevEui: %s", event.DeviceInfo.DevEui)
return
}
// 3.1 Base64 解码
decodedData, err := base64.StdEncoding.DecodeString(event.Data)
if err != nil {
logger.Errorf("Base64 解码 'up' 事件的 Data 失败: %v, Data: %s", err, event.Data)
return
}
// 3.2 解析外层 "信封"
var instruction proto.Instruction
if err := gproto.Unmarshal(decodedData, &instruction); err != nil {
logger.Errorf("解析上行 Instruction Protobuf 失败: %v, Decoded Data: %x", err, decodedData)
return
}
// 3.3 使用 type switch 从 oneof payload 中提取 CollectResult
var collectResp *proto.CollectResult
switch p := instruction.GetPayload().(type) {
case *proto.Instruction_CollectResult:
collectResp = p.CollectResult
default:
// 如果上行的数据不是采集结果,记录日志并忽略
logger.Infof("收到一个非采集响应的上行指令 (Type: %T),无需处理。", p)
return
}
// 检查 collectResp 是否为 nil虽然在 type switch 成功的情况下不太可能
if collectResp == nil {
logger.Errorf("从 Instruction 中提取的 CollectResult 为 nil")
return
}
correlationID := collectResp.CorrelationId
logger.Infof("成功解析采集响应 (CorrelationID: %s),包含 %d 个值。", correlationID, len(collectResp.Values))
// 4. 根据 CorrelationID 查找待处理请求
pendingReq, err := c.pendingCollectionRepo.FindByCorrelationID(reqCtx, correlationID)
if err != nil {
logger.Errorf("处理采集响应失败:无法找到待处理请求 (CorrelationID: %s): %v", correlationID, err)
return
}
// 检查状态,防止重复处理
if pendingReq.Status != models.PendingStatusPending && pendingReq.Status != models.PendingStatusTimedOut {
logger.Warnf("收到一个已处理过的采集响应 (CorrelationID: %s, Status: %s),将忽略。", correlationID, pendingReq.Status)
return
}
// 5. 匹配数据并存入数据库
deviceIDs := pendingReq.CommandMetadata
values := collectResp.Values
if len(deviceIDs) != len(values) {
logger.Errorf("数据不匹配:下行指令要求采集 %d 个设备,但上行响应包含 %d 个值 (CorrelationID: %s)", len(deviceIDs), len(values), correlationID)
// 即使数量不匹配,也更新状态为完成,以防止请求永远 pending
err = c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time)
if err != nil {
logger.Errorf("处理采集响应失败:无法更新待处理请求 (CorrelationID: %s) 的状态为完成: %v", correlationID, err)
}
return
}
for i, deviceID := range deviceIDs {
rawSensorValue := values[i] // 这是设备上报的原始值
// 检查设备上报的值是否为 NaN (Not a Number),如果是则跳过
if math.IsNaN(float64(rawSensorValue)) {
logger.Warnf("设备 (ID: %d) 上报了一个无效的 NaN 值,已跳过当前值的记录。", deviceID)
continue
}
// 5.1 获取设备及其模板
dev, err := c.deviceRepo.FindByID(reqCtx, deviceID)
if err != nil {
logger.Errorf("处理采集数据失败:无法找到设备 (ID: %d): %v", deviceID, err)
continue
}
// 依赖 SelfCheck 确保设备和模板有效
if err := dev.SelfCheck(); err != nil {
logger.Warnf("跳过设备 %d因其未通过自检: %v", dev.ID, err)
continue
}
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
logger.Warnf("跳过设备 %d因其设备模板未通过自检: %v", dev.ID, err)
continue
}
// 5.2 从设备模板中解析 ValueDescriptor
var valueDescriptors []*models.ValueDescriptor
if err := dev.DeviceTemplate.ParseValues(&valueDescriptors); err != nil {
logger.Warnf("跳过设备 %d因其设备模板的 Values 属性解析失败: %v", dev.ID, err)
continue
}
// 根据 DeviceTemplate.SelfCheck这里应该只有一个 ValueDescriptor
if len(valueDescriptors) == 0 {
logger.Warnf("跳过设备 %d因其设备模板缺少 ValueDescriptor 定义", dev.ID)
continue
}
valueDescriptor := valueDescriptors[0]
// 5.3 应用乘数和偏移量计算最终值
parsedValue := rawSensorValue*valueDescriptor.Multiplier + valueDescriptor.Offset
// 5.4 根据传感器类型构建具体的数据结构
var dataToRecord interface{}
switch valueDescriptor.Type {
case models.SensorTypeTemperature:
dataToRecord = models.TemperatureData{TemperatureCelsius: parsedValue}
case models.SensorTypeHumidity:
dataToRecord = models.HumidityData{HumidityPercent: parsedValue}
case models.SensorTypeWeight:
dataToRecord = models.WeightData{WeightKilograms: parsedValue}
default:
// TODO 未知传感器的数据需要记录吗
logger.Warnf("未知的传感器类型 '%s',将使用通用格式记录", valueDescriptor.Type)
dataToRecord = map[string]float32{"value": parsedValue}
}
// 5.5 记录传感器数据
c.recordSensorData(reqCtx, areaController.ID, dev.ID, event.Time, valueDescriptor.Type, dataToRecord)
logger.Infof("成功记录传感器数据: 设备ID=%d, 类型=%s, 原始值=%f, 解析值=%.2f", dev.ID, valueDescriptor.Type, rawSensorValue, parsedValue)
}
// 6. 更新请求状态为“已完成”
if err := c.pendingCollectionRepo.UpdateStatusToFulfilled(reqCtx, correlationID, event.Time); err != nil {
logger.Errorf("更新待采集请求状态为 'fulfilled' 失败 (CorrelationID: %s): %v", correlationID, err)
} else {
logger.Infof("成功完成并关闭采集请求 (CorrelationID: %s)", correlationID)
}
}
// handleStatusEvent 处理设备状态事件
func (c *ChirpStackListener) handleStatusEvent(ctx context.Context, event *StatusEvent) {
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleStatusEvent")
logger.Infof("处接收到理 'status' 事件: %+v", event)
// 查找区域主控设备
areaController, err := c.areaControllerRepo.FindByNetworkID(reqCtx, event.DeviceInfo.DevEui)
if err != nil {
logger.Errorf("处理 'status' 事件失败:无法通过 DevEui '%s' 找到区域主控设备: %v", event.DeviceInfo.DevEui, err)
return
}
// 记录信号强度
signalMetrics := models.SignalMetrics{
MarginDb: event.Margin,
}
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeSignalMetrics, signalMetrics)
logger.Infof("已记录区域主控 (ID: %d) 的信号状态: %+v", areaController.ID, signalMetrics)
// 记录电量
batteryLevel := models.BatteryLevel{
BatteryLevelRatio: event.BatteryLevel,
BatteryLevelUnavailable: event.BatteryLevelUnavailable,
ExternalPower: event.ExternalPower,
}
c.recordSensorData(reqCtx, areaController.ID, areaController.ID, event.Time, models.SensorTypeBatteryLevel, batteryLevel)
logger.Infof("已记录区域主控 (ID: %d) 的电池状态: %+v", areaController.ID, batteryLevel)
}
// handleAckEvent 处理下行确认事件
func (c *ChirpStackListener) handleAckEvent(ctx context.Context, event *AckEvent) {
reqCtx, logger := logs.Trace(ctx, c.ctx, "handleAckEvent")
logger.Infof("接收到 'ack' 事件: %+v", event)
// 更新下行任务记录的确认时间及接收成功状态
err := c.deviceCommandLogRepo.UpdateAcknowledgedAt(reqCtx, event.DeduplicationID, event.Time, event.Acknowledged)
if err != nil {
logger.Errorf("更新下行任务记录的确认时间及接收成功状态失败 (MessageID: %s, DevEui: %s, Acknowledged: %t): %v",
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, err)
return
}
logger.Infof("成功更新下行任务记录确认时间及接收成功状态 (MessageID: %s, DevEui: %s, Acknowledged: %t, AcknowledgedAt: %s)",
event.DeduplicationID, event.DeviceInfo.DevEui, event.Acknowledged, event.Time.Format(time.RFC3339))
}
// handleLogEvent 处理日志事件
func (c *ChirpStackListener) handleLogEvent(ctx context.Context, event *LogEvent) {
logger := logs.TraceLogger(ctx, c.ctx, "handleLogEvent")
// 首先,打印完整的事件结构体,用于详细排查
logger.Infof("接收到 'log' 事件的完整内容: %+v", event)
// 接着,根据 ChirpStack 日志的级别,使用我们自己的 logger 对应级别来打印核心信息
logMessage := "ChirpStack 日志: [%s] %s (DevEui: %s)"
switch event.Level {
case "INFO":
logger.Infof(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "WARNING":
logger.Warnf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
case "ERROR":
logger.Errorf(logMessage, event.Code, event.Description, event.DeviceInfo.DevEui)
default:
// 对于未知级别,使用 Warn 级别打印,并明确指出级别未知
logger.Warnf("ChirpStack 日志: [未知级别: %s] %s %s (DevEui: %s)",
event.Level, event.Code, event.Description, event.DeviceInfo.DevEui)
}
}
// handleJoinEvent 处理入网事件
func (c *ChirpStackListener) handleJoinEvent(ctx context.Context, event *JoinEvent) {
logger := logs.TraceLogger(ctx, c.ctx, "handleJoinEvent")
logger.Infof("接收到 'join' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleTxAckEvent 处理网关发送确认事件
func (c *ChirpStackListener) handleTxAckEvent(ctx context.Context, event *TxAckEvent) {
logger := logs.TraceLogger(ctx, c.ctx, "handleTxAckEvent")
logger.Infof("接收到 'txack' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleLocationEvent 处理位置事件
func (c *ChirpStackListener) handleLocationEvent(ctx context.Context, event *LocationEvent) {
logger := logs.TraceLogger(ctx, c.ctx, "handleLocationEvent")
logger.Infof("接收到 'location' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// handleIntegrationEvent 处理集成事件
func (c *ChirpStackListener) handleIntegrationEvent(ctx context.Context, event *IntegrationEvent) {
logger := logs.TraceLogger(ctx, c.ctx, "handleIntegrationEvent")
logger.Infof("接收到 'integration' 事件: %+v", event)
// 在这里添加您的业务逻辑
}
// recordSensorData 是一个通用方法,用于将传感器数据存入数据库。
// areaControllerID: 区域主控设备的ID
// sensorDeviceID: 实际产生传感器数据的普通设备的ID
// sensorType: 传感器值的类型 (例如 models.SensorTypeTemperature)
// data: 具体的传感器数据结构体实例 (例如 models.TemperatureData)
func (c *ChirpStackListener) recordSensorData(ctx context.Context, areaControllerID uint32, sensorDeviceID uint32, eventTime time.Time, sensorType models.SensorType, data interface{}) {
reqCtx, logger := logs.Trace(ctx, c.ctx, "recordSensorData")
// 1. 将传入的结构体序列化为 JSON
jsonData, err := json.Marshal(data)
if err != nil {
logger.Errorf("记录传感器数据失败:序列化数据为 JSON 时出错: %v", err)
return
}
// 2. 构建 SensorData 模型
sensorData := &models.SensorData{
Time: eventTime,
DeviceID: sensorDeviceID,
AreaControllerID: areaControllerID,
SensorType: sensorType,
Data: datatypes.JSON(jsonData),
}
// 3. 调用仓库创建记录
if err := c.sensorDataRepo.Create(reqCtx, sensorData); err != nil {
logger.Errorf("记录传感器数据失败:存入数据库时出错: %v", err)
}
}

View File

@@ -0,0 +1,198 @@
package chirp_stack
import (
"encoding/json"
"time"
)
// --- 通用结构体 ---
// DeviceInfo 包含了所有事件中通用的设备信息。
// 基于 aiserver.proto v4 (integration)
type DeviceInfo struct {
TenantID string `json:"tenant_id"` // 租户ID
TenantName string `json:"tenant_name"` // 租户名称
ApplicationID string `json:"application_id"` // 应用ID
ApplicationName string `json:"application_name"` // 应用名称
DeviceProfileID string `json:"device_profile_id"` // 设备配置文件ID
DeviceProfileName string `json:"device_profile_name"` // 设备配置文件名称
DeviceName string `json:"device_name"` // 设备名称
DevEui string `json:"dev_eui"` // 设备EUI (十六进制编码)
DeviceClassEnabled string `json:"device_class_enabled,omitempty"` // 设备启用的LoRaWAN类别 (A, B, 或 C)
Tags map[string]string `json:"tags"` // 用户定义的标签
}
// Location 包含了地理位置信息。
type Location struct {
Latitude float32 `json:"latitude"` // 纬度
Longitude float32 `json:"longitude"` // 经度
Altitude float32 `json:"altitude"` // 海拔
}
// --- 可复用的子结构体 ---
// UplinkRelayRxInfo 包含了上行中继接收信息。
type UplinkRelayRxInfo struct {
DevEui string `json:"dev_eui"` // 中继设备的DevEUI
Frequency uint32 `json:"frequency"` // 接收频率
Dr uint32 `json:"dr"` // 数据速率
Snr int32 `json:"snr"` // 信噪比
Rssi int32 `json:"rssi"` // 接收信号强度指示
WorChannel uint32 `json:"wor_channel"` // Work-on-Relay 通道
}
// KeyEnvelope 包装了一个加密的密钥。
// 基于 common.proto
type KeyEnvelope struct {
KEKLabel string `json:"kek_label,omitempty"` // 密钥加密密钥 (KEK) 标签
AESKey string `json:"aes_key,omitempty"` // Base64 编码的加密密钥
}
// JoinServerContext 包含了 Join-Server 上下文。
// 基于 common.proto
type JoinServerContext struct {
SessionKeyID string `json:"session_key_id"` // 会话密钥ID
AppSKey *KeyEnvelope `json:"app_s_key,omitempty"` // 应用会话密钥
}
// UplinkRxInfo 包含了上行接收信息。
type UplinkRxInfo struct {
GatewayID string `json:"gateway_id"` // 接收到上行数据的网关ID
UplinkID uint32 `json:"uplink_id"` // 上行ID
Time time.Time `json:"time"` // 接收时间
Rssi int `json:"rssi"` // 接收信号强度指示
Snr float32 `json:"snr"` // 信噪比
Channel int `json:"channel"` // 接收通道
Location *Location `json:"location"` // 网关位置
Context string `json:"context"` // 上下文信息
Metadata map[string]string `json:"metadata"` // 元数据
}
// LoraModulationInfo 包含了 LoRa 调制的具体参数。
type LoraModulationInfo struct {
Bandwidth int `json:"bandwidth"` // 带宽
SpreadingFactor int `json:"spreading_factor"` // 扩频因子
CodeRate string `json:"code_rate"` // 编码率
Polarization bool `json:"polarization_invert,omitempty"` // 极化反转
}
// Modulation 包含了具体的调制信息。
type Modulation struct {
Lora LoraModulationInfo `json:"lora"` // LoRa 调制信息
}
// UplinkTxInfo 包含了上行发送信息。
type UplinkTxInfo struct {
Frequency int `json:"frequency"` // 发送频率
Modulation Modulation `json:"modulation"` // 调制信息
}
// DownlinkTxInfo 包含了下行发送信息。
type DownlinkTxInfo struct {
Frequency int `json:"frequency"` // 发送频率
Power int `json:"power"` // 发送功率
Modulation Modulation `json:"modulation"` // 调制信息
}
// ResolvedLocation 包含了地理位置解析结果。
type ResolvedLocation struct {
Latitude float32 `json:"latitude"` // 纬度
Longitude float32 `json:"longitude"` // 经度
Altitude float32 `json:"altitude"` // 海拔
Source string `json:"source"` // 位置来源
Accuracy int `json:"accuracy"` // 精度
}
// --- 事件专属结构体 ---
// UpEvent 对应 ChirpStack 的 "up" 事件。
type UpEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
DevAddr string `json:"dev_addr"` // 设备地址
ADR bool `json:"adr"` // 自适应数据速率 (ADR) 是否启用
DR int `json:"dr"` // 数据速率
FCnt uint32 `json:"f_cnt"` // 帧计数器
FPort uint8 `json:"f_port"` // 端口
Confirmed bool `json:"confirmed"` // 是否是确认帧
Data string `json:"data"` // Base64 编码的原始负载数据
Object json.RawMessage `json:"object"` // 解码后的JSON对象负载
RxInfo []UplinkRxInfo `json:"rx_info"` // 接收信息列表
TxInfo UplinkTxInfo `json:"tx_info"` // 发送信息
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
}
// JoinEvent 对应 ChirpStack 的 "join" 事件。
type JoinEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
DevAddr string `json:"dev_addr"` // 设备地址
RelayRxInfo *UplinkRelayRxInfo `json:"relay_rx_info,omitempty"` // 中继接收信息
JoinServerContext *JoinServerContext `json:"join_server_context,omitempty"` // Join-Server 上下文
RegionConfigID string `json:"region_config_id,omitempty"` // 区域配置ID
}
// AckEvent 对应 ChirpStack 的 "ack" 事件。
type AckEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Acknowledged bool `json:"acknowledged"` // 是否已确认
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
QueueItemID string `json:"queue_item_id"` // 队列项ID
}
// TxAckEvent 对应 ChirpStack 的 "txack" 事件。
type TxAckEvent struct {
DownlinkID uint32 `json:"downlink_id"` // 下行ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
FCntDown uint32 `json:"f_cnt_down"` // 下行帧计数器
GatewayID string `json:"gateway_id"` // 网关ID
QueueItemID string `json:"queue_item_id"` // 队列项ID
TxInfo DownlinkTxInfo `json:"tx_info"` // 下行发送信息
}
// StatusEvent 对应 ChirpStack 的 "status" 事件。
type StatusEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Margin int `json:"margin"` // 链路预算余量 (dB)
ExternalPower bool `json:"external_power_source"` // 设备是否连接外部电源
BatteryLevel float32 `json:"battery_level"` // 电池剩余电量
BatteryLevelUnavailable bool `json:"battery_level_unavailable"` // 电池电量是否不可用
}
// LogEvent 对应 ChirpStack 的 "log" 事件。
type LogEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Level string `json:"level"` // 日志级别 (e.g., INFO, WARNING, ERROR)
Code string `json:"code"` // 日志代码
Description string `json:"description"` // 日志描述
Context map[string]string `json:"context"` // 上下文信息
}
// LocationEvent 对应 ChirpStack 的 "location" 事件。
type LocationEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
Location ResolvedLocation `json:"location"` // 解析后的位置信息
}
// IntegrationEvent 对应 ChirpStack 的 "integration" 事件。
type IntegrationEvent struct {
DeduplicationID string `json:"deduplication_id"` // 去重ID
Time time.Time `json:"time"` // 事件时间
DeviceInfo DeviceInfo `json:"device_info"` // 设备信息
IntegrationName string `json:"integration_name"` // 集成名称
EventType string `json:"event_type,omitempty"` // 事件类型
Object json.RawMessage `json:"object"` // 集成事件的原始JSON负载
}

View File

@@ -0,0 +1,32 @@
package chirp_stack
import (
"context"
"net/http"
"git.huangwc.com/pig/pig-farm-controller/internal/app/listener"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
)
// PlaceholderListener 是一个占位符, 用于在非 LoRaWAN 配置下满足 ListenHandler 接口
type PlaceholderListener struct {
ctx context.Context
}
// NewPlaceholderListener 创建一个新的 PlaceholderListener 实例
// 它只打印一条日志, 表明 ChirpStack listener 未被激活
func NewPlaceholderListener(ctx context.Context) listener.ListenHandler {
return &PlaceholderListener{
ctx: ctx,
}
}
// Handler 返回一个不执行任何操作的 http.HandlerFunc
// 理论上, 在占位符生效的模式下, 这个 Handler 不应该被调用
func (p *PlaceholderListener) Handler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
logger := logs.TraceLogger(r.Context(), p.ctx, "PlaceholderListener")
logger.Warn("PlaceholderListener 的 Handler 被调用, 这通常是意料之外的。")
w.WriteHeader(http.StatusNotImplemented)
}
}

View File

@@ -0,0 +1,8 @@
package listener
import "net/http"
// ListenHandler 是一个监听器, 用于监听设备上行事件
type ListenHandler interface {
Handler() http.HandlerFunc
}