Files
pig-farm-controller/internal/domain/device/general_device_service.go

322 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package device
import (
"context"
"errors"
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/proto"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/utils/command_generater"
"github.com/google/uuid"
gproto "google.golang.org/protobuf/proto"
)
type GeneralDeviceService struct {
ctx context.Context
deviceRepo repository.DeviceRepository
areaControllerRepo repository.AreaControllerRepository
deviceCommandLogRepo repository.DeviceCommandLogRepository
pendingCollectionRepo repository.PendingCollectionRepository
comm transport.Communicator
}
// NewGeneralDeviceService 创建一个通用设备服务
func NewGeneralDeviceService(
ctx context.Context,
deviceRepo repository.DeviceRepository,
areaControllerRepo repository.AreaControllerRepository,
deviceCommandLogRepo repository.DeviceCommandLogRepository,
pendingCollectionRepo repository.PendingCollectionRepository,
comm transport.Communicator,
) Service {
return &GeneralDeviceService{
ctx: ctx,
deviceRepo: deviceRepo,
areaControllerRepo: areaControllerRepo,
deviceCommandLogRepo: deviceCommandLogRepo,
pendingCollectionRepo: pendingCollectionRepo,
comm: comm,
}
}
func (g *GeneralDeviceService) Switch(ctx context.Context, device *models.Device, action DeviceAction) error {
serviceCtx, logger := logs.Trace(ctx, g.ctx, "Switch")
// 1. 依赖模型自身的 SelfCheck 进行全面校验
if err := device.SelfCheck(); err != nil {
return fmt.Errorf("设备 %v(id=%v) 未通过自检: %w", device.Name, device.ID, err)
}
if err := device.DeviceTemplate.SelfCheck(); err != nil {
return fmt.Errorf("设备 %v(id=%v) 的模板未通过自检: %w", device.Name, device.ID, err)
}
// 2. 检查预加载的 AreaController 是否有效
areaController := &device.AreaController
if err := areaController.SelfCheck(); err != nil {
return fmt.Errorf("区域主控 %v(id=%v) 未通过自检: %w", areaController.Name, areaController.ID, err)
}
// 3. 使用模型层预定义的 Bus485Properties 结构体解析设备属性
var deviceProps models.Bus485Properties
if err := device.ParseProperties(&deviceProps); err != nil {
return fmt.Errorf("解析设备 %v(id=%v) 的属性失败: %w", device.Name, device.ID, err)
}
// 4. 解析设备模板中的开关指令参数
var switchCmd models.SwitchCommands
if err := device.DeviceTemplate.ParseCommands(&switchCmd); err != nil {
return fmt.Errorf("解析设备 %v(id=%v) 的开关指令失败: %w", device.Name, device.ID, err)
}
// 5. 根据 action 生成 Modbus RTU 写入指令
onOffState := true // 默认为开启
if action == DeviceActionStop { // 如果是停止动作,则设置为关闭
onOffState = false
}
modbusCommandBytes, err := command_generater.GenerateModbusRTUSwitchCommand(
deviceProps.BusAddress,
switchCmd.ModbusStartAddress,
onOffState,
)
if err != nil {
return fmt.Errorf("生成Modbus RTU写入指令失败: %w", err)
}
// 6. 构建 Protobuf Raw485Command包含总线号
raw485Cmd := &proto.Raw485Command{
BusNumber: int32(deviceProps.BusNumber), // 添加总线号
CommandBytes: modbusCommandBytes,
}
instruction := &proto.Instruction{
Payload: &proto.Instruction_Raw_485Command{
Raw_485Command: raw485Cmd,
},
}
message, err := gproto.Marshal(instruction)
if err != nil {
return fmt.Errorf("序列化指令失败: %w", err)
}
// 7. 发送指令
networkID := areaController.NetworkID
sendResult, err := g.comm.Send(serviceCtx, networkID, message)
if err != nil {
return fmt.Errorf("发送指令到 %s 失败: %w", networkID, err)
}
// 8. 创建并保存命令日志
logRecord := &models.DeviceCommandLog{
MessageID: sendResult.MessageID,
DeviceID: areaController.ID,
SentAt: time.Now(),
}
if sendResult.AcknowledgedAt != nil {
logRecord.AcknowledgedAt = sendResult.AcknowledgedAt
}
if sendResult.ReceivedSuccess != nil {
logRecord.ReceivedSuccess = *sendResult.ReceivedSuccess
}
if err := g.deviceCommandLogRepo.Create(serviceCtx, logRecord); err != nil {
// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
// 我们记录一个错误日志,然后成功返回。
logger.Errorf("创建指令日志失败 (MessageID: %s): %v", sendResult.MessageID, err)
}
logger.Infof("成功发送指令到 %s 并创建日志 (MessageID: %s)", networkID, sendResult.MessageID)
return nil
}
// Collect 实现了 Service 接口,用于发起对指定区域主控下的多个设备的批量采集请求。
func (g *GeneralDeviceService) Collect(ctx context.Context, areaControllerID uint32, devicesToCollect []*models.Device) error {
serviceCtx, logger := logs.Trace(ctx, g.ctx, "Collect")
if len(devicesToCollect) == 0 {
logger.Info("待采集设备列表为空,无需执行采集任务。")
return nil
}
// 1. 从设备列表中获取预加载的区域主控,并进行校验
areaController := &devicesToCollect[0].AreaController
if areaController.ID != areaControllerID {
return fmt.Errorf("设备列表与指定的区域主控ID (%d) 不匹配", areaControllerID)
}
if err := areaController.SelfCheck(); err != nil {
return fmt.Errorf("区域主控 (ID: %d) 未通过自检: %w", areaControllerID, err)
}
// 2. 准备采集任务列表
var childDeviceIDs []uint32
var collectTasks []*proto.CollectTask
for _, dev := range devicesToCollect {
// 依赖模型自身的 SelfCheck 进行全面校验
if err := dev.SelfCheck(); err != nil {
logger.Warnf("跳过设备 %d因其未通过自检: %v", dev.ID, err)
continue
}
if err := dev.DeviceTemplate.SelfCheck(); err != nil {
logger.Warnf("跳过设备 %d因其设备模板未通过自检: %v", dev.ID, err)
continue
}
// 使用模板的 ParseCommands 方法获取传感器指令参数
var sensorCmd models.SensorCommands
if err := dev.DeviceTemplate.ParseCommands(&sensorCmd); err != nil {
logger.Warnf("跳过设备 %d因其模板指令无法解析为 SensorCommands: %v", dev.ID, err)
continue
}
// 使用模型层预定义的 Bus485Properties 结构体解析设备属性
var deviceProps models.Bus485Properties
if err := dev.ParseProperties(&deviceProps); err != nil {
logger.Warnf("跳过设备 %d因其属性解析失败: %v", dev.ID, err)
continue
}
// 生成 Modbus RTU 读取指令
modbusCommandBytes, err := command_generater.GenerateModbusRTUReadCommand(
deviceProps.BusAddress,
sensorCmd.ModbusFunctionCode,
sensorCmd.ModbusStartAddress,
sensorCmd.ModbusQuantity,
)
if err != nil {
logger.Warnf("跳过设备 %d因生成Modbus RTU读取指令失败: %v", dev.ID, err)
continue
}
logger.Debugf("生成485指令: %v", modbusCommandBytes)
// 构建 Raw485Command包含总线号
raw485Cmd := &proto.Raw485Command{
BusNumber: int32(deviceProps.BusNumber), // 添加总线号
CommandBytes: modbusCommandBytes,
}
collectTasks = append(collectTasks, &proto.CollectTask{
Command: raw485Cmd,
})
childDeviceIDs = append(childDeviceIDs, dev.ID)
}
if len(childDeviceIDs) == 0 {
return errors.New("经过滤后,没有可通过自检的有效设备")
}
// 3. 构建并发送指令
networkID := areaController.NetworkID
// 4. 创建待处理请求记录
correlationID := uuid.New().String()
pendingReq := &models.PendingCollection{
CorrelationID: correlationID,
DeviceID: areaController.ID,
CommandMetadata: childDeviceIDs,
Status: models.PendingStatusPending,
CreatedAt: time.Now(),
}
if err := g.pendingCollectionRepo.Create(serviceCtx, pendingReq); err != nil {
logger.Errorf("创建待采集请求失败 (CorrelationID: %s): %v", correlationID, err)
return err
}
logger.Debugf("成功创建待采集请求 (CorrelationID: %s, DeviceID: %d)", correlationID, areaController.ID)
// 5. 构建最终的空中载荷
batchCmd := &proto.BatchCollectCommand{
CorrelationId: correlationID,
Tasks: collectTasks,
}
instruction := &proto.Instruction{
Payload: &proto.Instruction_BatchCollectCommand{
BatchCollectCommand: batchCmd,
},
}
payload, err := gproto.Marshal(instruction)
if err != nil {
logger.Errorf("序列化采集指令失败 (CorrelationID: %s): %v", correlationID, err)
return err
}
logger.Debugf("构造空中载荷成功: networkID: %v, payload: %v", networkID, instruction)
if _, err := g.comm.Send(serviceCtx, networkID, payload); err != nil {
logger.DPanicf("待采集请求 (CorrelationID: %s) 已创建,但发送到设备失败: %v。数据可能不一致", correlationID, err)
return err
}
logger.Debugf("成功将采集请求 (CorrelationID: %s) 发送到设备 %s", correlationID, networkID)
return nil
}
// Send 实现了 Service 接口,用于发送一个通用的指令载荷。
// 它将载荷包装成顶层指令,然后执行查找网络地址、序列化、发送和记录日志的完整流程。
func (g *GeneralDeviceService) Send(ctx context.Context, areaControllerID uint32, payload proto.InstructionPayload, opts ...SendOption) error {
serviceCtx, logger := logs.Trace(ctx, g.ctx, "Send")
// 1. 应用选项
options := &SendOptions{}
for _, opt := range opts {
opt(options)
}
// 2. 查找区域主控以获取 NetworkID
areaController, err := g.areaControllerRepo.FindByID(serviceCtx, areaControllerID)
if err != nil {
return fmt.Errorf("发送通用指令失败无法找到ID为 %d 的区域主控: %w", areaControllerID, err)
}
// 3. 将载荷包装进顶层 Instruction 结构体
instruction := &proto.Instruction{
Payload: payload,
}
// 4. 序列化指令
message, err := gproto.Marshal(instruction)
if err != nil {
return fmt.Errorf("序列化通用指令失败: %w", err)
}
// 5. 发送指令
networkID := areaController.NetworkID
sendResult, err := g.comm.Send(serviceCtx, networkID, message)
if err != nil {
return fmt.Errorf("发送通用指令到 %s 失败: %w", networkID, err)
}
// 6. 始终创建 DeviceCommandLog 记录,但根据选项设置其初始状态
logRecord := &models.DeviceCommandLog{
MessageID: sendResult.MessageID,
DeviceID: areaController.ID, // 将日志与区域主控关联
SentAt: time.Now(),
}
if options.NotTrackable {
// 对于无需追踪的指令,直接标记为已完成
now := time.Now()
logRecord.AcknowledgedAt = &now
logRecord.ReceivedSuccess = true
logger.Infow("成功发送一个无需追踪的通用指令,并记录为已完成日志", "networkID", networkID, "MessageID", sendResult.MessageID)
} else {
// 对于需要追踪的指令,记录其发送结果,等待异步确认
if sendResult.AcknowledgedAt != nil {
logRecord.AcknowledgedAt = sendResult.AcknowledgedAt
}
if sendResult.ReceivedSuccess != nil {
logRecord.ReceivedSuccess = *sendResult.ReceivedSuccess
}
logger.Infow("成功发送通用指令,并创建追踪日志", "networkID", networkID, "MessageID", sendResult.MessageID)
}
if err := g.deviceCommandLogRepo.Create(serviceCtx, logRecord); err != nil {
// 记录日志失败是一个需要关注的问题,但可能不应该中断主流程。
logger.Errorw("创建通用指令的日志失败", "MessageID", sendResult.MessageID, "error", err)
}
return nil
}