Files
video-sync/desktop-client/sync_client.py
2026-03-05 14:09:58 +11:00

115 lines
4.0 KiB
Python

import asyncio
import json
import time
import websockets
from PyQt6.QtCore import QThread, pyqtSignal
class SyncClientThread(QThread):
# Signals from WebSocket -> PyQt UI
connected = pyqtSignal()
disconnected = pyqtSignal()
room_joined = pyqtSignal(dict)
room_rejoined = pyqtSignal(dict)
room_error = pyqtSignal(str)
file_check_needed = pyqtSignal(dict) # msg
users_updated = pyqtSignal(list)
chat_message = pyqtSignal(str, str, int) # author, text, timestamp
system_message = pyqtSignal(str)
sync_event = pyqtSignal(dict)
latency_updated = pyqtSignal(int) # latency in ms
def __init__(self, url="ws://localhost:3000/ws"):
super().__init__()
self.url = url
self.ws = None
self.loop = None
self.running = False
self._ping_sent_at = 0
def run(self):
"""Runs strictly within the newly created QThread"""
self.running = True
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._connect_and_listen())
self.loop.close()
def stop(self):
self.running = False
if self.ws and self.loop:
asyncio.run_coroutine_threadsafe(self.ws.close(), self.loop)
self.quit()
self.wait()
def send_message(self, message: dict):
"""Called safely from the main PyQt thread to send data out"""
if self.ws and self.loop:
json_str = json.dumps(message)
asyncio.run_coroutine_threadsafe(self.ws.send(json_str), self.loop)
async def _ping_loop(self, ws):
"""Sends WebSocket protocol-level pings every 5s to measure latency."""
while self.running:
try:
pong = await ws.ping()
sent_at = time.time()
await asyncio.wait_for(pong, timeout=5)
latency_ms = int((time.time() - sent_at) * 1000)
self.latency_updated.emit(latency_ms)
except Exception:
break
await asyncio.sleep(5)
async def _connect_and_listen(self):
while self.running:
try:
async with websockets.connect(self.url) as ws:
self.ws = ws
self.connected.emit()
ping_task = asyncio.create_task(self._ping_loop(ws))
try:
async for message in ws:
if not self.running:
break
self._handle_message(json.loads(message))
except websockets.ConnectionClosed:
pass
finally:
ping_task.cancel()
except Exception as e:
print(f"WebSocket Error: {e}")
self.disconnected.emit()
if self.running:
# Reconnect backoff
await asyncio.sleep(2)
def _handle_message(self, msg: dict):
t = msg.get("type")
if t == "room_created":
self.room_joined.emit(msg)
elif t == "room_joined":
self.room_joined.emit(msg)
elif t == "room_rejoined":
self.room_rejoined.emit(msg)
elif t == "error":
self.room_error.emit(msg.get("message", "Unknown error"))
elif t == "room_file_check":
self.file_check_needed.emit(msg)
elif t in ["user_joined", "user_left"]:
self.users_updated.emit(msg.get("users", []))
elif t == "chat":
self.chat_message.emit(msg.get("username", "Unknown"), msg.get("message", ""), msg.get("timestamp", 0))
elif t == "sync":
self.sync_event.emit(msg)