sentimentstream-worker / backend /app /services /game_sync_service.py
GitHub Action
deploy: worker release from GitHub
8ff1b66
"""
Game Sync Service — fetches game data from SteamSpy and upserts to MongoDB.
Replaces the manual scripts/fetch_games_to_mongodb.py with an automated,
rate-limited sync that runs as part of the Worker cycle.
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any
import httpx
from app.core.config import settings
from app.db.mongodb import mongodb
logger = logging.getLogger(__name__)
STEAMSPY_API_URL = "https://steamspy.com/api.php"
STEAM_STORE_API_URL = "https://store.steampowered.com/api"
class GameSyncService:
"""Syncs game data from SteamSpy into MongoDB."""
def __init__(self, client: httpx.AsyncClient | None = None) -> None:
self._client = client
self._owns_client = client is None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def close(self) -> None:
if self._owns_client and self._client is not None:
await self._client.aclose()
self._client = None
async def sync_all_games(self) -> tuple[int, int]:
"""
Fetch all games from SteamSpy (paginated, up to 90 pages).
Returns:
(total_upserted, total_modified)
"""
client = await self._get_client()
total_upserted = 0
total_modified = 0
now = datetime.now(timezone.utc)
for page in range(90):
try:
resp = await client.get(
STEAMSPY_API_URL,
params={"request": "all", "page": page},
)
resp.raise_for_status()
data = resp.json()
if not data:
logger.info(f"SteamSpy page {page} empty — sync complete")
break
games = self._parse_all_response(data, now)
if games:
upserted, modified = await mongodb.upsert_games_batch(games)
total_upserted += upserted
total_modified += modified
logger.info(
f"SteamSpy page {page}: {len(games)} games "
f"(upserted={total_upserted}, modified={total_modified})"
)
except httpx.HTTPStatusError as e:
logger.error(f"SteamSpy HTTP error on page {page}: {e}")
break
except httpx.RequestError as e:
logger.error(f"SteamSpy request error on page {page}: {e}")
break
# Rate limit: SteamSpy allows ~1 request per minute
if page < 89:
await asyncio.sleep(settings.game_sync_steamspy_delay)
logger.info(
f"Game sync complete: upserted={total_upserted}, modified={total_modified}"
)
return (total_upserted, total_modified)
async def sync_top_game_details(self, limit: int | None = None) -> int:
"""
Enrich top N games with detailed info (tags, genre, ccu) from SteamSpy.
Returns:
Number of games enriched.
"""
limit = limit or settings.game_sync_top_n_details
client = await self._get_client()
top_games = await mongodb.get_top_games_by_reviews(limit)
enriched = 0
for game in top_games:
appid = game.get("appid", "")
if not appid:
continue
try:
resp = await client.get(
STEAMSPY_API_URL,
params={"request": "appdetails", "appid": appid},
)
resp.raise_for_status()
detail = resp.json()
update = self._parse_detail_response(detail)
if update:
await mongodb.upsert_game({"appid": appid, "name": game["name"], **update})
enriched += 1
except httpx.HTTPStatusError as e:
logger.warning(f"SteamSpy detail error for {appid}: {e}")
except httpx.RequestError as e:
logger.warning(f"SteamSpy detail request error for {appid}: {e}")
await asyncio.sleep(settings.game_sync_details_delay)
logger.info(f"Enriched {enriched}/{len(top_games)} games with details")
return enriched
async def enrich_cn_names(self, limit: int | None = None) -> int:
"""
Enrich games with Chinese names from Steam Store API.
Returns:
Number of games processed.
"""
limit = limit or settings.game_sync_cn_enrichment_limit
client = await self._get_client()
games_to_check = await mongodb.get_games_without_cn_name(limit)
processed = 0
for game in games_to_check:
appid = game.get("appid")
name_en = game.get("name")
if not appid:
continue
try:
app_data = await self._fetch_store_app_data(client, appid)
if app_data and app_data.get("success"):
info = app_data.get("data", {})
name_cn = info.get("name")
# If names are different, we found a translation
if name_cn and name_cn != name_en:
await mongodb.mark_cn_name_checked(appid, name_cn)
else:
await mongodb.mark_cn_name_checked(appid)
else:
# Not found or error in API - still mark as checked
await mongodb.mark_cn_name_checked(appid)
processed += 1
except httpx.HTTPError as e:
logger.warning(f"Error fetching CN name for {appid}: {e}")
# Don't mark as checked on network error, try again next cycle
# Respect rate limits
await asyncio.sleep(settings.game_sync_cn_enrichment_delay)
logger.info(f"Enriched CN names for {processed}/{len(games_to_check)} games")
return processed
async def enrich_app_types(self, limit: int | None = None) -> int:
"""
Enrich app_type/parent_appid using Steam Store appdetails.
Returns:
Number of games processed.
"""
limit = limit or settings.game_sync_app_type_enrichment_limit
client = await self._get_client()
games_to_check = await mongodb.get_games_missing_app_type(limit)
processed = 0
for game in games_to_check:
appid = game.get("appid")
if not appid:
continue
try:
app_data = await self._fetch_store_app_data(client, appid)
info = app_data.get("data", {}) if app_data and app_data.get("success") else {}
parsed = self._parse_store_type_response(info)
await mongodb.mark_app_type_checked(
appid,
app_type=parsed["app_type"],
parent_appid=parsed["parent_appid"],
)
processed += 1
except httpx.HTTPError as e:
logger.warning(f"Error fetching app type for {appid}: {e}")
await asyncio.sleep(settings.game_sync_app_type_enrichment_delay)
logger.info(f"Enriched app types for {processed}/{len(games_to_check)} games")
return processed
@staticmethod
def _parse_all_response(
data: dict[str, Any], synced_at: datetime
) -> list[dict[str, Any]]:
"""Parse SteamSpy 'all' response into list of game dicts."""
games: list[dict[str, Any]] = []
for appid_str, info in data.items():
name = info.get("name", "")
if not name:
continue
games.append({
"appid": str(appid_str),
"name": name,
"developer": info.get("developer", ""),
"publisher": info.get("publisher", ""),
"positive": info.get("positive", 0),
"negative": info.get("negative", 0),
"synced_at": synced_at,
})
return games
@staticmethod
def _parse_detail_response(detail: dict[str, Any]) -> dict[str, Any]:
"""Parse SteamSpy 'appdetails' response into enrichment fields."""
update: dict[str, Any] = {}
tags = detail.get("tags")
if isinstance(tags, dict) and tags:
# Sort by vote count descending, keep top 20 tag names
sorted_tags = sorted(tags.items(), key=lambda x: x[1], reverse=True)[:20]
update["tags"] = [tag_name for tag_name, _ in sorted_tags]
genre = detail.get("genre")
if genre:
update["genre"] = genre
ccu = detail.get("ccu")
if ccu is not None:
update["ccu"] = ccu
return update
@staticmethod
def _parse_store_type_response(info: dict[str, Any]) -> dict[str, Any]:
app_type = info.get("type") or "unknown"
fullgame = info.get("fullgame")
parent_appid = None
if app_type == "dlc" and isinstance(fullgame, dict) and fullgame.get("appid") is not None:
parent_appid = str(fullgame["appid"])
return {
"app_type": str(app_type),
"parent_appid": parent_appid,
}
@staticmethod
async def _fetch_store_app_data(
client: httpx.AsyncClient, appid: str
) -> dict[str, Any] | None:
"""Fetch one appdetails payload from Steam Store."""
resp = await client.get(
f"{STEAM_STORE_API_URL}/appdetails",
params={
"appids": appid,
"l": "schinese",
"cc": "CN",
},
)
resp.raise_for_status()
data = resp.json()
return data.get(str(appid))