Files
pig-house-controller/app/bus/rs485_manager.py

301 lines
12 KiB
Python
Raw Normal View History

2025-10-08 19:36:48 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
RS485 总线管理器实现
此模块实现了 IBusManager 接口用于管理 RS485 总线通信
"""
2025-10-13 14:35:20 +08:00
from ..logs.logger import log
2025-10-08 19:36:48 +08:00
# 导入 MicroPython 的 UART 和 Pin 库
from machine import UART, Pin
import time # 用于添加延时确保RS485方向切换
import _thread # 用于线程同步
2025-10-13 14:35:20 +08:00
import struct # 用于浮点数转换
2025-10-08 19:36:48 +08:00
2025-10-13 14:35:20 +08:00
class RS485Manager:
2025-10-08 19:36:48 +08:00
"""
RS485 总线管理器
负责 RS485 设备的指令发送响应接收和数据解析
"""
2025-10-13 14:35:20 +08:00
def __init__(self, bus_config, default_timeouts):
2025-10-08 19:36:48 +08:00
"""
构造函数注入配置
根据传入的配置初始化 RS485 总线对应的 UART 管理器
Args:
2025-10-13 14:35:20 +08:00
bus_config: 包含所有总线配置的字典
2025-10-08 19:36:48 +08:00
键是总线ID值是该总线的详细配置
2025-10-13 14:35:20 +08:00
default_timeouts: 包含各种默认超时设置的字典
2025-10-08 19:36:48 +08:00
"""
self.bus_config = bus_config
self.default_timeouts = default_timeouts
# 存储以总线号为key的UART管理器实例、RTS引脚和锁
2025-10-13 14:35:20 +08:00
self.bus_ports = {}
2025-10-08 19:36:48 +08:00
log("RS485Manager 已使用配置初始化。")
log(f"总线配置: {self.bus_config}")
log(f"默认超时设置: {self.default_timeouts}")
# 遍历 bus_config初始化 RS485 端口
for bus_id, config in bus_config.items():
if config.get('protocol') == 'RS485':
try:
uart_id = config['uart_id']
baudrate = config['baudrate']
pins = config['pins']
tx_pin_num = pins['tx']
rx_pin_num = pins['rx']
rts_pin_num = pins['rts'] # RS485 的 DE/RE 方向控制引脚
# 初始化 Pin 对象
rts_pin = Pin(rts_pin_num, Pin.OUT) # RTS 引脚设置为输出模式
rts_pin.value(0) # 默认设置为接收模式
# 初始化 UART 对象
# 注意MicroPython 的 UART 构造函数可能不支持直接传入 Pin 对象,而是 Pin 编号
# 并且 rts 参数通常用于硬件流控制RS485 的 DE/RE 需要手动控制
uart = UART(uart_id, baudrate=baudrate, tx=tx_pin_num, rx=rx_pin_num,
timeout=self.default_timeouts.get('rs485_response', 500))
self.bus_ports[bus_id] = {
'uart': uart,
'rts_pin': rts_pin,
'lock': _thread.allocate_lock()
}
log(f"总线 {bus_id} (RS485) 的 UART 管理器初始化成功。UART ID: {uart_id}, 波特率: {baudrate}, TX: {tx_pin_num}, RX: {rx_pin_num}, RTS(DE/RE): {rts_pin_num}")
except KeyError as e:
log(f"错误: 总线 {bus_id} 的 RS485 配置缺少关键参数: {e}")
except Exception as e:
log(f"错误: 初始化总线 {bus_id} 的 RS485 管理器失败: {e}")
else:
log(f"总线 {bus_id} 的协议不是 RS485跳过初始化。")
2025-10-13 14:35:20 +08:00
@staticmethod
def _calculate_crc16_modbus(data):
2025-10-08 19:36:48 +08:00
"""
计算 Modbus RTU CRC16 校验码
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
2025-10-13 14:35:20 +08:00
if crc & 0x0001:
2025-10-08 19:36:48 +08:00
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
2025-10-13 14:35:20 +08:00
def execute_raw_command(self, bus_id, command):
2025-10-08 19:36:48 +08:00
"""
契约执行一个发后不理的原始指令
Args:
bus_id (int): 目标总线的编号
command (bytes): 要发送的原始命令字节
"""
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
with lock:
try:
rts_pin.value(1) # 设置为发送模式 (DE/RE = HIGH)
time.sleep_us(100) # 短暂延时,确保方向切换完成
uart.write(command)
# 等待所有数据发送完毕
uart.flush()
time.sleep_us(100) # 短暂延时,确保数据完全发出
rts_pin.value(0) # 切换回接收模式 (DE/RE = LOW)
log(f"总线 {bus_id} 原始命令发送成功: {command.hex()}")
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行原始命令失败: {e}")
2025-10-13 14:35:20 +08:00
def execute_collect_task(self, task):
2025-10-08 19:36:48 +08:00
"""
契约执行一个完整的采集任务并直接返回最终的数值
一个符合本接口的实现必须自己处理所有细节
- 从task字典中解析出 bus_id, command, parser_type
- 发送指令
- 接收响应
- 根据parser_type选择正确的内部解析器进行解析
- 返回最终的float数值或在任何失败情况下返回None
Args:
2025-10-13 14:35:20 +08:00
task: 从Protobuf解码出的单个CollectTask消息字典
2025-10-08 19:36:48 +08:00
期望结构: {"command": {"bus_number": int, "command_bytes": bytes}}
Returns:
2025-10-13 14:35:20 +08:00
成功解析则返回数值否则返回None
2025-10-08 19:36:48 +08:00
"""
# I. 任务参数解析与初步验证
try:
command_info = task.get("command")
if not command_info:
log("错误: CollectTask 缺少 'command' 字段。")
return None
bus_id = command_info.get("bus_number")
command_bytes = command_info.get("command_bytes")
2025-10-17 15:36:54 +08:00
# 增加对命令有效性的检查
if bus_id is None or not command_bytes or len(command_bytes) < 2:
log(f"错误: CollectTask 的 'command' 字段无效。bus_id: {bus_id}, command_bytes: {command_bytes}")
2025-10-08 19:36:48 +08:00
return None
except Exception as e:
log(f"错误: 解析CollectTask失败: {e}. 任务: {task}")
return None
if bus_id not in self.bus_ports:
log(f"错误: 未找到总线 {bus_id} 的 RS485 配置。")
return None
port_info = self.bus_ports[bus_id]
uart = port_info['uart']
rts_pin = port_info['rts_pin']
lock = port_info['lock']
2025-10-17 15:36:54 +08:00
response_bytes = None # 在锁外部初始化,确保其作用域
response_buffer = bytearray()
2025-10-08 19:36:48 +08:00
with lock:
try:
# II. 线程安全与指令发送
2025-10-17 15:36:54 +08:00
rts_pin.value(1)
time.sleep_us(100)
2025-10-08 19:36:48 +08:00
uart.write(command_bytes)
uart.flush()
2025-10-17 15:36:54 +08:00
time.sleep_us(100)
rts_pin.value(0)
2025-10-08 19:36:48 +08:00
log(f"总线 {bus_id} 原始命令发送成功: {command_bytes.hex()}")
2025-10-17 15:36:54 +08:00
# III. 接收响应
2025-10-17 10:32:52 +08:00
start_time = time.ticks_ms()
response_timeout = self.default_timeouts.get('rs485_response', 500)
while time.ticks_diff(time.ticks_ms(), start_time) < response_timeout:
if uart.any():
2025-10-17 15:36:54 +08:00
chunk = uart.read(32)
2025-10-17 10:32:52 +08:00
if chunk:
response_buffer.extend(chunk)
2025-10-17 15:36:54 +08:00
start_time = time.ticks_ms() # 收到数据就重置超时
2025-10-17 10:32:52 +08:00
time.sleep_ms(5)
if response_buffer:
2025-10-17 15:36:54 +08:00
# 动态地从请求命令中获取预期的从站ID和功能码
expected_slave_id = command_bytes[0]
expected_func_code = command_bytes[1]
found_frame = self._find_modbus_frame(response_buffer, expected_slave_id, expected_func_code)
if found_frame:
log(f"总线 {bus_id} 收到有效响应: {found_frame.hex()}")
response_bytes = found_frame # 将找到的帧赋值给外部变量
2025-10-17 10:32:52 +08:00
else:
log(f"警告: 总线 {bus_id} 响应中无有效帧。收到响应: {response_buffer.hex()}")
2025-10-08 19:36:48 +08:00
else:
2025-10-17 10:32:52 +08:00
log(f"警告: 总线 {bus_id} 未收到响应。")
2025-10-08 19:36:48 +08:00
except Exception as e:
log(f"错误: 在总线 {bus_id} 上执行采集命令失败: {e}")
2025-10-17 15:36:54 +08:00
# IV. 统一处理和解析
# 无论是因为超时、未找到有效帧还是发生异常,只要 response_bytes 仍为 None就任务失败
if response_bytes is None:
return None
2025-10-08 19:36:48 +08:00
2025-10-17 15:36:54 +08:00
# 使用找到的有效帧进行解析
parsed_value = RS485Manager._parse_modbus_rtu_default(response_bytes)
2025-10-13 14:35:20 +08:00
2025-10-08 19:36:48 +08:00
return parsed_value
2025-10-17 10:32:52 +08:00
def _find_modbus_frame(self, buffer: bytearray, expected_slave: int, func_code: int) -> bytes | None:
"""
修复版加调试优先头检查CRC 字节序标准 Modbus (低字节在前)
"""
log(f"搜索帧: buffer 长度 {len(buffer)}, hex {buffer.hex()}")
i = 0
while i < len(buffer) - 6: # 最小 7 字节,-6 安全
if buffer[i] == expected_slave and buffer[i + 1] == func_code:
byte_count = buffer[i + 2]
frame_len = 3 + byte_count + 2
if len(buffer) - i >= frame_len:
frame = bytes(buffer[i:i + frame_len])
# CRC 预校验(标准 ModbusCRC 低字节在前)
core = frame[:-2]
calc_crc = self._calculate_crc16_modbus(core)
low_crc = frame[-2]
high_crc = frame[-1]
recv_crc = (high_crc << 8) | low_crc # 高<<8 | 低
log(f"候选帧 at {i}: {frame.hex()}, calc CRC {calc_crc:04X}, recv {recv_crc:04X}")
if calc_crc == recv_crc:
log(f"找到有效帧: {frame.hex()}")
return frame
else:
log(f"CRC 不匹配,跳过 (calc {calc_crc:04X} != recv {recv_crc:04X})")
i += 1
log("无有效帧")
return None
@staticmethod
def _parse_modbus_rtu_default(response_bytes): # 改名,支持整数/浮点
"""
修复版动态数据长CRC 只用核心
"""
if not response_bytes or len(response_bytes) < 7:
log(f"警告: 响应过短。响应: {response_bytes.hex() if response_bytes else 'None'}")
return None
# CRC 校验(只核心)
data_for_crc = response_bytes[:-2]
received_crc = (response_bytes[-1] << 8) | response_bytes[-2]
calculated_crc = RS485Manager._calculate_crc16_modbus(data_for_crc)
if calculated_crc != received_crc:
log(f"错误: CRC失败。接收: {received_crc:04X}, 计算: {calculated_crc:04X}. 响应: {response_bytes.hex()}")
return None
function_code = response_bytes[1]
byte_count = response_bytes[2]
data_bytes = response_bytes[3:3 + byte_count]
if function_code not in [0x03, 0x04]:
log(f"警告: 功能码 {function_code:02X} 不符。")
return None
if len(data_bytes) != byte_count:
log(f"错误: 数据长 {len(data_bytes)} != {byte_count}")
return None
# 动态解析
if byte_count == 2:
# 整数 (e.g., 温度)
try:
value = int.from_bytes(data_bytes, 'big') # 或 signed '>h'
parsed_value = value
log(f"成功解析整数: {parsed_value}")
return parsed_value
except Exception as e:
log(f"整数解析失败: {e}")
return None
elif byte_count == 4:
# 浮点
try:
parsed_value = struct.unpack('>f', data_bytes)[0]
log(f"成功解析浮点: {parsed_value}")
return parsed_value
except Exception as e:
log(f"浮点失败: {e}")
return None
else:
log(f"警告: 未知字节数 {byte_count}")
return None