import asyncio import json import time import websockets from PyQt6.QtCore import QThread, pyqtSignal class SyncClientThread(QThread): # Signals from WebSocket -> PyQt UI connected = pyqtSignal() disconnected = pyqtSignal() room_joined = pyqtSignal(dict) room_rejoined = pyqtSignal(dict) room_error = pyqtSignal(str) file_check_needed = pyqtSignal(dict) # msg users_updated = pyqtSignal(list) chat_message = pyqtSignal(str, str, int) # author, text, timestamp system_message = pyqtSignal(str) sync_event = pyqtSignal(dict) latency_updated = pyqtSignal(int) # latency in ms def __init__(self, url="ws://localhost:3000/ws"): super().__init__() self.url = url self.ws = None self.loop = None self.running = False self._ping_sent_at = 0 def run(self): """Runs strictly within the newly created QThread""" self.running = True self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self._connect_and_listen()) self.loop.close() def stop(self): self.running = False if self.ws and self.loop: asyncio.run_coroutine_threadsafe(self.ws.close(), self.loop) self.quit() self.wait() def send_message(self, message: dict): """Called safely from the main PyQt thread to send data out""" if self.ws and self.loop: json_str = json.dumps(message) asyncio.run_coroutine_threadsafe(self.ws.send(json_str), self.loop) async def _ping_loop(self, ws): """Sends WebSocket protocol-level pings every 5s to measure latency.""" while self.running: try: pong = await ws.ping() sent_at = time.time() await asyncio.wait_for(pong, timeout=5) latency_ms = int((time.time() - sent_at) * 1000) self.latency_updated.emit(latency_ms) except Exception: break await asyncio.sleep(5) async def _connect_and_listen(self): while self.running: try: async with websockets.connect(self.url) as ws: self.ws = ws self.connected.emit() ping_task = asyncio.create_task(self._ping_loop(ws)) try: async for message in ws: if not self.running: break self._handle_message(json.loads(message)) except websockets.ConnectionClosed: pass finally: ping_task.cancel() except Exception as e: print(f"WebSocket Error: {e}") self.disconnected.emit() if self.running: # Reconnect backoff await asyncio.sleep(2) def _handle_message(self, msg: dict): t = msg.get("type") if t == "room_created": self.room_joined.emit(msg) elif t == "room_joined": self.room_joined.emit(msg) elif t == "room_rejoined": self.room_rejoined.emit(msg) elif t == "error": self.room_error.emit(msg.get("message", "Unknown error")) elif t == "room_file_check": self.file_check_needed.emit(msg) elif t in ["user_joined", "user_left"]: self.users_updated.emit(msg.get("users", [])) elif t == "chat": self.chat_message.emit(msg.get("username", "Unknown"), msg.get("message", ""), msg.get("timestamp", 0)) elif t == "sync": self.sync_event.emit(msg)