Daniel246 commited on
Commit
e01e5fd
·
1 Parent(s): 3cdc4d3

改用 WebSocket 驅動實時更新

Browse files

- 新增 websocket_api.py:連接 Universalis WebSocket API
- 首次查詢用 REST API,之後用 WebSocket 緩存
- 自動刷新改為 5 秒
- 物品資訊與市場數據並行請求(aiohttp)
- 新增更新紀錄頁籤

Files changed (5) hide show
  1. app.py +50 -3
  2. requirements.txt +3 -0
  3. src/api.py +215 -0
  4. src/display.py +38 -8
  5. src/websocket_api.py +314 -0
app.py CHANGED
@@ -20,6 +20,7 @@ from src.watchlist import (
20
  remove_item_from_list,
21
  )
22
  from src.ai_analysis import analyze_item_with_ai, get_market_summary
 
23
 
24
 
25
  def refresh_watchlist_with_notify(watchlist: list):
@@ -59,6 +60,8 @@ def create_app() -> gr.Blocks:
59
  **支援伺服器:** 伊弗利特、迦樓羅、利維坦、鳳凰、奧汀、巴哈姆特、拉姆、泰坦
60
 
61
  > **搜尋提示:** 可輸入繁體中文、英文名稱、物品 ID,或貼上 Universalis 網址
 
 
62
  """)
63
 
64
  with gr.Tabs():
@@ -68,6 +71,7 @@ def create_app() -> gr.Blocks:
68
  _build_watchlist_tab(watchlist_state)
69
  _build_tax_tab()
70
  _build_stats_tab()
 
71
 
72
  return app
73
 
@@ -117,7 +121,7 @@ def _build_market_tab() -> None:
117
  with gr.Row():
118
  search_btn = gr.Button("查詢市場", variant="primary")
119
  auto_refresh = gr.Checkbox(
120
- label="自動刷新 (60秒)",
121
  value=False,
122
  )
123
 
@@ -152,8 +156,8 @@ def _build_market_tab() -> None:
152
  comparison_table = gr.Dataframe(interactive=False)
153
  comparison_chart = gr.Plot()
154
 
155
- # 自動刷新用的隱藏計時器
156
- timer = gr.Timer(value=60, active=False)
157
 
158
  # 事件綁定
159
  search_input.change(
@@ -466,7 +470,50 @@ def _build_ai_tab() -> None:
466
  )
467
 
468
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
469
  if __name__ == "__main__":
 
 
 
 
 
 
 
 
 
 
 
470
  application = create_app()
471
  application.launch(
472
  server_name="0.0.0.0",
 
20
  remove_item_from_list,
21
  )
22
  from src.ai_analysis import analyze_item_with_ai, get_market_summary
23
+ from src.websocket_api import start_websocket
24
 
25
 
26
  def refresh_watchlist_with_notify(watchlist: list):
 
60
  **支援伺服器:** 伊弗利特、迦樓羅、利維坦、鳳凰、奧汀、巴哈姆特、拉姆、泰坦
61
 
62
  > **搜尋提示:** 可輸入繁體中文、英文名稱、物品 ID,或貼上 Universalis 網址
63
+
64
+ 🔗 **WebSocket 已連線** - 使用異步 API 加速查詢
65
  """)
66
 
67
  with gr.Tabs():
 
71
  _build_watchlist_tab(watchlist_state)
72
  _build_tax_tab()
73
  _build_stats_tab()
74
+ _build_changelog_tab()
75
 
76
  return app
77
 
 
121
  with gr.Row():
122
  search_btn = gr.Button("查詢市場", variant="primary")
123
  auto_refresh = gr.Checkbox(
124
+ label="自動刷新 (5秒)",
125
  value=False,
126
  )
127
 
 
156
  comparison_table = gr.Dataframe(interactive=False)
157
  comparison_chart = gr.Plot()
158
 
159
+ # 自動刷新用的計時器 (5秒,使用 WebSocket 緩存)
160
+ timer = gr.Timer(value=5, active=False)
161
 
162
  # 事件綁定
163
  search_input.change(
 
470
  )
471
 
472
 
