Files
pig-farm-controller/internal/app/service/area_controller_service.go

234 lines
7.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"git.huangwc.com/pig/pig-farm-controller/internal/app/dto"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/file"
"github.com/google/uuid"
)
// AreaControllerService 定义了应用层的区域主控服务接口。
type AreaControllerService interface {
CreateAreaController(ctx context.Context, req *dto.CreateAreaControllerRequest) (*dto.AreaControllerResponse, error)
GetAreaController(ctx context.Context, id uint32) (*dto.AreaControllerResponse, error)
ListAreaControllers(ctx context.Context) ([]*dto.AreaControllerResponse, error)
UpdateAreaController(ctx context.Context, id uint32, req *dto.UpdateAreaControllerRequest) (*dto.AreaControllerResponse, error)
DeleteAreaController(ctx context.Context, id uint32) error
// StartUpgrade 用于启动一个 OTA 升级任务。
StartUpgrade(ctx context.Context, areaControllerID uint32, firmware *dto.OtaUpgradeRequest) (*dto.OtaUpgradeResponse, error)
// GetUpgradeProgress 用于查询指定 OTA 任务的进度。
GetUpgradeProgress(ctx context.Context, taskID uint32) (*dto.OtaUpgradeProgressResponse, error)
// StopUpgrade 用于请求停止一个正在进行的 OTA 升级任务。
StopUpgrade(ctx context.Context, taskID uint32) error
}
// areaControllerService 是 AreaControllerService 接口的具体实现。
type areaControllerService struct {
ctx context.Context
areaControllerRepo repository.AreaControllerRepository
otaRepo repository.OtaRepository
thresholdAlarmService ThresholdAlarmService
otaService device.OtaService
}
// NewAreaControllerService 创建一个新的 AreaControllerService 实例。
func NewAreaControllerService(
ctx context.Context,
areaControllerRepo repository.AreaControllerRepository,
otaRepo repository.OtaRepository,
thresholdAlarmService ThresholdAlarmService,
otaService device.OtaService,
) AreaControllerService {
return &areaControllerService{
ctx: ctx,
areaControllerRepo: areaControllerRepo,
otaRepo: otaRepo,
thresholdAlarmService: thresholdAlarmService,
otaService: otaService,
}
}
func (s *areaControllerService) CreateAreaController(ctx context.Context, req *dto.CreateAreaControllerRequest) (*dto.AreaControllerResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "CreateAreaController")
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
return nil, err
}
ac := &models.AreaController{
Name: req.Name,
NetworkID: req.NetworkID,
Location: req.Location,
Properties: propertiesJSON,
}
if err := ac.SelfCheck(); err != nil {
return nil, err
}
if err := s.areaControllerRepo.Create(serviceCtx, ac); err != nil {
return nil, err
}
return dto.NewAreaControllerResponse(ac)
}
func (s *areaControllerService) GetAreaController(ctx context.Context, id uint32) (*dto.AreaControllerResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetAreaController")
ac, err := s.areaControllerRepo.FindByID(serviceCtx, id)
if err != nil {
return nil, err
}
return dto.NewAreaControllerResponse(ac)
}
func (s *areaControllerService) ListAreaControllers(ctx context.Context) ([]*dto.AreaControllerResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "ListAreaControllers")
acs, err := s.areaControllerRepo.ListAll(serviceCtx)
if err != nil {
return nil, err
}
return dto.NewListAreaControllerResponse(acs)
}
func (s *areaControllerService) UpdateAreaController(ctx context.Context, id uint32, req *dto.UpdateAreaControllerRequest) (*dto.AreaControllerResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "UpdateAreaController")
existingAC, err := s.areaControllerRepo.FindByID(serviceCtx, id)
if err != nil {
return nil, err
}
propertiesJSON, err := json.Marshal(req.Properties)
if err != nil {
return nil, err
}
existingAC.Name = req.Name
existingAC.NetworkID = req.NetworkID
existingAC.Location = req.Location
existingAC.Properties = propertiesJSON
if err := existingAC.SelfCheck(); err != nil {
return nil, err
}
if err := s.areaControllerRepo.Update(serviceCtx, existingAC); err != nil {
return nil, err
}
return dto.NewAreaControllerResponse(existingAC)
}
func (s *areaControllerService) DeleteAreaController(ctx context.Context, id uint32) error {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "DeleteAreaController")
// 1. 检查是否存在
_, err := s.areaControllerRepo.FindByID(serviceCtx, id)
if err != nil {
return err // 如果未找到gorm会返回 ErrRecordNotFound
}
// 2. 检查是否被使用(业务逻辑)
inUse, err := s.areaControllerRepo.IsAreaControllerUsedByTasks(serviceCtx, id, []models.TaskType{models.TaskTypeAreaCollectorThresholdCheck})
if err != nil {
return err // 返回数据库检查错误
}
if inUse {
return ErrAreaControllerInUse // 返回业务错误
}
// TODO 这个应该用事务处理
err = s.thresholdAlarmService.DeleteAreaThresholdAlarmByAreaControllerID(serviceCtx, id)
if err != nil {
return fmt.Errorf("删除区域阈值告警失败: %w", err)
}
// 3. 执行删除
return s.areaControllerRepo.Delete(serviceCtx, id)
}
// StartUpgrade 用于启动一个 OTA 升级任务。
func (s *areaControllerService) StartUpgrade(ctx context.Context, areaControllerID uint32, firmware *dto.OtaUpgradeRequest) (*dto.OtaUpgradeResponse, error) {
serviceCtx, logger := logs.Trace(ctx, s.ctx, "StartUpgrade")
src, err := firmware.FirmwareFile.Open()
if err != nil {
return nil, fmt.Errorf("打开固件文件失败: %w", err)
}
defer src.Close()
data, err := io.ReadAll(src)
if err != nil {
return nil, fmt.Errorf("读取固件文件内容失败: %w", err)
}
subDir := filepath.Join(models.OTADir, uuid.New().String())
var filePath string
var actionErr error
err = file.ExecuteWithLock(func() error {
filePath, actionErr = file.WriteTempFile(subDir, firmware.FirmwareFile.Filename, data)
return actionErr
}, func(err error) {
removeErr := file.RemoveTempDir(subDir)
if removeErr != nil {
logger.Errorf("回滚失败, 删除临时目录失败: %v, 目标地址: %v", removeErr, filePath)
}
})
if err != nil {
return nil, fmt.Errorf("保存固件文件失败: %w", err)
}
taskID, err := s.otaService.StartUpgrade(serviceCtx, areaControllerID, filePath)
if err != nil {
return nil, fmt.Errorf("启动升级任务失败: %w", err)
}
return &dto.OtaUpgradeResponse{TaskID: taskID}, nil
}
// GetUpgradeProgress 用于查询指定 OTA 任务的进度。
func (s *areaControllerService) GetUpgradeProgress(ctx context.Context, taskID uint32) (*dto.OtaUpgradeProgressResponse, error) {
serviceCtx := logs.AddFuncName(ctx, s.ctx, "GetUpgradeProgress")
// 直接调用 otaRepo 查询任务状态
task, err := s.otaRepo.FindByID(serviceCtx, taskID)
if err != nil {
return nil, fmt.Errorf("查找 OTA 任务失败: %w", err)
}
// 构造响应 DTO
response := &dto.OtaUpgradeProgressResponse{
TaskID: task.ID,
CurrentStage: task.Status,
Message: string(task.Status), // 默认使用状态作为消息
}
// 如果任务失败,使用更详细的错误信息
if task.ErrorMessage != "" {
response.Message = task.ErrorMessage
}
return response, nil
}
// StopUpgrade 用于请求停止一个正在进行的 OTA 升级任务。
func (s *areaControllerService) StopUpgrade(ctx context.Context, taskID uint32) error {
err := s.otaService.StopUpgrade(ctx, taskID)
if err != nil {
return fmt.Errorf("停止升级任务失败: %w", err)
}
return nil
}