增加ping指令并获取带版本号的响应

This commit is contained in:
2025-12-01 20:42:21 +08:00
parent d5056af676
commit 2e6a0abac3
17 changed files with 557 additions and 44 deletions

View File

@@ -11,7 +11,8 @@ http://git.huangwc.com/pig/pig-farm-controller/issues/71
## OTA 升级 ## OTA 升级
- [x] 增加一个proto对象, 用于封装ota升级包 - [x] 增加一个proto对象, 用于封装ota升级包
- [ ] 区域主控增加版本号 - [x] 区域主控增加版本号
- [x] 增加ping指令并获取带版本号的响应
- [ ] 实现ota升级逻辑 - [ ] 实现ota升级逻辑
## Lora 监听逻辑重构 ## Lora 监听逻辑重构

View File

@@ -216,12 +216,14 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"type": "string", "type": "string",
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -231,14 +233,16 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
], ],
"description": "按传感器类型过滤", "description": "按传感器类型过滤",
"name": "sensor_type", "name": "sensor_type",
@@ -497,12 +501,14 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"type": "string", "type": "string",
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -512,14 +518,16 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
], ],
"description": "按传感器类型过滤", "description": "按传感器类型过滤",
"name": "sensor_type", "name": "sensor_type",
@@ -6889,6 +6897,9 @@ const docTemplate = `{
"created_at": { "created_at": {
"type": "string" "type": "string"
}, },
"firmware_version": {
"type": "string"
},
"id": { "id": {
"type": "integer" "type": "integer"
}, },
@@ -10592,11 +10603,13 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -10606,14 +10619,16 @@ const docTemplate = `{
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
] ]
}, },
"models.SeverityLevel": { "models.SeverityLevel": {
@@ -10681,6 +10696,7 @@ const docTemplate = `{
"等待", "等待",
"下料", "下料",
"全量采集", "全量采集",
"心跳检测",
"告警通知", "告警通知",
"通知刷新", "通知刷新",
"设备阈值检查", "设备阈值检查",
@@ -10692,6 +10708,7 @@ const docTemplate = `{
"TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务", "TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务",
"TaskTypeDeviceThresholdCheck": "设备阈值检查任务", "TaskTypeDeviceThresholdCheck": "设备阈值检查任务",
"TaskTypeFullCollection": "新增的全量采集任务", "TaskTypeFullCollection": "新增的全量采集任务",
"TaskTypeHeartbeat": "区域主控心跳检测任务",
"TaskTypeNotificationRefresh": "通知刷新任务", "TaskTypeNotificationRefresh": "通知刷新任务",
"TaskTypeReleaseFeedWeight": "下料口释放指定重量任务", "TaskTypeReleaseFeedWeight": "下料口释放指定重量任务",
"TaskTypeWaiting": "等待任务" "TaskTypeWaiting": "等待任务"
@@ -10701,6 +10718,7 @@ const docTemplate = `{
"等待任务", "等待任务",
"下料口释放指定重量任务", "下料口释放指定重量任务",
"新增的全量采集任务", "新增的全量采集任务",
"区域主控心跳检测任务",
"告警通知任务", "告警通知任务",
"通知刷新任务", "通知刷新任务",
"设备阈值检查任务", "设备阈值检查任务",
@@ -10711,6 +10729,7 @@ const docTemplate = `{
"TaskTypeWaiting", "TaskTypeWaiting",
"TaskTypeReleaseFeedWeight", "TaskTypeReleaseFeedWeight",
"TaskTypeFullCollection", "TaskTypeFullCollection",
"TaskTypeHeartbeat",
"TaskTypeAlarmNotification", "TaskTypeAlarmNotification",
"TaskTypeNotificationRefresh", "TaskTypeNotificationRefresh",
"TaskTypeDeviceThresholdCheck", "TaskTypeDeviceThresholdCheck",

View File

@@ -208,12 +208,14 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"type": "string", "type": "string",
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -223,14 +225,16 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
], ],
"description": "按传感器类型过滤", "description": "按传感器类型过滤",
"name": "sensor_type", "name": "sensor_type",
@@ -489,12 +493,14 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"type": "string", "type": "string",
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -504,14 +510,16 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
], ],
"description": "按传感器类型过滤", "description": "按传感器类型过滤",
"name": "sensor_type", "name": "sensor_type",
@@ -6881,6 +6889,9 @@
"created_at": { "created_at": {
"type": "string" "type": "string"
}, },
"firmware_version": {
"type": "string"
},
"id": { "id": {
"type": "integer" "type": "integer"
}, },
@@ -10584,11 +10595,13 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-comments": { "x-enum-comments": {
"SensorTypeBatteryLevel": "电池电量", "SensorTypeBatteryLevel": "电池电量",
"SensorTypeHumidity": "湿度", "SensorTypeHumidity": "湿度",
"SensorTypeOnlineStatus": "在线状态",
"SensorTypeSignalMetrics": "信号强度", "SensorTypeSignalMetrics": "信号强度",
"SensorTypeTemperature": "温度", "SensorTypeTemperature": "温度",
"SensorTypeWeight": "重量" "SensorTypeWeight": "重量"
@@ -10598,14 +10611,16 @@
"电池电量", "电池电量",
"温度", "温度",
"湿度", "湿度",
"重量" "重量",
"在线状态"
], ],
"x-enum-varnames": [ "x-enum-varnames": [
"SensorTypeSignalMetrics", "SensorTypeSignalMetrics",
"SensorTypeBatteryLevel", "SensorTypeBatteryLevel",
"SensorTypeTemperature", "SensorTypeTemperature",
"SensorTypeHumidity", "SensorTypeHumidity",
"SensorTypeWeight" "SensorTypeWeight",
"SensorTypeOnlineStatus"
] ]
}, },
"models.SeverityLevel": { "models.SeverityLevel": {
@@ -10673,6 +10688,7 @@
"等待", "等待",
"下料", "下料",
"全量采集", "全量采集",
"心跳检测",
"告警通知", "告警通知",
"通知刷新", "通知刷新",
"设备阈值检查", "设备阈值检查",
@@ -10684,6 +10700,7 @@
"TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务", "TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务",
"TaskTypeDeviceThresholdCheck": "设备阈值检查任务", "TaskTypeDeviceThresholdCheck": "设备阈值检查任务",
"TaskTypeFullCollection": "新增的全量采集任务", "TaskTypeFullCollection": "新增的全量采集任务",
"TaskTypeHeartbeat": "区域主控心跳检测任务",
"TaskTypeNotificationRefresh": "通知刷新任务", "TaskTypeNotificationRefresh": "通知刷新任务",
"TaskTypeReleaseFeedWeight": "下料口释放指定重量任务", "TaskTypeReleaseFeedWeight": "下料口释放指定重量任务",
"TaskTypeWaiting": "等待任务" "TaskTypeWaiting": "等待任务"
@@ -10693,6 +10710,7 @@
"等待任务", "等待任务",
"下料口释放指定重量任务", "下料口释放指定重量任务",
"新增的全量采集任务", "新增的全量采集任务",
"区域主控心跳检测任务",
"告警通知任务", "告警通知任务",
"通知刷新任务", "通知刷新任务",
"设备阈值检查任务", "设备阈值检查任务",
@@ -10703,6 +10721,7 @@
"TaskTypeWaiting", "TaskTypeWaiting",
"TaskTypeReleaseFeedWeight", "TaskTypeReleaseFeedWeight",
"TaskTypeFullCollection", "TaskTypeFullCollection",
"TaskTypeHeartbeat",
"TaskTypeAlarmNotification", "TaskTypeAlarmNotification",
"TaskTypeNotificationRefresh", "TaskTypeNotificationRefresh",
"TaskTypeDeviceThresholdCheck", "TaskTypeDeviceThresholdCheck",

View File

@@ -86,6 +86,8 @@ definitions:
properties: properties:
created_at: created_at:
type: string type: string
firmware_version:
type: string
id: id:
type: integer type: integer
location: location:
@@ -2638,10 +2640,12 @@ definitions:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
type: string type: string
x-enum-comments: x-enum-comments:
SensorTypeBatteryLevel: 电池电量 SensorTypeBatteryLevel: 电池电量
SensorTypeHumidity: 湿度 SensorTypeHumidity: 湿度
SensorTypeOnlineStatus: 在线状态
SensorTypeSignalMetrics: 信号强度 SensorTypeSignalMetrics: 信号强度
SensorTypeTemperature: 温度 SensorTypeTemperature: 温度
SensorTypeWeight: 重量 SensorTypeWeight: 重量
@@ -2651,12 +2655,14 @@ definitions:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
x-enum-varnames: x-enum-varnames:
- SensorTypeSignalMetrics - SensorTypeSignalMetrics
- SensorTypeBatteryLevel - SensorTypeBatteryLevel
- SensorTypeTemperature - SensorTypeTemperature
- SensorTypeHumidity - SensorTypeHumidity
- SensorTypeWeight - SensorTypeWeight
- SensorTypeOnlineStatus
models.SeverityLevel: models.SeverityLevel:
enum: enum:
- debug - debug
@@ -2713,6 +2719,7 @@ definitions:
- 等待 - 等待
- 下料 - 下料
- 全量采集 - 全量采集
- 心跳检测
- 告警通知 - 告警通知
- 通知刷新 - 通知刷新
- 设备阈值检查 - 设备阈值检查
@@ -2724,6 +2731,7 @@ definitions:
TaskTypeAreaCollectorThresholdCheck: 区域阈值检查任务 TaskTypeAreaCollectorThresholdCheck: 区域阈值检查任务
TaskTypeDeviceThresholdCheck: 设备阈值检查任务 TaskTypeDeviceThresholdCheck: 设备阈值检查任务
TaskTypeFullCollection: 新增的全量采集任务 TaskTypeFullCollection: 新增的全量采集任务
TaskTypeHeartbeat: 区域主控心跳检测任务
TaskTypeNotificationRefresh: 通知刷新任务 TaskTypeNotificationRefresh: 通知刷新任务
TaskTypeReleaseFeedWeight: 下料口释放指定重量任务 TaskTypeReleaseFeedWeight: 下料口释放指定重量任务
TaskTypeWaiting: 等待任务 TaskTypeWaiting: 等待任务
@@ -2732,6 +2740,7 @@ definitions:
- 等待任务 - 等待任务
- 下料口释放指定重量任务 - 下料口释放指定重量任务
- 新增的全量采集任务 - 新增的全量采集任务
- 区域主控心跳检测任务
- 告警通知任务 - 告警通知任务
- 通知刷新任务 - 通知刷新任务
- 设备阈值检查任务 - 设备阈值检查任务
@@ -2741,6 +2750,7 @@ definitions:
- TaskTypeWaiting - TaskTypeWaiting
- TaskTypeReleaseFeedWeight - TaskTypeReleaseFeedWeight
- TaskTypeFullCollection - TaskTypeFullCollection
- TaskTypeHeartbeat
- TaskTypeAlarmNotification - TaskTypeAlarmNotification
- TaskTypeNotificationRefresh - TaskTypeNotificationRefresh
- TaskTypeDeviceThresholdCheck - TaskTypeDeviceThresholdCheck
@@ -2985,12 +2995,14 @@ paths:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
in: query in: query
name: sensor_type name: sensor_type
type: string type: string
x-enum-comments: x-enum-comments:
SensorTypeBatteryLevel: 电池电量 SensorTypeBatteryLevel: 电池电量
SensorTypeHumidity: 湿度 SensorTypeHumidity: 湿度
SensorTypeOnlineStatus: 在线状态
SensorTypeSignalMetrics: 信号强度 SensorTypeSignalMetrics: 信号强度
SensorTypeTemperature: 温度 SensorTypeTemperature: 温度
SensorTypeWeight: 重量 SensorTypeWeight: 重量
@@ -3000,12 +3012,14 @@ paths:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
x-enum-varnames: x-enum-varnames:
- SensorTypeSignalMetrics - SensorTypeSignalMetrics
- SensorTypeBatteryLevel - SensorTypeBatteryLevel
- SensorTypeTemperature - SensorTypeTemperature
- SensorTypeHumidity - SensorTypeHumidity
- SensorTypeWeight - SensorTypeWeight
- SensorTypeOnlineStatus
produces: produces:
- application/json - application/json
responses: responses:
@@ -3167,12 +3181,14 @@ paths:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
in: query in: query
name: sensor_type name: sensor_type
type: string type: string
x-enum-comments: x-enum-comments:
SensorTypeBatteryLevel: 电池电量 SensorTypeBatteryLevel: 电池电量
SensorTypeHumidity: 湿度 SensorTypeHumidity: 湿度
SensorTypeOnlineStatus: 在线状态
SensorTypeSignalMetrics: 信号强度 SensorTypeSignalMetrics: 信号强度
SensorTypeTemperature: 温度 SensorTypeTemperature: 温度
SensorTypeWeight: 重量 SensorTypeWeight: 重量
@@ -3182,12 +3198,14 @@ paths:
- 温度 - 温度
- 湿度 - 湿度
- 重量 - 重量
- 在线状态
x-enum-varnames: x-enum-varnames:
- SensorTypeSignalMetrics - SensorTypeSignalMetrics
- SensorTypeBatteryLevel - SensorTypeBatteryLevel
- SensorTypeTemperature - SensorTypeTemperature
- SensorTypeHumidity - SensorTypeHumidity
- SensorTypeWeight - SensorTypeWeight
- SensorTypeOnlineStatus
produces: produces:
- application/json - application/json
responses: responses:

View File

@@ -59,6 +59,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr
case *proto.Instruction_OtaUpgradeStatus: case *proto.Instruction_OtaUpgradeStatus:
return l.handleOtaStatus(ctx, sourceAddr, p.OtaUpgradeStatus) return l.handleOtaStatus(ctx, sourceAddr, p.OtaUpgradeStatus)
case *proto.Instruction_Pong:
return l.handlePong(ctx, sourceAddr, p.Pong)
case *proto.Instruction_LogUploadRequest: case *proto.Instruction_LogUploadRequest:
logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries)) logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries))
// TODO: 在这里实现设备日志的处理逻辑 // TODO: 在这里实现设备日志的处理逻辑
@@ -280,7 +283,6 @@ func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, s
logger.Infow("开始处理OTA升级状态", logger.Infow("开始处理OTA升级状态",
"来源地址", sourceAddr, "来源地址", sourceAddr,
"状态码", status.StatusCode, "状态码", status.StatusCode,
"处理结果", status.StatusCode == 0,
"当前版本", status.CurrentFirmwareVersion, "当前版本", status.CurrentFirmwareVersion,
) )
@@ -305,3 +307,33 @@ func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, s
return nil return nil
} }
// handlePong 处理设备上报的Pong响应或主动心跳。
func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong")
logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.FirmwareVersion)
// 1. 查找区域主控
areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr)
if err != nil {
return fmt.Errorf("处理Pong失败无法找到区域主控: %w", err)
}
// 2. 如果 Pong 中包含版本号,则更新
if pong.FirmwareVersion != "" {
err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, pong.FirmwareVersion)
if err != nil {
// 只记录错误,不中断流程,因为还要记录在线状态
logger.Errorw("处理Pong时更新固件版本失败", "主控ID", areaController.ID, "error", err)
} else {
logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", pong.FirmwareVersion)
}
}
// 3. 记录在线状态
onlineStatus := models.OnlineStatusData{State: models.StateOnline}
l.recordSensorData(reqCtx, areaController.ID, areaController.ID, time.Now(), models.SensorTypeOnlineStatus, onlineStatus)
logger.Infow("已记录区域主控为在线状态", "主控ID", areaController.ID)
return nil
}

