"""Universalis WebSocket API 實現.""" import asyncio import threading import time from typing import Callable, Optional from queue import Queue import bson import websockets from .config import DATA_CENTER, WORLD_IDS, WORLDS # WebSocket 設定 UNIVERSALIS_WS_URL = "wss://universalis.app/api/ws" # 陸行鳥資料中心的所有伺服器 ID CHOCOBO_WORLD_IDS = list(WORLDS.keys()) class UniversalisWebSocket: """Universalis WebSocket 客戶端.""" # 最大保留的即時事件數量 MAX_LIVE_EVENTS = 100 def __init__(self): self._ws: Optional[websockets.WebSocketClientProtocol] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None self._running = False self._subscriptions: set = set() self._callbacks: dict[str, list[Callable]] = {} self._message_queue: Queue = Queue() self._connected = False # 物品數據緩存: {item_id: {"data": {...}, "timestamp": time}} self._item_cache: dict[int, dict] = {} # 當前訂閱的物品 ID self._watched_items: set[int] = set() # 即時交易事件列表 (最新的在前面) self._live_events: list[dict] = [] self._live_events_lock = threading.Lock() # 各伺服器最後收到數據的時間: {world_id: timestamp} self._world_last_update: dict[int, float] = {} self._world_event_count: dict[int, int] = {} # 事件計數 def start(self): """啟動 WebSocket 連線(在背景執行緒中運行).""" if self._running: return self._running = True self._thread = threading.Thread(target=self._run_event_loop, daemon=True) self._thread.start() def stop(self): """停止 WebSocket 連線.""" self._running = False self._connected = False if self._ws and self._loop: try: asyncio.run_coroutine_threadsafe( self._ws.close(), self._loop ).result(timeout=2) except Exception: pass if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(timeout=3) def _run_event_loop(self): """在背景執行緒中運行事件循環.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: self._loop.run_until_complete(self._connect_and_listen()) except Exception as e: print(f"WebSocket 事件循環錯誤: {e}") finally: self._loop.close() async def _connect_and_listen(self): """連接並監聽 WebSocket.""" while self._running: try: async with websockets.connect( UNIVERSALIS_WS_URL, ping_interval=30, ping_timeout=10, ) as ws: self._ws = ws self._connected = True print("WebSocket 已連接到 Universalis") # 重新訂閱之前的頻道 for channel in self._subscriptions: await self._send_subscribe(channel) # 監聽消息 async for message in ws: if not self._running: break await self._handle_message(message) except websockets.exceptions.ConnectionClosed: print("WebSocket 連線已關閉,嘗試重新連接...") except Exception as e: print(f"WebSocket 錯誤: {e}") self._connected = False if self._running: await asyncio.sleep(5) # 等待後重新連接 async def _send_subscribe(self, channel: str): """發送訂閱消息.""" if self._ws and self._connected: msg = bson.encode({"event": "subscribe", "channel": channel}) await self._ws.send(msg) print(f"已訂閱頻道: {channel}") async def _send_unsubscribe(self, channel: str): """發送取消訂閱消息.""" if self._ws and self._connected: msg = bson.encode({"event": "unsubscribe", "channel": channel}) await self._ws.send(msg) print(f"已取消訂閱頻道: {channel}") async def _handle_message(self, message: bytes): """處理收到的消息.""" try: data = bson.decode(message) event = data.get("event", "") item_id = data.get("item") world_id = data.get("world") # 檢查是否為陸行鳥資料中心的伺服器 if world_id and world_id not in CHOCOBO_WORLD_IDS: return # 忽略其他資料中心的消息 # 如果是我們關注的物品,更新緩存 if item_id and item_id in self._watched_items: self._item_cache[item_id] = { "data": data, "timestamp": time.time(), "event": event, "world": world_id, } # 儲存即時事件 (listings/add, sales/add, sales/remove) if event in ("listings/add", "sales/add", "listings/remove"): current_time = time.time() live_event = { "event": event, "item_id": item_id, "world_id": world_id, "world_name": WORLDS.get(world_id, "未知"), "timestamp": current_time, "data": data, } with self._live_events_lock: self._live_events.insert(0, live_event) # 限制列表大小 if len(self._live_events) > self.MAX_LIVE_EVENTS: self._live_events = self._live_events[:self.MAX_LIVE_EVENTS] # 更新伺服器最後更新時間 if world_id: self._world_last_update[world_id] = current_time self._world_event_count[world_id] = self._world_event_count.get(world_id, 0) + 1 # 放入消息佇列 self._message_queue.put(data) # 呼叫回調 if event in self._callbacks: for callback in self._callbacks[event]: try: callback(data) except Exception as e: print(f"回調錯誤: {e}") except Exception as e: print(f"處理消息錯誤: {e}") def subscribe(self, channel: str, world_id: int = None): """訂閱頻道. Args: channel: 頻道名稱 (listings/add, listings/remove, sales/add) world_id: 可選的伺服器 ID,用於過濾 """ if world_id: full_channel = f"{channel}{{world={world_id}}}" else: full_channel = channel self._subscriptions.add(full_channel) if self._loop and self._connected: asyncio.run_coroutine_threadsafe( self._send_subscribe(full_channel), self._loop ) def subscribe_item(self, item_id: int, world_or_dc: str = None): """訂閱特定物品的更新. Args: item_id: 物品 ID world_or_dc: 伺服器或資料中心名稱 """ # 訂閱該物品的上架和銷售更新 if world_or_dc and world_or_dc != "全部伺服器": world_id = WORLD_IDS.get(world_or_dc) if world_id: self.subscribe("listings/add", world_id) self.subscribe("sales/add", world_id) else: # 訂閱陸行鳥資料中心所有伺服器 for world_id in CHOCOBO_WORLD_IDS: self.subscribe("listings/add", world_id) self.subscribe("sales/add", world_id) def unsubscribe(self, channel: str, world_id: int = None): """取消訂閱頻道.""" if world_id: full_channel = f"{channel}{{world={world_id}}}" else: full_channel = channel self._subscriptions.discard(full_channel) if self._loop and self._connected: asyncio.run_coroutine_threadsafe( self._send_unsubscribe(full_channel), self._loop ) def on_event(self, event: str, callback: Callable): """註冊事件回調. Args: event: 事件名稱 (listings/add, listings/remove, sales/add) callback: 回調函數,接收事件數據 """ if event not in self._callbacks: self._callbacks[event] = [] self._callbacks[event].append(callback) def get_latest_messages(self, limit: int = 10) -> list: """取得最新的消息. Args: limit: 最大數量 Returns: 消息列表 """ messages = [] while not self._message_queue.empty() and len(messages) < limit: try: messages.append(self._message_queue.get_nowait()) except: break return messages def is_connected(self) -> bool: """檢查是否已連接.""" return self._connected def watch_item(self, item_id: int): """開始關注某個物品的更新. Args: item_id: 物品 ID """ self._watched_items.add(item_id) def unwatch_item(self, item_id: int): """停止關注某個物品.""" self._watched_items.discard(item_id) self._item_cache.pop(item_id, None) def get_cached_data(self, item_id: int) -> Optional[dict]: """取得物品的緩存數據. Args: item_id: 物品 ID Returns: 緩存數據,如果沒有則返回 None """ return self._item_cache.get(item_id) def has_update(self, item_id: int, since: float = 0) -> bool: """檢查物品是否有新更新. Args: item_id: 物品 ID since: 時間戳,檢查此時間之後是否有更新 Returns: 是否有新更新 """ cache = self._item_cache.get(item_id) if cache and cache["timestamp"] > since: return True return False def clear_cache(self, item_id: int = None): """清除緩存. Args: item_id: 物品 ID,如果為 None 則清除所有 """ if item_id: self._item_cache.pop(item_id, None) else: self._item_cache.clear() def get_live_events(self, limit: int = 50) -> list[dict]: """取得即時交易事件. Args: limit: 最大數量 Returns: 即時事件列表(最新的在前面) """ with self._live_events_lock: return self._live_events[:limit].copy() def get_live_events_count(self) -> int: """取得即時事件數量.""" with self._live_events_lock: return len(self._live_events) def clear_live_events(self): """清除即時事件.""" with self._live_events_lock: self._live_events.clear() def get_world_data_status(self) -> list[dict]: """取得各伺服器的資料流狀態. Returns: 各伺服器狀態列表,包含 world_id, world_name, last_update, event_count """ current_time = time.time() status_list = [] for world_id, world_name in WORLDS.items(): last_update = self._world_last_update.get(world_id, 0) event_count = self._world_event_count.get(world_id, 0) if last_update > 0: elapsed = current_time - last_update else: elapsed = -1 # 表示從未收到數據 status_list.append({ "world_id": world_id, "world_name": world_name, "last_update": last_update, "elapsed_seconds": elapsed, "event_count": event_count, }) return status_list def reset_stats(self): """重置統計數據.""" self._world_last_update.clear() self._world_event_count.clear() with self._live_events_lock: self._live_events.clear() # 全域 WebSocket 客戶端實例 _ws_client: Optional[UniversalisWebSocket] = None def get_ws_client() -> UniversalisWebSocket: """取得全域 WebSocket 客戶端.""" global _ws_client if _ws_client is None: _ws_client = UniversalisWebSocket() return _ws_client def start_websocket(): """啟動全域 WebSocket 連線.""" client = get_ws_client() client.start() return client def stop_websocket(): """停止全域 WebSocket 連線.""" global _ws_client if _ws_client: _ws_client.stop() _ws_client = None