系统初始化时健康计划调整(包括增加延时任务)
This commit is contained in:
@@ -137,4 +137,5 @@
|
|||||||
5. 实现告警通知发送计划/全量采集计划改名
|
5. 实现告警通知发送计划/全量采集计划改名
|
||||||
6. 实现设备阈值检查任务
|
6. 实现设备阈值检查任务
|
||||||
7. 实现忽略告警和取消忽略告警接口及功能
|
7. 实现忽略告警和取消忽略告警接口及功能
|
||||||
8. 实现列表查询活跃告警和历史告警
|
8. 实现列表查询活跃告警和历史告警
|
||||||
|
9. 系统初始化时健康计划调整(包括增加延时任务)
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/domain/task"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
@@ -93,6 +94,47 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont
|
|||||||
interval = 1 // 确保间隔至少为1分钟
|
interval = 1 // 确保间隔至少为1分钟
|
||||||
}
|
}
|
||||||
cronExpression := fmt.Sprintf("*/%d * * * *", interval)
|
cronExpression := fmt.Sprintf("*/%d * * * *", interval)
|
||||||
|
|
||||||
|
// 定义预设的全量采集任务
|
||||||
|
fullCollectionTask := models.Task{
|
||||||
|
Name: "全量采集",
|
||||||
|
Description: "触发一次全量数据采集",
|
||||||
|
ExecutionOrder: 1,
|
||||||
|
Type: models.TaskTypeFullCollection,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 定义预设的延时任务
|
||||||
|
delayParams := task.DelayTaskParams{DelayDuration: 10} // 延时10秒
|
||||||
|
delayTask := models.Task{
|
||||||
|
Name: "延时任务",
|
||||||
|
Description: "系统预设延时任务,用于错峰处理",
|
||||||
|
ExecutionOrder: 2,
|
||||||
|
Type: models.TaskTypeWaiting,
|
||||||
|
}
|
||||||
|
err := delayTask.SaveParameters(delayParams)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("序列化延时任务参数失败: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建新的任务列表
|
||||||
|
var newTasks []models.Task
|
||||||
|
newTasks = append(newTasks, fullCollectionTask, delayTask)
|
||||||
|
|
||||||
|
// 如果计划已存在,则获取其现有任务并追加到新任务列表后(排除预设任务)
|
||||||
|
if foundExistingPlan, ok := existingPlanMap[PlanNamePeriodicSystemHealthCheck]; ok {
|
||||||
|
for _, existingTask := range foundExistingPlan.Tasks {
|
||||||
|
// 排除已预设的全量采集和延时任务
|
||||||
|
if existingTask.Type != models.TaskTypeFullCollection && existingTask.Type != models.TaskTypeWaiting {
|
||||||
|
newTasks = append(newTasks, existingTask)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 重新设置所有任务的 ExecutionOrder
|
||||||
|
for i := range newTasks {
|
||||||
|
newTasks[i].ExecutionOrder = i + 1
|
||||||
|
}
|
||||||
|
|
||||||
predefinedPlan := &models.Plan{
|
predefinedPlan := &models.Plan{
|
||||||
Name: PlanNamePeriodicSystemHealthCheck,
|
Name: PlanNamePeriodicSystemHealthCheck,
|
||||||
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval),
|
Description: fmt.Sprintf("这是一个系统预定义的计划, 每 %d 分钟自动触发一次全量数据采集, 并进行阈值校验告警。", app.Config.Collection.Interval),
|
||||||
@@ -101,14 +143,7 @@ func (app *Application) initializePeriodicSystemHealthCheckPlan(ctx context.Cont
|
|||||||
CronExpression: cronExpression,
|
CronExpression: cronExpression,
|
||||||
Status: models.PlanStatusEnabled,
|
Status: models.PlanStatusEnabled,
|
||||||
ContentType: models.PlanContentTypeTasks,
|
ContentType: models.PlanContentTypeTasks,
|
||||||
Tasks: []models.Task{
|
Tasks: newTasks,
|
||||||
{
|
|
||||||
Name: "全量采集",
|
|
||||||
Description: "触发一次全量数据采集",
|
|
||||||
ExecutionOrder: 1,
|
|
||||||
Type: models.TaskTypeFullCollection,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
|
if foundExistingPlan, ok := existingPlanMap[predefinedPlan.Name]; ok {
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package task
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -114,7 +113,7 @@ func (r *ReleaseFeedWeightTask) getNowWeight(ctx context.Context) (float64, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg := &models.WeightData{}
|
wg := &models.WeightData{}
|
||||||
err = json.Unmarshal(sensorData.Data, wg)
|
err = sensorData.ParseData(wg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID)
|
logger.Errorf("反序列化设备 %v 最新传感器数据失败: %v , 日志ID: %v", r.mixingTankDeviceID, err, r.claimedLog.ID)
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|||||||
@@ -202,6 +202,20 @@ func (t Task) ParseParameters(v interface{}) error {
|
|||||||
return json.Unmarshal(t.Parameters, v)
|
return json.Unmarshal(t.Parameters, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveParameters 将一个结构体序列化为 JSON 并保存到 Task 的 Parameters 字段。
|
||||||
|
// 示例:
|
||||||
|
//
|
||||||
|
// params := LoraParameters{...}
|
||||||
|
// if err := task.SaveParameters(params); err != nil { ... }
|
||||||
|
func (t *Task) SaveParameters(v interface{}) error {
|
||||||
|
data, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("序列化任务参数失败: %w", err)
|
||||||
|
}
|
||||||
|
t.Parameters = data
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeviceTask 是设备和任务之间的关联模型,表示一个设备可以执行多个任务,一个任务可以被多个设备执行。
|
// DeviceTask 是设备和任务之间的关联模型,表示一个设备可以执行多个任务,一个任务可以被多个设备执行。
|
||||||
type DeviceTask struct {
|
type DeviceTask struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
|
|||||||
Reference in New Issue
Block a user