115 lines
4.0 KiB
Python
115 lines
4.0 KiB
Python
import asyncio
|
|
import json
|
|
import time
|
|
import websockets
|
|
from PyQt6.QtCore import QThread, pyqtSignal
|
|
|
|
class SyncClientThread(QThread):
|
|
# Signals from WebSocket -> PyQt UI
|
|
connected = pyqtSignal()
|
|
disconnected = pyqtSignal()
|
|
room_joined = pyqtSignal(dict)
|
|
room_rejoined = pyqtSignal(dict)
|
|
room_error = pyqtSignal(str)
|
|
file_check_needed = pyqtSignal(dict) # msg
|
|
users_updated = pyqtSignal(list)
|
|
chat_message = pyqtSignal(str, str, int) # author, text, timestamp
|
|
system_message = pyqtSignal(str)
|
|
sync_event = pyqtSignal(dict)
|
|
latency_updated = pyqtSignal(int) # latency in ms
|
|
|
|
def __init__(self, url="ws://localhost:3000/ws"):
|
|
super().__init__()
|
|
self.url = url
|
|
self.ws = None
|
|
self.loop = None
|
|
self.running = False
|
|
self._ping_sent_at = 0
|
|
|
|
def run(self):
|
|
"""Runs strictly within the newly created QThread"""
|
|
self.running = True
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self.loop.run_until_complete(self._connect_and_listen())
|
|
self.loop.close()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if self.ws and self.loop:
|
|
asyncio.run_coroutine_threadsafe(self.ws.close(), self.loop)
|
|
self.quit()
|
|
self.wait()
|
|
|
|
def send_message(self, message: dict):
|
|
"""Called safely from the main PyQt thread to send data out"""
|
|
if self.ws and self.loop:
|
|
json_str = json.dumps(message)
|
|
asyncio.run_coroutine_threadsafe(self.ws.send(json_str), self.loop)
|
|
|
|
async def _ping_loop(self, ws):
|
|
"""Sends WebSocket protocol-level pings every 5s to measure latency."""
|
|
while self.running:
|
|
try:
|
|
pong = await ws.ping()
|
|
sent_at = time.time()
|
|
await asyncio.wait_for(pong, timeout=5)
|
|
latency_ms = int((time.time() - sent_at) * 1000)
|
|
self.latency_updated.emit(latency_ms)
|
|
except Exception:
|
|
break
|
|
await asyncio.sleep(5)
|
|
|
|
async def _connect_and_listen(self):
|
|
while self.running:
|
|
try:
|
|
async with websockets.connect(self.url) as ws:
|
|
self.ws = ws
|
|
self.connected.emit()
|
|
|
|
ping_task = asyncio.create_task(self._ping_loop(ws))
|
|
try:
|
|
async for message in ws:
|
|
if not self.running:
|
|
break
|
|
self._handle_message(json.loads(message))
|
|
except websockets.ConnectionClosed:
|
|
pass
|
|
finally:
|
|
ping_task.cancel()
|
|
|
|
except Exception as e:
|
|
print(f"WebSocket Error: {e}")
|
|
|
|
self.disconnected.emit()
|
|
if self.running:
|
|
# Reconnect backoff
|
|
await asyncio.sleep(2)
|
|
|
|
def _handle_message(self, msg: dict):
|
|
t = msg.get("type")
|
|
|
|
if t == "room_created":
|
|
self.room_joined.emit(msg)
|
|
|
|
elif t == "room_joined":
|
|
self.room_joined.emit(msg)
|
|
|
|
elif t == "room_rejoined":
|
|
self.room_rejoined.emit(msg)
|
|
|
|
elif t == "error":
|
|
self.room_error.emit(msg.get("message", "Unknown error"))
|
|
|
|
elif t == "room_file_check":
|
|
self.file_check_needed.emit(msg)
|
|
|
|
elif t in ["user_joined", "user_left"]:
|
|
self.users_updated.emit(msg.get("users", []))
|
|
|
|
elif t == "chat":
|
|
self.chat_message.emit(msg.get("username", "Unknown"), msg.get("message", ""), msg.get("timestamp", 0))
|
|
|
|
elif t == "sync":
|
|
self.sync_event.emit(msg)
|