Files
pig-farm-controller/internal/infra/repository/plan_repository.go

580 lines
18 KiB
Go
Raw Normal View History

2025-09-13 13:04:19 +08:00
package repository
import (
"encoding/json"
2025-09-13 17:38:02 +08:00
"errors"
2025-09-13 14:53:31 +08:00
"fmt"
2025-09-13 13:04:19 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/datatypes"
2025-09-13 13:04:19 +08:00
"gorm.io/gorm"
)
2025-09-13 17:38:02 +08:00
// 定义仓库层可导出的公共错误
var (
2025-09-13 22:21:23 +08:00
ErrUpdateWithInvalidRoot = errors.New("更新操作的目标根计划无效或ID为0")
ErrNewSubPlanInUpdate = errors.New("计划树中包含一个ID为0的新子计划更新操作只允许关联已存在的计划")
ErrNodeDoesNotExist = errors.New("计划树中包含一个或多个在数据库中不存在的计划")
ErrCreateWithNonZeroID = errors.New("创建操作的计划ID必须为0")
ErrSubPlanIDIsZeroOnCreate = errors.New("子计划ID为0创建操作只允许关联已存在的计划")
ErrMixedContent = errors.New("计划不能同时包含任务和子计划")
ErrDeleteWithReferencedPlan = errors.New("禁止删除正在被引用的计划")
2025-09-13 17:38:02 +08:00
)
2025-09-13 13:04:19 +08:00
// PlanRepository 定义了与计划模型相关的数据库操作接口
// 这是为了让业务逻辑层依赖于抽象,而不是具体的数据库实现
type PlanRepository interface {
// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情
ListBasicPlans() ([]models.Plan, error)
// GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情
GetBasicPlanByID(id uint) (*models.Plan, error)
2025-09-13 15:00:31 +08:00
// GetPlanByID 根据ID获取计划包含子计划和任务详情
GetPlanByID(id uint) (*models.Plan, error)
2025-09-13 22:21:23 +08:00
// CreatePlan 创建一个新的计划
CreatePlan(plan *models.Plan) error
2025-09-13 17:03:46 +08:00
// UpdatePlan 更新计划,包括子计划和任务
UpdatePlan(plan *models.Plan) error
2025-09-13 22:21:23 +08:00
// DeletePlan 根据ID删除计划同时删除其关联的任务非子任务或子计划关联
DeletePlan(id uint) error
// FlattenPlanTasks 递归展开计划,返回按执行顺序排列的所有任务列表
FlattenPlanTasks(planID uint) ([]models.Task, error)
// DeleteTask 根据ID删除任务
DeleteTask(id int) error
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error)
2025-09-13 13:04:19 +08:00
}
// gormPlanRepository 是 PlanRepository 的 GORM 实现
type gormPlanRepository struct {
db *gorm.DB
}
// NewGormPlanRepository 创建一个新的 PlanRepository GORM 实现实例
func NewGormPlanRepository(db *gorm.DB) PlanRepository {
return &gormPlanRepository{
db: db,
}
}
// ListBasicPlans 获取所有计划的基本信息,不包含子计划和任务详情
func (r *gormPlanRepository) ListBasicPlans() ([]models.Plan, error) {
var plans []models.Plan
// GORM 默认不会加载关联,除非使用 Preload所以直接 Find 即可满足要求
result := r.db.Find(&plans)
if result.Error != nil {
return nil, result.Error
}
return plans, nil
}
// GetBasicPlanByID 根据ID获取计划的基本信息不包含子计划和任务详情
func (r *gormPlanRepository) GetBasicPlanByID(id uint) (*models.Plan, error) {
var plan models.Plan
// GORM 默认不会加载关联,除非使用 Preload所以直接 First 即可满足要求
result := r.db.First(&plan, id)
if result.Error != nil {
return nil, result.Error
}
return &plan, nil
}
2025-09-13 14:53:31 +08:00
// GetPlanByID 根据ID获取计划包含子计划和任务详情
func (r *gormPlanRepository) GetPlanByID(id uint) (*models.Plan, error) {
var plan models.Plan
// 先获取基本计划信息
result := r.db.First(&plan, id)
if result.Error != nil {
return nil, result.Error
}
// 根据内容类型加载关联数据
switch plan.ContentType {
case models.PlanContentTypeSubPlans:
// 加载子计划引用
var subPlans []models.SubPlan
2025-09-13 15:30:10 +08:00
result = r.db.Where("parent_plan_id = ?", plan.ID).Order("execution_order").Find(&subPlans)
2025-09-13 14:53:31 +08:00
if result.Error != nil {
return nil, result.Error
}
// 递归加载每个子计划的完整信息
for i := range subPlans {
childPlan, err := r.GetPlanByID(subPlans[i].ChildPlanID)
if err != nil {
return nil, err
}
subPlans[i].ChildPlan = childPlan
}
plan.SubPlans = subPlans
case models.PlanContentTypeTasks:
// 加载任务
result = r.db.Preload("Tasks", func(taskDB *gorm.DB) *gorm.DB {
2025-09-13 15:30:10 +08:00
return taskDB.Order("execution_order")
}).First(&plan, id)
if result.Error != nil {
return nil, result.Error
}
2025-09-13 14:53:31 +08:00
default:
return nil, fmt.Errorf("未知的计划内容类型: %v; 计划ID: %v", plan.ContentType, plan.ID)
}
return &plan, nil
}
2025-09-13 17:03:46 +08:00
2025-09-13 22:21:23 +08:00
// CreatePlan 创建一个新的计划
func (r *gormPlanRepository) CreatePlan(plan *models.Plan) error {
2025-09-13 20:13:53 +08:00
return r.db.Transaction(func(tx *gorm.DB) error {
// 1. 前置校验
if plan.ID != 0 {
return ErrCreateWithNonZeroID
}
// 检查是否同时包含任务和子计划
if len(plan.Tasks) > 0 && len(plan.SubPlans) > 0 {
return ErrMixedContent
}
// 检查是否有重复的执行顺序
if err := plan.ValidateExecutionOrder(); err != nil {
return fmt.Errorf("计划 (ID: %d) 的执行顺序无效: %w", plan.ID, err)
}
2025-09-13 20:13:53 +08:00
// 如果是子计划类型验证所有子计划是否存在且ID不为0
if plan.ContentType == models.PlanContentTypeSubPlans {
childIDsToValidate := make(map[uint]bool)
for _, subPlanLink := range plan.SubPlans {
2025-09-14 14:45:56 +08:00
if subPlanLink.ChildPlanID == 0 {
2025-09-13 20:13:53 +08:00
return ErrSubPlanIDIsZeroOnCreate
}
2025-09-14 14:45:56 +08:00
childIDsToValidate[subPlanLink.ChildPlanID] = true
2025-09-13 20:13:53 +08:00
}
var ids []uint
for id := range childIDsToValidate {
ids = append(ids, id)
}
if len(ids) > 0 {
var count int64
if err := tx.Model(&models.Plan{}).Where("id IN ?", ids).Count(&count).Error; err != nil {
return fmt.Errorf("验证子计划存在性失败: %w", err)
}
if int(count) != len(ids) {
return ErrNodeDoesNotExist
}
}
}
// 2. 创建根计划
// GORM 会自动处理关联的 Tasks (如果 ContentType 是 tasks 且 Task.ID 为 0)
if err := tx.Create(plan).Error; err != nil {
return err
}
// 3. 创建触发器Task
if err := r.createPlanAnalysisTask(tx, plan); err != nil {
return err
}
2025-09-13 20:13:53 +08:00
return nil
})
}
2025-09-13 17:03:46 +08:00
// UpdatePlan 是更新计划的公共入口点
func (r *gormPlanRepository) UpdatePlan(plan *models.Plan) error {
return r.db.Transaction(func(tx *gorm.DB) error {
return r.updatePlanTx(tx, plan)
})
}
// updatePlanTx 在事务中协调整个更新过程
func (r *gormPlanRepository) updatePlanTx(tx *gorm.DB, plan *models.Plan) error {
if err := r.validatePlanTree(tx, plan); err != nil {
return err
}
if err := r.reconcilePlanNode(tx, plan); err != nil {
return err
}
// 更新Plan触发器
return r.updatePlanAnalysisTask(tx, plan)
2025-09-13 17:03:46 +08:00
}
// validatePlanTree 对整个计划树进行全面的只读健康检查
func (r *gormPlanRepository) validatePlanTree(tx *gorm.DB, plan *models.Plan) error {
// 1. 检查根节点
if plan == nil || plan.ID == 0 {
2025-09-13 17:38:02 +08:00
return ErrUpdateWithInvalidRoot
2025-09-13 17:03:46 +08:00
}
if len(plan.Tasks) > 0 && len(plan.SubPlans) > 0 {
return fmt.Errorf("计划 (ID: %d) 不能同时包含任务和子计划", plan.ID)
}
// 2. 递归验证所有子节点,并检测循环引用
allIDs := make(map[uint]bool)
recursionStack := make(map[uint]bool)
if err := validateNodeAndDetectCycles(plan, allIDs, recursionStack); err != nil {
return err
}
// 3. 检查是否有重复的执行顺序
if err := plan.ValidateExecutionOrder(); err != nil {
return fmt.Errorf("计划 (ID: %d) 的执行顺序无效: %w", plan.ID, err)
}
// 4. 一次性数据库存在性校验
2025-09-13 17:03:46 +08:00
var idsToCheck []uint
for id := range allIDs {
idsToCheck = append(idsToCheck, id)
}
if len(idsToCheck) > 0 {
var count int64
if err := tx.Model(&models.Plan{}).Where("id IN ?", idsToCheck).Count(&count).Error; err != nil {
return fmt.Errorf("检查计划存在性时出错: %w", err)
}
if int(count) != len(idsToCheck) {
2025-09-13 17:38:02 +08:00
return ErrNodeDoesNotExist
2025-09-13 17:03:46 +08:00
}
}
return nil
}
// validateNodeAndDetectCycles 递归地验证节点有效性并检测循环引用
func validateNodeAndDetectCycles(plan *models.Plan, allIDs, recursionStack map[uint]bool) error {
if plan == nil {
return nil
}
if plan.ID == 0 {
2025-09-13 17:38:02 +08:00
return ErrNewSubPlanInUpdate
2025-09-13 17:03:46 +08:00
}
if recursionStack[plan.ID] {
return fmt.Errorf("检测到循环引用:计划 (ID: %d) 是其自身的祖先", plan.ID)
}
if allIDs[plan.ID] {
return nil // 已经验证过这个节点及其子树,无需重复
}
recursionStack[plan.ID] = true
allIDs[plan.ID] = true
for _, subPlanLink := range plan.SubPlans {
if err := validateNodeAndDetectCycles(subPlanLink.ChildPlan, allIDs, recursionStack); err != nil {
return err
}
}
delete(recursionStack, plan.ID) // 回溯,将节点移出当前递归路径
return nil
}
// reconcilePlanNode 递归地同步数据库状态以匹配给定的计划节点
func (r *gormPlanRepository) reconcilePlanNode(tx *gorm.DB, plan *models.Plan) error {
if plan == nil {
return nil
}
// 1. 更新节点本身的基础字段
if err := tx.Model(plan).Select("Name", "Description", "ExecutionType", "ExecuteNum", "CronExpression", "ContentType").Updates(plan).Error; err != nil {
2025-09-13 17:03:46 +08:00
return err
}
// 2. 根据内容类型协调子项
switch plan.ContentType {
case models.PlanContentTypeTasks:
// 清理旧的子计划关联
if err := tx.Where("parent_plan_id = ?", plan.ID).Delete(&models.SubPlan{}).Error; err != nil {
return fmt.Errorf("更新时清理旧的子计划关联失败: %w", err)
}
// 协调任务列表
return r.reconcileTasks(tx, plan)
case models.PlanContentTypeSubPlans:
// 清理旧的任务
if err := tx.Where("plan_id = ?", plan.ID).Delete(&models.Task{}).Error; err != nil {
return fmt.Errorf("更新时清理旧的任务失败: %w", err)
}
// 协调子计划关联
return r.reconcileSubPlans(tx, plan)
}
return nil
}
// reconcileTasks 精确同步任务列表
func (r *gormPlanRepository) reconcileTasks(tx *gorm.DB, plan *models.Plan) error {
var existingTasks []models.Task
if err := tx.Where("plan_id = ?", plan.ID).Find(&existingTasks).Error; err != nil {
return err
}
existingTaskMap := make(map[int]bool)
2025-09-13 17:03:46 +08:00
for _, task := range existingTasks {
existingTaskMap[task.ID] = true
}
for i := range plan.Tasks {
task := &plan.Tasks[i]
if task.ID == 0 {
task.PlanID = plan.ID
if err := tx.Create(task).Error; err != nil {
return err
}
} else {
delete(existingTaskMap, task.ID) // 从待删除map中移除
if err := tx.Model(task).Updates(task).Error; err != nil {
return err
}
}
}
var tasksToDelete []int
2025-09-13 17:03:46 +08:00
for id := range existingTaskMap {
tasksToDelete = append(tasksToDelete, id)
}
if len(tasksToDelete) > 0 {
if err := tx.Delete(&models.Task{}, tasksToDelete).Error; err != nil {
return err
}
}
return nil
}
// reconcileSubPlans 精确同步子计划关联并递归
func (r *gormPlanRepository) reconcileSubPlans(tx *gorm.DB, plan *models.Plan) error {
var existingLinks []models.SubPlan
if err := tx.Where("parent_plan_id = ?", plan.ID).Find(&existingLinks).Error; err != nil {
return err
}
existingLinkMap := make(map[uint]bool)
for _, link := range existingLinks {
existingLinkMap[link.ID] = true
}
for i := range plan.SubPlans {
link := &plan.SubPlans[i]
link.ParentPlanID = plan.ID
if link.ID == 0 {
if err := tx.Create(link).Error; err != nil {
return err
}
} else {
delete(existingLinkMap, link.ID) // 从待删除map中移除
if err := tx.Model(link).Updates(link).Error; err != nil {
return err
}
}
}
var linksToDelete []uint
for id := range existingLinkMap {
linksToDelete = append(linksToDelete, id)
}
if len(linksToDelete) > 0 {
if err := tx.Delete(&models.SubPlan{}, linksToDelete).Error; err != nil {
return err
}
}
return nil
}
2025-09-13 20:13:53 +08:00
2025-09-13 22:21:23 +08:00
// DeletePlan 根据ID删除计划同时删除其关联的任务非子任务或子计划关联
func (r *gormPlanRepository) DeletePlan(id uint) error {
return r.db.Transaction(func(tx *gorm.DB) error {
// 1. 检查该计划是否是其他计划的子计划
var count int64
if err := tx.Model(&models.SubPlan{}).Where("child_plan_id = ?", id).Count(&count).Error; err != nil {
return fmt.Errorf("检查计划是否为子计划失败: %w", err)
}
if count > 0 {
return ErrDeleteWithReferencedPlan
}
var plan models.Plan
// 2. 获取计划以确定其内容类型
if err := tx.First(&plan, id).Error; err != nil {
return err
}
// 3. 根据内容类型删除关联数据
switch plan.ContentType {
case models.PlanContentTypeTasks:
// 删除与此计划关联的所有非子任务
if err := tx.Where("plan_id = ?", id).Delete(&models.Task{}).Error; err != nil {
2025-09-13 22:21:23 +08:00
return fmt.Errorf("删除计划ID %d 的任务失败: %w", id, err)
}
case models.PlanContentTypeSubPlans:
// 删除与此计划关联的所有子计划链接
if err := tx.Where("parent_plan_id = ?", id).Delete(&models.SubPlan{}).Error; err != nil {
return fmt.Errorf("删除计划ID %d 的子计划关联失败: %w", id, err)
}
}
// 4. 删除计划本身
if err := tx.Delete(&models.Plan{}, id).Error; err != nil {
return fmt.Errorf("删除计划ID %d 失败: %w", id, err)
}
return nil
})
}
// FlattenPlanTasks 递归展开计划,返回按执行顺序排列的所有任务列表
func (r *gormPlanRepository) FlattenPlanTasks(planID uint) ([]models.Task, error) {
plan, err := r.GetPlanByID(planID)
if err != nil {
return nil, fmt.Errorf("获取计划(ID: %d)失败: %w", planID, err)
}
return r.flattenPlanTasksRecursive(plan)
}
// flattenPlanTasksRecursive 递归展开计划的内部实现
func (r *gormPlanRepository) flattenPlanTasksRecursive(plan *models.Plan) ([]models.Task, error) {
var tasks []models.Task
switch plan.ContentType {
case models.PlanContentTypeTasks:
// 如果计划直接包含任务,直接返回这些任务
// 由于GetPlanByID已经预加载并排序了任务这里直接使用即可
tasks = append(tasks, plan.Tasks...)
case models.PlanContentTypeSubPlans:
// 如果计划包含子计划,则递归处理每个子计划
for _, subPlan := range plan.SubPlans {
// 获取子计划的任务列表
var subTasks []models.Task
var err error
// 确保子计划已经被加载
if subPlan.ChildPlan != nil {
subTasks, err = r.flattenPlanTasksRecursive(subPlan.ChildPlan)
} else {
// 如果子计划未加载,则从数据库获取并递归展开
subTasks, err = r.FlattenPlanTasks(subPlan.ChildPlanID)
}
if err != nil {
return nil, fmt.Errorf("展开子计划(ID: %d)失败: %w", subPlan.ChildPlanID, err)
}
// 将子计划的任务添加到结果中
tasks = append(tasks, subTasks...)
}
default:
return nil, fmt.Errorf("未知的计划内容类型: %v", plan.ContentType)
}
return tasks, nil
}
// DeleteTask 根据ID删除任务
func (r *gormPlanRepository) DeleteTask(id int) error {
// 使用事务确保操作的原子性
return r.db.Transaction(func(tx *gorm.DB) error {
return r.deleteTask(tx, id)
})
}
// deleteTask 根据ID删除任务
func (r *gormPlanRepository) deleteTask(tx *gorm.DB, id int) error {
// 1. 检查是否有待执行任务引用了这个任务
var pendingTaskCount int64
if err := tx.Model(&models.PendingTask{}).Where("task_id = ?", id).Count(&pendingTaskCount).Error; err != nil {
return fmt.Errorf("检查待执行任务时出错: %w", err)
}
// 如果有待执行任务引用该任务,不能删除
if pendingTaskCount > 0 {
return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 条待执行任务引用该任务", id, pendingTaskCount)
}
// 2. 检查是否有计划仍在使用这个任务
var planCount int64
if err := tx.Model(&models.Plan{}).Joins("JOIN tasks ON plans.id = tasks.plan_id").Where("tasks.id = ?", id).Count(&planCount).Error; err != nil {
return fmt.Errorf("检查计划引用任务时出错: %w", err)
}
// 如果有计划在使用该任务,不能删除
if planCount > 0 {
return fmt.Errorf("无法删除任务(ID: %d),因为存在 %d 个计划仍在使用该任务", id, planCount)
}
// 3. 执行删除操作
result := tx.Delete(&models.Task{}, id)
if result.Error != nil {
return fmt.Errorf("删除任务失败: %w", result.Error)
}
// 检查是否实际删除了记录
if result.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
}
// FindPlanAnalysisTaskByParamsPlanID 根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) FindPlanAnalysisTaskByParamsPlanID(paramsPlanID uint) (*models.Task, error) {
return r.findPlanAnalysisTaskByParamsPlanID(r.db, paramsPlanID)
}
// findPlanAnalysisTaskByParamsPlanID 使用指定db根据Parameters中的ParamsPlanID字段值查找TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) findPlanAnalysisTaskByParamsPlanID(tx *gorm.DB, paramsPlanID uint) (*models.Task, error) {
var task models.Task
// 构造JSON查询条件查找Parameters中包含指定ParamsPlanID且Type为TaskPlanAnalysis的任务
// TODO 在JSON字段中查找特定键值的语法取决于数据库类型这里使用PostgreSQL的语法
// TODO 如果使用的是MySQL则需要相应调整查询条件
result := tx.Where(
"type = ? AND parameters->>'plan_id' = ?",
models.TaskPlanAnalysis,
fmt.Sprintf("%d", paramsPlanID),
).First(&task)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("未找到Parameters.PlanID为%d的TaskPlanAnalysis类型任务", paramsPlanID)
}
return nil, fmt.Errorf("查找任务时出错: %w", result.Error)
}
return &task, nil
}
// createPlanAnalysisTask 用于创建一个TaskPlanAnalysis类型的Task
func (r *gormPlanRepository) createPlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error {
m := map[string]interface{}{
models.ParamsPlanID: plan.ID,
}
parameters, err := json.Marshal(m)
if err != nil {
return err
}
task := &models.Task{
PlanID: plan.ID,
Name: fmt.Sprintf("'%v'计划触发器", plan.Name),
Description: fmt.Sprintf("计划名: %v, 计划ID: %v", plan.Name, plan.ID),
ExecutionOrder: 0,
Type: models.TaskPlanAnalysis,
Parameters: datatypes.JSON(parameters),
}
return tx.Create(task).Error
}
// updatePlanAnalysisTask 使用简单粗暴的删除再创建方式实现更新, 以控制AnalysisPlanTask的定义全部在createPlanAnalysisTask方法中
func (r *gormPlanRepository) updatePlanAnalysisTask(tx *gorm.DB, plan *models.Plan) error {
task, err := r.findPlanAnalysisTaskByParamsPlanID(tx, plan.ID)
if err != nil {
return err
}
err = r.deleteTask(tx, task.ID)
if err != nil {
return err
}
return r.createPlanAnalysisTask(tx, plan)
}