package service import ( "context" "encoding/json" "fmt" "io" "path/filepath" "git.huangwc.com/pig/pig-farm-controller/internal/app/dto" "git.huangwc.com/pig/pig-farm-controller/internal/domain/device" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "git.huangwc.com/pig/pig-farm-controller/internal/infra/repository" "git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file" "github.com/google/uuid" ) // AreaControllerService 定义了应用层的区域主控服务接口。 type AreaControllerService interface { CreateAreaController(ctx context.Context, req *dto.CreateAreaControllerRequest) (*dto.AreaControllerResponse, error) GetAreaController(ctx context.Context, id uint32) (*dto.AreaControllerResponse, error) ListAreaControllers(ctx context.Context) ([]*dto.AreaControllerResponse, error) UpdateAreaController(ctx context.Context, id uint32, req *dto.UpdateAreaControllerRequest) (*dto.AreaControllerResponse, error) DeleteAreaController(ctx context.Context, id uint32) error // StartUpgrade 用于启动一个 OTA 升级任务。 StartUpgrade(ctx context.Context, areaControllerID uint32, firmware *dto.OtaUpgradeRequest) (*dto.OtaUpgradeResponse, error) // GetUpgradeProgress 用于查询指定 OTA 任务的进度。 GetUpgradeProgress(ctx context.Context, taskID uint32) (*dto.OtaUpgradeProgressResponse, error) // StopUpgrade 用于请求停止一个正在进行的 OTA 升级任务。 StopUpgrade(ctx context.Context, taskID uint32) error } // areaControllerService 是 AreaControllerService 接口的具体实现。 type areaControllerService struct { ctx context.Context areaControllerRepo repository.AreaControllerRepository otaRepo repository.OtaRepository thresholdAlarmService ThresholdAlarmService otaService device.OtaService } // NewAreaControllerService 创建一个新的 AreaControllerService 实例。 func NewAreaControllerService( ctx context.Context, areaControllerRepo repository.AreaControllerRepository, otaRepo repository.OtaRepository, thresholdAlarmService ThresholdAlarmService, otaService device.OtaService, ) AreaControllerService { return &areaControllerService{ ctx: ctx, areaControllerRepo: areaControllerRepo, otaRepo: otaRepo, thresholdAlarmService: thresholdAlarmService, otaService: otaService, } } func (s *areaControllerService) CreateAreaController(ctx context.Context, req *dto.CreateAreaControllerRequest) (*dto.AreaControllerResponse, error) { serviceCtx := logs.AddFuncName(ctx, s.ctx, "CreateAreaController") propertiesJSON, err := json.Marshal(req.Properties) if err != nil { return nil, err } ac := &models.AreaController{ Name: req.Name, NetworkID: req.NetworkID, Location: req.Location, Properties: propertiesJSON, } if err := ac.SelfCheck(); err != nil { return nil, err } if err := s.areaControllerRepo.Create(serviceCtx, ac); err != nil { return nil, err } return dto.NewAreaControllerResponse(ac) } func (s *areaControllerService) GetAreaController(ctx context.Context, id uint32) (*dto.AreaControllerResponse, error) { serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetAreaController") ac, err := s.areaControllerRepo.FindByID(serviceCtx, id) if err != nil { return nil, err } return dto.NewAreaControllerResponse(ac) } func (s *areaControllerService) ListAreaControllers(ctx context.Context) ([]*dto.AreaControllerResponse, error) { serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListAreaControllers") acs, err := s.areaControllerRepo.ListAll(serviceCtx) if err != nil { return nil, err } return dto.NewListAreaControllerResponse(acs) } func (s *areaControllerService) UpdateAreaController(ctx context.Context, id uint32, req *dto.UpdateAreaControllerRequest) (*dto.AreaControllerResponse, error) { serviceCtx := logs.AddFuncName(ctx, s.ctx, "UpdateAreaController") existingAC, err := s.areaControllerRepo.FindByID(serviceCtx, id) if err != nil { return nil, err } propertiesJSON, err := json.Marshal(req.Properties) if err != nil { return nil, err } existingAC.Name = req.Name existingAC.NetworkID = req.NetworkID existingAC.Location = req.Location existingAC.Properties = propertiesJSON if err := existingAC.SelfCheck(); err != nil { return nil, err } if err := s.areaControllerRepo.Update(serviceCtx, existingAC); err != nil { return nil, err } return dto.NewAreaControllerResponse(existingAC) } func (s *areaControllerService) DeleteAreaController(ctx context.Context, id uint32) error { serviceCtx := logs.AddFuncName(ctx, s.ctx, "DeleteAreaController") // 1. 检查是否存在 _, err := s.areaControllerRepo.FindByID(serviceCtx, id) if err != nil { return err // 如果未找到,gorm会返回 ErrRecordNotFound } // 2. 检查是否被使用(业务逻辑) inUse, err := s.areaControllerRepo.IsAreaControllerUsedByTasks(serviceCtx, id, []models.TaskType{models.TaskTypeAreaCollectorThresholdCheck}) if err != nil { return err // 返回数据库检查错误 } if inUse { return ErrAreaControllerInUse // 返回业务错误 } // TODO 这个应该用事务处理 err = s.thresholdAlarmService.DeleteAreaThresholdAlarmByAreaControllerID(serviceCtx, id) if err != nil { return fmt.Errorf("删除区域阈值告警失败: %w", err) } // 3. 执行删除 return s.areaControllerRepo.Delete(serviceCtx, id) } // StartUpgrade 用于启动一个 OTA 升级任务。 func (s *areaControllerService) StartUpgrade(ctx context.Context, areaControllerID uint32, firmware *dto.OtaUpgradeRequest) (*dto.OtaUpgradeResponse, error) { serviceCtx, logger := logs.Trace(ctx, s.ctx, "StartUpgrade") src, err := firmware.FirmwareFile.Open() if err != nil { return nil, fmt.Errorf("打开固件文件失败: %w", err) } defer src.Close() data, err := io.ReadAll(src) if err != nil { return nil, fmt.Errorf("读取固件文件内容失败: %w", err) } subDir := filepath.Join(models.OTADir, uuid.New().String()) var filePath string var actionErr error err = file.ExecuteWithLock(func() error { filePath, actionErr = file.WriteTempFile(subDir, firmware.FirmwareFile.Filename, data) return actionErr }, func(err error) { removeErr := file.RemoveTempDir(subDir) if removeErr != nil { logger.Errorf("回滚失败, 删除临时目录失败: %v, 目标地址: %v", removeErr, filePath) } }) if err != nil { return nil, fmt.Errorf("保存固件文件失败: %w", err) } taskID, err := s.otaService.StartUpgrade(serviceCtx, areaControllerID, filePath) if err != nil { return nil, fmt.Errorf("启动升级任务失败: %w", err) } return &dto.OtaUpgradeResponse{TaskID: taskID}, nil } // GetUpgradeProgress 用于查询指定 OTA 任务的进度。 func (s *areaControllerService) GetUpgradeProgress(ctx context.Context, taskID uint32) (*dto.OtaUpgradeProgressResponse, error) { serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetUpgradeProgress") // 直接调用 otaRepo 查询任务状态 task, err := s.otaRepo.FindByID(serviceCtx, taskID) if err != nil { return nil, fmt.Errorf("查找 OTA 任务失败: %w", err) } // 构造响应 DTO response := &dto.OtaUpgradeProgressResponse{ TaskID: task.ID, CurrentStage: task.Status, Message: string(task.Status), // 默认使用状态作为消息 } // 如果任务失败,使用更详细的错误信息 if task.ErrorMessage != "" { response.Message = task.ErrorMessage } return response, nil } // StopUpgrade 用于请求停止一个正在进行的 OTA 升级任务。 func (s *areaControllerService) StopUpgrade(ctx context.Context, taskID uint32) error { err := s.otaService.StopUpgrade(ctx, taskID) if err != nil { return fmt.Errorf("停止升级任务失败: %w", err) } return nil }