Daniel246's picture
v1.8.0 新增物品分類篩選、物品資訊卡、分頁瀏覽、異常值標註
031cae9
"""Universalis 和 XIVAPI 的 API 呼叫函數."""
import asyncio
import re
from typing import Optional
import aiohttp
import requests
from opencc import OpenCC
# 繁體轉簡體轉換器(用於搜尋)
_t2s_converter = OpenCC('t2s')
# 簡體轉繁體轉換器(用於顯示結果)
_s2t_converter = OpenCC('s2t')
from .config import (
API_TIMEOUT,
CAFEMAKER_BASE,
DATA_CENTER,
MARKET_API_TIMEOUT,
POPULAR_ITEMS,
UNIVERSALIS_BASE,
WORLD_IDS,
XIVAPI_BASE,
)
def _extract_item_id(query: str) -> int:
"""從查詢字串中提取物品 ID.
支援格式:
- 純數字: "5506"
- Universalis 網址: "https://universalis.app/market/5506"
"""
# 檢查是否為純數字
if query.isdigit():
return int(query)
# 檢查是否為 Universalis 網址
match = re.search(r"universalis\.app/market/(\d+)", query)
if match:
return int(match.group(1))
return 0
def search_items(query: str, limit: int = 20, category: int = 0, page: int = 1) -> dict:
"""搜尋物品 - 支援繁體中文、簡體中文、英文.
Args:
query: 搜尋關鍵字、物品 ID 或 Universalis 網址
limit: 每頁結果數量限制
category: 物品分類 ID (ItemSearchCategory),0 表示全部
page: 頁碼(從 1 開始)
Returns:
包含 items, pagination 的字典:
- items: 物品列表,每個物品包含 id, name, icon, level
- pagination: 分頁資訊 (page, page_total, results_total)
"""
empty_result = {"items": [], "pagination": {"page": 1, "page_total": 1, "results_total": 0}}
if not query:
return empty_result
query = query.strip()
results = []
# 1. 檢查是否為物品 ID 或 Universalis 網址
item_id = _extract_item_id(query)
if item_id > 0:
item_info = get_item_info(item_id)
if item_info and item_info.get("Name"):
results.append({
"id": item_id,
"name": item_info.get("Name", f"物品 {item_id}"),
"icon": "",
"level": item_info.get("LevelItem", 0),
})
return {"items": results, "pagination": {"page": 1, "page_total": 1, "results_total": 1}}
# 2. 將繁體中文轉換為簡體中文(Cafemaker 使用簡體)
query_simplified = _t2s_converter.convert(query)
# 3. 使用 Cafemaker 搜尋(支援中文)
pagination = {"page": page, "page_total": 1, "results_total": 0}
try:
url = f"{CAFEMAKER_BASE}/search"
params = {
"string": query_simplified,
"indexes": "Item",
"limit": limit,
"page": page,
}
# 如果有指定分類,加入篩選條件
if category > 0:
params["filters"] = f"ItemSearchCategory.ID={category}"
response = requests.get(url, params=params, timeout=API_TIMEOUT)
if response.status_code == 200:
data = response.json()
# 取得分頁資訊
pag_info = data.get("Pagination", {})
pagination = {
"page": pag_info.get("Page", 1),
"page_total": pag_info.get("PageTotal", 1),
"results_total": pag_info.get("ResultsTotal", 0),
}
for item in data.get("Results", []):
if not any(r["id"] == item.get("ID") for r in results):
# 將簡體中文名稱轉換為繁體中文
name_traditional = _s2t_converter.convert(item.get("Name", ""))
results.append({
"id": item.get("ID"),
"name": name_traditional,
"icon": "",
"level": item.get("LevelItem", 0),
})
except requests.RequestException as e:
print(f"Cafemaker 搜尋錯誤: {e}")
return {"items": results, "pagination": pagination}
def get_item_info(item_id: int) -> dict:
"""取得物品詳細資訊 - 優先使用 Cafemaker(支援中文).
Args:
item_id: 物品 ID
Returns:
物品資訊字典(名稱已轉換為繁體中文)
"""
# 優先使用 Cafemaker(支援中文)
try:
url = f"{CAFEMAKER_BASE}/item/{item_id}"
response = requests.get(url, timeout=API_TIMEOUT)
if response.status_code == 200:
data = response.json()
# 將簡體名稱轉換為繁體
if data.get("Name"):
data["Name"] = _s2t_converter.convert(data["Name"])
return data
except requests.RequestException as e:
print(f"Cafemaker 取得物品資訊錯誤: {e}")
# 備用:嘗試 XIVAPI
try:
url = f"{XIVAPI_BASE}/item/{item_id}"
params = {"language": "en"}
response = requests.get(url, params=params, timeout=API_TIMEOUT)
if response.status_code == 200:
return response.json()
except requests.RequestException as e:
print(f"XIVAPI 取得物品資訊錯誤: {e}")
# 如果都失敗,返回基本資訊
return {"ID": item_id, "Name": f"物品 {item_id}", "LevelItem": 0}
def get_market_data(item_id: int, world_or_dc: str = None) -> dict:
"""取得市場板數據.
Args:
item_id: 物品 ID
world_or_dc: 伺服器或資料中心名稱
Returns:
市場數據字典
"""
if world_or_dc is None:
world_or_dc = DATA_CENTER
try:
url = f"{UNIVERSALIS_BASE}/{world_or_dc}/{item_id}"
params = {
"listings": 50,
"entries": 50,
}
response = requests.get(url, params=params, timeout=MARKET_API_TIMEOUT)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"取得市場數據錯誤: {e}")
return {}
def get_tax_rates(world: str) -> dict:
"""取得稅率資訊.
Args:
world: 伺服器名稱
Returns:
稅率字典
"""
try:
world_id = WORLD_IDS.get(world)
if not world_id:
return {}
url = f"{UNIVERSALIS_BASE}/tax-rates"
params = {"world": world_id}
response = requests.get(url, params=params, timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"取得稅率錯誤: {e}")
return {}
def get_upload_stats() -> dict:
"""取得上傳統計.
Returns:
上傳統計字典
"""
try:
url = f"{UNIVERSALIS_BASE}/extra/stats/world-upload-counts"
response = requests.get(url, timeout=API_TIMEOUT)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"取得上傳統計錯誤: {e}")
return {}
def get_recently_updated(world_or_dc: str = None, limit: int = 20) -> list:
"""取得最近更新的物品列表.
Args:
world_or_dc: 伺服器或資料中心名稱
limit: 結果數量限制
Returns:
最近更新的物品 ID 列表
"""
if world_or_dc is None:
world_or_dc = DATA_CENTER
# 如果是伺服器名稱,轉換為 world ID
world_id = WORLD_IDS.get(world_or_dc, world_or_dc)
try:
url = f"{UNIVERSALIS_BASE}/extra/stats/recently-updated"
params = {"world": world_id} if isinstance(world_id, int) else {"dcName": world_or_dc}
response = requests.get(url, params=params, timeout=API_TIMEOUT)
response.raise_for_status()
data = response.json()
return data.get("items", [])[:limit]
except requests.RequestException as e:
print(f"取得最近更新錯誤: {e}")
return []
def get_recent_activity(world_or_dc: str = None, limit: int = 15) -> list:
"""取得市場動態(最近更新的物品及其價格資訊).
Args:
world_or_dc: 伺服器或資料中心名稱
limit: 結果數量限制
Returns:
市場動態列表,包含物品名稱、最低價等資訊
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
item_ids = get_recently_updated(world_or_dc, limit)
if not item_ids:
return []
activity_list = []
def fetch_item_data(item_id: int) -> dict:
"""取得單一物品的市場資料."""
item_info = get_item_info(item_id)
market_data = get_market_data(item_id, world_or_dc)
if not market_data:
return None
listings = market_data.get("listings", [])
nq_prices = [l["pricePerUnit"] for l in listings if not l.get("hq")]
hq_prices = [l["pricePerUnit"] for l in listings if l.get("hq")]
# 取得物品名稱並轉換為繁體
name = item_info.get("Name", f"物品 {item_id}")
name_traditional = _s2t_converter.convert(name)
return {
"id": item_id,
"name": name_traditional,
"nq_min": min(nq_prices) if nq_prices else None,
"hq_min": min(hq_prices) if hq_prices else None,
"listing_count": len(listings),
"last_update": market_data.get("lastUploadTime", 0),
}
# 並行請求物品資料
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_item_data, item_id): item_id for item_id in item_ids}
for future in as_completed(futures):
result = future.result()
if result:
activity_list.append(result)
# 按更新時間排序(最新的在前)
activity_list.sort(key=lambda x: x["last_update"], reverse=True)
return activity_list
# ============================================================
# 異步 API 函數 (使用 aiohttp,速度更快)
# ============================================================
async def get_market_data_async(
item_id: int,
world_or_dc: str = None,
session: aiohttp.ClientSession = None,
) -> dict:
"""異步取得市場板數據.
Args:
item_id: 物品 ID
world_or_dc: 伺服器或資料中心名稱
session: 可選的 aiohttp session
Returns:
市場數據字典
"""
if world_or_dc is None:
world_or_dc = DATA_CENTER
url = f"{UNIVERSALIS_BASE}/{world_or_dc}/{item_id}"
params = {"listings": 50, "entries": 50}
close_session = False
if session is None:
session = aiohttp.ClientSession()
close_session = True
try:
async with session.get(
url, params=params, timeout=aiohttp.ClientTimeout(total=MARKET_API_TIMEOUT)
) as response:
if response.status == 200:
return await response.json()
return {}
except Exception as e:
print(f"異步取得市場數據錯誤: {e}")
return {}
finally:
if close_session:
await session.close()
async def get_item_info_async(
item_id: int,
session: aiohttp.ClientSession = None,
) -> dict:
"""異步取得物品詳細資訊.
Args:
item_id: 物品 ID
session: 可選的 aiohttp session
Returns:
物品資訊字典
"""
close_session = False
if session is None:
session = aiohttp.ClientSession()
close_session = True
try:
url = f"{CAFEMAKER_BASE}/item/{item_id}"
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=API_TIMEOUT)
) as response:
if response.status == 200:
data = await response.json()
if data.get("Name"):
data["Name"] = _s2t_converter.convert(data["Name"])
return data
except Exception as e:
print(f"異步取得物品資訊錯誤: {e}")
# 備用:嘗試 XIVAPI
try:
url = f"{XIVAPI_BASE}/item/{item_id}"
params = {"language": "en"}
async with session.get(
url, params=params, timeout=aiohttp.ClientTimeout(total=API_TIMEOUT)
) as response:
if response.status == 200:
return await response.json()
except Exception as e:
print(f"XIVAPI 異步取得物品資訊錯誤: {e}")
finally:
if close_session:
await session.close()
return {"ID": item_id, "Name": f"物品 {item_id}", "LevelItem": 0}
async def get_multi_item_market_data_async(
item_ids: list[int],
world_or_dc: str = None,
) -> dict:
"""異步批量取得多個物品的市場數據.
Args:
item_ids: 物品 ID 列表
world_or_dc: 伺服器或資料中心名稱
Returns:
以物品 ID 為 key 的市場數據字典
"""
if world_or_dc is None:
world_or_dc = DATA_CENTER
if not item_ids:
return {}
# Universalis 支援批量查詢,最多 100 個物品
ids_str = ",".join(str(i) for i in item_ids[:100])
url = f"{UNIVERSALIS_BASE}/{world_or_dc}/{ids_str}"
params = {"listings": 20, "entries": 20}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url, params=params, timeout=aiohttp.ClientTimeout(total=MARKET_API_TIMEOUT)
) as response:
if response.status == 200:
data = await response.json()
# 如果是單個物品,包裝成 items 格式
if "items" not in data and "itemID" in data:
return {data["itemID"]: data}
return data.get("items", {})
except Exception as e:
print(f"批量取得市場數據錯誤: {e}")
return {}
async def get_full_item_data_async(
item_id: int,
world_or_dc: str = None,
) -> dict:
"""異步取得物品的完整資料(物品資訊 + 市場數據).
Args:
item_id: 物品 ID
world_or_dc: 伺服器或資料中心名稱
Returns:
包含 item_info 和 market_data 的字典
"""
async with aiohttp.ClientSession() as session:
# 並行請求物品資訊和市場數據
item_info_task = get_item_info_async(item_id, session)
market_data_task = get_market_data_async(item_id, world_or_dc, session)
item_info, market_data = await asyncio.gather(
item_info_task, market_data_task
)
return {
"item_info": item_info,
"market_data": market_data,
}
def get_market_data_fast(item_id: int, world_or_dc: str = None) -> dict:
"""快速取得市場板數據(使用異步).
這是 get_market_data 的快速版本,適合在 Gradio 中使用。
Args:
item_id: 物品 ID
world_or_dc: 伺服器或資料中心名稱
Returns:
市場數據字典
"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(get_market_data_async(item_id, world_or_dc))
loop.close()
return result
except Exception as e:
print(f"快速取得市場數據錯誤: {e}")
# 回退到同步版本
return get_market_data(item_id, world_or_dc)
def get_full_item_data_fast(item_id: int, world_or_dc: str = None) -> dict:
"""快速取得物品完整資料(使用異步並行請求).
Args:
item_id: 物品 ID
world_or_dc: 伺服器或資料中心名稱
Returns:
包含 item_info 和 market_data 的字典
"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(get_full_item_data_async(item_id, world_or_dc))
loop.close()
return result
except Exception as e:
print(f"快速取得完整資料錯誤: {e}")
# 回退到同步版本
return {
"item_info": get_item_info(item_id),
"market_data": get_market_data(item_id, world_or_dc),
}
# ============================================================
# 配方 API 函數 (Recipe)
# ============================================================
def get_recipe(recipe_id: int) -> dict:
"""取得配方詳細資訊.
Args:
recipe_id: 配方 ID
Returns:
配方資訊字典
"""
try:
url = f"{CAFEMAKER_BASE}/Recipe/{recipe_id}"
response = requests.get(url, timeout=API_TIMEOUT)
if response.status_code == 200:
data = response.json()
# 轉換名稱為繁體
if data.get("Name"):
data["Name"] = _s2t_converter.convert(data["Name"])
# 轉換產出物名稱
if data.get("ItemResult") and data["ItemResult"].get("Name"):
data["ItemResult"]["Name"] = _s2t_converter.convert(
data["ItemResult"]["Name"]
)
# 轉換材料名稱
for i in range(10):
ingredient_key = f"ItemIngredient{i}"
if data.get(ingredient_key) and data[ingredient_key].get("Name"):
data[ingredient_key]["Name"] = _s2t_converter.convert(
data[ingredient_key]["Name"]
)
return data
except requests.RequestException as e:
print(f"取得配方錯誤: {e}")
return {}
def search_recipes(query: str, limit: int = 10) -> list:
"""搜尋配方.
Args:
query: 搜尋關鍵字
limit: 結果數量限制
Returns:
配方列表
"""
if not query:
return []
# 將繁體轉換為簡體(Cafemaker 使用簡體)
query_simplified = _t2s_converter.convert(query.strip())
try:
url = f"{CAFEMAKER_BASE}/search"
params = {
"string": query_simplified,
"indexes": "Recipe",
"limit": limit,
}
response = requests.get(url, params=params, timeout=API_TIMEOUT)
if response.status_code == 200:
data = response.json()
results = []
for item in data.get("Results", []):
results.append({
"id": item.get("ID"),
"name": _s2t_converter.convert(item.get("Name", "")),
"icon": item.get("Icon", ""),
})
return results
except requests.RequestException as e:
print(f"搜尋配方錯誤: {e}")
return []
def get_recipe_by_item_id(item_id: int) -> dict:
"""根據產出物品 ID 查找配方.
Args:
item_id: 產出物品的 ID
Returns:
配方資訊字典,如果沒有配方則返回空字典
"""
# 先取得物品資訊,檢查是否有 Recipes 欄位
item_info = get_item_info(item_id)
# 方法1: 從物品資訊的 Recipes 欄位直接取得配方 ID
recipes_list = item_info.get("Recipes", [])
if recipes_list:
# 取第一個配方(通常物品只有一個配方)
recipe_id = recipes_list[0].get("ID")
if recipe_id:
return get_recipe(recipe_id)
# 方法2: 搜尋配方(備用)
item_name = item_info.get("Name", "")
if not item_name:
return {}
recipes = search_recipes(item_name, limit=20)
# 找到產出物 ID 匹配的配方
for recipe in recipes:
recipe_data = get_recipe(recipe["id"])
if recipe_data.get("ItemResultTargetID") == item_id:
return recipe_data
return {}
async def get_recipe_async(
recipe_id: int,
session: aiohttp.ClientSession = None,
) -> dict:
"""異步取得配方詳細資訊.
Args:
recipe_id: 配方 ID
session: 可選的 aiohttp session
Returns:
配方資訊字典
"""
close_session = False
if session is None:
session = aiohttp.ClientSession()
close_session = True
try:
url = f"{CAFEMAKER_BASE}/Recipe/{recipe_id}"
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=API_TIMEOUT)
) as response:
if response.status == 200:
data = await response.json()
# 轉換名稱為繁體
if data.get("Name"):
data["Name"] = _s2t_converter.convert(data["Name"])
if data.get("ItemResult") and data["ItemResult"].get("Name"):
data["ItemResult"]["Name"] = _s2t_converter.convert(
data["ItemResult"]["Name"]
)
for i in range(10):
ingredient_key = f"ItemIngredient{i}"
if data.get(ingredient_key) and data[ingredient_key].get("Name"):
data[ingredient_key]["Name"] = _s2t_converter.convert(
data[ingredient_key]["Name"]
)
return data
except Exception as e:
print(f"異步取得配方錯誤: {e}")
finally:
if close_session:
await session.close()
return {}