View File

@@ -180,6 +180,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
generalDeviceService := device.NewGeneralDeviceService( generalDeviceService := device.NewGeneralDeviceService(
logs.AddCompName(baseCtx, "GeneralDeviceService"), logs.AddCompName(baseCtx, "GeneralDeviceService"),
infra.repos.deviceRepo, infra.repos.deviceRepo,
infra.repos.areaControllerRepo,
infra.repos.deviceCommandLogRepo, infra.repos.deviceCommandLogRepo,
infra.repos.pendingCollectionRepo, infra.repos.pendingCollectionRepo,
infra.lora.comm, infra.lora.comm,
@@ -197,6 +198,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
infra.repos.sensorDataRepo, infra.repos.sensorDataRepo,
infra.repos.deviceRepo, infra.repos.deviceRepo,
infra.repos.alarmRepo, infra.repos.alarmRepo,
infra.repos.areaControllerRepo,
generalDeviceService, generalDeviceService,
notifyService, notifyService,
alarmService, alarmService,

View File

@@ -83,6 +83,10 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error {
return err return err
} }
if err := app.initializeHeartbeatCheckPlan(appCtx, existingPlanMap); err != nil {
return err
}
logger.Info("预定义系统计划检查完成。") logger.Info("预定义系统计划检查完成。")
return nil return nil
} }
@@ -244,6 +248,56 @@ func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, exi
return nil return nil
} }
// initializeHeartbeatCheckPlan 负责初始化 "周期性心跳检测" 计划。
func (app *Application) initializeHeartbeatCheckPlan(ctx context.Context, existingPlanMap map[models.PlanName]*models.Plan) error {
appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeHeartbeatCheckPlan")
predefinedPlan := &models.Plan{
Name: models.PlanNamePeriodicHeartbeatCheck,
Description: "这是一个系统预定义的计划, 每5分钟自动触发一次区域主控心跳检测。",
PlanType: models.PlanTypeSystem,
ExecutionType: models.PlanExecutionTypeAutomatic,
CronExpression: "*/5 * * * *", // 每5分钟执行一次
Status: models.PlanStatusEnabled,
ContentType: models.PlanContentTypeTasks,
Tasks: []models.Task{
{
Name: "心跳检测",
Description: "向所有区域主控发送Ping指令",
ExecutionOrder: 1,
Type: models.TaskTypeHeartbeat,
},
},
}
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
// 如果计划存在,则进行无差别更新
logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name)
predefinedPlan.ID = foundExistingPlan.ID
predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount
if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err)
}
if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err)
}
logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name)
} else {
// 如果计划不存在, 则创建
logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name)
if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil {
return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err)
} else {
logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name)
}
}
return nil
}
// initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // initializePendingCollections 在应用启动时处理所有未完成的采集请求。
// 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。
// 这保证了系统在每次启动时都处于一个干净、确定的状态。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
) )
// 设备行为 // 设备行为
@@ -21,6 +22,26 @@ var (
MethodSwitch Method = "switch" // 启停指令 MethodSwitch Method = "switch" // 启停指令
) )
// SendOptions 包含了发送通用指令时的可选参数。
type SendOptions struct {
// NotTrackable 如果为 true则指示本次发送无需被追踪。
// 这将阻止系统为本次发送创建 device_command_logs 记录。
// 默认为 false即需要追踪。
NotTrackable bool
}
// SendOption 是一个函数类型,用于修改 SendOptions。
// 这是实现 "Functional Options Pattern" 的核心。
type SendOption func(*SendOptions)
// WithoutTracking 是一个公开的选项函数,用于明确指示本次发送无需追踪。
// 调用方在发送 Ping 等无需响应确认的指令时,应使用此选项。
func WithoutTracking() SendOption {
return func(opts *SendOptions) {
opts.NotTrackable = true
}
}
// Service 抽象了一组方法用于控制设备行为 // Service 抽象了一组方法用于控制设备行为
type Service interface { type Service interface {
@@ -29,6 +50,10 @@ type Service interface {
// Collect 用于发起对指定区域主控下的多个设备的批量采集请求。 // Collect 用于发起对指定区域主控下的多个设备的批量采集请求。
Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error
// Send 是一个通用的发送方法,用于将一个标准的指令载荷发送到指定的区域主控。
// 它负责将载荷包装成顶层指令、序列化、调用底层发送器,并默认记录下行命令日志。
Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error
} }
// 设备操作指令通用结构(最外层) // 设备操作指令通用结构(最外层)

View File

@@ -20,6 +20,7 @@ import (
type GeneralDeviceService struct { type GeneralDeviceService struct {
ctx context.Context ctx context.Context
deviceRepo repository.DeviceRepository deviceRepo repository.DeviceRepository
areaControllerRepo repository.AreaControllerRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository deviceCommandLogRepo repository.DeviceCommandLogRepository
pendingCollectionRepo repository.PendingCollectionRepository pendingCollectionRepo repository.PendingCollectionRepository
comm transport.Communicator comm transport.Communicator
@@ -29,6 +30,7 @@ type GeneralDeviceService struct {
func NewGeneralDeviceService( func NewGeneralDeviceService(
ctx context.Context, ctx context.Context,
deviceRepo repository.DeviceRepository, deviceRepo repository.DeviceRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository,
pendingCollectionRepo repository.PendingCollectionRepository, pendingCollectionRepo repository.PendingCollectionRepository,
comm transport.Communicator, comm transport.Communicator,
@@ -36,6 +38,7 @@ func NewGeneralDeviceService(
return &GeneralDeviceService{ return &GeneralDeviceService{
ctx: ctx, ctx: ctx,
deviceRepo: deviceRepo, deviceRepo: deviceRepo,
areaControllerRepo: areaControllerRepo,
deviceCommandLogRepo: deviceCommandLogRepo, deviceCommandLogRepo: deviceCommandLogRepo,
pendingCollectionRepo: pendingCollectionRepo, pendingCollectionRepo: pendingCollectionRepo,
comm: comm, comm: comm,
@@ -249,3 +252,70 @@ func (g *GeneralDeviceService) Collect(ctx context.Context, areaControllerID uin
logger.Debugf("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID) logger.Debugf("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID)
return nil return nil
} }
// Send 实现了 Service 接口,用于发送一个通用的指令载荷。
// 它将载荷包装成顶层指令,然后执行查找网络地址、序列化、发送和记录日志的完整流程。
func (g *GeneralDeviceService) Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error {
serviceCtx, logger := logs.Trace(ctx, g.ctx, "Send")
// 1. 应用选项
options := &SendOptions{}
for _, opt := range opts {
opt(options)
}
// 2. 查找区域主控以获取 NetworkID
areaController, err := g.areaControllerRepo.FindByID(serviceCtx, areaControllerID)
if err != nil {
return fmt.Errorf("发送通用指令失败无法找到ID为 %d 的区域主控: %w", areaControllerID, err)
}
// 3. 将载荷包装进顶层 Instruction 结构体
instruction := &proto.Instruction{
Payload: payload,
}
// 4. 序列化指令
message, err := gproto.Marshal(instruction)
if err != nil {
return fmt.Errorf("序列化通用指令失败: %w", err)
}
// 5. 发送指令
networkID := areaController.NetworkID
sendResult, err := g.comm.Send(serviceCtx, networkID, message)
if err != nil {
return fmt.Errorf("发送通用指令到 %s 失败: %w", networkID, err)
}
// 6. 始终创建 DeviceCommandLog 记录,但根据选项设置其初始状态
logRecord := &models.DeviceCommandLog{
MessageID: sendResult.MessageID,
DeviceID: areaController.ID, // 将日志与区域主控关联
SentAt: time.Now(),
}
if options.NotTrackable {
// 对于无需追踪的指令,直接标记为已完成
now := time.Now()
logRecord.AcknowledgedAt = &now
logRecord.ReceivedSuccess = true
logger.Infow("成功发送一个无需追踪的通用指令,并记录为已完成日志", "networkID", networkID, "MessageID", sendResult.MessageID)
} else {
// 对于需要追踪的指令,记录其发送结果,等待异步确认
if sendResult.AcknowledgedAt != nil {
logRecord.AcknowledgedAt = sendResult.AcknowledgedAt
}
if sendResult.ReceivedSuccess != nil {
logRecord.ReceivedSuccess = *sendResult.ReceivedSuccess
}
logger.Infow("成功发送通用指令,并创建追踪日志", "networkID", networkID, "MessageID", sendResult.MessageID)
}
if err := g.deviceCommandLogRepo.Create(serviceCtx, logRecord); err != nil {
// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
logger.Errorw("创建通用指令的日志失败", "MessageID", sendResult.MessageID, "error", err)
}
return nil
}

View File

@@ -0,0 +1,93 @@
package task
import (
"context"
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/plan"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
)
// HeartbeatTask 实现了 plan.Task 接口用于执行一次区域主控心跳检测发送Ping
type HeartbeatTask struct {
ctx context.Context
log *models.TaskExecutionLog
areaControllerRepo repository.AreaControllerRepository
deviceService device.Service
}
// NewHeartbeatTask 创建一个心跳检测任务实例
func NewHeartbeatTask(
ctx context.Context,
log *models.TaskExecutionLog,
areaControllerRepo repository.AreaControllerRepository,
deviceService device.Service,
) plan.Task {
return &HeartbeatTask{
ctx: ctx,
log: log,
areaControllerRepo: areaControllerRepo,
deviceService: deviceService,
}
}
// Execute 是任务的核心执行逻辑
func (t *HeartbeatTask) Execute(ctx context.Context) error {
taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute")
logger.Infow("开始执行区域主控心跳检测任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
controllers, err := t.areaControllerRepo.ListAll(taskCtx)
if err != nil {
return fmt.Errorf("心跳检测任务:获取所有区域主控失败: %w", err)
}
if len(controllers) == 0 {
logger.Infow("心跳检测任务:未发现任何区域主控,跳过本次检测")
return nil
}
// 构建 Ping 指令
pingInstruction := &proto.Instruction_Ping{
Ping: &proto.Ping{},
}
var firstError error
for _, controller := range controllers {
logger.Infow("向区域主控发送Ping指令", "controller_id", controller.ID)
err := t.deviceService.Send(taskCtx, controller.ID, pingInstruction, device.WithoutTracking())
if err != nil {
logger.Errorw("向区域主控发送Ping指令失败", "controller_id", controller.ID, "error", err)
if firstError == nil {
firstError = err // 保存第一个发生的错误
}
}
}
if firstError != nil {
return fmt.Errorf("心跳检测任务执行期间发生错误: %w", firstError)
}
logger.Infow("区域主控心跳检测任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID)
return nil
}
// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑
func (t *HeartbeatTask) OnFailure(ctx context.Context, executeErr error) {
logger := logs.TraceLogger(ctx, t.ctx, "OnFailure")
logger.Errorw("区域主控心跳检测任务执行失败",
"task_id", t.log.TaskID,
"task_type", t.log.Task.Type,
"log_id", t.log.ID,
"error", executeErr,
)
}
// ResolveDeviceIDs 获取当前任务需要使用的设备ID列表
func (t *HeartbeatTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) {
// 心跳检测任务不和任何特定设备绑定
return []uint32{}, nil
}

View File

@@ -18,14 +18,16 @@ const (
CompNameReleaseFeedWeight = "ReleaseFeedWeightTask" CompNameReleaseFeedWeight = "ReleaseFeedWeightTask"
CompNameFullCollectionTask = "FullCollectionTask" CompNameFullCollectionTask = "FullCollectionTask"
CompNameAlarmNotification = "AlarmNotificationTask" CompNameAlarmNotification = "AlarmNotificationTask"
CompNameHeartbeatTask = "HeartbeatTask"
) )
type taskFactory struct { type taskFactory struct {
ctx context.Context ctx context.Context
sensorDataRepo repository.SensorDataRepository sensorDataRepo repository.SensorDataRepository
deviceRepo repository.DeviceRepository deviceRepo repository.DeviceRepository
alarmRepo repository.AlarmRepository alarmRepo repository.AlarmRepository
areaControllerRepo repository.AreaControllerRepository
deviceService device.Service deviceService device.Service
notificationService notify.Service notificationService notify.Service
@@ -37,6 +39,7 @@ func NewTaskFactory(
sensorDataRepo repository.SensorDataRepository, sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository, deviceRepo repository.DeviceRepository,
alarmRepo repository.AlarmRepository, alarmRepo repository.AlarmRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceService device.Service, deviceService device.Service,
notifyService notify.Service, notifyService notify.Service,
alarmService alarm.AlarmService, alarmService alarm.AlarmService,
@@ -46,6 +49,7 @@ func NewTaskFactory(
sensorDataRepo: sensorDataRepo, sensorDataRepo: sensorDataRepo,
deviceRepo: deviceRepo, deviceRepo: deviceRepo,
alarmRepo: alarmRepo, alarmRepo: alarmRepo,
areaControllerRepo: areaControllerRepo,
deviceService: deviceService, deviceService: deviceService,
notificationService: notifyService, notificationService: notifyService,
alarmService: alarmService, alarmService: alarmService,
@@ -62,6 +66,8 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe
return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService) return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService)
case models.TaskTypeFullCollection: case models.TaskTypeFullCollection:
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService) return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService)
case models.TaskTypeHeartbeat:
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceService)
case models.TaskTypeAlarmNotification: case models.TaskTypeAlarmNotification:
return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo) return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo)
case models.TaskTypeDeviceThresholdCheck: case models.TaskTypeDeviceThresholdCheck:
@@ -71,7 +77,6 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe
case models.TaskTypeNotificationRefresh: case models.TaskTypeNotificationRefresh:
return NewRefreshNotificationTask(logs.AddCompName(baseCtx, "NotificationRefreshTask"), claimedLog, t.alarmService) return NewRefreshNotificationTask(logs.AddCompName(baseCtx, "NotificationRefreshTask"), claimedLog, t.alarmService)
default: default:
// TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型
logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type)
panic("不支持的任务类型") // 显式panic防编译器报错 panic("不支持的任务类型") // 显式panic防编译器报错
} }
@@ -79,8 +84,6 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe
// CreateTaskFromModel 实现了 TaskFactory 接口,用于从模型创建任务实例。 // CreateTaskFromModel 实现了 TaskFactory 接口,用于从模型创建任务实例。
func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models.Task) (plan.TaskDeviceIDResolver, error) { func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models.Task) (plan.TaskDeviceIDResolver, error) {
// 这个方法不关心 claimedLog 的其他字段,所以可以构造一个临时的
// 它只用于访问那些不依赖于执行日志的方法,比如 ResolveDeviceIDs
tempLog := &models.TaskExecutionLog{Task: *taskModel} tempLog := &models.TaskExecutionLog{Task: *taskModel}
baseCtx := context.Background() baseCtx := context.Background()
@@ -97,6 +100,8 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models
), nil ), nil
case models.TaskTypeFullCollection: case models.TaskTypeFullCollection:
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil
case models.TaskTypeHeartbeat:
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceService), nil
case models.TaskTypeAlarmNotification: case models.TaskTypeAlarmNotification:
return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil
case models.TaskTypeDeviceThresholdCheck: case models.TaskTypeDeviceThresholdCheck:

View File

@@ -16,6 +16,8 @@ type PlanName string
const ( const (
// PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称
PlanNamePeriodicSystemHealthCheck PlanName = "周期性系统健康检查" PlanNamePeriodicSystemHealthCheck PlanName = "周期性系统健康检查"
// PlanNamePeriodicHeartbeatCheck 是周期性心跳检测计划的名称
PlanNamePeriodicHeartbeatCheck PlanName = "周期性心跳检测"
// PlanNameAlarmNotification 是告警通知发送计划的名称 // PlanNameAlarmNotification 是告警通知发送计划的名称
PlanNameAlarmNotification PlanName = "告警通知发送" PlanNameAlarmNotification PlanName = "告警通知发送"
) )
@@ -44,6 +46,7 @@ const (
TaskTypeWaiting TaskType = "等待" // 等待任务 TaskTypeWaiting TaskType = "等待" // 等待任务
TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务
TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务
TaskTypeHeartbeat TaskType = "心跳检测" // 区域主控心跳检测任务
TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务
TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务 TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务
TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务

View File

@@ -17,6 +17,16 @@ const (
SensorTypeTemperature SensorType = "温度" // 温度 SensorTypeTemperature SensorType = "温度" // 温度
SensorTypeHumidity SensorType = "湿度" // 湿度 SensorTypeHumidity SensorType = "湿度" // 湿度
SensorTypeWeight SensorType = "重量" // 重量 SensorTypeWeight SensorType = "重量" // 重量
SensorTypeOnlineStatus SensorType = "在线状态" // 在线状态
)
// OnlineState 定义了设备的在线状态枚举
type OnlineState string
const (
StateOnline OnlineState = "在线" // 设备在线
StateOffline OnlineState = "离线" // 设备离线
StateAbnormal OnlineState = "异常" // 设备状态异常
) )
// SignalMetrics 存储信号强度数据 // SignalMetrics 存储信号强度数据
@@ -49,6 +59,11 @@ type WeightData struct {
WeightKilograms float32 `json:"weight_kilograms"` // 重量值 (公斤) WeightKilograms float32 `json:"weight_kilograms"` // 重量值 (公斤)
} }
// OnlineStatusData 记录了设备的在线状态
type OnlineStatusData struct {
State OnlineState `json:"state"` // 在线状态
}
// SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。
type SensorData struct { type SensorData struct {
// Time 是数据记录的时间戳,作为复合主键的一部分。 // Time 是数据记录的时间戳,作为复合主键的一部分。

View File

@@ -556,6 +556,89 @@ func (x *LogUploadRequest) GetEntries() []*LogEntry {
return nil return nil
} }
// 平台向设备发送的Ping指令用于检查存活性。
type Ping struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Ping) Reset() {
*x = Ping{}
mi := &file_device_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Ping) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Ping) ProtoMessage() {}
func (x *Ping) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Ping.ProtoReflect.Descriptor instead.
func (*Ping) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{9}
}
// 设备对Ping的响应或设备主动上报的心跳。
// 它包含了设备的关键状态信息。
type Pong struct {
state protoimpl.MessageState `protogen:"open.v1"`
FirmwareVersion string `protobuf:"bytes,1,opt,name=firmware_version,json=firmwareVersion,proto3" json:"firmware_version,omitempty"` // 当前固件版本
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Pong) Reset() {
*x = Pong{}
mi := &file_device_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Pong) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Pong) ProtoMessage() {}
func (x *Pong) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Pong.ProtoReflect.Descriptor instead.
func (*Pong) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{10}
}
func (x *Pong) GetFirmwareVersion() string {
if x != nil {
return x.FirmwareVersion
}
return ""
}
// Instruction 封装了所有与设备间的通信。 // Instruction 封装了所有与设备间的通信。
// 使用 oneof 来确保每个消息只有一个负载类型,这在嵌入式系统中是高效且类型安全的。 // 使用 oneof 来确保每个消息只有一个负载类型,这在嵌入式系统中是高效且类型安全的。
type Instruction struct { type Instruction struct {
@@ -566,9 +649,11 @@ type Instruction struct {
// *Instruction_BatchCollectCommand // *Instruction_BatchCollectCommand
// *Instruction_OtaUpgradeCommand // *Instruction_OtaUpgradeCommand
// *Instruction_ControlLogUploadCommand // *Instruction_ControlLogUploadCommand
// *Instruction_Ping
// *Instruction_CollectResult // *Instruction_CollectResult
// *Instruction_OtaUpgradeStatus // *Instruction_OtaUpgradeStatus
// *Instruction_LogUploadRequest // *Instruction_LogUploadRequest
// *Instruction_Pong
Payload isInstruction_Payload `protobuf_oneof:"payload"` Payload isInstruction_Payload `protobuf_oneof:"payload"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@@ -576,7 +661,7 @@ type Instruction struct {
func (x *Instruction) Reset() { func (x *Instruction) Reset() {
*x = Instruction{} *x = Instruction{}
mi := &file_device_proto_msgTypes[9] mi := &file_device_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -588,7 +673,7 @@ func (x *Instruction) String() string {
func (*Instruction) ProtoMessage() {} func (*Instruction) ProtoMessage() {}
func (x *Instruction) ProtoReflect() protoreflect.Message { func (x *Instruction) ProtoReflect() protoreflect.Message {
mi := &file_device_proto_msgTypes[9] mi := &file_device_proto_msgTypes[11]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -601,7 +686,7 @@ func (x *Instruction) ProtoReflect() protoreflect.Message {
// Deprecated: Use Instruction.ProtoReflect.Descriptor instead. // Deprecated: Use Instruction.ProtoReflect.Descriptor instead.
func (*Instruction) Descriptor() ([]byte, []int) { func (*Instruction) Descriptor() ([]byte, []int) {
return file_device_proto_rawDescGZIP(), []int{9} return file_device_proto_rawDescGZIP(), []int{11}
} }
func (x *Instruction) GetPayload() isInstruction_Payload { func (x *Instruction) GetPayload() isInstruction_Payload {
@@ -647,6 +732,15 @@ func (x *Instruction) GetControlLogUploadCommand() *ControlLogUploadCommand {
return nil return nil
} }
func (x *Instruction) GetPing() *Ping {
if x != nil {
if x, ok := x.Payload.(*Instruction_Ping); ok {
return x.Ping
}
}
return nil
}
func (x *Instruction) GetCollectResult() *CollectResult { func (x *Instruction) GetCollectResult() *CollectResult {
if x != nil { if x != nil {
if x, ok := x.Payload.(*Instruction_CollectResult); ok { if x, ok := x.Payload.(*Instruction_CollectResult); ok {
@@ -674,6 +768,15 @@ func (x *Instruction) GetLogUploadRequest() *LogUploadRequest {
return nil return nil
} }
func (x *Instruction) GetPong() *Pong {
if x != nil {
if x, ok := x.Payload.(*Instruction_Pong); ok {
return x.Pong
}
}
return nil
}
type isInstruction_Payload interface { type isInstruction_Payload interface {
isInstruction_Payload() isInstruction_Payload()
} }
@@ -695,6 +798,10 @@ type Instruction_ControlLogUploadCommand struct {
ControlLogUploadCommand *ControlLogUploadCommand `protobuf:"bytes,4,opt,name=control_log_upload_command,json=controlLogUploadCommand,proto3,oneof"` ControlLogUploadCommand *ControlLogUploadCommand `protobuf:"bytes,4,opt,name=control_log_upload_command,json=controlLogUploadCommand,proto3,oneof"`
} }
type Instruction_Ping struct {
Ping *Ping `protobuf:"bytes,6,opt,name=ping,proto3,oneof"`
}
type Instruction_CollectResult struct { type Instruction_CollectResult struct {
// --- 上行数据 (设备 -> 平台) --- // --- 上行数据 (设备 -> 平台) ---
CollectResult *CollectResult `protobuf:"bytes,101,opt,name=collect_result,json=collectResult,proto3,oneof"` CollectResult *CollectResult `protobuf:"bytes,101,opt,name=collect_result,json=collectResult,proto3,oneof"`
@@ -708,6 +815,10 @@ type Instruction_LogUploadRequest struct {
LogUploadRequest *LogUploadRequest `protobuf:"bytes,103,opt,name=log_upload_request,json=logUploadRequest,proto3,oneof"` LogUploadRequest *LogUploadRequest `protobuf:"bytes,103,opt,name=log_upload_request,json=logUploadRequest,proto3,oneof"`
} }
type Instruction_Pong struct {
Pong *Pong `protobuf:"bytes,104,opt,name=pong,proto3,oneof"`
}
func (*Instruction_Raw_485Command) isInstruction_Payload() {} func (*Instruction_Raw_485Command) isInstruction_Payload() {}
func (*Instruction_BatchCollectCommand) isInstruction_Payload() {} func (*Instruction_BatchCollectCommand) isInstruction_Payload() {}
@@ -716,12 +827,16 @@ func (*Instruction_OtaUpgradeCommand) isInstruction_Payload() {}
func (*Instruction_ControlLogUploadCommand) isInstruction_Payload() {} func (*Instruction_ControlLogUploadCommand) isInstruction_Payload() {}
func (*Instruction_Ping) isInstruction_Payload() {}
func (*Instruction_CollectResult) isInstruction_Payload() {} func (*Instruction_CollectResult) isInstruction_Payload() {}
func (*Instruction_OtaUpgradeStatus) isInstruction_Payload() {} func (*Instruction_OtaUpgradeStatus) isInstruction_Payload() {}
func (*Instruction_LogUploadRequest) isInstruction_Payload() {} func (*Instruction_LogUploadRequest) isInstruction_Payload() {}
func (*Instruction_Pong) isInstruction_Payload() {}
var File_device_proto protoreflect.FileDescriptor var File_device_proto protoreflect.FileDescriptor
const file_device_proto_rawDesc = "" + const file_device_proto_rawDesc = "" +
@@ -755,15 +870,20 @@ const file_device_proto_rawDesc = "" +
"\x06enable\x18\x01 \x01(\bR\x06enable\x12)\n" + "\x06enable\x18\x01 \x01(\bR\x06enable\x12)\n" +
"\x10duration_seconds\x18\x02 \x01(\rR\x0fdurationSeconds\">\n" + "\x10duration_seconds\x18\x02 \x01(\rR\x0fdurationSeconds\">\n" +
"\x10LogUploadRequest\x12*\n" + "\x10LogUploadRequest\x12*\n" +
"\aentries\x18\x01 \x03(\v2\x10.device.LogEntryR\aentries\"\xad\x04\n" + "\aentries\x18\x01 \x03(\v2\x10.device.LogEntryR\aentries\"\x06\n" +
"\x04Ping\"1\n" +
"\x04Pong\x12)\n" +
"\x10firmware_version\x18\x01 \x01(\tR\x0ffirmwareVersion\"\xf5\x04\n" +
"\vInstruction\x12?\n" + "\vInstruction\x12?\n" +
"\x0fraw_485_command\x18\x01 \x01(\v2\x15.device.Raw485CommandH\x00R\rraw485Command\x12Q\n" + "\x0fraw_485_command\x18\x01 \x01(\v2\x15.device.Raw485CommandH\x00R\rraw485Command\x12Q\n" +
"\x15batch_collect_command\x18\x02 \x01(\v2\x1b.device.BatchCollectCommandH\x00R\x13batchCollectCommand\x12K\n" + "\x15batch_collect_command\x18\x02 \x01(\v2\x1b.device.BatchCollectCommandH\x00R\x13batchCollectCommand\x12K\n" +
"\x13ota_upgrade_command\x18\x03 \x01(\v2\x19.device.OtaUpgradeCommandH\x00R\x11otaUpgradeCommand\x12^\n" + "\x13ota_upgrade_command\x18\x03 \x01(\v2\x19.device.OtaUpgradeCommandH\x00R\x11otaUpgradeCommand\x12^\n" +
"\x1acontrol_log_upload_command\x18\x04 \x01(\v2\x1f.device.ControlLogUploadCommandH\x00R\x17controlLogUploadCommand\x12>\n" + "\x1acontrol_log_upload_command\x18\x04 \x01(\v2\x1f.device.ControlLogUploadCommandH\x00R\x17controlLogUploadCommand\x12\"\n" +
"\x04ping\x18\x06 \x01(\v2\f.device.PingH\x00R\x04ping\x12>\n" +
"\x0ecollect_result\x18e \x01(\v2\x15.device.CollectResultH\x00R\rcollectResult\x12H\n" + "\x0ecollect_result\x18e \x01(\v2\x15.device.CollectResultH\x00R\rcollectResult\x12H\n" +
"\x12ota_upgrade_status\x18f \x01(\v2\x18.device.OtaUpgradeStatusH\x00R\x10otaUpgradeStatus\x12H\n" + "\x12ota_upgrade_status\x18f \x01(\v2\x18.device.OtaUpgradeStatusH\x00R\x10otaUpgradeStatus\x12H\n" +
"\x12log_upload_request\x18g \x01(\v2\x18.device.LogUploadRequestH\x00R\x10logUploadRequestB\t\n" + "\x12log_upload_request\x18g \x01(\v2\x18.device.LogUploadRequestH\x00R\x10logUploadRequest\x12\"\n" +
"\x04pong\x18h \x01(\v2\f.device.PongH\x00R\x04pongB\t\n" +
"\apayload*O\n" + "\apayload*O\n" +
"\bLogLevel\x12\x19\n" + "\bLogLevel\x12\x19\n" +
"\x15LOG_LEVEL_UNSPECIFIED\x10\x00\x12\t\n" + "\x15LOG_LEVEL_UNSPECIFIED\x10\x00\x12\t\n" +
@@ -785,7 +905,7 @@ func file_device_proto_rawDescGZIP() []byte {
} }
var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_device_proto_goTypes = []any{ var file_device_proto_goTypes = []any{
(LogLevel)(0), // 0: device.LogLevel (LogLevel)(0), // 0: device.LogLevel
(*LogEntry)(nil), // 1: device.LogEntry (*LogEntry)(nil), // 1: device.LogEntry
@@ -797,7 +917,9 @@ var file_device_proto_goTypes = []any{
(*OtaUpgradeStatus)(nil), // 7: device.OtaUpgradeStatus (*OtaUpgradeStatus)(nil), // 7: device.OtaUpgradeStatus
(*ControlLogUploadCommand)(nil), // 8: device.ControlLogUploadCommand (*ControlLogUploadCommand)(nil), // 8: device.ControlLogUploadCommand
(*LogUploadRequest)(nil), // 9: device.LogUploadRequest (*LogUploadRequest)(nil), // 9: device.LogUploadRequest
(*Instruction)(nil), // 10: device.Instruction (*Ping)(nil), // 10: device.Ping
(*Pong)(nil), // 11: device.Pong
(*Instruction)(nil), // 12: device.Instruction
} }
var file_device_proto_depIdxs = []int32{ var file_device_proto_depIdxs = []int32{
0, // 0: device.LogEntry.level:type_name -> device.LogLevel 0, // 0: device.LogEntry.level:type_name -> device.LogLevel
@@ -808,14 +930,16 @@ var file_device_proto_depIdxs = []int32{
3, // 5: device.Instruction.batch_collect_command:type_name -> device.BatchCollectCommand 3, // 5: device.Instruction.batch_collect_command:type_name -> device.BatchCollectCommand
6, // 6: device.Instruction.ota_upgrade_command:type_name -> device.OtaUpgradeCommand 6, // 6: device.Instruction.ota_upgrade_command:type_name -> device.OtaUpgradeCommand
8, // 7: device.Instruction.control_log_upload_command:type_name -> device.ControlLogUploadCommand 8, // 7: device.Instruction.control_log_upload_command:type_name -> device.ControlLogUploadCommand
5, // 8: device.Instruction.collect_result:type_name -> device.CollectResult 10, // 8: device.Instruction.ping:type_name -> device.Ping
7, // 9: device.Instruction.ota_upgrade_status:type_name -> device.OtaUpgradeStatus 5, // 9: device.Instruction.collect_result:type_name -> device.CollectResult
9, // 10: device.Instruction.log_upload_request:type_name -> device.LogUploadRequest 7, // 10: device.Instruction.ota_upgrade_status:type_name -> device.OtaUpgradeStatus
11, // [11:11] is the sub-list for method output_type 9, // 11: device.Instruction.log_upload_request:type_name -> device.LogUploadRequest
11, // [11:11] is the sub-list for method input_type 11, // 12: device.Instruction.pong:type_name -> device.Pong
11, // [11:11] is the sub-list for extension type_name 13, // [13:13] is the sub-list for method output_type
11, // [11:11] is the sub-list for extension extendee 13, // [13:13] is the sub-list for method input_type
0, // [0:11] is the sub-list for field type_name 13, // [13:13] is the sub-list for extension type_name
13, // [13:13] is the sub-list for extension extendee
0, // [0:13] is the sub-list for field type_name
} }
func init() { file_device_proto_init() } func init() { file_device_proto_init() }
@@ -823,14 +947,16 @@ func file_device_proto_init() {
if File_device_proto != nil { if File_device_proto != nil {
return return
} }
file_device_proto_msgTypes[9].OneofWrappers = []any{ file_device_proto_msgTypes[11].OneofWrappers = []any{
(*Instruction_Raw_485Command)(nil), (*Instruction_Raw_485Command)(nil),
(*Instruction_BatchCollectCommand)(nil), (*Instruction_BatchCollectCommand)(nil),
(*Instruction_OtaUpgradeCommand)(nil), (*Instruction_OtaUpgradeCommand)(nil),
(*Instruction_ControlLogUploadCommand)(nil), (*Instruction_ControlLogUploadCommand)(nil),
(*Instruction_Ping)(nil),
(*Instruction_CollectResult)(nil), (*Instruction_CollectResult)(nil),
(*Instruction_OtaUpgradeStatus)(nil), (*Instruction_OtaUpgradeStatus)(nil),
(*Instruction_LogUploadRequest)(nil), (*Instruction_LogUploadRequest)(nil),
(*Instruction_Pong)(nil),
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
@@ -838,7 +964,7 @@ func file_device_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)), RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)),
NumEnums: 1, NumEnums: 1,
NumMessages: 10, NumMessages: 12,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

View File

@@ -73,6 +73,18 @@ message LogUploadRequest {
repeated LogEntry entries = 1; // 一批日志条目 repeated LogEntry entries = 1; // 一批日志条目
} }
// 平台向设备发送的Ping指令用于检查存活性。
message Ping {
// 可以留空,指令本身即代表意图
}
// 设备对Ping的响应或设备主动上报的心跳。
// 它包含了设备的关键状态信息。
message Pong {
string firmware_version = 1; // 当前固件版本
// 可以扩展更多状态, e.g., int32 uptime_seconds = 2;
}
// --- 顶层指令包装器 --- // --- 顶层指令包装器 ---
@@ -85,10 +97,12 @@ message Instruction {
BatchCollectCommand batch_collect_command = 2; BatchCollectCommand batch_collect_command = 2;
OtaUpgradeCommand ota_upgrade_command = 3; OtaUpgradeCommand ota_upgrade_command = 3;
ControlLogUploadCommand control_log_upload_command = 4; ControlLogUploadCommand control_log_upload_command = 4;
Ping ping = 6;
// --- 上行数据 (设备 -> 平台) --- // --- 上行数据 (设备 -> 平台) ---
CollectResult collect_result = 101; CollectResult collect_result = 101;
OtaUpgradeStatus ota_upgrade_status = 102; OtaUpgradeStatus ota_upgrade_status = 102;
LogUploadRequest log_upload_request = 103; LogUploadRequest log_upload_request = 103;
Pong pong = 104;
} }
} }

View File

@@ -0,0 +1,13 @@
package proto
// InstructionPayload 是 protoc 为 oneof 生成的未导出接口 isInstruction_Payload 的一个公开别名。
// 通过接口嵌入,我们创建了一个新的、可导出的接口,它拥有与 isInstruction_Payload 完全相同的方法集。
//
// 根据 Go 的接口规则,任何实现了 isInstruction_Payload 接口的类型 (例如 *Instruction_Ping)
// 都会自动、隐式地满足此接口。
//
// 这使得我们可以在项目的其他包(如 domain 层)的公开 API 中使用这个接口,
// 从而在保持类型安全的同时,避免了对 protoc 生成的未导出类型的直接依赖。
type InstructionPayload interface {
isInstruction_Payload
}

View File

@@ -39,6 +39,7 @@ design/archive/2025-11-06-system-plan-continuously-triggered/index.md
design/archive/2025-11-10-exceeding-threshold-alarm/index.md design/archive/2025-11-10-exceeding-threshold-alarm/index.md
design/archive/2025-11-29-recipe-management/index.md design/archive/2025-11-29-recipe-management/index.md
design/ota-upgrade-and-log-monitoring/index.md design/ota-upgrade-and-log-monitoring/index.md
design/ota-upgrade-and-log-monitoring/lora_refactoring_plan.md
docs/docs.go docs/docs.go
docs/swagger.json docs/swagger.json
docs/swagger.yaml docs/swagger.yaml
@@ -88,6 +89,7 @@ internal/app/dto/user_dto.go
internal/app/listener/chirp_stack/chirp_stack.go internal/app/listener/chirp_stack/chirp_stack.go
internal/app/listener/chirp_stack/chirp_stack_types.go internal/app/listener/chirp_stack/chirp_stack_types.go
internal/app/listener/chirp_stack/placeholder_listener.go internal/app/listener/chirp_stack/placeholder_listener.go
internal/app/listener/lora_listener.go
internal/app/listener/transport.go internal/app/listener/transport.go
internal/app/middleware/audit.go internal/app/middleware/audit.go
internal/app/middleware/auth.go internal/app/middleware/auth.go
@@ -140,6 +142,7 @@ internal/domain/task/area_threshold_check_task.go
internal/domain/task/delay_task.go internal/domain/task/delay_task.go
internal/domain/task/device_threshold_check_task.go internal/domain/task/device_threshold_check_task.go
internal/domain/task/full_collection_task.go internal/domain/task/full_collection_task.go
internal/domain/task/heartbeat_task.go
internal/domain/task/refresh_notification_task.go internal/domain/task/refresh_notification_task.go
internal/domain/task/release_feed_weight_task.go internal/domain/task/release_feed_weight_task.go
internal/domain/task/task.go internal/domain/task/task.go
@@ -211,6 +214,7 @@ internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go
internal/infra/transport/lora/placeholder_transport.go internal/infra/transport/lora/placeholder_transport.go
internal/infra/transport/proto/device.pb.go internal/infra/transport/proto/device.pb.go
internal/infra/transport/proto/device.proto internal/infra/transport/proto/device.proto
internal/infra/transport/proto/exported.go
internal/infra/transport/transport.go internal/infra/transport/transport.go
internal/infra/utils/command_generater/modbus_rtu.go internal/infra/utils/command_generater/modbus_rtu.go
internal/infra/utils/time.go internal/infra/utils/time.go