拆分device.Service接口

This commit is contained in:
2025-12-03 15:12:43 +08:00
parent 7974955335
commit 4a3c82fc25
11 changed files with 41 additions and 71 deletions

View File

@@ -56,17 +56,9 @@ func (l *loraListener) HandleInstruction(upstreamCtx context.Context, sourceAddr
case *proto.Instruction_CollectResult:
return l.handleCollectResult(ctx, sourceAddr, p.CollectResult)
case *proto.Instruction_OtaUpgradeStatus:
return l.handleOtaStatus(ctx, sourceAddr, p.OtaUpgradeStatus)
case *proto.Instruction_Pong:
return l.handlePong(ctx, sourceAddr, p.Pong)
case *proto.Instruction_LogUploadRequest:
logger.Infow("收到设备日志上传请求,暂未实现处理逻辑", "来源地址", sourceAddr, "日志条数", len(p.LogUploadRequest.Entries))
// TODO: 在这里实现设备日志的处理逻辑
return nil
default:
logger.Warnw("收到一个当前未处理的上行指令类型", "来源地址", sourceAddr, "类型", fmt.Sprintf("%T", p))
return nil
@@ -277,37 +269,6 @@ func (l *loraListener) recordSensorData(ctx context.Context, areaControllerID ui
}
}
// handleOtaStatus 处理设备上报的OTA升级状态。
func (l *loraListener) handleOtaStatus(ctx context.Context, sourceAddr string, status *proto.OtaUpgradeStatus) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handleOtaStatus")
logger.Infow("开始处理OTA升级状态",
"来源地址", sourceAddr,
"状态码", status.StatusCode,
"当前版本", status.CurrentFirmwareVersion,
)
// 1. 查找区域主控
areaController, err := l.areaControllerRepo.FindByNetworkID(reqCtx, sourceAddr)
if err != nil {
return fmt.Errorf("处理OTA状态失败无法找到区域主控: %w", err)
}
// 2. 更新固件版本号
// 我们信任设备上报的版本号,无论成功失败都进行更新
if status.CurrentFirmwareVersion != "" {
err = l.areaControllerRepo.UpdateFirmwareVersion(reqCtx, areaController.ID, status.CurrentFirmwareVersion)
if err != nil {
logger.Errorw("更新区域主控固件版本号失败", "主控ID", areaController.ID, "error", err)
return fmt.Errorf("更新固件版本号失败: %w", err)
}
logger.Infow("成功更新区域主控固件版本号", "主控ID", areaController.ID, "新版本", status.CurrentFirmwareVersion)
}
// TODO: 后续可以在这里增加逻辑,比如记录一条操作日志,或者发送一个通知
return nil
}
// handlePong 处理设备上报的Pong响应或主动心跳。
func (l *loraListener) handlePong(ctx context.Context, sourceAddr string, pong *proto.Pong) error {
reqCtx, logger := logs.Trace(ctx, l.selfCtx, "handlePong")

View File

@@ -53,7 +53,7 @@ type deviceService struct {
deviceRepo repository.DeviceRepository
areaControllerRepo repository.AreaControllerRepository
deviceTemplateRepo repository.DeviceTemplateRepository
deviceDomainSvc device.Service
deviceDomainSvc device.DeviceOperator
thresholdAlarmService ThresholdAlarmService
}
@@ -63,7 +63,7 @@ func NewDeviceService(
deviceRepo repository.DeviceRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceTemplateRepo repository.DeviceTemplateRepository,
deviceDomainSvc device.Service,
deviceDomainSvc device.DeviceOperator,
thresholdAlarmService ThresholdAlarmService,
) DeviceService {
return &deviceService{

View File

@@ -140,7 +140,8 @@ type DomainServices struct {
pigTradeManager pig.PigTradeManager
pigSickManager pig.SickPigManager
pigBatchDomain pig.PigBatchService
generalDeviceService device.Service
deviceOperator device.DeviceOperator
deviceCommunicator device.DeviceCommunicator
taskFactory plan.TaskFactory
planExecutionManager plan.ExecutionManager
analysisPlanTaskManager plan.AnalysisPlanTaskManager
@@ -200,6 +201,7 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
infra.repos.alarmRepo,
infra.repos.areaControllerRepo,
generalDeviceService,
generalDeviceService,
notifyService,
alarmService,
)
@@ -217,7 +219,6 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
infra.repos.planRepo,
analysisPlanTaskManager,
taskFactory,
generalDeviceService,
time.Duration(cfg.Task.Interval)*time.Second,
cfg.Task.NumWorkers,
)
@@ -259,7 +260,8 @@ func initDomainServices(ctx context.Context, cfg *config.Config, infra *Infrastr
pigTradeManager: pigTradeManager,
pigSickManager: pigSickManager,
pigBatchDomain: pigBatchDomain,
generalDeviceService: generalDeviceService,
deviceOperator: generalDeviceService,
deviceCommunicator: generalDeviceService,
analysisPlanTaskManager: analysisPlanTaskManager,
taskFactory: taskFactory,
planExecutionManager: planExecutionManager,
@@ -328,7 +330,7 @@ func initAppServices(ctx context.Context, infra *Infrastructure, domainServices
infra.repos.deviceRepo,
infra.repos.areaControllerRepo,
infra.repos.deviceTemplateRepo,
domainServices.generalDeviceService,
domainServices.deviceOperator,
thresholdAlarmService,
)

View File

@@ -42,17 +42,21 @@ func WithoutTracking() SendOption {
}
}
// Service 抽象了一组方法用于控制设备行为
type Service interface {
// Switch 用于切换指定设备的状态, 比如启动和停止
// DeviceOperator 提供了对单个或多个设备进行具体操作的接口,
// 如开关、触发采集等。它通常用于响应用户的直接指令或执行具体的业务任务。
type DeviceOperator interface {
// Switch 用于切换指定设备的状态, 比如启动和停止
Switch(ctx context.Context, device *models.Device, action DeviceAction) error
// Collect 用于发起对指定区域主控下的多个设备的批量采集请求。
Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error
}
// Send 是一个通用的发送方法,用于将一个标准的指令载荷发送到指定的区域主控
// 它负责将载荷包装成顶层指令、序列化、调用底层发送器,并默认记录下行命令日志
// DeviceCommunicator 抽象了与设备进行底层通信的能力
// 它负责将一个标准的指令载荷发送到指定的区域主控
type DeviceCommunicator interface {
// Send 是一个通用的发送方法,它负责将载荷包装、序列化、
// 调用底层发送器,并默认记录下行命令日志。
Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error
}

View File

@@ -34,7 +34,7 @@ func NewGeneralDeviceService(
deviceCommandLogRepo repository.DeviceCommandLogRepository,
pendingCollectionRepo repository.PendingCollectionRepository,
comm transport.Communicator,
) Service {
) *GeneralDeviceService {
return &GeneralDeviceService{
ctx: ctx,
deviceRepo: deviceRepo,

View File

@@ -6,7 +6,6 @@ import (
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/domain/device"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
@@ -95,7 +94,6 @@ type planExecutionManagerImpl struct {
taskFactory TaskFactory
analysisPlanTaskManager AnalysisPlanTaskManager
progressTracker *ProgressTracker
deviceService device.Service
pool *ants.Pool // 使用 ants 协程池来管理并发
wg sync.WaitGroup
@@ -112,7 +110,6 @@ func NewPlanExecutionManager(
planRepo repository.PlanRepository,
analysisPlanTaskManager AnalysisPlanTaskManager,
taskFactory TaskFactory,
deviceService device.Service,
interval time.Duration,
numWorkers int,
) ExecutionManager {
@@ -125,7 +122,6 @@ func NewPlanExecutionManager(
planRepo: planRepo,
analysisPlanTaskManager: analysisPlanTaskManager,
taskFactory: taskFactory,
deviceService: deviceService,
pollingInterval: interval,
workers: numWorkers,
progressTracker: NewProgressTracker(),

View File

@@ -16,7 +16,7 @@ type FullCollectionTask struct {
ctx context.Context
log *models.TaskExecutionLog
deviceRepo repository.DeviceRepository
deviceService device.Service
deviceService device.DeviceOperator
}
// NewFullCollectionTask 创建一个全量采集任务实例
@@ -24,7 +24,7 @@ func NewFullCollectionTask(
ctx context.Context,
log *models.TaskExecutionLog,
deviceRepo repository.DeviceRepository,
deviceService device.Service,
deviceService device.DeviceOperator,
) plan.Task {
return &FullCollectionTask{
ctx: ctx,

View File

@@ -17,7 +17,7 @@ type HeartbeatTask struct {
ctx context.Context
log *models.TaskExecutionLog
areaControllerRepo repository.AreaControllerRepository
deviceService device.Service
deviceService device.DeviceCommunicator
}
// NewHeartbeatTask 创建一个心跳检测任务实例
@@ -25,7 +25,7 @@ func NewHeartbeatTask(
ctx context.Context,
log *models.TaskExecutionLog,
areaControllerRepo repository.AreaControllerRepository,
deviceService device.Service,
deviceService device.DeviceCommunicator,
) plan.Task {
return &HeartbeatTask{
ctx: ctx,

View File

@@ -32,7 +32,7 @@ type ReleaseFeedWeightTask struct {
releaseWeight float32
mixingTankDeviceID uint32
feedPort device.Service
feedPort device.DeviceOperator
// onceParse 保证解析参数只执行一次
onceParse sync.Once
@@ -44,7 +44,7 @@ func NewReleaseFeedWeightTask(
claimedLog *models.TaskExecutionLog,
sensorDataRepo repository.SensorDataRepository,
deviceRepo repository.DeviceRepository,
deviceService device.Service,
deviceService device.DeviceOperator,
) plan.Task {
return &ReleaseFeedWeightTask{
ctx: ctx,

View File

@@ -29,7 +29,8 @@ type taskFactory struct {
alarmRepo repository.AlarmRepository
areaControllerRepo repository.AreaControllerRepository
deviceService device.Service
deviceOperator device.DeviceOperator
deviceCommunicator device.DeviceCommunicator
notificationService notify.Service
alarmService alarm.AlarmService
}
@@ -40,7 +41,8 @@ func NewTaskFactory(
deviceRepo repository.DeviceRepository,
alarmRepo repository.AlarmRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceService device.Service,
deviceOperator device.DeviceOperator,
deviceCommunicator device.DeviceCommunicator,
notifyService notify.Service,
alarmService alarm.AlarmService,
) plan.TaskFactory {
@@ -50,7 +52,8 @@ func NewTaskFactory(
deviceRepo: deviceRepo,
alarmRepo: alarmRepo,
areaControllerRepo: areaControllerRepo,
deviceService: deviceService,
deviceOperator: deviceOperator,
deviceCommunicator: deviceCommunicator,
notificationService: notifyService,
alarmService: alarmService,
}
@@ -63,11 +66,11 @@ func (t *taskFactory) Production(ctx context.Context, claimedLog *models.TaskExe
case models.TaskTypeWaiting:
return NewDelayTask(logs.AddCompName(baseCtx, CompNameDelayTask), claimedLog)
case models.TaskTypeReleaseFeedWeight:
return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceService)
return NewReleaseFeedWeightTask(logs.AddCompName(baseCtx, CompNameReleaseFeedWeight), claimedLog, t.sensorDataRepo, t.deviceRepo, t.deviceOperator)
case models.TaskTypeFullCollection:
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceService)
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), claimedLog, t.deviceRepo, t.deviceOperator)
case models.TaskTypeHeartbeat:
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceService)
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), claimedLog, t.areaControllerRepo, t.deviceCommunicator)
case models.TaskTypeAlarmNotification:
return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), claimedLog, t.notificationService, t.alarmRepo)
case models.TaskTypeDeviceThresholdCheck:
@@ -96,12 +99,12 @@ func (t *taskFactory) CreateTaskFromModel(ctx context.Context, taskModel *models
tempLog,
t.sensorDataRepo,
t.deviceRepo,
t.deviceService,
t.deviceOperator,
), nil
case models.TaskTypeFullCollection:
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceService), nil
return NewFullCollectionTask(logs.AddCompName(baseCtx, CompNameFullCollectionTask), tempLog, t.deviceRepo, t.deviceOperator), nil
case models.TaskTypeHeartbeat:
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceService), nil
return NewHeartbeatTask(logs.AddCompName(baseCtx, CompNameHeartbeatTask), tempLog, t.areaControllerRepo, t.deviceCommunicator), nil
case models.TaskTypeAlarmNotification:
return NewAlarmNotificationTask(logs.AddCompName(baseCtx, CompNameAlarmNotification), tempLog, t.notificationService, t.alarmRepo), nil
case models.TaskTypeDeviceThresholdCheck:

View File

@@ -40,6 +40,7 @@ design/archive/2025-11-10-exceeding-threshold-alarm/index.md
design/archive/2025-11-29-recipe-management/index.md
design/ota-upgrade-and-log-monitoring/index.md
design/ota-upgrade-and-log-monitoring/lora_refactoring_plan.md
design/ota-upgrade-and-log-monitoring/ota_upgrade_solution.md
docs/docs.go
docs/swagger.json
docs/swagger.yaml
@@ -146,6 +147,9 @@ internal/domain/task/heartbeat_task.go
internal/domain/task/refresh_notification_task.go
internal/domain/task/release_feed_weight_task.go
internal/domain/task/task.go
internal/infra/ai/ai.go
internal/infra/ai/gemini.go
internal/infra/ai/no_ai.go
internal/infra/config/config.go
internal/infra/database/postgres.go
internal/infra/database/seeder.go