From 19d55eb09b534fa6f609cb5082c4b25836fd4545 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Mon, 10 Nov 2025 14:11:39 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E6=97=B6=E5=81=A5=E5=BA=B7=E8=AE=A1=E5=88=92=E8=B0=83=E6=95=B4?= =?UTF-8?q?(=E5=8C=85=E6=8B=AC=E5=A2=9E=E5=8A=A0=E5=BB=B6=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- design/exceeding-threshold-alarm/index.md | 3 +- internal/core/data_initializer.go | 51 ++++++++++++++++--- .../domain/task/release_feed_weight_task.go | 3 +- internal/infra/models/plan.go | 14 +++++ 4 files changed, 60 insertions(+), 11 deletions(-) diff --git a/design/exceeding-threshold-alarm/index.md b/design/exceeding-threshold-alarm/index.md index b0eae8e..621db95 100644 --- a/design/exceeding-threshold-alarm/index.md +++ b/design/exceeding-threshold-alarm/index.md @@ -137,4 +137,5 @@ 5. 实现告警通知发送计划/全量采集计划改名 6. 实现设备阈值检查任务 7. 实现忽略告警和取消忽略告警接口及功能 -8. 实现列表查询活跃告警和历史告警 \ No newline at end of file +8. 实现列表查询活跃告警和历史告警 +9. 系统初始化时健康计划调整(包括增加延时任务) \ No newline at end of file diff --git a/internal/core/data_initializer.go b/internal/core/data_initializer.go index c5ffe06..32edd75 100644 --- a/internal/core/data_initializer.go +++ b/internal/core/data_initializer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "git.huangwc.com/pig/pig-farm-controller/internal/domain/task" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" @@ -93,6 +94,47 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont interval = 1 // 确保间隔至少为1分钟 } cronExpression := fmt.Sprintf("*/%d * * * *", interval) + + // 定义预设的全量采集任务 + fullCollectionTask := models.Task{ + Name: "全量采集", + Description: "触发一次全量数据采集", + ExecutionOrder: 1, + Type: models.TaskTypeFullCollection, + } + + // 定义预设的延时任务 + delayParams := task.DelayTaskParams{DelayDuration: 10} // 延时10秒 + delayTask := models.Task{ + Name: "延时任务", + Description: "系统预设延时任务,用于错峰处理", + ExecutionOrder: 2, + Type: models.TaskTypeWaiting, + } + err := delayTask.SaveParameters(delayParams) + if err != nil { + return fmt.Errorf("序列化延时任务参数失败: %w", err) + } + + // 构建新的任务列表 + var newTasks []models.Task + newTasks = append(newTasks, fullCollectionTask, delayTask) + + // 如果计划已存在,则获取其现有任务并追加到新任务列表后(排除预设任务) + if foundExistingPlan, ok := existingPlanMap[PlanNamePeriodicSystemHealthCheck]; ok { + for _, existingTask := range foundExistingPlan.Tasks { + // 排除已预设的全量采集和延时任务 + if existingTask.Type != models.TaskTypeFullCollection && existingTask.Type != models.TaskTypeWaiting { + newTasks = append(newTasks, existingTask) + } + } + } + + // 重新设置所有任务的 ExecutionOrder + for i := range newTasks { + newTasks[i].ExecutionOrder = i + 1 + } + predefinedPlan := &models.Plan{ Name: PlanNamePeriodicSystemHealthCheck, Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval), @@ -101,14 +143,7 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont CronExpression: cronExpression, Status: models.PlanStatusEnabled, ContentType: models.PlanContentTypeTasks, - Tasks: []models.Task{ - { - Name: "全量采集", - Description: "触发一次全量数据采集", - ExecutionOrder: 1, - Type: models.TaskTypeFullCollection, - }, - }, + Tasks: newTasks, } if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok { diff --git a/internal/domain/task/release_feed_weight_task.go b/internal/domain/task/release_feed_weight_task.go index 530e929..5e2b30c 100644 --- a/internal/domain/task/release_feed_weight_task.go +++ b/internal/domain/task/release_feed_weight_task.go @@ -2,7 +2,6 @@ package task import ( "context" - "encoding/json" "fmt" "sync" "time" @@ -114,7 +113,7 @@ func (r *ReleaseFeedWeightTask) getNowWeight(ctx context.Context) (float64, erro } wg := &models.WeightData{} - err = json.Unmarshal(sensorData.Data, wg) + err = sensorData.ParseData(wg) if err != nil { logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID) return 0, err diff --git a/internal/infra/models/plan.go b/internal/infra/models/plan.go index 3c7e5f7..0e74684 100644 --- a/internal/infra/models/plan.go +++ b/internal/infra/models/plan.go @@ -202,6 +202,20 @@ func (t Task) ParseParameters(v interface{}) error { return json.Unmarshal(t.Parameters, v) } +// SaveParameters 将一个结构体序列化为 JSON 并保存到 Task 的 Parameters 字段。 +// 示例: +// +// params := LoraParameters{...} +// if err := task.SaveParameters(params); err != nil { ... } +func (t *Task) SaveParameters(v interface{}) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("序列化任务参数失败: %w", err) + } + t.Parameters = data + return nil +} + // DeviceTask 是设备和任务之间的关联模型,表示一个设备可以执行多个任务,一个任务可以被多个设备执行。 type DeviceTask struct { gorm.Model