473
+ def _build_changelog_tab() -> None:
474
+ """建立更新紀錄頁籤."""
475
+ with gr.TabItem("更新紀錄"):
476
+ gr.Markdown("""
477
+ ### v1.3.0 (2024-12)
478
+ - 改用 WebSocket 驅動實時更新
479
+ - 首次查詢用 REST API,之後用 WebSocket 緩存
480
+ - 自動刷新改為 5 秒(使用緩存時幾乎無延遲)
481
+ - 物品資訊與市場數據並行請求
482
+
483
+ ### v1.2.0 (2024-12)
484
+ - 新增 AI 分析功能
485
+ - 支援跨服套利判斷
486
+ - 手機版面優化
487
+
488
+ ### v1.1.0 (2024-12)
489
+ - 新增監看清單功能
490
+ - 支援設定目標價格提醒
491
+ - 資料儲存於瀏覽器 LocalStorage
492
+
493
+ ### v1.0.0 (2024-12)
494
+ - 首次發布
495
+ - 支援繁體中文搜尋物品
496
+ - 市場價格查詢、交易紀錄
497
+ - 跨伺服器比價
498
+ - 稅率資訊、上傳統計
499
+
500
+ ---
501
+ 資料來源: [Universalis API](https://universalis.app/)
502
+ """)
503
+
504
+
505
  if __name__ == "__main__":
506
+ # 啟動 WebSocket 連線(背景執行)
507
+ print("啟動 WebSocket 連線...")
508
+ ws_client = start_websocket()
509
+
510
+ # 訂閱陸行鳥資料中心的所有伺服器更新
511
+ from src.config import WORLDS
512
+ for world_id in WORLDS.keys():
513
+ ws_client.subscribe("listings/add", world_id)
514
+ ws_client.subscribe("sales/add", world_id)
515
+ print("已訂閱陸行鳥資料中心的市場更新")
516
+
517
  application = create_app()
