Files

153 lines
5.2 KiB
Python
Raw Permalink Normal View History

2025-09-08 22:12:42 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WebSocket客户端模块
用于与平台建立WebSocket连接并处理通信
"""
import asyncio
import json
import logging
import websockets
from datetime import datetime
from urllib.parse import urlencode
from config import config
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebSocketClient:
"""WebSocket客户端类"""
def __init__(self, relay_service):
"""
初始化WebSocket客户端
Args:
relay_service: 中继器服务实例
"""
self.relay_service = relay_service
self.device_id = config._config.get('relay', {}).get('device_id', '1')
self.host = config._config.get('websocket', {}).get('host', 'localhost')
self.port = config._config.get('websocket', {}).get('port', 8086)
self.timeout = config._config.get('websocket', {}).get('timeout', 5)
self.websocket = None
self.connected = False
self.running = False
logger.info(f"初始化WebSocket客户端: device_id={self.device_id}, host={self.host}, port={self.port}")
async def connect(self):
"""建立WebSocket连接"""
try:
# 构建连接URL
params = urlencode({'device_id': self.device_id})
uri = f"ws://{self.host}:{self.port}/ws/device?{params}"
logger.info(f"正在连接到平台: {uri}")
# 建立WebSocket连接移除了timeout参数以兼容新版本websockets库
self.websocket = await websockets.connect(uri)
self.connected = True
self.running = True
logger.info("成功连接到平台")
return True
except Exception as e:
logger.error(f"连接平台失败: {e}")
self.connected = False
return False
async def disconnect(self):
"""断开WebSocket连接"""
logger.info("断开WebSocket连接")
self.running = False
if self.websocket:
await self.websocket.close()
self.websocket = None
self.connected = False
logger.info("WebSocket连接已断开")
async def send_message(self, message):
"""
发送消息到平台
Args:
message (dict): 要发送的消息
"""
if not self.connected or not self.websocket:
logger.warning("WebSocket未连接无法发送消息")
return False
try:
message_str = json.dumps(message, ensure_ascii=False)
await self.websocket.send(message_str)
logger.debug(f"发送消息到平台: {message_str}")
return True
except Exception as e:
logger.error(f"发送消息失败: {e}")
# 连接可能已断开,更新状态
self.connected = False
2025-09-08 22:12:42 +08:00
return False
async def listen(self):
"""监听平台消息"""
if not self.connected or not self.websocket:
logger.error("WebSocket未连接无法监听消息")
return
logger.info("开始监听平台消息")
try:
async for message in self.websocket:
try:
# 解析收到的消息
message_data = json.loads(message)
logger.info(f"收到平台消息: {message_data}")
# 将消息传递给中继器服务处理
self.relay_service.handle_platform_command(message)
except json.JSONDecodeError as e:
logger.error(f"解析平台消息失败: {e}")
except Exception as e:
logger.error(f"处理平台消息时发生错误: {e}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket连接已关闭")
self.connected = False
except Exception as e:
logger.error(f"监听平台消息时发生错误: {e}")
self.connected = False
async def reconnect(self):
"""尝试重新连接到平台"""
logger.info("尝试重新连接到平台...")
reconnect_delay = 5 # 初始重连延迟(秒)
max_delay = 60 # 最大重连延迟(秒)
2025-09-08 22:12:42 +08:00
while self.running and not self.connected:
try:
await self.connect()
if self.connected:
logger.info("重新连接成功")
return True
else:
logger.warning(f"重新连接失败,{reconnect_delay}秒后重试...")
await asyncio.sleep(reconnect_delay)
# 指数退避,但不超过最大延迟
reconnect_delay = min(reconnect_delay * 2, max_delay)
except Exception as e:
logger.error(f"重新连接时发生错误: {e}")
await asyncio.sleep(reconnect_delay)
2025-09-08 22:12:42 +08:00
def is_connected(self):
"""检查是否已连接"""
return self.connected