Spaces:
Running
Running
| """ | |
| Serwis do komunikacji ze Steam API. | |
| Odpowiada za pobieranie informacji o grach oraz recenzji. | |
| Wykorzystuje publiczne API Steam (nie wymaga klucza API). | |
| Implementuje statystyczne próbkowanie recenzji (stratified sampling). | |
| Retry z exponential backoff dla 429/5xx/timeout. | |
| """ | |
| import asyncio | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Any, AsyncGenerator | |
| import httpx | |
| from app.core.config import settings | |
| from app.core.sampling import SamplePlan, create_sample_plan | |
| from app.db.mongodb import mongodb | |
| from app.models.schemas import GameInfo, ReviewBatch, ReviewItem | |
| from app.services.steam_errors import SteamAPIError, SteamRateLimitError | |
| logger = logging.getLogger(__name__) | |
| # Status codes that should be retried | |
| _RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504} | |
| class ReviewStats: | |
| """Statystyki recenzji gry.""" | |
| total: int | |
| positive: int | |
| negative: int | |
| class SteamService: | |
| """ | |
| Serwis do pobierania danych ze Steam API. | |
| """ | |
| STORE_API_URL = "https://store.steampowered.com/api" | |
| REVIEW_API_URL = "https://store.steampowered.com/appreviews" | |
| SEARCH_API_URL = "https://store.steampowered.com/api/storesearch" | |
| def __init__(self, timeout: float = 30.0) -> None: | |
| self.timeout = timeout | |
| self.client = httpx.AsyncClient(timeout=self.timeout) | |
| async def close(self) -> None: | |
| """Close the shared HTTP client.""" | |
| await self.client.aclose() | |
| def _review_dedup_key(review_item: ReviewItem) -> str: | |
| """Prefer Steam's stable review id; fall back to timestamp+text for malformed rows.""" | |
| if review_item.recommendation_id: | |
| return f"id:{review_item.recommendation_id}" | |
| return f"fallback:{review_item.timestamp_created}:{review_item.text}" | |
| def _dedupe_review_batch( | |
| self, | |
| review_items: list[ReviewItem], | |
| seen_review_keys: set[str], | |
| ) -> tuple[list[str], list[ReviewItem]]: | |
| deduped_items: list[ReviewItem] = [] | |
| deduped_reviews: list[str] = [] | |
| for review_item in review_items: | |
| dedup_key = self._review_dedup_key(review_item) | |
| if dedup_key in seen_review_keys: | |
| continue | |
| seen_review_keys.add(dedup_key) | |
| deduped_items.append(review_item) | |
| deduped_reviews.append(review_item.text) | |
| return deduped_reviews, deduped_items | |
| async def _request_with_retry( | |
| self, | |
| client: httpx.AsyncClient, | |
| url: str, | |
| params: dict[str, Any], | |
| context: str = "", | |
| ) -> httpx.Response: | |
| """ | |
| Wykonuje request z retry i exponential backoff. | |
| """ | |
| max_attempts = settings.steam_retry_max_attempts | |
| base_delay = settings.steam_retry_base_delay | |
| max_delay = settings.steam_retry_max_delay | |
| last_exception: Exception | None = None | |
| for attempt in range(max_attempts): | |
| try: | |
| response = await client.get(url, params=params) | |
| status = response.status_code | |
| if status == 200: | |
| return response | |
| # Non-retryable client errors | |
| if status == 404: | |
| raise SteamAPIError(404, context, f"Not found: {url}") | |
| if status == 403: | |
| raise SteamAPIError(403, context, f"Forbidden: {url}") | |
| if 400 <= status < 500 and status not in _RETRYABLE_STATUS_CODES: | |
| raise SteamAPIError(status, context, f"Client error {status}: {url}") | |
| # Retryable errors (429, 5xx) | |
| if attempt < max_attempts - 1: | |
| delay = min(base_delay * (2 ** attempt), max_delay) | |
| # Respect Retry-After header for 429 | |
| if status == 429: | |
| retry_after = response.headers.get("Retry-After") | |
| if retry_after: | |
| try: | |
| delay = min(float(retry_after), max_delay) | |
| except ValueError: | |
| pass | |
| logger.warning( | |
| f"Steam API {status} for {context}, " | |
| f"retry {attempt + 1}/{max_attempts - 1} after {delay:.1f}s" | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| # Exhausted retries | |
| if status == 429: | |
| raise SteamRateLimitError(context) | |
| raise SteamAPIError(status, context, f"Server error {status} after {max_attempts} attempts: {url}") | |
| except (httpx.TimeoutException, httpx.ConnectError) as e: | |
| last_exception = e | |
| if attempt < max_attempts - 1: | |
| delay = min(base_delay * (2 ** attempt), max_delay) | |
| logger.warning( | |
| f"Steam API {type(e).__name__} for {context}, " | |
| f"retry {attempt + 1}/{max_attempts - 1} after {delay:.1f}s" | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| raise SteamAPIError( | |
| 0, context, | |
| f"Connection failed after {max_attempts} attempts: {e}" | |
| ) from e | |
| # Should not reach here, but just in case | |
| raise SteamAPIError(0, context, "Unexpected retry exhaustion") from last_exception | |
| async def search_game(self, query: str) -> GameInfo | None: | |
| """Wyszukuje grę po nazwie używając publicznego API wyszukiwarki Steam.""" | |
| client = self.client | |
| search_locales = [ | |
| (settings.steam_region, settings.steam_review_language), | |
| ("US", "english"), | |
| ] | |
| items: list[dict[str, Any]] = [] | |
| seen_locales: set[tuple[str, str]] = set() | |
| for region, language in search_locales: | |
| locale = (region, language) | |
| if locale in seen_locales: | |
| continue | |
| seen_locales.add(locale) | |
| params = { | |
| "term": query, | |
| "l": language, | |
| "cc": region, | |
| } | |
| try: | |
| response = await self._request_with_retry( | |
| client, self.SEARCH_API_URL, params, context=f"search:{query}" | |
| ) | |
| data = response.json() | |
| except (SteamAPIError, SteamRateLimitError) as e: | |
| logger.error(f"Błąd wyszukiwania gry '{query}': {e}") | |
| return None | |
| items = data.get("items", []) | |
| if items: | |
| break | |
| if not items: | |
| logger.warning(f"Nie znaleziono gry: {query}") | |
| return None | |
| first_result = items[0] | |
| app_id = str(first_result.get("id")) | |
| game_info = await self.get_game_info(app_id) | |
| if game_info: | |
| await mongodb.upsert_game({ | |
| "appid": game_info.app_id, | |
| "name": game_info.name, | |
| "name_cn": game_info.name_cn, | |
| "cn_name_checked": True, | |
| "header_image": game_info.header_image, | |
| "total_reviews": game_info.total_reviews | |
| }) | |
| return game_info | |
| async def get_game_info(self, app_id: str) -> GameInfo | None: | |
| """Pobiera szczegółowe metadane gry (obrazek, nazwę) z appdetails.""" | |
| cached_error = await mongodb.get_steam_error(app_id) | |
| if cached_error: | |
| logger.info( | |
| f"Skipping Steam API for app {app_id} — " | |
| f"cached error {cached_error.get('status_code')}" | |
| ) | |
| return None | |
| client = self.client | |
| details_url = f"{self.STORE_API_URL}/appdetails" | |
| async def fetch_localized(lang: str): | |
| try: | |
| params = {"appids": app_id, "l": lang, "cc": settings.steam_region} | |
| resp = await self._request_with_retry( | |
| client, details_url, params, context=app_id | |
| ) | |
| return resp.json().get(app_id, {}) | |
| except SteamAPIError as e: | |
| if e.status_code == 404: | |
| await mongodb.cache_steam_error( | |
| app_id, 404, settings.steam_error_cache_ttl_404 | |
| ) | |
| return {} | |
| data_zh, data_en = await asyncio.gather( | |
| fetch_localized("schinese"), | |
| fetch_localized("english") | |
| ) | |
| if not data_en.get("success") and not data_zh.get("success"): | |
| logger.warning(f"Nie znaleziono szczegółów gry: {app_id}") | |
| return None | |
| base_data = data_en.get("data") or data_zh.get("data") | |
| name_en = data_en.get("data", {}).get("name") or base_data.get("name") | |
| name_zh = data_zh.get("data", {}).get("name") | |
| stats = await self.get_review_stats(app_id) | |
| return GameInfo( | |
| app_id=app_id, | |
| name=name_en, | |
| name_cn=name_zh if name_zh != name_en else None, | |
| header_image=base_data.get("header_image"), | |
| total_reviews=stats.total, | |
| ) | |
| async def get_review_stats(self, app_id: str) -> ReviewStats: | |
| """Pobiera sumaryczne statystyki recenzji potrzebne do planowania próbki.""" | |
| cached_error = await mongodb.get_steam_error(app_id) | |
| if cached_error: | |
| logger.info( | |
| f"Skipping review stats for app {app_id} — " | |
| f"cached error {cached_error.get('status_code')}" | |
| ) | |
| return ReviewStats(total=0, positive=0, negative=0) | |
| client = self.client | |
| url = f"{self.REVIEW_API_URL}/{app_id}" | |
| params = { | |
| "json": "1", | |
| "filter": "all", | |
| "num_per_page": "0", | |
| } | |
| try: | |
| response = await self._request_with_retry( | |
| client, url, params, context=app_id | |
| ) | |
| data = response.json() | |
| summary = data.get("query_summary", {}) | |
| return ReviewStats( | |
| total=summary.get("total_reviews", 0), | |
| positive=summary.get("total_positive", 0), | |
| negative=summary.get("total_negative", 0), | |
| ) | |
| except SteamAPIError as e: | |
| if e.status_code in (404, 429): | |
| ttl = ( | |
| settings.steam_error_cache_ttl_429 | |
| if e.status_code == 429 | |
| else settings.steam_error_cache_ttl_404 | |
| ) | |
| await mongodb.cache_steam_error(app_id, e.status_code, ttl) | |
| logger.error(f"Błąd pobierania statystyk recenzji: {e}") | |
| return ReviewStats(total=0, positive=0, negative=0) | |
| async def _fetch_reviews_batch( | |
| self, | |
| client: httpx.AsyncClient, | |
| app_id: str, | |
| review_type: str, | |
| filter_type: str, | |
| num_per_page: int, | |
| cursor: str | None, | |
| ) -> tuple[list[str], list[ReviewItem], str | None]: | |
| """Pobiera pojedynczą paczkę recenzji (do 100 sztuk).""" | |
| url = f"{self.REVIEW_API_URL}/{app_id}" | |
| params: dict[str, Any] = { | |
| "json": "1", | |
| "filter": filter_type, | |
| "review_type": review_type, | |
| "language": settings.steam_review_language, | |
| "num_per_page": str(num_per_page), | |
| "cursor": cursor or "*", | |
| "purchase_type": "all", | |
| } | |
| try: | |
| response = await self._request_with_retry( | |
| client, url, params, context=app_id | |
| ) | |
| data = response.json() | |
| except SteamRateLimitError: | |
| await mongodb.cache_steam_error( | |
| app_id, 429, settings.steam_error_cache_ttl_429 | |
| ) | |
| logger.error(f"Rate limited fetching reviews for {app_id}") | |
| return [], [], None | |
| except SteamAPIError as e: | |
| logger.error(f"Błąd pobierania recenzji: {e}") | |
| return [], [], None | |
| if not data.get("success"): | |
| return [], [], None | |
| reviews_data = data.get("reviews", []) | |
| review_texts: list[str] = [] | |
| review_items: list[ReviewItem] = [] | |
| for review in reviews_data: | |
| text = review.get("review") | |
| if not text: | |
| continue | |
| author = review.get("author") or {} | |
| review_texts.append(text) | |
| review_items.append(ReviewItem( | |
| text=text, | |
| recommendation_id=str(review.get("recommendationid", "")), | |
| timestamp_created=review.get("timestamp_created", 0), | |
| voted_up=review.get("voted_up"), | |
| playtime_at_review_minutes=author.get("playtime_at_review"), | |
| primarily_steam_deck=review.get("primarily_steam_deck"), | |
| deck_playtime_at_review_minutes=author.get("deck_playtime_at_review"), | |
| author_steamid=author.get("steamid"), | |
| language=review.get("language"), | |
| steam_purchase=review.get("steam_purchase"), | |
| received_for_free=review.get("received_for_free"), | |
| written_during_early_access=review.get("written_during_early_access"), | |
| )) | |
| new_cursor = data.get("cursor") | |
| return review_texts, review_items, new_cursor | |
| async def fetch_reviews_stratified( | |
| self, | |
| app_id: str, | |
| sample_plan: SamplePlan, | |
| ) -> AsyncGenerator[ReviewBatch, None]: | |
| """ | |
| Główna logika pobierania danych. Działa w dwóch fazach. | |
| """ | |
| batch_size = settings.review_batch_size | |
| seen_review_keys: set[str] = set() | |
| seen_cursors: set[str] = set() | |
| client = self.client | |
| # --- FAZA 1: TOP HELPFUL --- | |
| cursor: str | None = "*" | |
| fetched = 0 | |
| while fetched < sample_plan.top_helpful: | |
| to_fetch = min(batch_size, sample_plan.top_helpful - fetched) | |
| reviews, review_items, cursor = await self._fetch_reviews_batch( | |
| client, app_id, "all", "all", to_fetch, cursor | |
| ) | |
| if not reviews: | |
| break | |
| if cursor and cursor in seen_cursors: | |
| logger.warning(f"Repeated cursor {cursor} for {app_id} (top_helpful). Shortfall: {sample_plan.top_helpful - fetched}") | |
| break | |
| if cursor: | |
| seen_cursors.add(cursor) | |
| new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys) | |
| fetched += len(new_reviews) | |
| if new_reviews: | |
| yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor) | |
| if not cursor or cursor == "*": | |
| break | |
| # --- FAZA 2a: RECENT POSITIVE --- | |
| positive_target = sample_plan.positive_count | |
| if positive_target > 0: | |
| cursor = "*" | |
| fetched = 0 | |
| seen_cursors_pos: set[str] = set() | |
| while fetched < positive_target: | |
| to_fetch = min(batch_size, positive_target - fetched) | |
| # Jeśli mamy dużo duplikatów, prosimy o więcej niż pozostało do targetu (ale max batch_size) | |
| if fetched > 0: | |
| to_fetch = batch_size | |
| reviews, review_items, cursor = await self._fetch_reviews_batch( | |
| client, app_id, "positive", "recent", to_fetch, cursor or "*" | |
| ) | |
| if not reviews: | |
| break | |
| if cursor and cursor in seen_cursors_pos: | |
| logger.warning(f"Repeated cursor {cursor} for {app_id} (positive). Shortfall: {positive_target - fetched}") | |
| break | |
| if cursor: | |
| seen_cursors_pos.add(cursor) | |
| new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys) | |
| fetched += len(new_reviews) | |
| if new_reviews: | |
| yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor) | |
| if not cursor or cursor == "*": | |
| break | |
| # --- FAZA 2b: RECENT NEGATIVE --- | |
| negative_target = sample_plan.negative_count | |
| if negative_target > 0: | |
| cursor = "*" | |
| fetched = 0 | |
| seen_cursors_neg: set[str] = set() | |
| while fetched < negative_target: | |
| to_fetch = min(batch_size, negative_target - fetched) | |
| if fetched > 0: | |
| to_fetch = batch_size | |
| reviews, review_items, cursor = await self._fetch_reviews_batch( | |
| client, app_id, "negative", "recent", to_fetch, cursor or "*" | |
| ) | |
| if not reviews: | |
| break | |
| if cursor and cursor in seen_cursors_neg: | |
| logger.warning(f"Repeated cursor {cursor} for {app_id} (negative). Shortfall: {negative_target - fetched}") | |
| break | |
| if cursor: | |
| seen_cursors_neg.add(cursor) | |
| new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys) | |
| fetched += len(new_reviews) | |
| if new_reviews: | |
| yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor) | |
| if not cursor or cursor == "*": | |
| break | |
| logger.info(f"Pobrano łącznie {len(seen_review_keys)} unikalnych recenzji") | |
| async def fetch_recent_reviews( | |
| self, | |
| app_id: str, | |
| exclude_ids: set[str] | None = None, | |
| *, | |
| boundary_ts: int | None = None, | |
| boundary_ids_at_ts: set[str] | None = None, | |
| ) -> list[ReviewItem]: | |
| """ | |
| Fetch recent reviews for incremental analysis. | |
| """ | |
| exclude_ids = exclude_ids or set() | |
| boundary_ids_at_ts = boundary_ids_at_ts or set() | |
| batch_size = settings.review_batch_size | |
| client = self.client | |
| cursor: str | None = "*" | |
| seen_cursors: set[str] = set() | |
| seen_review_ids: set[str] = set() | |
| new_items: list[ReviewItem] = [] | |
| while True: | |
| _, review_items, cursor = await self._fetch_reviews_batch( | |
| client, app_id, "all", "recent", batch_size, cursor | |
| ) | |
| if not review_items: | |
| break | |
| if cursor and cursor in seen_cursors: | |
| logger.warning(f"Repeated cursor {cursor} for {app_id} (recent incremental fetch)") | |
| break | |
| if cursor: | |
| seen_cursors.add(cursor) | |
| if boundary_ts is not None and boundary_ts > 0: | |
| crossed_boundary = False | |
| batch_new: list[ReviewItem] = [] | |
| for review_item in review_items: | |
| review_id = review_item.recommendation_id | |
| if review_id and review_id in seen_review_ids: | |
| continue | |
| if review_id: | |
| seen_review_ids.add(review_id) | |
| if review_item.timestamp_created > boundary_ts: | |
| if review_id not in exclude_ids: | |
| batch_new.append(review_item) | |
| continue | |
| if review_item.timestamp_created == boundary_ts: | |
| if ( | |
| review_id not in exclude_ids | |
| and review_id not in boundary_ids_at_ts | |
| ): | |
| batch_new.append(review_item) | |
| continue | |
| crossed_boundary = True | |
| break | |
| new_items.extend(batch_new) | |
| if crossed_boundary or not cursor or cursor == "*": | |
| break | |
| continue | |
| # Filter out already-known reviews | |
| batch_new = [ri for ri in review_items if ri.recommendation_id not in exclude_ids] | |
| # Early exit: if >80% of batch is known, we've passed the boundary | |
| known_ratio = 1 - (len(batch_new) / len(review_items)) if review_items else 0 | |
| new_items.extend(batch_new) | |
| if known_ratio > 0.8: | |
| logger.info( | |
| f"Early exit for {app_id}: {known_ratio:.0%} of batch already known" | |
| ) | |
| break | |
| if not cursor or cursor == "*": | |
| break | |
| logger.info(f"Incremental fetch for {app_id}: {len(new_items)} new reviews") | |
| return new_items | |
| async def fetch_recent_reviews_since( | |
| self, | |
| app_id: str, | |
| since_timestamp: int, | |
| ) -> list[ReviewItem]: | |
| """ | |
| Fetch all recent reviews whose timestamp falls inside the requested time window. | |
| Steam's `filter=recent` endpoint is ordered newest-first, so once a batch crosses | |
| the cutoff we can stop paging. | |
| """ | |
| batch_size = settings.review_batch_size | |
| client = self.client | |
| cursor: str | None = "*" | |
| seen_cursors: set[str] = set() | |
| seen_review_ids: set[str] = set() | |
| window_items: list[ReviewItem] = [] | |
| while True: | |
| _, review_items, cursor = await self._fetch_reviews_batch( | |
| client, app_id, "all", "recent", batch_size, cursor | |
| ) | |
| if not review_items: | |
| break | |
| if cursor and cursor in seen_cursors: | |
| logger.warning(f"Repeated cursor {cursor} for {app_id} (recent window fetch)") | |
| break | |
| if cursor: | |
| seen_cursors.add(cursor) | |
| crossed_window_boundary = False | |
| for item in review_items: | |
| review_id = item.recommendation_id | |
| if review_id and review_id in seen_review_ids: | |
| continue | |
| if review_id: | |
| seen_review_ids.add(review_id) | |
| if item.timestamp_created >= since_timestamp: | |
| window_items.append(item) | |
| else: | |
| crossed_window_boundary = True | |
| if crossed_window_boundary or not cursor or cursor == "*": | |
| break | |
| logger.info(f"Recent-window fetch for {app_id}: {len(window_items)} reviews since {since_timestamp}") | |
| return window_items | |
| async def fetch_reviews( | |
| self, | |
| app_id: str, | |
| batch_size: int | None = None, | |
| max_reviews: int | None = None, | |
| ) -> AsyncGenerator[ReviewBatch, None]: | |
| """Wrapper dla zachowania kompatybilności.""" | |
| stats = await self.get_review_stats(app_id) | |
| if stats.total == 0: | |
| return | |
| sample_plan = create_sample_plan(stats.total, stats.positive, stats.negative) | |
| async for batch in self.fetch_reviews_stratified(app_id, sample_plan): | |
| yield batch | |
| # Globalna instancja serwisu (Singleton) | |
| steam_service = SteamService() | |