Spaces:
Running
Running
| """ | |
| Game Sync Service — fetches game data from SteamSpy and upserts to MongoDB. | |
| Replaces the manual scripts/fetch_games_to_mongodb.py with an automated, | |
| rate-limited sync that runs as part of the Worker cycle. | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| import httpx | |
| from app.core.config import settings | |
| from app.db.mongodb import mongodb | |
| logger = logging.getLogger(__name__) | |
| STEAMSPY_API_URL = "https://steamspy.com/api.php" | |
| STEAM_STORE_API_URL = "https://store.steampowered.com/api" | |
| class GameSyncService: | |
| """Syncs game data from SteamSpy into MongoDB.""" | |
| def __init__(self, client: httpx.AsyncClient | None = None) -> None: | |
| self._client = client | |
| self._owns_client = client is None | |
| async def _get_client(self) -> httpx.AsyncClient: | |
| if self._client is None: | |
| self._client = httpx.AsyncClient(timeout=30.0) | |
| return self._client | |
| async def close(self) -> None: | |
| if self._owns_client and self._client is not None: | |
| await self._client.aclose() | |
| self._client = None | |
| async def sync_all_games(self) -> tuple[int, int]: | |
| """ | |
| Fetch all games from SteamSpy (paginated, up to 90 pages). | |
| Returns: | |
| (total_upserted, total_modified) | |
| """ | |
| client = await self._get_client() | |
| total_upserted = 0 | |
| total_modified = 0 | |
| now = datetime.now(timezone.utc) | |
| for page in range(90): | |
| try: | |
| resp = await client.get( | |
| STEAMSPY_API_URL, | |
| params={"request": "all", "page": page}, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if not data: | |
| logger.info(f"SteamSpy page {page} empty — sync complete") | |
| break | |
| games = self._parse_all_response(data, now) | |
| if games: | |
| upserted, modified = await mongodb.upsert_games_batch(games) | |
| total_upserted += upserted | |
| total_modified += modified | |
| logger.info( | |
| f"SteamSpy page {page}: {len(games)} games " | |
| f"(upserted={total_upserted}, modified={total_modified})" | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"SteamSpy HTTP error on page {page}: {e}") | |
| break | |
| except httpx.RequestError as e: | |
| logger.error(f"SteamSpy request error on page {page}: {e}") | |
| break | |
| # Rate limit: SteamSpy allows ~1 request per minute | |
| if page < 89: | |
| await asyncio.sleep(settings.game_sync_steamspy_delay) | |
| logger.info( | |
| f"Game sync complete: upserted={total_upserted}, modified={total_modified}" | |
| ) | |
| return (total_upserted, total_modified) | |
| async def sync_top_game_details(self, limit: int | None = None) -> int: | |
| """ | |
| Enrich top N games with detailed info (tags, genre, ccu) from SteamSpy. | |
| Returns: | |
| Number of games enriched. | |
| """ | |
| limit = limit or settings.game_sync_top_n_details | |
| client = await self._get_client() | |
| top_games = await mongodb.get_top_games_by_reviews(limit) | |
| enriched = 0 | |
| for game in top_games: | |
| appid = game.get("appid", "") | |
| if not appid: | |
| continue | |
| try: | |
| resp = await client.get( | |
| STEAMSPY_API_URL, | |
| params={"request": "appdetails", "appid": appid}, | |
| ) | |
| resp.raise_for_status() | |
| detail = resp.json() | |
| update = self._parse_detail_response(detail) | |
| if update: | |
| await mongodb.upsert_game({"appid": appid, "name": game["name"], **update}) | |
| enriched += 1 | |
| except httpx.HTTPStatusError as e: | |
| logger.warning(f"SteamSpy detail error for {appid}: {e}") | |
| except httpx.RequestError as e: | |
| logger.warning(f"SteamSpy detail request error for {appid}: {e}") | |
| await asyncio.sleep(settings.game_sync_details_delay) | |
| logger.info(f"Enriched {enriched}/{len(top_games)} games with details") | |
| return enriched | |
| async def enrich_cn_names(self, limit: int | None = None) -> int: | |
| """ | |
| Enrich games with Chinese names from Steam Store API. | |
| Returns: | |
| Number of games processed. | |
| """ | |
| limit = limit or settings.game_sync_cn_enrichment_limit | |
| client = await self._get_client() | |
| games_to_check = await mongodb.get_games_without_cn_name(limit) | |
| processed = 0 | |
| for game in games_to_check: | |
| appid = game.get("appid") | |
| name_en = game.get("name") | |
| if not appid: | |
| continue | |
| try: | |
| app_data = await self._fetch_store_app_data(client, appid) | |
| if app_data and app_data.get("success"): | |
| info = app_data.get("data", {}) | |
| name_cn = info.get("name") | |
| # If names are different, we found a translation | |
| if name_cn and name_cn != name_en: | |
| await mongodb.mark_cn_name_checked(appid, name_cn) | |
| else: | |
| await mongodb.mark_cn_name_checked(appid) | |
| else: | |
| # Not found or error in API - still mark as checked | |
| await mongodb.mark_cn_name_checked(appid) | |
| processed += 1 | |
| except httpx.HTTPError as e: | |
| logger.warning(f"Error fetching CN name for {appid}: {e}") | |
| # Don't mark as checked on network error, try again next cycle | |
| # Respect rate limits | |
| await asyncio.sleep(settings.game_sync_cn_enrichment_delay) | |
| logger.info(f"Enriched CN names for {processed}/{len(games_to_check)} games") | |
| return processed | |
| async def enrich_app_types(self, limit: int | None = None) -> int: | |
| """ | |
| Enrich app_type/parent_appid using Steam Store appdetails. | |
| Returns: | |
| Number of games processed. | |
| """ | |
| limit = limit or settings.game_sync_app_type_enrichment_limit | |
| client = await self._get_client() | |
| games_to_check = await mongodb.get_games_missing_app_type(limit) | |
| processed = 0 | |
| for game in games_to_check: | |
| appid = game.get("appid") | |
| if not appid: | |
| continue | |
| try: | |
| app_data = await self._fetch_store_app_data(client, appid) | |
| info = app_data.get("data", {}) if app_data and app_data.get("success") else {} | |
| parsed = self._parse_store_type_response(info) | |
| await mongodb.mark_app_type_checked( | |
| appid, | |
| app_type=parsed["app_type"], | |
| parent_appid=parsed["parent_appid"], | |
| ) | |
| processed += 1 | |
| except httpx.HTTPError as e: | |
| logger.warning(f"Error fetching app type for {appid}: {e}") | |
| await asyncio.sleep(settings.game_sync_app_type_enrichment_delay) | |
| logger.info(f"Enriched app types for {processed}/{len(games_to_check)} games") | |
| return processed | |
| def _parse_all_response( | |
| data: dict[str, Any], synced_at: datetime | |
| ) -> list[dict[str, Any]]: | |
| """Parse SteamSpy 'all' response into list of game dicts.""" | |
| games: list[dict[str, Any]] = [] | |
| for appid_str, info in data.items(): | |
| name = info.get("name", "") | |
| if not name: | |
| continue | |
| games.append({ | |
| "appid": str(appid_str), | |
| "name": name, | |
| "developer": info.get("developer", ""), | |
| "publisher": info.get("publisher", ""), | |
| "positive": info.get("positive", 0), | |
| "negative": info.get("negative", 0), | |
| "synced_at": synced_at, | |
| }) | |
| return games | |
| def _parse_detail_response(detail: dict[str, Any]) -> dict[str, Any]: | |
| """Parse SteamSpy 'appdetails' response into enrichment fields.""" | |
| update: dict[str, Any] = {} | |
| tags = detail.get("tags") | |
| if isinstance(tags, dict) and tags: | |
| # Sort by vote count descending, keep top 20 tag names | |
| sorted_tags = sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20] | |
| update["tags"] = [tag_name for tag_name, _ in sorted_tags] | |
| genre = detail.get("genre") | |
| if genre: | |
| update["genre"] = genre | |
| ccu = detail.get("ccu") | |
| if ccu is not None: | |
| update["ccu"] = ccu | |
| return update | |
| def _parse_store_type_response(info: dict[str, Any]) -> dict[str, Any]: | |
| app_type = info.get("type") or "unknown" | |
| fullgame = info.get("fullgame") | |
| parent_appid = None | |
| if app_type == "dlc" and isinstance(fullgame, dict) and fullgame.get("appid") is not None: | |
| parent_appid = str(fullgame["appid"]) | |
| return { | |
| "app_type": str(app_type), | |
| "parent_appid": parent_appid, | |
| } | |
| async def _fetch_store_app_data( | |
| client: httpx.AsyncClient, appid: str | |
| ) -> dict[str, Any] | None: | |
| """Fetch one appdetails payload from Steam Store.""" | |
| resp = await client.get( | |
| f"{STEAM_STORE_API_URL}/appdetails", | |
| params={ | |
| "appids": appid, | |
| "l": "schinese", | |
| "cc": "CN", | |
| }, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return data.get(str(appid)) | |