diff --git a/design/ota-upgrade-and-log-monitoring/index.md b/design/ota-upgrade-and-log-monitoring/index.md index 5f07f69..e0968b8 100644 --- a/design/ota-upgrade-and-log-monitoring/index.md +++ b/design/ota-upgrade-and-log-monitoring/index.md @@ -11,7 +11,8 @@ http://git.huangwc.com/pig/pig-farm-controller/issues/71 ## OTA 升级 - [x] 增加一个proto对象, 用于封装ota升级包 -- [ ] 区域主控增加版本号 +- [x] 区域主控增加版本号 +- [x] 增加ping指令并获取带版本号的响应 - [ ] 实现ota升级逻辑 ## Lora 监听逻辑重构 diff --git a/docs/docs.go b/docs/docs.go index 5794ac5..4f61ff5 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -216,12 +216,14 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "type": "string", "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -231,14 +233,16 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ], "description": "按传感器类型过滤", "name": "sensor_type", @@ -497,12 +501,14 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "type": "string", "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -512,14 +518,16 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ], "description": "按传感器类型过滤", "name": "sensor_type", @@ -6889,6 +6897,9 @@ const docTemplate = `{ "created_at": { "type": "string" }, + "firmware_version": { + "type": "string" + }, "id": { "type": "integer" }, @@ -10592,11 +10603,13 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -10606,14 +10619,16 @@ const docTemplate = `{ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ] }, "models.SeverityLevel": { @@ -10681,6 +10696,7 @@ const docTemplate = `{ "等待", "下料", "全量采集", + "心跳检测", "告警通知", "通知刷新", "设备阈值检查", @@ -10692,6 +10708,7 @@ const docTemplate = `{ "TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务", "TaskTypeDeviceThresholdCheck": "设备阈值检查任务", "TaskTypeFullCollection": "新增的全量采集任务", + "TaskTypeHeartbeat": "区域主控心跳检测任务", "TaskTypeNotificationRefresh": "通知刷新任务", "TaskTypeReleaseFeedWeight": "下料口释放指定重量任务", "TaskTypeWaiting": "等待任务" @@ -10701,6 +10718,7 @@ const docTemplate = `{ "等待任务", "下料口释放指定重量任务", "新增的全量采集任务", + "区域主控心跳检测任务", "告警通知任务", "通知刷新任务", "设备阈值检查任务", @@ -10711,6 +10729,7 @@ const docTemplate = `{ "TaskTypeWaiting", "TaskTypeReleaseFeedWeight", "TaskTypeFullCollection", + "TaskTypeHeartbeat", "TaskTypeAlarmNotification", "TaskTypeNotificationRefresh", "TaskTypeDeviceThresholdCheck", diff --git a/docs/swagger.json b/docs/swagger.json index e97232d..db22f17 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -208,12 +208,14 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "type": "string", "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -223,14 +225,16 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ], "description": "按传感器类型过滤", "name": "sensor_type", @@ -489,12 +493,14 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "type": "string", "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -504,14 +510,16 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ], "description": "按传感器类型过滤", "name": "sensor_type", @@ -6881,6 +6889,9 @@ "created_at": { "type": "string" }, + "firmware_version": { + "type": "string" + }, "id": { "type": "integer" }, @@ -10584,11 +10595,13 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-comments": { "SensorTypeBatteryLevel": "电池电量", "SensorTypeHumidity": "湿度", + "SensorTypeOnlineStatus": "在线状态", "SensorTypeSignalMetrics": "信号强度", "SensorTypeTemperature": "温度", "SensorTypeWeight": "重量" @@ -10598,14 +10611,16 @@ "电池电量", "温度", "湿度", - "重量" + "重量", + "在线状态" ], "x-enum-varnames": [ "SensorTypeSignalMetrics", "SensorTypeBatteryLevel", "SensorTypeTemperature", "SensorTypeHumidity", - "SensorTypeWeight" + "SensorTypeWeight", + "SensorTypeOnlineStatus" ] }, "models.SeverityLevel": { @@ -10673,6 +10688,7 @@ "等待", "下料", "全量采集", + "心跳检测", "告警通知", "通知刷新", "设备阈值检查", @@ -10684,6 +10700,7 @@ "TaskTypeAreaCollectorThresholdCheck": "区域阈值检查任务", "TaskTypeDeviceThresholdCheck": "设备阈值检查任务", "TaskTypeFullCollection": "新增的全量采集任务", + "TaskTypeHeartbeat": "区域主控心跳检测任务", "TaskTypeNotificationRefresh": "通知刷新任务", "TaskTypeReleaseFeedWeight": "下料口释放指定重量任务", "TaskTypeWaiting": "等待任务" @@ -10693,6 +10710,7 @@ "等待任务", "下料口释放指定重量任务", "新增的全量采集任务", + "区域主控心跳检测任务", "告警通知任务", "通知刷新任务", "设备阈值检查任务", @@ -10703,6 +10721,7 @@ "TaskTypeWaiting", "TaskTypeReleaseFeedWeight", "TaskTypeFullCollection", + "TaskTypeHeartbeat", "TaskTypeAlarmNotification", "TaskTypeNotificationRefresh", "TaskTypeDeviceThresholdCheck", diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 69bde1e..dfc6ebb 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -86,6 +86,8 @@ definitions: properties: created_at: type: string + firmware_version: + type: string id: type: integer location: @@ -2638,10 +2640,12 @@ definitions: - 温度 - 湿度 - 重量 + - 在线状态 type: string x-enum-comments: SensorTypeBatteryLevel: 电池电量 SensorTypeHumidity: 湿度 + SensorTypeOnlineStatus: 在线状态 SensorTypeSignalMetrics: 信号强度 SensorTypeTemperature: 温度 SensorTypeWeight: 重量 @@ -2651,12 +2655,14 @@ definitions: - 温度 - 湿度 - 重量 + - 在线状态 x-enum-varnames: - SensorTypeSignalMetrics - SensorTypeBatteryLevel - SensorTypeTemperature - SensorTypeHumidity - SensorTypeWeight + - SensorTypeOnlineStatus models.SeverityLevel: enum: - debug @@ -2713,6 +2719,7 @@ definitions: - 等待 - 下料 - 全量采集 + - 心跳检测 - 告警通知 - 通知刷新 - 设备阈值检查 @@ -2724,6 +2731,7 @@ definitions: TaskTypeAreaCollectorThresholdCheck: 区域阈值检查任务 TaskTypeDeviceThresholdCheck: 设备阈值检查任务 TaskTypeFullCollection: 新增的全量采集任务 + TaskTypeHeartbeat: 区域主控心跳检测任务 TaskTypeNotificationRefresh: 通知刷新任务 TaskTypeReleaseFeedWeight: 下料口释放指定重量任务 TaskTypeWaiting: 等待任务 @@ -2732,6 +2740,7 @@ definitions: - 等待任务 - 下料口释放指定重量任务 - 新增的全量采集任务 + - 区域主控心跳检测任务 - 告警通知任务 - 通知刷新任务 - 设备阈值检查任务 @@ -2741,6 +2750,7 @@ definitions: - TaskTypeWaiting - TaskTypeReleaseFeedWeight - TaskTypeFullCollection + - TaskTypeHeartbeat - TaskTypeAlarmNotification - TaskTypeNotificationRefresh - TaskTypeDeviceThresholdCheck @@ -2985,12 +2995,14 @@ paths: - 温度 - 湿度 - 重量 + - 在线状态 in: query name: sensor_type type: string x-enum-comments: SensorTypeBatteryLevel: 电池电量 SensorTypeHumidity: 湿度 + SensorTypeOnlineStatus: 在线状态 SensorTypeSignalMetrics: 信号强度 SensorTypeTemperature: 温度 SensorTypeWeight: 重量 @@ -3000,12 +3012,14 @@ paths: - 温度 - 湿度 - 重量 + - 在线状态 x-enum-varnames: - SensorTypeSignalMetrics - SensorTypeBatteryLevel - SensorTypeTemperature - SensorTypeHumidity - SensorTypeWeight + - SensorTypeOnlineStatus produces: - application/json responses: @@ -3167,12 +3181,14 @@ paths: - 温度 - 湿度 - 重量 + - 在线状态 in: query name: sensor_type type: string x-enum-comments: SensorTypeBatteryLevel: 电池电量 SensorTypeHumidity: 湿度 + SensorTypeOnlineStatus: 在线状态 SensorTypeSignalMetrics: 信号强度 SensorTypeTemperature: 温度 SensorTypeWeight: 重量 @@ -3182,12 +3198,14 @@ paths: - 温度 - 湿度 - 重量 + - 在线状态 x-enum-varnames: - SensorTypeSignalMetrics - SensorTypeBatteryLevel - SensorTypeTemperature - SensorTypeHumidity - SensorTypeWeight + - SensorTypeOnlineStatus produces: - application/json responses: diff --git a/internal/app/listener/lora_listener.go b/internal/app/listener/lora_listener.go index 07d51ef..ceeb3bf 100644 --- a/internal/app/listener/lora_listener.go +++ b/internal/app/listener/lora_listener.go @@ -59,6 +59,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr case *proto.Instruction_OtaUpgradeStatus: return l.handleOtaStatus(ctx, sourceAddr, p.OtaUpgradeStatus) + case *proto.Instruction_Pong: + return l.handlePong(ctx, sourceAddr, p.Pong) + case *proto.Instruction_LogUploadRequest: logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries)) // TODO: 在这里实现设备日志的处理逻辑 @@ -280,7 +283,6 @@ func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, s logger.Infow("开始处理OTA升级状态", "来源地址", sourceAddr, "状态码", status.StatusCode, - "处理结果", status.StatusCode == 0, "当前版本", status.CurrentFirmwareVersion, ) @@ -305,3 +307,33 @@ func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, s return nil } + +// handlePong 处理设备上报的Pong响应或主动心跳。 +func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error { + reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong") + logger.Infow("开始处理Pong", "来源地址", sourceAddr, "携带版本", pong.FirmwareVersion) + + // 1. 查找区域主控 + areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr) + if err != nil { + return fmt.Errorf("处理Pong失败:无法找到区域主控: %w", err) + } + + // 2. 如果 Pong 中包含版本号,则更新 + if pong.FirmwareVersion != "" { + err := l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, pong.FirmwareVersion) + if err != nil { + // 只记录错误,不中断流程,因为还要记录在线状态 + logger.Errorw("处理Pong时更新固件版本失败", "主控ID", areaController.ID, "error", err) + } else { + logger.Infow("处理Pong时成功更新固件版本", "主控ID", areaController.ID, "新版本", pong.FirmwareVersion) + } + } + + // 3. 记录在线状态 + onlineStatus := models.OnlineStatusData{State: models.StateOnline} + l.recordSensorData(reqCtx, areaController.ID, areaController.ID, time.Now(), models.SensorTypeOnlineStatus, onlineStatus) + logger.Infow("已记录区域主控为在线状态", "主控ID", areaController.ID) + + return nil +} diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index 3504b54..a24d442 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -180,6 +180,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr generalDeviceService := device.NewGeneralDeviceService( logs.AddCompName(baseCtx, "GeneralDeviceService"), infra.repos.deviceRepo, + infra.repos.areaControllerRepo, infra.repos.deviceCommandLogRepo, infra.repos.pendingCollectionRepo, infra.lora.comm, @@ -197,6 +198,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr infra.repos.sensorDataRepo, infra.repos.deviceRepo, infra.repos.alarmRepo, + infra.repos.areaControllerRepo, generalDeviceService, notifyService, alarmService, diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index 644d411..2086ebe 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -83,6 +83,10 @@ func (app *Application) initializeSystemPlans(ctx context.Context) error { return err } + if err := app.initializeHeartbeatCheckPlan(appCtx, existingPlanMap); err != nil { + return err + } + logger.Info("预定义系统计划检查完成。") return nil } @@ -244,6 +248,56 @@ func (app *Application) initializeAlarmNotificationPlan(ctx context.Context, exi return nil } +// initializeHeartbeatCheckPlan 负责初始化 "周期性心跳检测" 计划。 +func (app *Application) initializeHeartbeatCheckPlan(ctx context.Context, existingPlanMap map[models.PlanName]*models.Plan) error { + appCtx, logger := logs.Trace(ctx, app.Ctx, "initializeHeartbeatCheckPlan") + + predefinedPlan := &models.Plan{ + Name: models.PlanNamePeriodicHeartbeatCheck, + Description: "这是一个系统预定义的计划, 每5分钟自动触发一次区域主控心跳检测。", + PlanType: models.PlanTypeSystem, + ExecutionType: models.PlanExecutionTypeAutomatic, + CronExpression: "*/5 * * * *", // 每5分钟执行一次 + Status: models.PlanStatusEnabled, + ContentType: models.PlanContentTypeTasks, + Tasks: []models.Task{ + { + Name: "心跳检测", + Description: "向所有区域主控发送Ping指令", + ExecutionOrder: 1, + Type: models.TaskTypeHeartbeat, + }, + }, + } + + if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { + // 如果计划存在,则进行无差别更新 + logger.Infof("预定义计划 '%s' 已存在,正在进行无差别更新...", predefinedPlan.Name) + + predefinedPlan.ID = foundExistingPlan.ID + predefinedPlan.ExecuteCount = foundExistingPlan.ExecuteCount + + if err := app.Infra.repos.planRepo.UpdatePlanMetadataAndStructure(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的元数据和结构失败: %w", predefinedPlan.Name, err) + } + + if err := app.Infra.repos.planRepo.UpdatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("更新预定义计划 '%s' 的所有顶层字段失败: %w", predefinedPlan.Name, err) + } + + logger.Infof("成功更新预定义计划 '%s'。", predefinedPlan.Name) + } else { + // 如果计划不存在, 则创建 + logger.Infof("预定义计划 '%s' 不存在,正在创建...", predefinedPlan.Name) + if err := app.Infra.repos.planRepo.CreatePlan(appCtx, predefinedPlan); err != nil { + return fmt.Errorf("创建预定义计划 '%s' 失败: %w", predefinedPlan.Name, err) + } else { + logger.Infof("成功创建预定义计划 '%s'。", predefinedPlan.Name) + } + } + return nil +} + // initializePendingCollections 在应用启动时处理所有未完成的采集请求。 // 我们的策略是:任何在程序重启前仍处于“待处理”状态的请求,都应被视为已失败。 // 这保证了系统在每次启动时都处于一个干净、确定的状态。 diff --git a/internal/domain/device/device_service.go b/internal/domain/device/device_service.go index db445c0..7c451fd 100644 --- a/internal/domain/device/device_service.go +++ b/internal/domain/device/device_service.go @@ -4,6 +4,7 @@ import ( "context" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" ) // 设备行为 @@ -21,6 +22,26 @@ var ( MethodSwitch Method = "switch" // 启停指令 ) +// SendOptions 包含了发送通用指令时的可选参数。 +type SendOptions struct { + // NotTrackable 如果为 true,则指示本次发送无需被追踪。 + // 这将阻止系统为本次发送创建 device_command_logs 记录。 + // 默认为 false,即需要追踪。 + NotTrackable bool +} + +// SendOption 是一个函数类型,用于修改 SendOptions。 +// 这是实现 "Functional Options Pattern" 的核心。 +type SendOption func(*SendOptions) + +// WithoutTracking 是一个公开的选项函数,用于明确指示本次发送无需追踪。 +// 调用方在发送 Ping 等无需响应确认的指令时,应使用此选项。 +func WithoutTracking() SendOption { + return func(opts *SendOptions) { + opts.NotTrackable = true + } +} + // Service 抽象了一组方法用于控制设备行为 type Service interface { @@ -29,6 +50,10 @@ type Service interface { // Collect 用于发起对指定区域主控下的多个设备的批量采集请求。 Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error + + // Send 是一个通用的发送方法,用于将一个标准的指令载荷发送到指定的区域主控。 + // 它负责将载荷包装成顶层指令、序列化、调用底层发送器,并默认记录下行命令日志。 + Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error } // 设备操作指令通用结构(最外层) diff --git a/internal/domain/device/general_device_service.go b/internal/domain/device/general_device_service.go index 78bd2b8..2bc5bcd 100644 --- a/internal/domain/device/general_device_service.go +++ b/internal/domain/device/general_device_service.go @@ -20,6 +20,7 @@ import ( type GeneralDeviceService struct { ctx context.Context deviceRepo repository.DeviceRepository + areaControllerRepo repository.AreaControllerRepository deviceCommandLogRepo repository.DeviceCommandLogRepository pendingCollectionRepo repository.PendingCollectionRepository comm transport.Communicator @@ -29,6 +30,7 @@ type GeneralDeviceService struct { func NewGeneralDeviceService( ctx context.Context, deviceRepo repository.DeviceRepository, + areaControllerRepo repository.AreaControllerRepository, deviceCommandLogRepo repository.DeviceCommandLogRepository, pendingCollectionRepo repository.PendingCollectionRepository, comm transport.Communicator, @@ -36,6 +38,7 @@ func NewGeneralDeviceService( return &GeneralDeviceService{ ctx: ctx, deviceRepo: deviceRepo, + areaControllerRepo: areaControllerRepo, deviceCommandLogRepo: deviceCommandLogRepo, pendingCollectionRepo: pendingCollectionRepo, comm: comm, @@ -249,3 +252,70 @@ func (g *GeneralDeviceService) Collect(ctx context.Context, areaControllerID uin logger.Debugf("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID) return nil } + +// Send 实现了 Service 接口,用于发送一个通用的指令载荷。 +// 它将载荷包装成顶层指令,然后执行查找网络地址、序列化、发送和记录日志的完整流程。 +func (g *GeneralDeviceService) Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error { + serviceCtx, logger := logs.Trace(ctx, g.ctx, "Send") + + // 1. 应用选项 + options := &SendOptions{} + for _, opt := range opts { + opt(options) + } + + // 2. 查找区域主控以获取 NetworkID + areaController, err := g.areaControllerRepo.FindByID(serviceCtx, areaControllerID) + if err != nil { + return fmt.Errorf("发送通用指令失败:无法找到ID为 %d 的区域主控: %w", areaControllerID, err) + } + + // 3. 将载荷包装进顶层 Instruction 结构体 + instruction := &proto.Instruction{ + Payload: payload, + } + + // 4. 序列化指令 + message, err := gproto.Marshal(instruction) + if err != nil { + return fmt.Errorf("序列化通用指令失败: %w", err) + } + + // 5. 发送指令 + networkID := areaController.NetworkID + sendResult, err := g.comm.Send(serviceCtx, networkID, message) + if err != nil { + return fmt.Errorf("发送通用指令到 %s 失败: %w", networkID, err) + } + + // 6. 始终创建 DeviceCommandLog 记录,但根据选项设置其初始状态 + logRecord := &models.DeviceCommandLog{ + MessageID: sendResult.MessageID, + DeviceID: areaController.ID, // 将日志与区域主控关联 + SentAt: time.Now(), + } + + if options.NotTrackable { + // 对于无需追踪的指令,直接标记为已完成 + now := time.Now() + logRecord.AcknowledgedAt = &now + logRecord.ReceivedSuccess = true + logger.Infow("成功发送一个无需追踪的通用指令,并记录为已完成日志", "networkID", networkID, "MessageID", sendResult.MessageID) + } else { + // 对于需要追踪的指令,记录其发送结果,等待异步确认 + if sendResult.AcknowledgedAt != nil { + logRecord.AcknowledgedAt = sendResult.AcknowledgedAt + } + if sendResult.ReceivedSuccess != nil { + logRecord.ReceivedSuccess = *sendResult.ReceivedSuccess + } + logger.Infow("成功发送通用指令,并创建追踪日志", "networkID", networkID, "MessageID", sendResult.MessageID) + } + + if err := g.deviceCommandLogRepo.Create(serviceCtx, logRecord); err != nil { + // 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。 + logger.Errorw("创建通用指令的日志失败", "MessageID", sendResult.MessageID, "error", err) + } + + return nil +} diff --git a/internal/domain/task/heartbeat_task.go b/internal/domain/task/heartbeat_task.go new file mode 100644 index 0000000..55f67ce --- /dev/null +++ b/internal/domain/task/heartbeat_task.go @@ -0,0 +1,93 @@ +package task + +import ( + "context" + "fmt" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto" +) + +// HeartbeatTask 实现了 plan.Task 接口,用于执行一次区域主控心跳检测(发送Ping) +type HeartbeatTask struct { + ctx context.Context + log *models.TaskExecutionLog + areaControllerRepo repository.AreaControllerRepository + deviceService device.Service +} + +// NewHeartbeatTask 创建一个心跳检测任务实例 +func NewHeartbeatTask( + ctx context.Context, + log *models.TaskExecutionLog, + areaControllerRepo repository.AreaControllerRepository, + deviceService device.Service, +) plan.Task { + return &HeartbeatTask{ + ctx: ctx, + log: log, + areaControllerRepo: areaControllerRepo, + deviceService: deviceService, + } +} + +// Execute 是任务的核心执行逻辑 +func (t *HeartbeatTask) Execute(ctx context.Context) error { + taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute") + logger.Infow("开始执行区域主控心跳检测任务", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + + controllers, err := t.areaControllerRepo.ListAll(taskCtx) + if err != nil { + return fmt.Errorf("心跳检测任务:获取所有区域主控失败: %w", err) + } + + if len(controllers) == 0 { + logger.Infow("心跳检测任务:未发现任何区域主控,跳过本次检测") + return nil + } + + // 构建 Ping 指令 + pingInstruction := &proto.Instruction_Ping{ + Ping: &proto.Ping{}, + } + + var firstError error + for _, controller := range controllers { + logger.Infow("向区域主控发送Ping指令", "controller_id", controller.ID) + err := t.deviceService.Send(taskCtx, controller.ID, pingInstruction, device.WithoutTracking()) + if err != nil { + logger.Errorw("向区域主控发送Ping指令失败", "controller_id", controller.ID, "error", err) + if firstError == nil { + firstError = err // 保存第一个发生的错误 + } + } + } + + if firstError != nil { + return fmt.Errorf("心跳检测任务执行期间发生错误: %w", firstError) + } + + logger.Infow("区域主控心跳检测任务执行完成", "task_id", t.log.TaskID, "task_type", t.log.Task.Type, "log_id", t.log.ID) + return nil +} + +// OnFailure 定义了当 Execute 方法返回错误时,需要执行的回滚或清理逻辑 +func (t *HeartbeatTask) OnFailure(ctx context.Context, executeErr error) { + logger := logs.TraceLogger(ctx, t.ctx, "OnFailure") + logger.Errorw("区域主控心跳检测任务执行失败", + "task_id", t.log.TaskID, + "task_type", t.log.Task.Type, + "log_id", t.log.ID, + "error", executeErr, + ) +} + +// ResolveDeviceIDs 获取当前任务需要使用的设备ID列表 +func (t *HeartbeatTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { + // 心跳检测任务不和任何特定设备绑定 + return []uint32{}, nil +} diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 049dca0..38db219 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -18,14 +18,16 @@ const ( CompNameReleaseFeedWeight = "ReleaseFeedWeightTask" CompNameFullCollectionTask = "FullCollectionTask" CompNameAlarmNotification = "AlarmNotificationTask" + CompNameHeartbeatTask = "HeartbeatTask" ) type taskFactory struct { ctx context.Context - sensorDataRepo repository.SensorDataRepository - deviceRepo repository.DeviceRepository - alarmRepo repository.AlarmRepository + sensorDataRepo repository.SensorDataRepository + deviceRepo repository.DeviceRepository + alarmRepo repository.AlarmRepository + areaControllerRepo repository.AreaControllerRepository deviceService device.Service notificationService notify.Service @@ -37,6 +39,7 @@ func NewTaskFactory( sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, alarmRepo repository.AlarmRepository, + areaControllerRepo repository.AreaControllerRepository, deviceService device.Service, notifyService notify.Service, alarmService alarm.AlarmService, @@ -46,6 +49,7 @@ func NewTaskFactory( sensorDataRepo: sensorDataRepo, deviceRepo: deviceRepo, alarmRepo: alarmRepo, + areaControllerRepo: areaControllerRepo, deviceService: deviceService, notificationService: notifyService, alarmService: alarmService, @@ -62,6 +66,8 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService) case models.TaskTypeFullCollection: return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService) + case models.TaskTypeHeartbeat: + return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceService) case models.TaskTypeAlarmNotification: return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo) case models.TaskTypeDeviceThresholdCheck: @@ -71,7 +77,6 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe case models.TaskTypeNotificationRefresh: return NewRefreshNotificationTask(logs.AddCompName(baseCtx, "NotificationRefreshTask"), claimedLog, t.alarmService) default: - // TODO 这里直接panic合适吗? 不过这个场景确实不该出现任何异常的任务类型 logger.Panicf("不支持的任务类型: %s", claimedLog.Task.Type) panic("不支持的任务类型") // 显式panic防编译器报错 } @@ -79,8 +84,6 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe // CreateTaskFromModel 实现了 TaskFactory 接口,用于从模型创建任务实例。 func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models.Task) (plan.TaskDeviceIDResolver, error) { - // 这个方法不关心 claimedLog 的其他字段,所以可以构造一个临时的 - // 它只用于访问那些不依赖于执行日志的方法,比如 ResolveDeviceIDs tempLog := &models.TaskExecutionLog{Task: *taskModel} baseCtx := context.Background() @@ -97,6 +100,8 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models ), nil case models.TaskTypeFullCollection: return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil + case models.TaskTypeHeartbeat: + return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceService), nil case models.TaskTypeAlarmNotification: return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil case models.TaskTypeDeviceThresholdCheck: diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 22e7c88..db24c03 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -16,6 +16,8 @@ type PlanName string const ( // PlanNamePeriodicSystemHealthCheck 是周期性系统健康检查计划的名称 PlanNamePeriodicSystemHealthCheck PlanName = "周期性系统健康检查" + // PlanNamePeriodicHeartbeatCheck 是周期性心跳检测计划的名称 + PlanNamePeriodicHeartbeatCheck PlanName = "周期性心跳检测" // PlanNameAlarmNotification 是告警通知发送计划的名称 PlanNameAlarmNotification PlanName = "告警通知发送" ) @@ -44,6 +46,7 @@ const ( TaskTypeWaiting TaskType = "等待" // 等待任务 TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 + TaskTypeHeartbeat TaskType = "心跳检测" // 区域主控心跳检测任务 TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务 TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 diff --git a/internal/infra/models/sensor_data.go b/internal/infra/models/sensor_data.go index 8165bcc..863a434 100644 --- a/internal/infra/models/sensor_data.go +++ b/internal/infra/models/sensor_data.go @@ -17,6 +17,16 @@ const ( SensorTypeTemperature SensorType = "温度" // 温度 SensorTypeHumidity SensorType = "湿度" // 湿度 SensorTypeWeight SensorType = "重量" // 重量 + SensorTypeOnlineStatus SensorType = "在线状态" // 在线状态 +) + +// OnlineState 定义了设备的在线状态枚举 +type OnlineState string + +const ( + StateOnline OnlineState = "在线" // 设备在线 + StateOffline OnlineState = "离线" // 设备离线 + StateAbnormal OnlineState = "异常" // 设备状态异常 ) // SignalMetrics 存储信号强度数据 @@ -49,6 +59,11 @@ type WeightData struct { WeightKilograms float32 `json:"weight_kilograms"` // 重量值 (公斤) } +// OnlineStatusData 记录了设备的在线状态 +type OnlineStatusData struct { + State OnlineState `json:"state"` // 在线状态 +} + // SensorData 存储所有类型的传感器数据,对应数据库中的 'sensor_data' 表。 type SensorData struct { // Time 是数据记录的时间戳,作为复合主键的一部分。 diff --git a/internal/infra/transport/proto/device.pb.go b/internal/infra/transport/proto/device.pb.go index 46b8ca3..e39bf9f 100644 --- a/internal/infra/transport/proto/device.pb.go +++ b/internal/infra/transport/proto/device.pb.go @@ -556,6 +556,89 @@ func (x *LogUploadRequest) GetEntries() []*LogEntry { return nil } +// 平台向设备发送的Ping指令,用于检查存活性。 +type Ping struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Ping) Reset() { + *x = Ping{} + mi := &file_device_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Ping) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ping) ProtoMessage() {} + +func (x *Ping) ProtoReflect() protoreflect.Message { + mi := &file_device_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ping.ProtoReflect.Descriptor instead. +func (*Ping) Descriptor() ([]byte, []int) { + return file_device_proto_rawDescGZIP(), []int{9} +} + +// 设备对Ping的响应,或设备主动上报的心跳。 +// 它包含了设备的关键状态信息。 +type Pong struct { + state protoimpl.MessageState `protogen:"open.v1"` + FirmwareVersion string `protobuf:"bytes,1,opt,name=firmware_version,json=firmwareVersion,proto3" json:"firmware_version,omitempty"` // 当前固件版本 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Pong) Reset() { + *x = Pong{} + mi := &file_device_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Pong) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Pong) ProtoMessage() {} + +func (x *Pong) ProtoReflect() protoreflect.Message { + mi := &file_device_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Pong.ProtoReflect.Descriptor instead. +func (*Pong) Descriptor() ([]byte, []int) { + return file_device_proto_rawDescGZIP(), []int{10} +} + +func (x *Pong) GetFirmwareVersion() string { + if x != nil { + return x.FirmwareVersion + } + return "" +} + // Instruction 封装了所有与设备间的通信。 // 使用 oneof 来确保每个消息只有一个负载类型,这在嵌入式系统中是高效且类型安全的。 type Instruction struct { @@ -566,9 +649,11 @@ type Instruction struct { // *Instruction_BatchCollectCommand // *Instruction_OtaUpgradeCommand // *Instruction_ControlLogUploadCommand + // *Instruction_Ping // *Instruction_CollectResult // *Instruction_OtaUpgradeStatus // *Instruction_LogUploadRequest + // *Instruction_Pong Payload isInstruction_Payload `protobuf_oneof:"payload"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -576,7 +661,7 @@ type Instruction struct { func (x *Instruction) Reset() { *x = Instruction{} - mi := &file_device_proto_msgTypes[9] + mi := &file_device_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -588,7 +673,7 @@ func (x *Instruction) String() string { func (*Instruction) ProtoMessage() {} func (x *Instruction) ProtoReflect() protoreflect.Message { - mi := &file_device_proto_msgTypes[9] + mi := &file_device_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -601,7 +686,7 @@ func (x *Instruction) ProtoReflect() protoreflect.Message { // Deprecated: Use Instruction.ProtoReflect.Descriptor instead. func (*Instruction) Descriptor() ([]byte, []int) { - return file_device_proto_rawDescGZIP(), []int{9} + return file_device_proto_rawDescGZIP(), []int{11} } func (x *Instruction) GetPayload() isInstruction_Payload { @@ -647,6 +732,15 @@ func (x *Instruction) GetControlLogUploadCommand() *ControlLogUploadCommand { return nil } +func (x *Instruction) GetPing() *Ping { + if x != nil { + if x, ok := x.Payload.(*Instruction_Ping); ok { + return x.Ping + } + } + return nil +} + func (x *Instruction) GetCollectResult() *CollectResult { if x != nil { if x, ok := x.Payload.(*Instruction_CollectResult); ok { @@ -674,6 +768,15 @@ func (x *Instruction) GetLogUploadRequest() *LogUploadRequest { return nil } +func (x *Instruction) GetPong() *Pong { + if x != nil { + if x, ok := x.Payload.(*Instruction_Pong); ok { + return x.Pong + } + } + return nil +} + type isInstruction_Payload interface { isInstruction_Payload() } @@ -695,6 +798,10 @@ type Instruction_ControlLogUploadCommand struct { ControlLogUploadCommand *ControlLogUploadCommand `protobuf:"bytes,4,opt,name=control_log_upload_command,json=controlLogUploadCommand,proto3,oneof"` } +type Instruction_Ping struct { + Ping *Ping `protobuf:"bytes,6,opt,name=ping,proto3,oneof"` +} + type Instruction_CollectResult struct { // --- 上行数据 (设备 -> 平台) --- CollectResult *CollectResult `protobuf:"bytes,101,opt,name=collect_result,json=collectResult,proto3,oneof"` @@ -708,6 +815,10 @@ type Instruction_LogUploadRequest struct { LogUploadRequest *LogUploadRequest `protobuf:"bytes,103,opt,name=log_upload_request,json=logUploadRequest,proto3,oneof"` } +type Instruction_Pong struct { + Pong *Pong `protobuf:"bytes,104,opt,name=pong,proto3,oneof"` +} + func (*Instruction_Raw_485Command) isInstruction_Payload() {} func (*Instruction_BatchCollectCommand) isInstruction_Payload() {} @@ -716,12 +827,16 @@ func (*Instruction_OtaUpgradeCommand) isInstruction_Payload() {} func (*Instruction_ControlLogUploadCommand) isInstruction_Payload() {} +func (*Instruction_Ping) isInstruction_Payload() {} + func (*Instruction_CollectResult) isInstruction_Payload() {} func (*Instruction_OtaUpgradeStatus) isInstruction_Payload() {} func (*Instruction_LogUploadRequest) isInstruction_Payload() {} +func (*Instruction_Pong) isInstruction_Payload() {} + var File_device_proto protoreflect.FileDescriptor const file_device_proto_rawDesc = "" + @@ -755,15 +870,20 @@ const file_device_proto_rawDesc = "" + "\x06enable\x18\x01 \x01(\bR\x06enable\x12)\n" + "\x10duration_seconds\x18\x02 \x01(\rR\x0fdurationSeconds\">\n" + "\x10LogUploadRequest\x12*\n" + - "\aentries\x18\x01 \x03(\v2\x10.device.LogEntryR\aentries\"\xad\x04\n" + + "\aentries\x18\x01 \x03(\v2\x10.device.LogEntryR\aentries\"\x06\n" + + "\x04Ping\"1\n" + + "\x04Pong\x12)\n" + + "\x10firmware_version\x18\x01 \x01(\tR\x0ffirmwareVersion\"\xf5\x04\n" + "\vInstruction\x12?\n" + "\x0fraw_485_command\x18\x01 \x01(\v2\x15.device.Raw485CommandH\x00R\rraw485Command\x12Q\n" + "\x15batch_collect_command\x18\x02 \x01(\v2\x1b.device.BatchCollectCommandH\x00R\x13batchCollectCommand\x12K\n" + "\x13ota_upgrade_command\x18\x03 \x01(\v2\x19.device.OtaUpgradeCommandH\x00R\x11otaUpgradeCommand\x12^\n" + - "\x1acontrol_log_upload_command\x18\x04 \x01(\v2\x1f.device.ControlLogUploadCommandH\x00R\x17controlLogUploadCommand\x12>\n" + + "\x1acontrol_log_upload_command\x18\x04 \x01(\v2\x1f.device.ControlLogUploadCommandH\x00R\x17controlLogUploadCommand\x12\"\n" + + "\x04ping\x18\x06 \x01(\v2\f.device.PingH\x00R\x04ping\x12>\n" + "\x0ecollect_result\x18e \x01(\v2\x15.device.CollectResultH\x00R\rcollectResult\x12H\n" + "\x12ota_upgrade_status\x18f \x01(\v2\x18.device.OtaUpgradeStatusH\x00R\x10otaUpgradeStatus\x12H\n" + - "\x12log_upload_request\x18g \x01(\v2\x18.device.LogUploadRequestH\x00R\x10logUploadRequestB\t\n" + + "\x12log_upload_request\x18g \x01(\v2\x18.device.LogUploadRequestH\x00R\x10logUploadRequest\x12\"\n" + + "\x04pong\x18h \x01(\v2\f.device.PongH\x00R\x04pongB\t\n" + "\apayload*O\n" + "\bLogLevel\x12\x19\n" + "\x15LOG_LEVEL_UNSPECIFIED\x10\x00\x12\t\n" + @@ -785,7 +905,7 @@ func file_device_proto_rawDescGZIP() []byte { } var file_device_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_device_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_device_proto_goTypes = []any{ (LogLevel)(0), // 0: device.LogLevel (*LogEntry)(nil), // 1: device.LogEntry @@ -797,7 +917,9 @@ var file_device_proto_goTypes = []any{ (*OtaUpgradeStatus)(nil), // 7: device.OtaUpgradeStatus (*ControlLogUploadCommand)(nil), // 8: device.ControlLogUploadCommand (*LogUploadRequest)(nil), // 9: device.LogUploadRequest - (*Instruction)(nil), // 10: device.Instruction + (*Ping)(nil), // 10: device.Ping + (*Pong)(nil), // 11: device.Pong + (*Instruction)(nil), // 12: device.Instruction } var file_device_proto_depIdxs = []int32{ 0, // 0: device.LogEntry.level:type_name -> device.LogLevel @@ -808,14 +930,16 @@ var file_device_proto_depIdxs = []int32{ 3, // 5: device.Instruction.batch_collect_command:type_name -> device.BatchCollectCommand 6, // 6: device.Instruction.ota_upgrade_command:type_name -> device.OtaUpgradeCommand 8, // 7: device.Instruction.control_log_upload_command:type_name -> device.ControlLogUploadCommand - 5, // 8: device.Instruction.collect_result:type_name -> device.CollectResult - 7, // 9: device.Instruction.ota_upgrade_status:type_name -> device.OtaUpgradeStatus - 9, // 10: device.Instruction.log_upload_request:type_name -> device.LogUploadRequest - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 10, // 8: device.Instruction.ping:type_name -> device.Ping + 5, // 9: device.Instruction.collect_result:type_name -> device.CollectResult + 7, // 10: device.Instruction.ota_upgrade_status:type_name -> device.OtaUpgradeStatus + 9, // 11: device.Instruction.log_upload_request:type_name -> device.LogUploadRequest + 11, // 12: device.Instruction.pong:type_name -> device.Pong + 13, // [13:13] is the sub-list for method output_type + 13, // [13:13] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name } func init() { file_device_proto_init() } @@ -823,14 +947,16 @@ func file_device_proto_init() { if File_device_proto != nil { return } - file_device_proto_msgTypes[9].OneofWrappers = []any{ + file_device_proto_msgTypes[11].OneofWrappers = []any{ (*Instruction_Raw_485Command)(nil), (*Instruction_BatchCollectCommand)(nil), (*Instruction_OtaUpgradeCommand)(nil), (*Instruction_ControlLogUploadCommand)(nil), + (*Instruction_Ping)(nil), (*Instruction_CollectResult)(nil), (*Instruction_OtaUpgradeStatus)(nil), (*Instruction_LogUploadRequest)(nil), + (*Instruction_Pong)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -838,7 +964,7 @@ func file_device_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_device_proto_rawDesc), len(file_device_proto_rawDesc)), NumEnums: 1, - NumMessages: 10, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/infra/transport/proto/device.proto b/internal/infra/transport/proto/device.proto index c403379..9e83e1d 100644 --- a/internal/infra/transport/proto/device.proto +++ b/internal/infra/transport/proto/device.proto @@ -73,6 +73,18 @@ message LogUploadRequest { repeated LogEntry entries = 1; // 一批日志条目 } +// 平台向设备发送的Ping指令,用于检查存活性。 +message Ping { + // 可以留空,指令本身即代表意图 +} + +// 设备对Ping的响应,或设备主动上报的心跳。 +// 它包含了设备的关键状态信息。 +message Pong { + string firmware_version = 1; // 当前固件版本 + // 可以扩展更多状态, e.g., int32 uptime_seconds = 2; +} + // --- 顶层指令包装器 --- @@ -85,10 +97,12 @@ message Instruction { BatchCollectCommand batch_collect_command = 2; OtaUpgradeCommand ota_upgrade_command = 3; ControlLogUploadCommand control_log_upload_command = 4; + Ping ping = 6; // --- 上行数据 (设备 -> 平台) --- CollectResult collect_result = 101; OtaUpgradeStatus ota_upgrade_status = 102; LogUploadRequest log_upload_request = 103; + Pong pong = 104; } } diff --git a/internal/infra/transport/proto/exported.go b/internal/infra/transport/proto/exported.go new file mode 100644 index 0000000..1ffa12e --- /dev/null +++ b/internal/infra/transport/proto/exported.go @@ -0,0 +1,13 @@ +package proto + +// InstructionPayload 是 protoc 为 oneof 生成的未导出接口 isInstruction_Payload 的一个公开别名。 +// 通过接口嵌入,我们创建了一个新的、可导出的接口,它拥有与 isInstruction_Payload 完全相同的方法集。 +// +// 根据 Go 的接口规则,任何实现了 isInstruction_Payload 接口的类型 (例如 *Instruction_Ping) +// 都会自动、隐式地满足此接口。 +// +// 这使得我们可以在项目的其他包(如 domain 层)的公开 API 中使用这个接口, +// 从而在保持类型安全的同时,避免了对 protoc 生成的未导出类型的直接依赖。 +type InstructionPayload interface { + isInstruction_Payload +} diff --git a/project_structure.txt b/project_structure.txt index 8eb8a4a..6aaa887 100644 --- a/project_structure.txt +++ b/project_structure.txt @@ -39,6 +39,7 @@ design/archive/2025-11-06-system-plan-continuously-triggered/index.md design/archive/2025-11-10-exceeding-threshold-alarm/index.md design/archive/2025-11-29-recipe-management/index.md design/ota-upgrade-and-log-monitoring/index.md +design/ota-upgrade-and-log-monitoring/lora_refactoring_plan.md docs/docs.go docs/swagger.json docs/swagger.yaml @@ -88,6 +89,7 @@ internal/app/dto/user_dto.go internal/app/listener/chirp_stack/chirp_stack.go internal/app/listener/chirp_stack/chirp_stack_types.go internal/app/listener/chirp_stack/placeholder_listener.go +internal/app/listener/lora_listener.go internal/app/listener/transport.go internal/app/middleware/audit.go internal/app/middleware/auth.go @@ -140,6 +142,7 @@ internal/domain/task/area_threshold_check_task.go internal/domain/task/delay_task.go internal/domain/task/device_threshold_check_task.go internal/domain/task/full_collection_task.go +internal/domain/task/heartbeat_task.go internal/domain/task/refresh_notification_task.go internal/domain/task/release_feed_weight_task.go internal/domain/task/task.go @@ -211,6 +214,7 @@ internal/infra/transport/lora/lora_mesh_uart_passthrough_transport.go internal/infra/transport/lora/placeholder_transport.go internal/infra/transport/proto/device.pb.go internal/infra/transport/proto/device.proto +internal/infra/transport/proto/exported.go internal/infra/transport/transport.go internal/infra/utils/command_generater/modbus_rtu.go internal/infra/utils/time.go