diff --git a/internal/app/listener/lora_listener.go b/internal/app/listener/lora_listener.go index ceeb3bf..aee4f67 100644 --- a/internal/app/listener/lora_listener.go +++ b/internal/app/listener/lora_listener.go @@ -56,17 +56,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr case *proto.Instruction_CollectResult: return l.handleCollectResult(ctx, sourceAddr, p.CollectResult) - case *proto.Instruction_OtaUpgradeStatus: - return l.handleOtaStatus(ctx, sourceAddr, p.OtaUpgradeStatus) - case *proto.Instruction_Pong: return l.handlePong(ctx, sourceAddr, p.Pong) - case *proto.Instruction_LogUploadRequest: - logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries)) - // TODO: 在这里实现设备日志的处理逻辑 - return nil - default: logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p)) return nil @@ -277,37 +269,6 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui } } -// handleOtaStatus 处理设备上报的OTA升级状态。 -func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, status *proto.OtaUpgradeStatus) error { - reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleOtaStatus") - logger.Infow("开始处理OTA升级状态", - "来源地址", sourceAddr, - "状态码", status.StatusCode, - "当前版本", status.CurrentFirmwareVersion, - ) - - // 1. 查找区域主控 - areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr) - if err != nil { - return fmt.Errorf("处理OTA状态失败:无法找到区域主控: %w", err) - } - - // 2. 更新固件版本号 - // 我们信任设备上报的版本号,无论成功失败都进行更新 - if status.CurrentFirmwareVersion != "" { - err = l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, status.CurrentFirmwareVersion) - if err != nil { - logger.Errorw("更新区域主控固件版本号失败", "主控ID", areaController.ID, "error", err) - return fmt.Errorf("更新固件版本号失败: %w", err) - } - logger.Infow("成功更新区域主控固件版本号", "主控ID", areaController.ID, "新版本", status.CurrentFirmwareVersion) - } - - // TODO: 后续可以在这里增加逻辑,比如记录一条操作日志,或者发送一个通知 - - return nil -} - // handlePong 处理设备上报的Pong响应或主动心跳。 func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error { reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong") diff --git a/internal/app/service/device_service.go b/internal/app/service/device_service.go index 89cc49c..262c493 100644 --- a/internal/app/service/device_service.go +++ b/internal/app/service/device_service.go @@ -53,7 +53,7 @@ type deviceService struct { deviceRepo repository.DeviceRepository areaControllerRepo repository.AreaControllerRepository deviceTemplateRepo repository.DeviceTemplateRepository - deviceDomainSvc device.Service + deviceDomainSvc device.DeviceOperator thresholdAlarmService ThresholdAlarmService } @@ -63,7 +63,7 @@ func NewDeviceService( deviceRepo repository.DeviceRepository, areaControllerRepo repository.AreaControllerRepository, deviceTemplateRepo repository.DeviceTemplateRepository, - deviceDomainSvc device.Service, + deviceDomainSvc device.DeviceOperator, thresholdAlarmService ThresholdAlarmService, ) DeviceService { return &deviceService{ diff --git a/internal/core/component_initializers.go b/internal/core/component_initializers.go index a24d442..379102a 100644 --- a/internal/core/component_initializers.go +++ b/internal/core/component_initializers.go @@ -140,7 +140,8 @@ type DomainServices struct { pigTradeManager pig.PigTradeManager pigSickManager pig.SickPigManager pigBatchDomain pig.PigBatchService - generalDeviceService device.Service + deviceOperator device.DeviceOperator + deviceCommunicator device.DeviceCommunicator taskFactory plan.TaskFactory planExecutionManager plan.ExecutionManager analysisPlanTaskManager plan.AnalysisPlanTaskManager @@ -200,6 +201,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr infra.repos.alarmRepo, infra.repos.areaControllerRepo, generalDeviceService, + generalDeviceService, notifyService, alarmService, ) @@ -217,7 +219,6 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr infra.repos.planRepo, analysisPlanTaskManager, taskFactory, - generalDeviceService, time.Duration(cfg.Task.Interval)*time.Second, cfg.Task.NumWorkers, ) @@ -259,7 +260,8 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr pigTradeManager: pigTradeManager, pigSickManager: pigSickManager, pigBatchDomain: pigBatchDomain, - generalDeviceService: generalDeviceService, + deviceOperator: generalDeviceService, + deviceCommunicator: generalDeviceService, analysisPlanTaskManager: analysisPlanTaskManager, taskFactory: taskFactory, planExecutionManager: planExecutionManager, @@ -328,7 +330,7 @@ func initAppServices(ctx context.Context, infra *Infrastructure, domainServices infra.repos.deviceRepo, infra.repos.areaControllerRepo, infra.repos.deviceTemplateRepo, - domainServices.generalDeviceService, + domainServices.deviceOperator, thresholdAlarmService, ) diff --git a/internal/domain/device/device_service.go b/internal/domain/device/device_service.go index 7c451fd..43b87ca 100644 --- a/internal/domain/device/device_service.go +++ b/internal/domain/device/device_service.go @@ -42,17 +42,21 @@ func WithoutTracking() SendOption { } } -// Service 抽象了一组方法用于控制设备行为 -type Service interface { - - // Switch 用于切换指定设备的状态, 比如启动和停止 +// DeviceOperator 提供了对单个或多个设备进行具体操作的接口, +// 如开关、触发采集等。它通常用于响应用户的直接指令或执行具体的业务任务。 +type DeviceOperator interface { + // Switch 用于切换指定设备的状态, 比如启动和停止。 Switch(ctx context.Context, device *models.Device, action DeviceAction) error // Collect 用于发起对指定区域主控下的多个设备的批量采集请求。 Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error +} - // Send 是一个通用的发送方法,用于将一个标准的指令载荷发送到指定的区域主控。 - // 它负责将载荷包装成顶层指令、序列化、调用底层发送器,并默认记录下行命令日志。 +// DeviceCommunicator 抽象了与设备进行底层通信的能力。 +// 它负责将一个标准的指令载荷发送到指定的区域主控。 +type DeviceCommunicator interface { + // Send 是一个通用的发送方法,它负责将载荷包装、序列化、 + // 调用底层发送器,并默认记录下行命令日志。 Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error } diff --git a/internal/domain/device/general_device_service.go b/internal/domain/device/general_device_service.go index 2bc5bcd..1edd732 100644 --- a/internal/domain/device/general_device_service.go +++ b/internal/domain/device/general_device_service.go @@ -34,7 +34,7 @@ func NewGeneralDeviceService( deviceCommandLogRepo repository.DeviceCommandLogRepository, pendingCollectionRepo repository.PendingCollectionRepository, comm transport.Communicator, -) Service { +) *GeneralDeviceService { return &GeneralDeviceService{ ctx: ctx, deviceRepo: deviceRepo, diff --git a/internal/domain/plan/plan_execution_manager.go b/internal/domain/plan/plan_execution_manager.go index 8bc9566..7c10acd 100644 --- a/internal/domain/plan/plan_execution_manager.go +++ b/internal/domain/plan/plan_execution_manager.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -95,7 +94,6 @@ type planExecutionManagerImpl struct { taskFactory TaskFactory analysisPlanTaskManager AnalysisPlanTaskManager progressTracker *ProgressTracker - deviceService device.Service pool *ants.Pool // 使用 ants 协程池来管理并发 wg sync.WaitGroup @@ -112,7 +110,6 @@ func NewPlanExecutionManager( planRepo repository.PlanRepository, analysisPlanTaskManager AnalysisPlanTaskManager, taskFactory TaskFactory, - deviceService device.Service, interval time.Duration, numWorkers int, ) ExecutionManager { @@ -125,7 +122,6 @@ func NewPlanExecutionManager( planRepo: planRepo, analysisPlanTaskManager: analysisPlanTaskManager, taskFactory: taskFactory, - deviceService: deviceService, pollingInterval: interval, workers: numWorkers, progressTracker: NewProgressTracker(), diff --git a/internal/domain/task/full_collection_task.go b/internal/domain/task/full_collection_task.go index 9f94d7d..3585b8c 100644 --- a/internal/domain/task/full_collection_task.go +++ b/internal/domain/task/full_collection_task.go @@ -16,7 +16,7 @@ type FullCollectionTask struct { ctx context.Context log *models.TaskExecutionLog deviceRepo repository.DeviceRepository - deviceService device.Service + deviceService device.DeviceOperator } // NewFullCollectionTask 创建一个全量采集任务实例 @@ -24,7 +24,7 @@ func NewFullCollectionTask( ctx context.Context, log *models.TaskExecutionLog, deviceRepo repository.DeviceRepository, - deviceService device.Service, + deviceService device.DeviceOperator, ) plan.Task { return &FullCollectionTask{ ctx: ctx, diff --git a/internal/domain/task/heartbeat_task.go b/internal/domain/task/heartbeat_task.go index 55f67ce..1d75831 100644 --- a/internal/domain/task/heartbeat_task.go +++ b/internal/domain/task/heartbeat_task.go @@ -17,7 +17,7 @@ type HeartbeatTask struct { ctx context.Context log *models.TaskExecutionLog areaControllerRepo repository.AreaControllerRepository - deviceService device.Service + deviceService device.DeviceCommunicator } // NewHeartbeatTask 创建一个心跳检测任务实例 @@ -25,7 +25,7 @@ func NewHeartbeatTask( ctx context.Context, log *models.TaskExecutionLog, areaControllerRepo repository.AreaControllerRepository, - deviceService device.Service, + deviceService device.DeviceCommunicator, ) plan.Task { return &HeartbeatTask{ ctx: ctx, diff --git a/internal/domain/task/release_feed_weight_task.go b/internal/domain/task/release_feed_weight_task.go index ad58d2e..4cc42ef 100644 --- a/internal/domain/task/release_feed_weight_task.go +++ b/internal/domain/task/release_feed_weight_task.go @@ -32,7 +32,7 @@ type ReleaseFeedWeightTask struct { releaseWeight float32 mixingTankDeviceID uint32 - feedPort device.Service + feedPort device.DeviceOperator // onceParse 保证解析参数只执行一次 onceParse sync.Once @@ -44,7 +44,7 @@ func NewReleaseFeedWeightTask( claimedLog *models.TaskExecutionLog, sensorDataRepo repository.SensorDataRepository, deviceRepo repository.DeviceRepository, - deviceService device.Service, + deviceService device.DeviceOperator, ) plan.Task { return &ReleaseFeedWeightTask{ ctx: ctx, diff --git a/internal/domain/task/task.go b/internal/domain/task/task.go index 38db219..254e734 100644 --- a/internal/domain/task/task.go +++ b/internal/domain/task/task.go @@ -29,7 +29,8 @@ type taskFactory struct { alarmRepo repository.AlarmRepository areaControllerRepo repository.AreaControllerRepository - deviceService device.Service + deviceOperator device.DeviceOperator + deviceCommunicator device.DeviceCommunicator notificationService notify.Service alarmService alarm.AlarmService } @@ -40,7 +41,8 @@ func NewTaskFactory( deviceRepo repository.DeviceRepository, alarmRepo repository.AlarmRepository, areaControllerRepo repository.AreaControllerRepository, - deviceService device.Service, + deviceOperator device.DeviceOperator, + deviceCommunicator device.DeviceCommunicator, notifyService notify.Service, alarmService alarm.AlarmService, ) plan.TaskFactory { @@ -50,7 +52,8 @@ func NewTaskFactory( deviceRepo: deviceRepo, alarmRepo: alarmRepo, areaControllerRepo: areaControllerRepo, - deviceService: deviceService, + deviceOperator: deviceOperator, + deviceCommunicator: deviceCommunicator, notificationService: notifyService, alarmService: alarmService, } @@ -63,11 +66,11 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe case models.TaskTypeWaiting: return NewDelayTask(logs.AddCompName(baseCtx, CompNameDelayTask), claimedLog) case models.TaskTypeReleaseFeedWeight: - return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService) + return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceOperator) case models.TaskTypeFullCollection: - return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService) + return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceOperator) case models.TaskTypeHeartbeat: - return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceService) + return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceCommunicator) case models.TaskTypeAlarmNotification: return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo) case models.TaskTypeDeviceThresholdCheck: @@ -96,12 +99,12 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models tempLog, t.sensorDataRepo, t.deviceRepo, - t.deviceService, + t.deviceOperator, ), nil case models.TaskTypeFullCollection: - return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil + return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceOperator), nil case models.TaskTypeHeartbeat: - return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceService), nil + return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceCommunicator), nil case models.TaskTypeAlarmNotification: return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil case models.TaskTypeDeviceThresholdCheck: diff --git a/project_structure.txt b/project_structure.txt index 6aaa887..f227530 100644 --- a/project_structure.txt +++ b/project_structure.txt @@ -40,6 +40,7 @@ design/archive/2025-11-10-exceeding-threshold-alarm/index.md design/archive/2025-11-29-recipe-management/index.md design/ota-upgrade-and-log-monitoring/index.md design/ota-upgrade-and-log-monitoring/lora_refactoring_plan.md +design/ota-upgrade-and-log-monitoring/ota_upgrade_solution.md docs/docs.go docs/swagger.json docs/swagger.yaml @@ -146,6 +147,9 @@ internal/domain/task/heartbeat_task.go internal/domain/task/refresh_notification_task.go internal/domain/task/release_feed_weight_task.go internal/domain/task/task.go +internal/infra/ai/ai.go +internal/infra/ai/gemini.go +internal/infra/ai/no_ai.go internal/infra/config/config.go internal/infra/database/postgres.go internal/infra/database/seeder.go