Files
pig-farm-controller/internal/domain/plan/plan_service.go

421 lines
15 KiB
Go
Raw Normal View History

2025-11-02 19:33:05 +08:00
package plan
import (
2025-11-05 21:40:19 +08:00
"context"
2025-11-02 23:51:45 +08:00
"errors"
"fmt"
2025-11-02 23:51:45 +08:00
2025-11-02 19:33:05 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
2025-11-02 23:51:45 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
2025-11-05 21:40:19 +08:00
2025-11-02 23:51:45 +08:00
"gorm.io/gorm"
)
var (
// ErrPlanNotFound 表示未找到计划
ErrPlanNotFound = errors.New("计划不存在")
// ErrPlanCannotBeModified 表示计划不允许修改
ErrPlanCannotBeModified = errors.New("系统计划不允许修改")
// ErrPlanCannotBeDeleted 表示计划不允许删除
ErrPlanCannotBeDeleted = errors.New("系统计划不允许删除")
// ErrPlanCannotBeStarted 表示计划不允许手动启动
ErrPlanCannotBeStarted = errors.New("系统计划不允许手动启动")
// ErrPlanAlreadyEnabled 表示计划已处于启动状态
ErrPlanAlreadyEnabled = errors.New("计划已处于启动状态,无需重复操作")
// ErrPlanNotEnabled 表示计划未处于启动状态
ErrPlanNotEnabled = errors.New("计划当前不是启用状态")
// ErrPlanCannotBeStopped 表示计划不允许停止
ErrPlanCannotBeStopped = errors.New("系统计划不允许停止")
2025-11-02 19:33:05 +08:00
)
// Service 定义了计划领域服务的接口。
type Service interface {
// Start 启动计划相关的后台服务,例如计划执行管理器。
2025-11-05 21:40:19 +08:00
Start(ctx context.Context)
2025-11-02 19:33:05 +08:00
// Stop 停止计划相关的后台服务,例如计划执行管理器。
2025-11-05 21:40:19 +08:00
Stop(ctx context.Context)
2025-11-02 19:33:05 +08:00
// RefreshPlanTriggers 刷新计划触发器,同步数据库中的计划状态和待执行队列中的触发器任务。
2025-11-05 21:40:19 +08:00
RefreshPlanTriggers(ctx context.Context) error
2025-11-02 23:51:45 +08:00
// CreatePlan 创建一个新的计划
2025-11-05 21:40:19 +08:00
CreatePlan(ctx context.Context, plan *models.Plan) (*models.Plan, error)
2025-11-02 23:51:45 +08:00
// GetPlanByID 根据ID获取计划详情
2025-11-10 22:23:31 +08:00
GetPlanByID(ctx context.Context, id uint32) (*models.Plan, error)
2025-11-02 23:51:45 +08:00
// ListPlans 获取计划列表,支持过滤和分页
2025-11-05 21:40:19 +08:00
ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error)
// UpdatePlan 更新计划, wantPlanType 表示期望被修改的计划是什么类型
UpdatePlan(ctx context.Context, plan *models.Plan, wantPlanType models.PlanType) (*models.Plan, error)
2025-11-02 23:51:45 +08:00
// DeletePlan 删除计划(软删除)
2025-11-10 22:23:31 +08:00
DeletePlan(ctx context.Context, id uint32) error
2025-11-02 23:51:45 +08:00
// StartPlan 启动计划
2025-11-10 22:23:31 +08:00
StartPlan(ctx context.Context, id uint32) error
2025-11-02 23:51:45 +08:00
// StopPlan 停止计划
2025-11-10 22:23:31 +08:00
StopPlan(ctx context.Context, id uint32) error
2025-11-02 19:33:05 +08:00
}
// planServiceImpl 是 Service 接口的具体实现。
type planServiceImpl struct {
2025-11-05 21:40:19 +08:00
ctx context.Context
2025-11-02 19:33:05 +08:00
executionManager ExecutionManager
taskManager AnalysisPlanTaskManager
planRepo repository.PlanRepository
deviceRepo repository.DeviceRepository
unitOfWork repository.UnitOfWork
taskFactory TaskFactory
2025-11-02 19:33:05 +08:00
}
// NewPlanService 创建一个新的 Service 实例。
func NewPlanService(
2025-11-05 21:40:19 +08:00
ctx context.Context,
2025-11-02 19:33:05 +08:00
executionManager ExecutionManager,
taskManager AnalysisPlanTaskManager,
planRepo repository.PlanRepository,
deviceRepo repository.DeviceRepository,
unitOfWork repository.UnitOfWork,
taskFactory TaskFactory,
2025-11-02 19:33:05 +08:00
) Service {
return &planServiceImpl{
2025-11-05 21:40:19 +08:00
ctx: ctx,
2025-11-02 19:33:05 +08:00
executionManager: executionManager,
taskManager: taskManager,
planRepo: planRepo,
deviceRepo: deviceRepo,
unitOfWork: unitOfWork,
taskFactory: taskFactory,
2025-11-02 19:33:05 +08:00
}
}
// Start 启动计划相关的后台服务。
2025-11-05 21:40:19 +08:00
func (s *planServiceImpl) Start(ctx context.Context) {
planCtx, logger := logs.Trace(ctx, s.ctx, "Start")
logger.Infof("PlanService 正在启动...")
s.executionManager.Start(planCtx)
2025-11-02 19:33:05 +08:00
}
// Stop 停止计划相关的后台服务。
2025-11-05 21:40:19 +08:00
func (s *planServiceImpl) Stop(ctx context.Context) {
planCtx, logger := logs.Trace(ctx, s.ctx, "Stop")
logger.Infof("PlanService 正在停止...")
s.executionManager.Stop(planCtx)
2025-11-02 19:33:05 +08:00
}
// RefreshPlanTriggers 刷新计划触发器。
2025-11-05 21:40:19 +08:00
func (s *planServiceImpl) RefreshPlanTriggers(ctx context.Context) error {
planCtx, logger := logs.Trace(ctx, s.ctx, "RefreshPlanTriggers")
logger.Infof("PlanService 正在刷新计划触发器...")
return s.taskManager.Refresh(planCtx)
2025-11-02 19:33:05 +08:00
}
2025-11-02 23:51:45 +08:00
// CreatePlan 创建一个新的计划
2025-11-05 21:40:19 +08:00
func (s *planServiceImpl) CreatePlan(ctx context.Context, planToCreate *models.Plan) (*models.Plan, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "CreatePlan")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:创建计划"
// 1. 业务规则处理
// 用户创建的计划永远是自定义计划
planToCreate.PlanType = models.PlanTypeCustom
// 自动判断 ContentType
if len(planToCreate.SubPlans) > 0 {
planToCreate.ContentType = models.PlanContentTypeSubPlans
} else {
planToCreate.ContentType = models.PlanContentTypeTasks
}
// 2. 验证和重排顺序 (领域逻辑)
if err := planToCreate.ValidateExecutionOrder(); err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToCreate.ID, err)
2025-11-02 23:51:45 +08:00
return nil, err
}
planToCreate.ReorderSteps()
// 3. 在调用仓库前,准备好所有数据,包括设备关联
for i := range planToCreate.Tasks {
taskModel := &planToCreate.Tasks[i]
// 使用工厂创建临时领域对象
2025-11-05 21:40:19 +08:00
taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel)
if err != nil {
// 如果一个任务类型不支持,我们可以选择跳过或报错
2025-11-05 21:40:19 +08:00
logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
continue
}
2025-11-05 21:40:19 +08:00
deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx)
if err != nil {
// 在事务外解析失败,直接返回错误
return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
}
if len(deviceIDs) > 0 {
// 优化无需查询完整的设备对象只需构建包含ID的结构体即可建立关联
devices := make([]models.Device, len(deviceIDs))
for i, id := range deviceIDs {
2025-11-10 22:23:31 +08:00
devices[i] = models.Device{Model: models.Model{ID: id}}
}
taskModel.Devices = devices
}
}
// 4. 调用仓库方法创建计划,该方法内部会处理事务
2025-11-05 21:40:19 +08:00
err := s.planRepo.CreatePlan(planCtx, planToCreate)
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 数据库创建计划失败: %v", actionType, err)
2025-11-02 23:51:45 +08:00
return nil, err
}
// 5. 创建成功后,调用 manager 确保触发器任务定义存在,但不立即加入待执行队列
2025-11-05 21:40:19 +08:00
if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToCreate.ID); err != nil {
2025-11-02 23:51:45 +08:00
// 这是一个非阻塞性错误,我们只记录日志,因为主流程(创建计划)已经成功
2025-11-05 21:40:19 +08:00
logger.Errorf("为新创建的计划 %d 确保触发器任务定义失败: %v", planToCreate.ID, err)
2025-11-02 23:51:45 +08:00
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 计划创建成功, ID: %d", actionType, planToCreate.ID)
2025-11-02 23:51:45 +08:00
return planToCreate, nil
}
// GetPlanByID 根据ID获取计划详情
2025-11-10 22:23:31 +08:00
func (s *planServiceImpl) GetPlanByID(ctx context.Context, id uint32) (*models.Plan, error) {
2025-11-05 21:40:19 +08:00
planCtx, logger := logs.Trace(ctx, s.ctx, "GetPlanByID")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:获取计划详情"
2025-11-05 21:40:19 +08:00
plan, err := s.planRepo.GetPlanByID(planCtx, id)
2025-11-02 23:51:45 +08:00
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return nil, ErrPlanNotFound
}
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 数据库查询失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return nil, err
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 获取计划详情成功, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return plan, nil
}
// ListPlans 获取计划列表,支持过滤和分页
2025-11-05 21:40:19 +08:00
func (s *planServiceImpl) ListPlans(ctx context.Context, opts repository.ListPlansOptions, page, pageSize int) ([]models.Plan, int64, error) {
planCtx, logger := logs.Trace(ctx, s.ctx, "ListPlans")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:获取计划列表"
2025-11-05 21:40:19 +08:00
plans, total, err := s.planRepo.ListPlans(planCtx, opts, page, pageSize)
2025-11-02 23:51:45 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 数据库查询失败: %v", actionType, err)
2025-11-02 23:51:45 +08:00
return nil, 0, err
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 获取计划列表成功, 数量: %d", actionType, len(plans))
2025-11-02 23:51:45 +08:00
return plans, total, nil
}
// UpdatePlan 更新计划, wantPlanType 表示期望被修改的计划是什么类型
func (s *planServiceImpl) UpdatePlan(ctx context.Context, planToUpdate *models.Plan, wantPlanType models.PlanType) (*models.Plan, error) {
2025-11-05 21:40:19 +08:00
planCtx, logger := logs.Trace(ctx, s.ctx, "UpdatePlan")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:更新计划"
2025-11-05 21:40:19 +08:00
existingPlan, err := s.planRepo.GetBasicPlanByID(planCtx, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划不存在, ID: %d", actionType, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
return nil, ErrPlanNotFound
}
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
return nil, err
}
if existingPlan.PlanType != wantPlanType {
logger.Warnf("%s: 禁止修改 %v 类型计划, ID: %d", actionType, wantPlanType, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
return nil, ErrPlanCannotBeModified
}
// 自动判断 ContentType
if len(planToUpdate.SubPlans) > 0 {
planToUpdate.ContentType = models.PlanContentTypeSubPlans
} else {
planToUpdate.ContentType = models.PlanContentTypeTasks
}
// 验证和重排顺序 (领域逻辑)
if err := planToUpdate.ValidateExecutionOrder(); err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 计划 (ID: %d) 的执行顺序无效: %v", actionType, planToUpdate.ID, err)
2025-11-02 23:51:45 +08:00
return nil, err
}
planToUpdate.ReorderSteps()
// 只要是更新任务,就重置执行计数器
planToUpdate.ExecuteCount = 0
2025-11-05 21:40:19 +08:00
logger.Infof("计划 #%d 被更新,执行计数器已重置为 0。", planToUpdate.ID)
2025-11-02 23:51:45 +08:00
// 在调用仓库前,准备好所有数据,包括设备关联
for i := range planToUpdate.Tasks {
taskModel := &planToUpdate.Tasks[i]
2025-11-05 21:40:19 +08:00
taskResolver, err := s.taskFactory.CreateTaskFromModel(planCtx, taskModel)
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Warnf("跳过为任务类型 '%s' 解析设备ID: %v", taskModel.Type, err)
continue
}
2025-11-05 21:40:19 +08:00
deviceIDs, err := taskResolver.ResolveDeviceIDs(planCtx)
if err != nil {
return nil, fmt.Errorf("为任务 '%s' 提取设备ID失败: %w", taskModel.Name, err)
}
if len(deviceIDs) > 0 {
// 优化无需查询完整的设备对象只需构建包含ID的结构体即可建立关联
devices := make([]models.Device, len(deviceIDs))
for i, id := range deviceIDs {
2025-11-10 22:23:31 +08:00
devices[i] = models.Device{Model: models.Model{ID: id}}
}
taskModel.Devices = devices
}
}
// 调用仓库方法更新计划,该方法内部会处理事务
2025-11-05 21:40:19 +08:00
err = s.planRepo.UpdatePlanMetadataAndStructure(planCtx, planToUpdate)
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 数据库更新计划失败: %v, Plan: %+v", actionType, err, planToUpdate)
2025-11-02 23:51:45 +08:00
return nil, err
}
2025-11-05 21:40:19 +08:00
if err := s.taskManager.EnsureAnalysisTaskDefinition(planCtx, planToUpdate.ID); err != nil {
logger.Errorf("为更新后的计划 %d 确保触发器任务定义失败: %v", planToUpdate.ID, err)
2025-11-02 23:51:45 +08:00
}
2025-11-05 21:40:19 +08:00
updatedPlan, err := s.planRepo.GetPlanByID(planCtx, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
if err != nil {
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 获取更新后计划详情失败: %v, ID: %d", actionType, err, planToUpdate.ID)
2025-11-02 23:51:45 +08:00
return nil, errors.New("获取更新后计划详情时发生内部错误")
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 计划更新成功, ID: %d", actionType, updatedPlan.ID)
2025-11-02 23:51:45 +08:00
return updatedPlan, nil
}
// DeletePlan 删除计划(软删除)
2025-11-10 22:23:31 +08:00
func (s *planServiceImpl) DeletePlan(ctx context.Context, id uint32) error {
2025-11-05 21:40:19 +08:00
planCtx, logger := logs.Trace(ctx, s.ctx, "DeletePlan")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:删除计划"
2025-11-05 21:40:19 +08:00
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
2025-11-02 23:51:45 +08:00
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanNotFound
}
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
// 系统计划不允许删除
if plan.PlanType == models.PlanTypeSystem {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 尝试删除系统计划, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanCannotBeDeleted
}
// 如果计划处于启用状态,先停止它
if plan.Status == models.PlanStatusEnabled {
2025-11-05 21:40:19 +08:00
if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil {
logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
}
2025-11-05 21:40:19 +08:00
if err := s.planRepo.DeletePlan(planCtx, id); err != nil {
logger.Errorf("%s: 数据库删除失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 计划删除成功, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return nil
}
// StartPlan 启动计划
2025-11-10 22:23:31 +08:00
func (s *planServiceImpl) StartPlan(ctx context.Context, id uint32) error {
2025-11-05 21:40:19 +08:00
planCtx, logger := logs.Trace(ctx, s.ctx, "StartPlan")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:启动计划"
2025-11-05 21:40:19 +08:00
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
2025-11-02 23:51:45 +08:00
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanNotFound
}
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
// 系统计划不允许手动启动
if plan.PlanType == models.PlanTypeSystem {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 尝试手动启动系统计划, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanCannotBeStarted
}
// 计划已处于启动状态,无需重复操作
if plan.Status == models.PlanStatusEnabled {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划已处于启动状态,无需重复操作, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanAlreadyEnabled
}
// 如果计划未处于启用状态
if plan.Status != models.PlanStatusEnabled {
// 如果执行计数器大于0重置为0
if plan.ExecuteCount > 0 {
2025-11-05 21:40:19 +08:00
if err := s.planRepo.UpdateExecuteCount(planCtx, plan.ID, 0); err != nil {
logger.Errorf("%s: 重置计划执行计数失败: %v, ID: %d", actionType, err, plan.ID)
2025-11-02 23:51:45 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infof("计划 #%d 的执行计数器已重置为 0。", plan.ID)
2025-11-02 23:51:45 +08:00
}
// 更新计划状态为启用
2025-11-05 21:40:19 +08:00
if err := s.planRepo.UpdatePlanStatus(planCtx, plan.ID, models.PlanStatusEnabled); err != nil {
logger.Errorf("%s: 更新计划状态失败: %v, ID: %d", actionType, err, plan.ID)
2025-11-02 23:51:45 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infof("已成功更新计划 #%d 的状态为 '已启动'。", plan.ID)
2025-11-02 23:51:45 +08:00
}
// 创建或更新触发器
2025-11-05 21:40:19 +08:00
if err := s.taskManager.CreateOrUpdateTrigger(planCtx, plan.ID); err != nil {
logger.Errorf("%s: 创建或更新触发器失败: %v, ID: %d", actionType, err, plan.ID)
2025-11-02 23:51:45 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 计划已成功启动, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return nil
}
// StopPlan 停止计划
2025-11-10 22:23:31 +08:00
func (s *planServiceImpl) StopPlan(ctx context.Context, id uint32) error {
2025-11-05 21:40:19 +08:00
planCtx, logger := logs.Trace(ctx, s.ctx, "StopPlan")
2025-11-02 23:51:45 +08:00
const actionType = "领域层:停止计划"
2025-11-05 21:40:19 +08:00
plan, err := s.planRepo.GetBasicPlanByID(planCtx, id)
2025-11-02 23:51:45 +08:00
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划不存在, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanNotFound
}
2025-11-05 21:40:19 +08:00
logger.Errorf("%s: 获取计划信息失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
// 系统计划不允许停止
if plan.PlanType == models.PlanTypeSystem {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 尝试停止系统计划, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return ErrPlanCannotBeStopped
}
// 计划当前不是启用状态
if plan.Status != models.PlanStatusEnabled {
2025-11-05 21:40:19 +08:00
logger.Warnf("%s: 计划当前不是启用状态, ID: %d, Status: %s", actionType, id, plan.Status)
2025-11-02 23:51:45 +08:00
return ErrPlanNotEnabled
}
// 停止计划事务性操作
2025-11-05 21:40:19 +08:00
if err := s.planRepo.StopPlanTransactionally(planCtx, id); err != nil {
logger.Errorf("%s: 停止计划失败: %v, ID: %d", actionType, err, id)
2025-11-02 23:51:45 +08:00
return err
}
2025-11-05 21:40:19 +08:00
logger.Infof("%s: 计划已成功停止, ID: %d", actionType, id)
2025-11-02 23:51:45 +08:00
return nil
}