518
  application.launch(
519
  server_name="0.0.0.0",
requirements.txt CHANGED
@@ -5,3 +5,6 @@ plotly>=5.15.0
5
  opencc-python-reimplemented>=0.1.7
6
  huggingface_hub>=0.20.0
7
  python-dotenv>=1.0.0
 
 
 
 
5
  opencc-python-reimplemented>=0.1.7
6
  huggingface_hub>=0.20.0
7
  python-dotenv>=1.0.0
8
+ aiohttp>=3.9.0
9
+ websockets>=12.0
10
+ pymongo>=4.6.0
src/api.py CHANGED
@@ -1,7 +1,10 @@
1
  """Universalis 和 XIVAPI 的 API 呼叫函數."""
2
 
 
3
  import re
 
4
 
 
5
  import requests
6
  from opencc import OpenCC
7
 
@@ -285,3 +288,215 @@ def get_recent_activity(world_or_dc: str = None, limit: int = 15) -> list:
285
  activity_list.sort(key=lambda x: x["last_update"], reverse=True)
286
 
287
  return activity_list
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Universalis 和 XIVAPI 的 API 呼叫函數."""
2
 
3
+ import asyncio
4
  import re
5
+ from typing import Optional
6
 
7
+ import aiohttp
8
  import requests
9
  from opencc import OpenCC
10
 
 
288
  activity_list.sort(key=lambda x: x["last_update"], reverse=True)
289
 
290
  return activity_list
291
+
292
+
293
+ # ============================================================
294
+ # 異步 API 函數 (使用 aiohttp,速度更快)
295
+ # ============================================================
296
+
297
+ async def get_market_data_async(
298
+ item_id: int,
299
+ world_or_dc: str = None,
300
+ session: aiohttp.ClientSession = None,
301
+ ) -> dict:
302
+ """異步取得市場板數據.
303
+
304
+ Args:
305
+ item_id: 物品 ID
306
+ world_or_dc: 伺服器或資料中心名稱
307
+ session: 可選的 aiohttp session
308
+
309
+ Returns:
310
+ 市場數據字典
311
+ """
312
+ if world_or_dc is None:
313
+ world_or_dc = DATA_CENTER
314
+
315
+ url = f"{UNIVERSALIS_BASE}/{world_or_dc}/{item_id}"
316
+ params = {"listings": 50, "entries": 50}
317
+
318
+ close_session = False
319
+ if session is None:
320
+ session = aiohttp.ClientSession()
321
+ close_session = True
322
+
323
+ try:
324
+ async with session.get(
325
+ url, params=params, timeout=aiohttp.ClientTimeout(total=MARKET_API_TIMEOUT)
326
+ ) as response:
327
+ if response.status == 200:
328
+ return await response.json()
329
+ return {}
330
+ except Exception as e:
331
+ print(f"異步取得市場數據錯誤: {e}")
332
+ return {}
333
+ finally:
334
+ if close_session:
335
+ await session.close()
336
+
337
+
338
+ async def get_item_info_async(
339
+ item_id: int,
340
+ session: aiohttp.ClientSession = None,
341
+ ) -> dict:
342
+ """異步取得物品詳細資訊.
343
+
344
+ Args:
345
+ item_id: 物品 ID
346
+ session: 可選的 aiohttp session
347
+
348
+ Returns:
349
+ 物品資訊字典
350
+ """
351
+ close_session = False
352
+ if session is None:
353
+ session = aiohttp.ClientSession()
354
+ close_session = True
355
+
356
+ try:
357
+ url = f"{CAFEMAKER_BASE}/item/{item_id}"
358
+ async with session.get(
359
+ url, timeout=aiohttp.ClientTimeout(total=API_TIMEOUT)
360
+ ) as response:
361
+ if response.status == 200:
362
+ data = await response.json()
363
+ if data.get("Name"):
364
+ data["Name"] = _s2t_converter.convert(data["Name"])
365
+ return data
366
+ except Exception as e:
367
+ print(f"異步取得物品資訊錯誤: {e}")
368
+
369
+ # 備用:嘗試 XIVAPI
370
+ try:
371
+ url = f"{XIVAPI_BASE}/item/{item_id}"
372
+ params = {"language": "en"}
373
+ async with session.get(
374
+ url, params=params, timeout=aiohttp.ClientTimeout(total=API_TIMEOUT)
375
+ ) as response:
376
+ if response.status == 200:
377
+ return await response.json()
378
+ except Exception as e:
379
+ print(f"XIVAPI 異步取得物品資訊錯誤: {e}")
380
+ finally:
381
+ if close_session:
382
+ await session.close()
383
+
384
+ return {"ID": item_id, "Name": f"物品 {item_id}", "LevelItem": 0}
385
+
386
+
387
+ async def get_multi_item_market_data_async(
388
+ item_ids: list[int],
389
+ world_or_dc: str = None,
390
+ ) -> dict:
391
+ """異步批量取得多個物品的市場數據.
392
+
393
+ Args:
394
+ item_ids: 物品 ID 列表
395
+ world_or_dc: 伺服器或資料中心名稱
396
+
397
+ Returns:
398
+ 以物品 ID 為 key 的市場數據字典
399
+ """
400
+ if world_or_dc is None:
401
+ world_or_dc = DATA_CENTER
402
+
403
+ if not item_ids:
404
+ return {}
405
+
406
+ # Universalis 支援批量查詢,最多 100 個物品
407
+ ids_str = ",".join(str(i) for i in item_ids[:100])
408
+ url = f"{UNIVERSALIS_BASE}/{world_or_dc}/{ids_str}"
409
+ params = {"listings": 20, "entries": 20}
410
+
411
+ try:
412
+ async with aiohttp.ClientSession() as session:
413
+ async with session.get(
414
+ url, params=params, timeout=aiohttp.ClientTimeout(total=MARKET_API_TIMEOUT)
415
+ ) as response:
416
+ if response.status == 200:
417
+ data = await response.json()
418
+ # 如果是單個物品,包裝成 items 格式
419
+ if "items" not in data and "itemID" in data:
420
+ return {data["itemID"]: data}
421
+ return data.get("items", {})
422
+ except Exception as e:
423
+ print(f"批量取得市場數據錯誤: {e}")
424
+
425
+ return {}
426
+
427
+
428
+ async def get_full_item_data_async(
429
+ item_id: int,
430
+ world_or_dc: str = None,
431
+ ) -> dict:
432
+ """異步取得物品的完整資料(物品資訊 + 市場數據).
433
+
434
+ Args:
435
+ item_id: 物品 ID
436
+ world_or_dc: 伺服器或資料中心名稱
437
+
438
+ Returns:
439
+ 包含 item_info 和 market_data 的字典
440
+ """
441
+ async with aiohttp.ClientSession() as session:
442
+ # 並行請求物品資訊和市場數據
443
+ item_info_task = get_item_info_async(item_id, session)
444
+ market_data_task = get_market_data_async(item_id, world_or_dc, session)
445
+
446
+ item_info, market_data = await asyncio.gather(
447
+ item_info_task, market_data_task
448
+ )
449
+
450
+ return {
451
+ "item_info": item_info,
452
+ "market_data": market_data,
453
+ }
454
+
455
+
456
+ def get_market_data_fast(item_id: int, world_or_dc: str = None) -> dict:
457
+ """快速取得市場板數據(使用異步).
458
+
459
+ 這是 get_market_data 的快速版本,適合在 Gradio 中使用。
460
+
461
+ Args:
462
+ item_id: 物品 ID
463
+ world_or_dc: 伺服器或資料中心名稱
464
+
465
+ Returns:
466
+ 市場數據字典
467
+ """
468
+ try:
469
+ loop = asyncio.new_event_loop()
470
+ asyncio.set_event_loop(loop)
471
+ result = loop.run_until_complete(get_market_data_async(item_id, world_or_dc))
472
+ loop.close()
473
+ return result
474
+ except Exception as e:
475
+ print(f"快速取得市場數據錯誤: {e}")
476
+ # 回退到同步版本
477
+ return get_market_data(item_id, world_or_dc)
478
+
479
+
480
+ def get_full_item_data_fast(item_id: int, world_or_dc: str = None) -> dict:
481
+ """快速取得物品完整資料(使用異步並行請求).
482
+
483
+ Args:
484
+ item_id: 物品 ID
485
+ world_or_dc: 伺服器或資料中心名稱
486
+
487
+ Returns:
488
+ 包含 item_info 和 market_data 的字典
489
+ """
490
+ try:
491
+ loop = asyncio.new_event_loop()
492
+ asyncio.set_event_loop(loop)
493
+ result = loop.run_until_complete(get_full_item_data_async(item_id, world_or_dc))
494
+ loop.close()
495
+ return result
496
+ except Exception as e:
497
+ print(f"快速取得完整資料錯誤: {e}")
498
+ # 回退到同步版本
499
+ return {
500
+ "item_info": get_item_info(item_id),
501
+ "market_data": get_market_data(item_id, world_or_dc),
502
+ }
src/display.py CHANGED
@@ -7,7 +7,10 @@ import gradio as gr
7
  import pandas as pd
8
  import plotly.graph_objects as go
9
 
 
 
10
  from .api import (
 
11
  get_item_info,
12
  get_market_data,
13
  get_recent_activity,
@@ -15,6 +18,7 @@ from .api import (
15
  get_upload_stats,
16
  search_items,
17
  )
 
18
  from .charts import (
19
  create_cross_world_comparison,
20
  create_price_chart,
@@ -91,17 +95,43 @@ def display_item_market(
91
  return "", empty_df, empty_df, empty_fig, empty_df, empty_fig
92
 
93
  item_id = item_selection
94
-
95
- # 取得物品資訊
96
- item_info = get_item_info(item_id)
97
- item_name = item_info.get("Name", f"物品 {item_id}")
98
- item_level = item_info.get("LevelItem", 0)
99
-
100
- # 取得市場數據
101
  world_query = (
102
  selected_world if selected_world != "全部伺服器" else DATA_CENTER
103
  )
104
- market_data = get_market_data(item_id, world_query)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
106
  if not market_data:
107
  return (
 
7
  import pandas as pd
8
  import plotly.graph_objects as go
9
 
10
+ import time
11
+
12
  from .api import (
13
+ get_full_item_data_fast,
14
  get_item_info,
15
  get_market_data,
16
  get_recent_activity,
 
18
  get_upload_stats,
19
  search_items,
20
  )
21
+ from .websocket_api import get_ws_client
22
  from .charts import (
23
  create_cross_world_comparison,
24
  create_price_chart,
 
95
  return "", empty_df, empty_df, empty_fig, empty_df, empty_fig
96
 
97
  item_id = item_selection
 
 
 
 
 
 
 
98
  world_query = (
99
  selected_world if selected_world != "全部伺服器" else DATA_CENTER
100
  )
101
+
102
+ # 開始關注此物品的 WebSocket 更新
103
+ ws_client = get_ws_client()
104
+ if ws_client:
105
+ ws_client.watch_item(item_id)
106
+
107
+ # 檢查 WebSocket 是否有此物品的緩存數據
108
+ ws_data = None
109
+ if ws_client:
110
+ ws_data = ws_client.get_cached_data(item_id)
111
+
112
+ if ws_data and ws_data.get("data", {}).get("listings"):
113
+ # 使用 WebSocket 緩存的數據(更快)
114
+ cached = ws_data["data"]
115
+ item_info = get_item_info(item_id) # 物品資訊還是用 API
116
+ market_data = {
117
+ "listings": cached.get("listings", []),
118
+ "recentHistory": cached.get("recentHistory", []),
119
+ "currentAveragePrice": cached.get("currentAveragePrice", 0),
120
+ "averagePrice": cached.get("averagePrice", 0),
121
+ "minPrice": cached.get("minPrice", 0),
122
+ "maxPrice": cached.get("maxPrice", 0),
123
+ "listingsCount": len(cached.get("listings", [])),
124
+ "regularSaleVelocity": cached.get("regularSaleVelocity", 0),
125
+ "lastUploadTime": int(ws_data["timestamp"] * 1000),
126
+ }
127
+ else:
128
+ # 首次查詢,使用 REST API
129
+ full_data = get_full_item_data_fast(item_id, world_query)
130
+ item_info = full_data.get("item_info", {})
131
+ market_data = full_data.get("market_data", {})
132
+
133
+ item_name = item_info.get("Name", f"物品 {item_id}")
134
+ item_level = item_info.get("LevelItem", 0)
135
 
136
  if not market_data:
137
  return (
src/websocket_api.py ADDED
@@ -0,0 +1,314 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Universalis WebSocket API 實現."""
2
+
3
+ import asyncio
4
+ import threading
5
+ import time
6
+ from typing import Callable, Optional
7
+ from queue import Queue
8
+
9
+ import bson
10
+ import websockets
11
+
12
+ from .config import DATA_CENTER, WORLD_IDS, WORLDS
13
+
14
+ # WebSocket 設定
15
+ UNIVERSALIS_WS_URL = "wss://universalis.app/api/ws"
16
+
17
+ # 陸行鳥資料中心的所有伺服器 ID
18
+ CHOCOBO_WORLD_IDS = list(WORLDS.keys())
19
+
20
+
21
+ class UniversalisWebSocket:
22
+ """Universalis WebSocket 客戶端."""
23
+
24
+ def __init__(self):
25
+ self._ws: Optional[websockets.WebSocketClientProtocol] = None
26
+ self._loop: Optional[asyncio.AbstractEventLoop] = None
27
+ self._thread: Optional[threading.Thread] = None
28
+ self._running = False
29
+ self._subscriptions: set = set()
30
+ self._callbacks: dict[str, list[Callable]] = {}
31
+ self._message_queue: Queue = Queue()
32
+ self._connected = False
33
+ # 物品數據緩存: {item_id: {"data": {...}, "timestamp": time}}
34
+ self._item_cache: dict[int, dict] = {}
35
+ # 當前訂閱的物品 ID
36
+ self._watched_items: set[int] = set()
37
+
38
+ def start(self):
39
+ """啟動 WebSocket 連線(在背景執行緒中運行)."""
40
+ if self._running:
41
+ return
42
+
43
+ self._running = True
44
+ self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
45
+ self._thread.start()
46
+
47
+ def stop(self):
48
+ """停止 WebSocket 連線."""
49
+ self._running = False
50
+ self._connected = False
51
+ if self._ws and self._loop:
52
+ try:
53
+ asyncio.run_coroutine_threadsafe(
54
+ self._ws.close(), self._loop
55
+ ).result(timeout=2)
56
+ except Exception:
57
+ pass
58
+ if self._loop and self._loop.is_running():
59
+ self._loop.call_soon_threadsafe(self._loop.stop)
60
+ if self._thread:
61
+ self._thread.join(timeout=3)
62
+
63
+ def _run_event_loop(self):
64
+ """在背景執行緒中運行事件循環."""
65
+ self._loop = asyncio.new_event_loop()
66
+ asyncio.set_event_loop(self._loop)
67
+ try:
68
+ self._loop.run_until_complete(self._connect_and_listen())
69
+ except Exception as e:
70
+ print(f"WebSocket 事件循環錯誤: {e}")
71
+ finally:
72
+ self._loop.close()
73
+
74
+ async def _connect_and_listen(self):
75
+ """連接並監聽 WebSocket."""
76
+ while self._running:
77
+ try:
78
+ async with websockets.connect(
79
+ UNIVERSALIS_WS_URL,
80
+ ping_interval=30,
81
+ ping_timeout=10,
82
+ ) as ws:
83
+ self._ws = ws
84
+ self._connected = True
85
+ print("WebSocket 已連接到 Universalis")
86
+
87
+ # 重新訂閱之前的頻道
88
+ for channel in self._subscriptions:
89
+ await self._send_subscribe(channel)
90
+
91
+ # 監聽消息
92
+ async for message in ws:
93
+ if not self._running:
94
+ break
95
+ await self._handle_message(message)
96
+
97
+ except websockets.exceptions.ConnectionClosed:
98
+ print("WebSocket 連線已關閉,嘗試重新連接...")
99
+ except Exception as e:
100
+ print(f"WebSocket 錯誤: {e}")
101
+
102
+ self._connected = False
103
+ if self._running:
104
+ await asyncio.sleep(5) # 等待後重新連接
105
+
106
+ async def _send_subscribe(self, channel: str):
107
+ """發送訂閱消息."""
108
+ if self._ws and self._connected:
109
+ msg = bson.encode({"event": "subscribe", "channel": channel})
110
+ await self._ws.send(msg)
111
+ print(f"已訂閱頻道: {channel}")
112
+
113
+ async def _send_unsubscribe(self, channel: str):
114
+ """發送取消訂閱消息."""
115
+ if self._ws and self._connected:
116
+ msg = bson.encode({"event": "unsubscribe", "channel": channel})
117
+ await self._ws.send(msg)
118
+ print(f"已取消訂閱頻道: {channel}")
119
+
120
+ async def _handle_message(self, message: bytes):
121
+ """處理收到的消息."""
122
+ try:
123
+ data = bson.decode(message)
124
+ event = data.get("event", "")
125
+ item_id = data.get("item")
126
+ world_id = data.get("world")
127
+
128
+ # 檢查是否為陸行鳥資料中心的伺服器
129
+ if world_id and world_id not in CHOCOBO_WORLD_IDS:
130
+ return # 忽略其他資料中心的消息
131
+
132
+ # 如果是我們關注的物品,更新緩存
133
+ if item_id and item_id in self._watched_items:
134
+ self._item_cache[item_id] = {
135
+ "data": data,
136
+ "timestamp": time.time(),
137
+ "event": event,
138
+ "world": world_id,
139
+ }
140
+
141
+ # 放入消息佇列
142
+ self._message_queue.put(data)
143
+
144
+ # 呼叫回調
145
+ if event in self._callbacks:
146
+ for callback in self._callbacks[event]:
147
+ try:
148
+ callback(data)
149
+ except Exception as e:
150
+ print(f"回調錯誤: {e}")
151
+
152
+ except Exception as e:
153
+ print(f"處理消息錯誤: {e}")
154
+
155
+ def subscribe(self, channel: str, world_id: int = None):
156
+ """訂閱頻道.
157
+
158
+ Args:
159
+ channel: 頻道名稱 (listings/add, listings/remove, sales/add)
160
+ world_id: 可選的伺服器 ID,用於過濾
161
+ """
162
+ if world_id:
163
+ full_channel = f"{channel}{{world={world_id}}}"
164
+ else:
165
+ full_channel = channel
166
+
167
+ self._subscriptions.add(full_channel)
168
+
169
+ if self._loop and self._connected:
170
+ asyncio.run_coroutine_threadsafe(
171
+ self._send_subscribe(full_channel), self._loop
172
+ )
173
+
174
+ def subscribe_item(self, item_id: int, world_or_dc: str = None):
175
+ """訂閱特定物品的更新.
176
+
177
+ Args:
178
+ item_id: 物品 ID
179
+ world_or_dc: 伺服器或資料中心名稱
180
+ """
181
+ # 訂閱該物品的上架和銷售更新
182
+ if world_or_dc and world_or_dc != "全部伺服器":
183
+ world_id = WORLD_IDS.get(world_or_dc)
184
+ if world_id:
185
+ self.subscribe("listings/add", world_id)
186
+ self.subscribe("sales/add", world_id)
187
+ else:
188
+ # 訂閱陸行鳥資料中心所有伺服器
189
+ for world_id in CHOCOBO_WORLD_IDS:
190
+ self.subscribe("listings/add", world_id)
191
+ self.subscribe("sales/add", world_id)
192
+
193
+ def unsubscribe(self, channel: str, world_id: int = None):
194
+ """取消訂閱頻道."""
195
+ if world_id:
196
+ full_channel = f"{channel}{{world={world_id}}}"
197
+ else:
198
+ full_channel = channel
199
+
200
+ self._subscriptions.discard(full_channel)
201
+
202
+ if self._loop and self._connected:
203
+ asyncio.run_coroutine_threadsafe(
204
+ self._send_unsubscribe(full_channel), self._loop
205
+ )
206
+
207
+ def on_event(self, event: str, callback: Callable):
208
+ """註冊事件回調.
209
+
210
+ Args:
211
+ event: 事件名稱 (listings/add, listings/remove, sales/add)
212
+ callback: 回調函數,接收事件數據
213
+ """
214
+ if event not in self._callbacks:
215
+ self._callbacks[event] = []
216
+ self._callbacks[event].append(callback)
217
+
218
+ def get_latest_messages(self, limit: int = 10) -> list:
219
+ """取得最新的消息.
220
+
221
+ Args:
222
+ limit: 最大數量
223
+
224
+ Returns:
225
+ 消息列表
226
+ """
227
+ messages = []
228
+ while not self._message_queue.empty() and len(messages) < limit:
229
+ try:
230
+ messages.append(self._message_queue.get_nowait())
231
+ except:
232
+ break
233
+ return messages
234
+
235
+ def is_connected(self) -> bool:
236
+ """檢查是否已連接."""
237
+ return self._connected
238
+
239
+ def watch_item(self, item_id: int):
240
+ """開始關注某個物品的更新.
241
+
242
+ Args:
243
+ item_id: 物品 ID
244
+ """
245
+ self._watched_items.add(item_id)
246
+
247
+ def unwatch_item(self, item_id: int):
248
+ """停止關注某個物品."""
249
+ self._watched_items.discard(item_id)
250
+ self._item_cache.pop(item_id, None)
251
+
252
+ def get_cached_data(self, item_id: int) -> Optional[dict]:
253
+ """取得物品的緩存數據.
254
+
255
+ Args:
256
+ item_id: 物品 ID
257
+
258
+ Returns:
259
+ 緩存數據,如果沒有則返回 None
260
+ """
261
+ return self._item_cache.get(item_id)
262
+
263
+ def has_update(self, item_id: int, since: float = 0) -> bool:
264
+ """檢查物品是否有新更新.
265
+
266
+ Args:
267
+ item_id: 物品 ID
268
+ since: 時間戳,檢查此時間之後是否有更新
269
+
270
+ Returns:
271
+ 是否有新更新
272
+ """
273
+ cache = self._item_cache.get(item_id)
274
+ if cache and cache["timestamp"] > since:
275
+ return True
276
+ return False
277
+
278
+ def clear_cache(self, item_id: int = None):
279
+ """清除緩存.
280
+
281
+ Args:
282
+ item_id: 物品 ID,如果為 None 則清除所有
283
+ """
284
+ if item_id:
285
+ self._item_cache.pop(item_id, None)
286
+ else:
287
+ self._item_cache.clear()
288
+
289
+
290
+ # 全域 WebSocket 客戶端實例
291
+ _ws_client: Optional[UniversalisWebSocket] = None
292
+
293
+
294
+ def get_ws_client() -> UniversalisWebSocket:
295
+ """取得全域 WebSocket 客戶端."""
296
+ global _ws_client
297
+ if _ws_client is None:
298
+ _ws_client = UniversalisWebSocket()
299
+ return _ws_client
300
+
301
+
302
+ def start_websocket():
303
+ """啟動全域 WebSocket 連線."""
304
+ client = get_ws_client()
305
+ client.start()
306
+ return client
307
+
308
+
309
+ def stop_websocket():
310
+ """停止全域 WebSocket 連線."""
311
+ global _ws_client
312
+ if _ws_client:
313
+ _ws_client.stop()
314
+ _ws_client = None