From 9d9b5f801ff71c20cf38459349333fa52572a9d2 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Wed, 3 Dec 2025 17:34:38 +0800 Subject: [PATCH] =?UTF-8?q?ota=E5=8D=87=E7=BA=A7=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.example.yml | 12 +- config/config.yml | 9 ++ internal/domain/task/ota_check_task.go | 141 ++++++++++++++++++++ internal/infra/config/config.go | 13 ++ internal/infra/models/plan.go | 19 +-- internal/infra/repository/ota_repository.go | 52 ++++++++ internal/infra/transport/proto/device.pb.go | 35 +++-- internal/infra/transport/proto/device.proto | 2 + project_structure.txt | 2 + 9 files changed, 267 insertions(+), 18 deletions(-) create mode 100644 internal/domain/task/ota_check_task.go create mode 100644 internal/infra/repository/ota_repository.go diff --git a/config/config.example.yml b/config/config.example.yml index a640949..f714edc 100644 --- a/config/config.example.yml +++ b/config/config.example.yml @@ -132,4 +132,14 @@ ai: gemini: api_key: "YOUR_GEMINI_API_KEY" # 替换为你的 Gemini API Key model_name: "gemini-2.5-flash" # Gemini 模型名称,例如 "gemini-pro" - timeout: 30 # AI 请求超时时间 (秒) \ No newline at end of file + timeout: 30 # AI 请求超时时间 (秒) + +# OTA 升级配置 +ota: + # 升级任务的全局超时时间(秒)。如果一个升级任务在此时间内没有完成,将被标记为超时。 + default_timeout_seconds: 300 + # 等待设备响应的单次请求超时时间(秒)。例如,下发 PrepareUpdateReq 后等待设备请求文件的超时。 + default_request_timeout_seconds: 60 + # 默认的固件块请求重试次数。 + default_retry_count: 3 + diff --git a/config/config.yml b/config/config.yml index ca5fa1b..6e36057 100644 --- a/config/config.yml +++ b/config/config.yml @@ -111,3 +111,12 @@ ai: api_key: "AIzaSyAJdXUmoN07LIswDac6YxPeRnvXlR73OO8" # 替换为你的 Gemini API Key model_name: "gemini-2.0-flash" # Gemini 模型名称,例如 "gemini-pro" timeout: 30 # AI 请求超时时间 (秒) + +# OTA 升级配置 +ota: + # 升级任务的全局超时时间(秒)。如果一个升级任务在此时间内没有完成,将被标记为超时。 + default_timeout_seconds: 300 + # 等待设备响应的单次请求超时时间(秒)。例如,下发 PrepareUpdateReq 后等待设备请求文件的超时。 + default_request_timeout_seconds: 60 + # 默认的固件块请求重试次数。 + default_retry_count: 3 \ No newline at end of file diff --git a/internal/domain/task/ota_check_task.go b/internal/domain/task/ota_check_task.go new file mode 100644 index 0000000..72f3914 --- /dev/null +++ b/internal/domain/task/ota_check_task.go @@ -0,0 +1,141 @@ +package task + +import ( + "context" + "fmt" + "sync" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/domain/plan" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" +) + +// OtaCheckTaskParams 定义了 OTA 检查任务所需的参数。 +// 这些参数从任务的 Parameters JSON 字段中解析而来。 +type OtaCheckTaskParams struct { + // TimeoutSeconds 定义了任务的全局超时时间(秒)。 + // 如果一个升级任务在此时间内没有完成,将被标记为超时。 + TimeoutSeconds int `json:"timeout_seconds"` +} + +// otaCheckTask 实现了扫描和处理超时 OTA 升级任务的逻辑。 +type otaCheckTask struct { + ctx context.Context + onceParse sync.Once + + taskLog *models.TaskExecutionLog + params OtaCheckTaskParams + + otaRepo repository.OtaRepository +} + +// NewOtaCheckTask 创建一个新的 otaCheckTask 实例。 +func NewOtaCheckTask( + ctx context.Context, + taskLog *models.TaskExecutionLog, + otaRepo repository.OtaRepository, +) plan.Task { + return &otaCheckTask{ + ctx: ctx, + taskLog: taskLog, + otaRepo: otaRepo, + } +} + +// Execute 是任务的核心执行逻辑。 +func (t *otaCheckTask) Execute(ctx context.Context) error { + taskCtx, logger := logs.Trace(ctx, t.ctx, "Execute") + + // 1. 解析并验证任务参数 + if err := t.parseParameters(taskCtx); err != nil { + return err + } + + logger.Infof("开始执行OTA升级超时检查任务,超时设置为 %d 秒...", t.params.TimeoutSeconds) + + timeoutDuration := time.Duration(t.params.TimeoutSeconds) * time.Second + timeoutBefore := time.Now().Add(-timeoutDuration) + + // 2. 定义需要检查的状态 + inProgressStatuses := []models.OTATaskStatus{ + models.OTATaskStatusInProgress, + } + + // 3. 查找所有超时的、仍在进行中的任务 + tasks, err := t.otaRepo.FindTasksByStatusesAndCreationTime(taskCtx, inProgressStatuses, timeoutBefore) + if err != nil { + logger.Errorf("查找超时的OTA升级任务失败: %v", err) + return fmt.Errorf("查找超时的OTA升级任务失败: %w", err) + } + + if len(tasks) == 0 { + logger.Info("没有发现超时的OTA升级任务。") + return nil + } + + logger.Infof("发现 %d 个超时的OTA升级任务,正在逐一处理...", len(tasks)) + message := fmt.Sprintf("任务因超过全局超时时间(%d秒)未完成而被系统自动标记为超时。", t.params.TimeoutSeconds) + + // 4. 逐一更新任务状态 + for _, task := range tasks { + logger.Warnf("正在处理超时的OTA升级任务: ID=%d, 区域主控ID=%d, 目标版本=%s, 创建于=%v", + task.ID, task.AreaControllerID, task.TargetVersion, task.CreatedAt) + + task.Status = models.OTATaskStatusTimedOut + task.ErrorMessage = message + completedTime := time.Now() + task.CompletedAt = &completedTime + + if err := t.otaRepo.Update(taskCtx, task); err != nil { + // 仅记录错误,不中断整个检查任务,以确保其他超时任务能被处理 + logger.Errorf("更新超时的OTA任务 #%d 状态失败: %v", task.ID, err) + } + } + + logger.Infof("成功处理了 %d 个超时的OTA升级任务。", len(tasks)) + return nil +} + +// parseParameters 使用 sync.Once 确保任务参数只被解析一次。 +func (t *otaCheckTask) parseParameters(ctx context.Context) error { + logger := logs.TraceLogger(ctx, t.ctx, "parseParameters") + var err error + t.onceParse.Do(func() { + if t.taskLog.Task.Parameters == nil { + err = fmt.Errorf("任务 %d: 缺少参数", t.taskLog.TaskID) + logger.Error(err.Error()) + return + } + + var params OtaCheckTaskParams + if pErr := t.taskLog.Task.ParseParameters(¶ms); pErr != nil { + err = fmt.Errorf("任务 %d: 解析参数失败: %w", t.taskLog.TaskID, pErr) + logger.Error(err.Error()) + return + } + + // 验证参数 + if params.TimeoutSeconds <= 0 { + err = fmt.Errorf("任务 %d: 参数 'timeout_seconds' 必须是一个正整数", t.taskLog.TaskID) + logger.Error(err.Error()) + return + } + + t.params = params + }) + return err +} + +// OnFailure 定义了当 Execute 方法返回错误时的回滚或清理逻辑。 +func (t *otaCheckTask) OnFailure(ctx context.Context, executeErr error) { + logger := logs.TraceLogger(ctx, t.ctx, "OnFailure") + logger.Errorf("OTA升级超时检查任务执行失败, 任务ID: %d: %v", t.taskLog.TaskID, executeErr) +} + +// ResolveDeviceIDs 从任务配置中解析并返回所有关联的设备ID列表。 +func (t *otaCheckTask) ResolveDeviceIDs(ctx context.Context) ([]uint32, error) { + // 这是一个系统级的任务,不与任何特定设备直接关联。 + return []uint32{}, nil +} diff --git a/internal/infra/config/config.go b/internal/infra/config/config.go index e97c965..62cd447 100644 --- a/internal/infra/config/config.go +++ b/internal/infra/config/config.go @@ -54,6 +54,9 @@ type Config struct { // AI AI服务配置 AI AIConfig `yaml:"ai"` + + // OTA OTA升级配置 + OTA OTAConfig `yaml:"ota"` } // AppConfig 代表应用基础配置 @@ -248,6 +251,16 @@ type Gemini struct { Timeout int `yaml:"timeout"` // AI 请求超时时间 (秒) } +// OTAConfig 代表 OTA 升级配置 +type OTAConfig struct { + // DefaultTimeoutSeconds 升级任务的全局超时时间(秒) + DefaultTimeoutSeconds int `yaml:"default_timeout_seconds"` + // DefaultRequestTimeoutSeconds 等待设备响应的单次请求超时时间(秒) + DefaultRequestTimeoutSeconds int `yaml:"default_request_timeout_seconds"` + // DefaultRetryCount 默认的固件块请求重试次数 + DefaultRetryCount int `yaml:"default_retry_count"` +} + // NewConfig 创建并返回一个新的配置实例 func NewConfig() *Config { // 默认值可以在这里设置,但我们优先使用配置文件中的值 diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index db24c03..c68265b 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -42,15 +42,16 @@ const ( type TaskType string const ( - TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 - TaskTypeWaiting TaskType = "等待" // 等待任务 - TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 - TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 - TaskTypeHeartbeat TaskType = "心跳检测" // 区域主控心跳检测任务 - TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 - TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务 - TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 - TaskTypeAreaCollectorThresholdCheck TaskType = "区域阈值检查" // 区域阈值检查任务 + TaskPlanAnalysis TaskType = "计划分析" // 解析Plan的Task列表并添加到待执行队列的特殊任务 + TaskTypeWaiting TaskType = "等待" // 等待任务 + TaskTypeReleaseFeedWeight TaskType = "下料" // 下料口释放指定重量任务 + TaskTypeFullCollection TaskType = "全量采集" // 新增的全量采集任务 + TaskTypeHeartbeat TaskType = "心跳检测" // 区域主控心跳检测任务 + TaskTypeAlarmNotification TaskType = "告警通知" // 告警通知任务 + TaskTypeNotificationRefresh TaskType = "通知刷新" // 通知刷新任务 + TaskTypeDeviceThresholdCheck TaskType = "设备阈值检查" // 设备阈值检查任务 + TaskTypeAreaCollectorThresholdCheck TaskType = "区域阈值检查" // 区域阈值检查任务 + TaskTypeOTACheck TaskType = "OTA升级检查任务" // OTA升级超时检查任务 ) // -- Task Parameters -- diff --git a/internal/infra/repository/ota_repository.go b/internal/infra/repository/ota_repository.go new file mode 100644 index 0000000..3e712dc --- /dev/null +++ b/internal/infra/repository/ota_repository.go @@ -0,0 +1,52 @@ +package repository + +import ( + "context" + "time" + + "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" + "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" + + "gorm.io/gorm" +) + +// OtaRepository 定义了与 OTA 升级任务相关的数据库操作接口。 +type OtaRepository interface { + // FindTasksByStatusesAndCreationTime 根据状态列表和创建时间查找任务。 + FindTasksByStatusesAndCreationTime(ctx context.Context, statuses []models.OTATaskStatus, createdBefore time.Time) ([]*models.OTATask, error) + // Update 更新单个 OTA 任务。 + Update(ctx context.Context, task *models.OTATask) error +} + +// gormOtaRepository 是 OtaRepository 的 GORM 实现 +type gormOtaRepository struct { + ctx context.Context + db *gorm.DB +} + +// NewGormOtaRepository 创建一个新的 OtaRepository GORM 实现实例 +func NewGormOtaRepository(ctx context.Context, db *gorm.DB) OtaRepository { + return &gormOtaRepository{ + ctx: ctx, + db: db, + } +} + +// FindTasksByStatusesAndCreationTime 实现了根据状态和创建时间查找任务的逻辑。 +func (r *gormOtaRepository) FindTasksByStatusesAndCreationTime(ctx context.Context, + statuses []models.OTATaskStatus, + createdBefore time.Time, +) ([]*models.OTATask, error) { + repoCtx := logs.AddFuncName(ctx, r.ctx, "FindTasksByStatusesAndCreationTime") + var tasks []*models.OTATask + err := r.db.WithContext(repoCtx). + Where("status IN ? AND created_at < ?", statuses, createdBefore). + Find(&tasks).Error + return tasks, err +} + +// Update 实现了更新单个 OTA 任务的逻辑。 +func (r *gormOtaRepository) Update(ctx context.Context, task *models.OTATask) error { + repoCtx := logs.AddFuncName(ctx, r.ctx, "Update") + return r.db.WithContext(repoCtx).Save(task).Error +} diff --git a/internal/infra/transport/proto/device.pb.go b/internal/infra/transport/proto/device.pb.go index 8566649..be07df0 100644 --- a/internal/infra/transport/proto/device.pb.go +++ b/internal/infra/transport/proto/device.pb.go @@ -374,12 +374,14 @@ func (x *Pong) GetFirmwareVersion() string { // PrepareUpdateReq: 平台发送给设备,通知设备准备开始 OTA 升级 (下行) type PrepareUpdateReq struct { - state protoimpl.MessageState `protogen:"open.v1"` - Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // 新固件版本号 - TaskId uint32 `protobuf:"varint,2,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // 升级任务唯一ID - ManifestMd5 string `protobuf:"bytes,3,opt,name=manifest_md5,json=manifestMd5,proto3" json:"manifest_md5,omitempty"` // 清单文件的 MD5 校验和,用于设备初步校验清单文件完整性 - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // 新固件版本号 + TaskId uint32 `protobuf:"varint,2,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // 升级任务唯一ID + ManifestMd5 string `protobuf:"bytes,3,opt,name=manifest_md5,json=manifestMd5,proto3" json:"manifest_md5,omitempty"` // 清单文件的 MD5 校验和,用于设备初步校验清单文件完整性 + RetryCount uint32 `protobuf:"varint,4,opt,name=retry_count,json=retryCount,proto3" json:"retry_count,omitempty"` // 建议的重试次数 + RequestTimeoutSeconds uint32 `protobuf:"varint,5,opt,name=request_timeout_seconds,json=requestTimeoutSeconds,proto3" json:"request_timeout_seconds,omitempty"` // 建议的单次请求超时时间 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *PrepareUpdateReq) Reset() { @@ -433,6 +435,20 @@ func (x *PrepareUpdateReq) GetManifestMd5() string { return "" } +func (x *PrepareUpdateReq) GetRetryCount() uint32 { + if x != nil { + return x.RetryCount + } + return 0 +} + +func (x *PrepareUpdateReq) GetRequestTimeoutSeconds() uint32 { + if x != nil { + return x.RequestTimeoutSeconds + } + return 0 +} + // RequestFile: 设备向平台请求特定文件 (包括清单文件和固件文件) (上行) type RequestFile struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -842,11 +858,14 @@ const file_device_proto_rawDesc = "" + "\x06values\x18\x02 \x03(\x02R\x06values\"\x06\n" + "\x04Ping\"1\n" + "\x04Pong\x12)\n" + - "\x10firmware_version\x18\x01 \x01(\tR\x0ffirmwareVersion\"h\n" + + "\x10firmware_version\x18\x01 \x01(\tR\x0ffirmwareVersion\"\xc1\x01\n" + "\x10PrepareUpdateReq\x12\x18\n" + "\aversion\x18\x01 \x01(\tR\aversion\x12\x17\n" + "\atask_id\x18\x02 \x01(\rR\x06taskId\x12!\n" + - "\fmanifest_md5\x18\x03 \x01(\tR\vmanifestMd5\"B\n" + + "\fmanifest_md5\x18\x03 \x01(\tR\vmanifestMd5\x12\x1f\n" + + "\vretry_count\x18\x04 \x01(\rR\n" + + "retryCount\x126\n" + + "\x17request_timeout_seconds\x18\x05 \x01(\rR\x15requestTimeoutSeconds\"B\n" + "\vRequestFile\x12\x17\n" + "\atask_id\x18\x01 \x01(\rR\x06taskId\x12\x1a\n" + "\bfilepath\x18\x02 \x01(\tR\bfilepath\"]\n" + diff --git a/internal/infra/transport/proto/device.proto b/internal/infra/transport/proto/device.proto index b2afbab..72db4f5 100644 --- a/internal/infra/transport/proto/device.proto +++ b/internal/infra/transport/proto/device.proto @@ -48,6 +48,8 @@ message PrepareUpdateReq { string version = 1; // 新固件版本号 uint32 task_id = 2; // 升级任务唯一ID string manifest_md5 = 3; // 清单文件的 MD5 校验和,用于设备初步校验清单文件完整性 + uint32 retry_count = 4; // 建议的重试次数 + uint32 request_timeout_seconds = 5; // 建议的单次请求超时时间 } // RequestFile: 设备向平台请求特定文件 (包括清单文件和固件文件) (上行) diff --git a/project_structure.txt b/project_structure.txt index f227530..9f1b17b 100644 --- a/project_structure.txt +++ b/project_structure.txt @@ -144,6 +144,7 @@ internal/domain/task/delay_task.go internal/domain/task/device_threshold_check_task.go internal/domain/task/full_collection_task.go internal/domain/task/heartbeat_task.go +internal/domain/task/ota_check_task.go internal/domain/task/refresh_notification_task.go internal/domain/task/release_feed_weight_task.go internal/domain/task/task.go @@ -195,6 +196,7 @@ internal/infra/repository/execution_log_repository.go internal/infra/repository/medication_log_repository.go internal/infra/repository/notification_repository.go internal/infra/repository/nutrient_repository.go +internal/infra/repository/ota_repository.go internal/infra/repository/pending_collection_repository.go internal/infra/repository/pending_task_repository.go internal/infra/repository/pig_batch_log_repository.go