Spaces:
Running
Running
| """ | |
| Moduł połączenia z bazą danych MongoDB. | |
| Wykorzystuje Motor (async driver) do asynchronicznej komunikacji z MongoDB. | |
| Implementuje cache wyników analizy z TTL 24h. | |
| """ | |
| import asyncio | |
| import logging | |
| import re | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| from bson.codec_options import CodecOptions | |
| from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase | |
| from pymongo import ASCENDING, DESCENDING, UpdateOne | |
| from pymongo.errors import ( | |
| BulkWriteError, | |
| ConnectionFailure, | |
| OperationFailure, | |
| PyMongoError, | |
| ) | |
| from app.core.config import settings | |
| logger = logging.getLogger(__name__) | |
| class MongoDB: | |
| """ | |
| Klasa zarządzająca połączeniem z MongoDB. | |
| Implementuje wzorzec Singleton poprzez globalną instancję. | |
| Obsługuje cache wyników analizy z automatyczną walidacją TTL. | |
| Przechowuje listę gier Steam do autouzupełniania. | |
| Attributes: | |
| client: Klient MongoDB (Motor). | |
| db: Referencja do bazy danych. | |
| """ | |
| COLLECTION_ANALYSES = "analyses" | |
| COLLECTION_GAMES = "games" | |
| COLLECTION_STEAM_ERRORS = "steam_errors" | |
| COLLECTION_REFRESH_SCHEDULES = "refresh_schedules" | |
| def __init__(self) -> None: | |
| """Inicjalizuje instancję bez aktywnego połączenia.""" | |
| self.client: AsyncIOMotorClient | None = None # type: ignore | |
| self.db: AsyncIOMotorDatabase | None = None # type: ignore | |
| async def connect(self, max_retries: int = 3) -> None: | |
| """ | |
| Nawiązuje połączenie z MongoDB z exponential backoff. | |
| Tworzy indeksy dla optymalnej wydajności zapytań. | |
| Args: | |
| max_retries: Maksymalna liczba prób połączenia. | |
| Raises: | |
| ConnectionError: Gdy nie można połączyć się z bazą po wszystkich próbach. | |
| """ | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| self.client = AsyncIOMotorClient(settings.mongodb_url, tz_aware=True) | |
| codec_options: CodecOptions = CodecOptions(tz_aware=True) | |
| self.db = self.client.get_database( | |
| settings.mongodb_db_name, codec_options=codec_options | |
| ) | |
| # Weryfikacja połączenia | |
| await self.client.admin.command("ping") | |
| logger.info(f"Połączono z MongoDB: {settings.mongodb_db_name}") | |
| # Utwórz indeksy | |
| await self._create_indexes() | |
| return | |
| except (ConnectionFailure, PyMongoError) as e: | |
| if attempt < max_retries: | |
| delay = 2 ** (attempt - 1) # 1s, 2s, 4s | |
| logger.warning( | |
| f"MongoDB connection attempt {attempt}/{max_retries} failed: {e}. " | |
| f"Retrying in {delay}s..." | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| logger.error(f"MongoDB connection failed after {max_retries} attempts: {e}") | |
| raise ConnectionError( | |
| f"Nie można połączyć się z MongoDB po {max_retries} próbach: {e}" | |
| ) | |
| async def _create_indexes(self) -> None: | |
| """Tworzy indeksy dla kolekcji.""" | |
| if self.db is None: | |
| return | |
| # Indeksy dla analiz | |
| analyses = self.db[self.COLLECTION_ANALYSES] | |
| await analyses.create_index("game_id", unique=True) | |
| # Migrate from old global TTL index (cached_at) to per-document TTL (expires_at) | |
| try: | |
| existing_indexes = await analyses.index_information() | |
| for idx_name, idx_info in existing_indexes.items(): | |
| if idx_info.get("expireAfterSeconds") is not None and "cached_at" in str(idx_info.get("key")): | |
| await analyses.drop_index(idx_name) | |
| logger.info(f"Dropped old TTL index: {idx_name}") | |
| break | |
| except OperationFailure: | |
| pass # Old index may not exist | |
| await analyses.create_index("expires_at", expireAfterSeconds=0) | |
| # Indeksy dla listy gier | |
| games = self.db[self.COLLECTION_GAMES] | |
| await games.create_index("appid", unique=True) | |
| # Indeks dla wyszukiwania regex (case-insensitive) | |
| await games.create_index("name_lower") | |
| await games.create_index("name_cn") | |
| # Rzadki indeks dla flagi sprawdzenia (oszczędność miejsca, szybkość zapytania) | |
| await games.create_index("cn_name_checked", sparse=True) | |
| await games.create_index("parent_appid", sparse=True) | |
| # Compound index for sorting games by review count (worker game sync) | |
| await games.create_index( | |
| [("positive", DESCENDING), ("negative", DESCENDING)], | |
| sparse=True, | |
| ) | |
| await games.create_index( | |
| [ | |
| ("name_lower", ASCENDING), | |
| ("app_type", ASCENDING), | |
| ("positive", DESCENDING), | |
| ("negative", DESCENDING), | |
| ] | |
| ) | |
| await games.create_index("is_priority", sparse=True) | |
| # Indeksy dla cache błędów Steam API | |
| steam_errors = self.db[self.COLLECTION_STEAM_ERRORS] | |
| await steam_errors.create_index("app_id", unique=True) | |
| await steam_errors.create_index("expires_at", expireAfterSeconds=0) | |
| # Indexes for refresh schedules (worker pre-cache) | |
| schedules = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| await schedules.create_index("app_id", unique=True) | |
| await schedules.create_index("status") | |
| logger.debug("Utworzono indeksy MongoDB") | |
| async def disconnect(self) -> None: | |
| """Zamyka połączenie z MongoDB.""" | |
| if self.client: | |
| self.client.close() | |
| logger.info("Rozłączono z MongoDB") | |
| def _is_document_expired(self, document: dict[str, Any]) -> bool: | |
| """Check if a cache document is expired using expires_at or cached_at fallback. | |
| With tz_aware=True on the Motor client, all datetimes from MongoDB are | |
| already timezone-aware, so no manual .replace(tzinfo=...) is needed. | |
| """ | |
| now = datetime.now(timezone.utc) | |
| # New-format: per-document expires_at | |
| expires_at = document.get("expires_at") | |
| if expires_at: | |
| if isinstance(expires_at, str): | |
| expires_at = datetime.fromisoformat(expires_at) | |
| return now >= expires_at | |
| # Old-format fallback: cached_at + default TTL | |
| cached_at = document.get("cached_at") | |
| if cached_at: | |
| if isinstance(cached_at, str): | |
| cached_at = datetime.fromisoformat(cached_at) | |
| ttl_hours = document.get("ttl_hours", settings.cache_ttl_hours) | |
| return now - cached_at > timedelta(hours=ttl_hours) | |
| return True # No timestamp info = treat as expired | |
| async def get_cached_analysis_full(self, game_id: str) -> dict[str, Any] | None: | |
| """ | |
| Returns full cache document (with review IDs, TTL info) or None if expired/missing. | |
| """ | |
| if self.db is None: | |
| return None | |
| collection = self.db[self.COLLECTION_ANALYSES] | |
| try: | |
| document = await collection.find_one({"game_id": game_id}) | |
| if not document: | |
| return None | |
| if self._is_document_expired(document): | |
| logger.info(f"Cache expired for game {game_id}") | |
| return None | |
| document.pop("_id", None) | |
| return document | |
| except PyMongoError as e: | |
| logger.error(f"Error reading cache: {e}") | |
| return None | |
| async def get_stale_analysis(self, game_id: str) -> dict[str, Any] | None: | |
| """ | |
| Returns cache document even if expired. Used by incremental path | |
| to retrieve old review IDs. Returns None only if no document exists. | |
| """ | |
| return await self.get_analysis(game_id) | |
| async def get_analysis(self, game_id: str) -> dict[str, Any] | None: | |
| """ | |
| Returns an analysis document regardless of TTL. | |
| Product freshness is evaluated outside MongoDB, so this method is the | |
| canonical read path for "show stale result + refresh" behavior. | |
| """ | |
| if self.db is None: | |
| return None | |
| collection = self.db[self.COLLECTION_ANALYSES] | |
| try: | |
| document = await collection.find_one({"game_id": game_id}) | |
| if not document: | |
| return None | |
| document.pop("_id", None) | |
| return document | |
| except PyMongoError as e: | |
| logger.error(f"Error reading stale cache: {e}") | |
| return None | |
| async def get_cached_analysis(self, game_id: str) -> dict[str, Any] | None: | |
| """ | |
| Returns cached analysis results or None if expired/missing. | |
| Backward-compatible wrapper around get_cached_analysis_full. | |
| """ | |
| doc = await self.get_cached_analysis_full(game_id) | |
| if doc is None: | |
| return None | |
| results = doc.get("results") | |
| if isinstance(results, dict) and results.get("cached_at") is None and doc.get("cached_at") is not None: | |
| results = {**results, "cached_at": doc["cached_at"]} | |
| return results | |
| async def save_analysis( | |
| self, | |
| game_id: str, | |
| results: dict[str, Any], | |
| analyzed_review_ids: list[str] | None = None, | |
| latest_review_timestamp: int = 0, | |
| ttl_hours: int | None = None, | |
| analyzed_at: datetime | None = None, | |
| ) -> None: | |
| """ | |
| Saves analysis results to cache with per-document TTL. | |
| Purges review IDs to keep only the most recent ones (space efficiency). | |
| """ | |
| if self.db is None: | |
| logger.warning("Brak połączenia z MongoDB - nie zapisano cache") | |
| return | |
| collection = self.db[self.COLLECTION_ANALYSES] | |
| effective_ttl = ttl_hours or settings.cache_ttl_hours | |
| now = datetime.now(timezone.utc) | |
| analysis_date = analyzed_at | |
| if analysis_date is None: | |
| raw_value = results.get("analysis_date") or results.get("cached_at") | |
| if isinstance(raw_value, str): | |
| analysis_date = datetime.fromisoformat(raw_value) | |
| elif isinstance(raw_value, datetime): | |
| analysis_date = raw_value | |
| if analysis_date is None: | |
| analysis_date = now | |
| if results.get("analysis_date") is None: | |
| results = {**results, "analysis_date": analysis_date} | |
| # Purge old IDs — keep only the most recent N | |
| if analyzed_review_ids: | |
| analyzed_review_ids = analyzed_review_ids[-settings.incremental_max_stored_ids:] | |
| document: dict[str, Any] = { | |
| "game_id": game_id, | |
| "results": results, | |
| "analyzed_review_ids": analyzed_review_ids or [], | |
| "latest_review_timestamp": latest_review_timestamp, | |
| "cached_at": now, | |
| "analyzed_at": analysis_date, | |
| "ttl_hours": effective_ttl, | |
| "expires_at": now + timedelta(hours=effective_ttl), | |
| } | |
| try: | |
| await collection.update_one( | |
| {"game_id": game_id}, | |
| {"$set": document}, | |
| upsert=True, | |
| ) | |
| logger.info(f"Saved cache for game {game_id} (TTL: {effective_ttl}h)") | |
| except PyMongoError as e: | |
| logger.error(f"Error saving cache: {e}") | |
| async def delete_cached_analysis(self, game_id: str) -> bool: | |
| """ | |
| Usuwa cache dla danej gry. | |
| Args: | |
| game_id: Identyfikator gry Steam. | |
| Returns: | |
| True jeśli usunięto, False w przeciwnym razie. | |
| """ | |
| if self.db is None: | |
| return False | |
| collection = self.db[self.COLLECTION_ANALYSES] | |
| try: | |
| result = await collection.delete_one({"game_id": game_id}) | |
| return result.deleted_count > 0 | |
| except PyMongoError as e: | |
| logger.error(f"Błąd usuwania cache: {e}") | |
| return False | |
| # ========== Steam API Error Cache ========== | |
| async def get_steam_error(self, app_id: str) -> dict[str, Any] | None: | |
| """ | |
| Sprawdza czy app_id ma cached error. | |
| Returns: | |
| Dict z polami app_id, status_code, expires_at lub None. | |
| """ | |
| if self.db is None: | |
| return None | |
| collection = self.db[self.COLLECTION_STEAM_ERRORS] | |
| try: | |
| document = await collection.find_one({"app_id": app_id}) | |
| if not document: | |
| return None | |
| document.pop("_id", None) | |
| return document | |
| except PyMongoError as e: | |
| logger.error(f"Błąd odczytu steam error cache: {e}") | |
| return None | |
| async def cache_steam_error( | |
| self, app_id: str, status_code: int, ttl_seconds: int | |
| ) -> None: | |
| """ | |
| Cachuje błąd Steam API z automatycznym TTL. | |
| MongoDB TTL index automatycznie usunie dokument po expires_at. | |
| """ | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_STEAM_ERRORS] | |
| document = { | |
| "app_id": app_id, | |
| "status_code": status_code, | |
| "cached_at": datetime.now(timezone.utc), | |
| "expires_at": datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds), | |
| } | |
| try: | |
| await collection.update_one( | |
| {"app_id": app_id}, | |
| {"$set": document}, | |
| upsert=True, | |
| ) | |
| logger.info( | |
| f"Cached Steam error {status_code} for app {app_id} (TTL: {ttl_seconds}s)" | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Błąd zapisu steam error cache: {e}") | |
| # ========== Metody dla listy gier (autouzupełnianie) ========== | |
| async def get_games_count(self) -> int: | |
| """Zwraca liczbę gier w bazie.""" | |
| if self.db is None: | |
| return 0 | |
| collection = self.db[self.COLLECTION_GAMES] | |
| return await collection.count_documents({}) | |
| async def save_games_batch(self, games: list[dict[str, str]]) -> int: | |
| """ | |
| Zapisuje partię gier do bazy (bulk insert). | |
| Args: | |
| games: Lista słowników z kluczami 'appid', 'name', opcjonalnie 'developer', 'publisher'. | |
| Returns: | |
| Liczba zapisanych gier. | |
| """ | |
| if self.db is None or not games: | |
| return 0 | |
| collection = self.db[self.COLLECTION_GAMES] | |
| # Dodaj pole name_lower dla wyszukiwania case-insensitive | |
| documents = [] | |
| for game in games: | |
| if not game.get("name"): | |
| continue | |
| doc = { | |
| "appid": game["appid"], | |
| "name": game["name"], | |
| "name_lower": game["name"].lower(), | |
| } | |
| # Dodaj opcjonalne pola | |
| if game.get("developer"): | |
| doc["developer"] = game["developer"] | |
| if game.get("publisher"): | |
| doc["publisher"] = game["publisher"] | |
| documents.append(doc) | |
| try: | |
| # Użyj ordered=False żeby kontynuować mimo duplikatów | |
| result = await collection.insert_many(documents, ordered=False) | |
| return len(result.inserted_ids) | |
| except BulkWriteError as e: | |
| # Duplicates are expected with ordered=False — count successful inserts | |
| inserted = e.details.get("nInserted", 0) | |
| logger.debug(f"Pominięto duplikaty podczas zapisu gier ({inserted} inserted)") | |
| return inserted | |
| except PyMongoError as e: | |
| logger.error(f"Błąd zapisu gier: {e}") | |
| return 0 | |
| async def clear_games(self) -> None: | |
| """Usuwa wszystkie gry z bazy.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| await collection.delete_many({}) | |
| logger.info("Usunięto wszystkie gry z bazy") | |
| async def upsert_game(self, game_data: dict[str, Any]) -> None: | |
| """ | |
| Dodaje lub aktualizuje pojedynczą grę w bazie danych. | |
| Używane głównie przez mechanizm Fallback Search. | |
| """ | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| appid = str(game_data["appid"]) | |
| # Przygotuj dokument | |
| update_doc = { | |
| "appid": appid, | |
| "name": game_data["name"], | |
| "name_lower": game_data["name"].lower(), | |
| } | |
| if game_data.get("name_cn"): | |
| update_doc["name_cn"] = game_data["name_cn"] | |
| update_doc["cn_name_checked"] = True | |
| elif game_data.get("cn_name_checked"): | |
| update_doc["cn_name_checked"] = True | |
| if game_data.get("header_image") is not None: | |
| update_doc["header_image"] = game_data["header_image"] | |
| if game_data.get("total_reviews") is not None: | |
| update_doc["total_reviews"] = game_data["total_reviews"] | |
| # Worker-supplied fields | |
| for field in ( | |
| "positive", "negative", "tags", "genre", "ccu", | |
| "last_game_update_at", "synced_at", "developer", "publisher", | |
| "app_type", "parent_appid", "dlc_checked_at", | |
| ): | |
| if game_data.get(field) is not None: | |
| update_doc[field] = game_data[field] | |
| try: | |
| await collection.update_one( | |
| {"appid": appid}, | |
| {"$set": update_doc}, | |
| upsert=True | |
| ) | |
| logger.debug(f"Zsynchronizowano grę {appid} w MongoDB") | |
| except PyMongoError as e: | |
| logger.error(f"Błąd upsert gry {appid}: {e}") | |
| async def search_games(self, query: str, limit: int = 10) -> list[dict[str, Any]]: | |
| """ | |
| Wyszukuje gry po nazwie (EN lub CN). | |
| Używa wyszukiwania case-insensitive z prefiksem. | |
| Args: | |
| query: Tekst do wyszukania. | |
| limit: Maksymalna liczba wyników. | |
| Returns: | |
| Lista gier pasujących do zapytania (appid, name, name_cn, developer, publisher). | |
| """ | |
| normalized_query = query.strip() | |
| if self.db is None or not normalized_query or len(normalized_query) < 2: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| query_lower = normalized_query.lower() | |
| name_pattern = re.escape(query_lower) | |
| name_prefix_pattern = f"^{name_pattern}" | |
| name_exact_pattern = f"^{name_pattern}$" | |
| cn_pattern = re.escape(normalized_query) | |
| cn_prefix_pattern = f"^{cn_pattern}" | |
| cn_exact_pattern = f"^{cn_pattern}$" | |
| match_filter: dict[str, Any] = { | |
| "$or": [ | |
| {"name_lower": {"$regex": name_pattern}}, | |
| {"name_cn": {"$regex": cn_pattern, "$options": "i"}}, | |
| ] | |
| } | |
| if not settings.dlc_visible_in_search: | |
| match_filter["app_type"] = {"$ne": "dlc"} | |
| pipeline = [ | |
| {"$match": match_filter}, | |
| { | |
| "$addFields": { | |
| "match_rank": { | |
| "$switch": { | |
| "branches": [ | |
| { | |
| "case": { | |
| "$or": [ | |
| { | |
| "$regexMatch": { | |
| "input": {"$ifNull": ["$name_lower", ""]}, | |
| "regex": name_exact_pattern, | |
| } | |
| }, | |
| { | |
| "$regexMatch": { | |
| "input": {"$ifNull": ["$name_cn", ""]}, | |
| "regex": cn_exact_pattern, | |
| "options": "i", | |
| } | |
| }, | |
| ] | |
| }, | |
| "then": 0, | |
| }, | |
| { | |
| "case": { | |
| "$or": [ | |
| { | |
| "$regexMatch": { | |
| "input": {"$ifNull": ["$name_lower", ""]}, | |
| "regex": name_prefix_pattern, | |
| } | |
| }, | |
| { | |
| "$regexMatch": { | |
| "input": {"$ifNull": ["$name_cn", ""]}, | |
| "regex": cn_prefix_pattern, | |
| "options": "i", | |
| } | |
| }, | |
| ] | |
| }, | |
| "then": 1, | |
| }, | |
| ], | |
| "default": 2, | |
| } | |
| }, | |
| "type_rank": { | |
| "$switch": { | |
| "branches": [ | |
| { | |
| "case": { | |
| "$in": [ | |
| {"$ifNull": ["$app_type", "unknown"]}, | |
| ["game", "unknown"], | |
| ] | |
| }, | |
| "then": 0, | |
| }, | |
| {"case": {"$eq": ["$app_type", "dlc"]}, "then": 1}, | |
| {"case": {"$eq": ["$app_type", "demo"]}, "then": 2}, | |
| ], | |
| "default": 1, | |
| } | |
| }, | |
| "review_count": { | |
| "$add": [ | |
| {"$ifNull": ["$positive", 0]}, | |
| {"$ifNull": ["$negative", 0]}, | |
| ] | |
| }, | |
| } | |
| }, | |
| { | |
| "$sort": { | |
| "match_rank": 1, | |
| "type_rank": 1, | |
| "review_count": -1, | |
| "name": 1, | |
| } | |
| }, | |
| {"$limit": limit}, | |
| { | |
| "$project": { | |
| "_id": 0, | |
| "appid": 1, | |
| "name": 1, | |
| "name_cn": 1, | |
| "developer": 1, | |
| "publisher": 1, | |
| "app_type": 1, | |
| "parent_appid": 1, | |
| } | |
| }, | |
| ] | |
| cursor = collection.aggregate(pipeline) | |
| results = await cursor.to_list(length=limit) | |
| return results | |
| except PyMongoError as e: | |
| logger.error(f"Błąd wyszukiwania gier: {e}") | |
| return [] | |
| async def get_game_update_date(self, app_id: str) -> datetime | None: | |
| """Get the last game update timestamp for a game.""" | |
| if self.db is None: | |
| return None | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| doc = await collection.find_one( | |
| {"appid": str(app_id)}, | |
| {"_id": 0, "last_game_update_at": 1}, | |
| ) | |
| if doc and doc.get("last_game_update_at"): | |
| val = doc["last_game_update_at"] | |
| if isinstance(val, datetime): | |
| return val | |
| return None | |
| return None | |
| except PyMongoError as e: | |
| logger.error(f"Error getting game update date for {app_id}: {e}") | |
| return None | |
| async def get_games_without_cn_name(self, limit: int = 200) -> list[dict[str, Any]]: | |
| """ | |
| Pobiera gry, które nie mają jeszcze nazwy chińskiej i nie były sprawdzane. | |
| Sortuje po liczbie pozytywnych recenzji (jeśli dostępne, dla priorytetyzacji). | |
| """ | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| pipeline = [ | |
| {"$match": { | |
| "name_cn": {"$exists": False}, | |
| "cn_name_checked": {"$ne": True}, # Pomiń już sprawdzone | |
| }}, | |
| # Sortowanie po positive (DESC), ale gry bez tego pola trafią na koniec (sparse index handling) | |
| {"$sort": {"positive": -1}}, | |
| {"$limit": limit}, | |
| {"$project": {"_id": 0, "appid": 1, "name": 1}}, | |
| ] | |
| cursor = collection.aggregate(pipeline) | |
| return await cursor.to_list(length=limit) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting games without CN name: {e}") | |
| return [] | |
| async def mark_cn_name_checked(self, app_id: str, name_cn: str | None = None) -> None: | |
| """ | |
| Oznacza grę jako sprawdzoną pod kątem chińskiej nazwy. | |
| Opcjonalnie zapisuje znalezioną nazwę. | |
| """ | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| update_doc: dict[str, Any] = {"cn_name_checked": True} | |
| if name_cn: | |
| update_doc["name_cn"] = name_cn | |
| try: | |
| await collection.update_one( | |
| {"appid": str(app_id)}, | |
| {"$set": update_doc} | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error marking CN name checked for {app_id}: {e}") | |
| async def get_games_missing_app_type(self, limit: int = 200) -> list[dict[str, Any]]: | |
| """ | |
| Return high-signal games that still need Steam Store type enrichment. | |
| We prioritize already-priority games first, then any app with enough reviews | |
| to qualify a DLC for worker-managed analysis. | |
| """ | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| pipeline = [ | |
| { | |
| "$addFields": { | |
| "total_reviews_sum": { | |
| "$add": [ | |
| {"$ifNull": ["$positive", 0]}, | |
| {"$ifNull": ["$negative", 0]}, | |
| ] | |
| } | |
| } | |
| }, | |
| { | |
| "$match": { | |
| "dlc_checked_at": {"$exists": False}, | |
| "$or": [ | |
| {"is_priority": True}, | |
| { | |
| "total_reviews_sum": { | |
| "$gte": settings.dlc_min_reviews_for_analysis | |
| } | |
| }, | |
| ], | |
| } | |
| }, | |
| {"$sort": {"is_priority": -1, "total_reviews_sum": -1}}, | |
| {"$limit": limit}, | |
| {"$project": {"_id": 0, "appid": 1, "name": 1}}, | |
| ] | |
| cursor = collection.aggregate(pipeline) | |
| return await cursor.to_list(length=limit) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting games missing app type: {e}") | |
| return [] | |
| async def mark_app_type_checked( | |
| self, | |
| app_id: str, | |
| *, | |
| app_type: str, | |
| parent_appid: str | None = None, | |
| ) -> None: | |
| """Persist Steam Store app type metadata.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| update_doc: dict[str, Any] = { | |
| "app_type": app_type, | |
| "parent_appid": str(parent_appid) if parent_appid else None, | |
| "dlc_checked_at": datetime.now(timezone.utc), | |
| } | |
| try: | |
| await collection.update_one( | |
| {"appid": str(app_id)}, | |
| {"$set": update_doc}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error marking app type checked for {app_id}: {e}") | |
| # ========== Worker Methods ========== | |
| async def upsert_games_batch(self, games: list[dict[str, Any]]) -> tuple[int, int]: | |
| """ | |
| Bulk upsert games via UpdateOne operations. | |
| Returns: | |
| (upserted_count, modified_count) | |
| """ | |
| if self.db is None or not games: | |
| return (0, 0) | |
| collection = self.db[self.COLLECTION_GAMES] | |
| operations = [] | |
| for game in games: | |
| appid = str(game.get("appid", "")) | |
| name = game.get("name", "") | |
| if not appid or not name: | |
| continue | |
| update_doc: dict[str, Any] = { | |
| "appid": appid, | |
| "name": name, | |
| "name_lower": name.lower(), | |
| } | |
| for field in ( | |
| "developer", "publisher", "positive", "negative", | |
| "tags", "genre", "ccu", "synced_at", | |
| "app_type", "parent_appid", "dlc_checked_at", | |
| ): | |
| if game.get(field) is not None: | |
| update_doc[field] = game[field] | |
| operations.append( | |
| UpdateOne({"appid": appid}, {"$set": update_doc}, upsert=True) | |
| ) | |
| if not operations: | |
| return (0, 0) | |
| try: | |
| result = await collection.bulk_write(operations, ordered=False) | |
| return (result.upserted_count, result.modified_count) | |
| except BulkWriteError as e: | |
| details = e.details or {} | |
| return (details.get("nUpserted", 0), details.get("nModified", 0)) | |
| except PyMongoError as e: | |
| logger.error(f"Error in upsert_games_batch: {e}") | |
| return (0, 0) | |
| async def get_top_games_by_reviews(self, limit: int = 500) -> list[dict[str, Any]]: | |
| """Top N games sorted by total review count (positive + negative) DESC.""" | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| pipeline = [ | |
| {"$match": {"positive": {"$exists": True}, "negative": {"$exists": True}}}, | |
| {"$addFields": {"total_reviews_sum": {"$add": ["$positive", "$negative"]}}}, | |
| {"$sort": {"total_reviews_sum": -1}}, | |
| {"$limit": limit}, | |
| {"$project": {"_id": 0}}, | |
| ] | |
| cursor = collection.aggregate(pipeline) | |
| return await cursor.to_list(length=limit) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting top games: {e}") | |
| return [] | |
| async def update_game_update_date(self, app_id: str, update_at: datetime) -> None: | |
| """Store the latest game update timestamp.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| await collection.update_one( | |
| {"appid": str(app_id)}, | |
| {"$set": {"last_game_update_at": update_at}}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error updating game update date for {app_id}: {e}") | |
| async def update_game_patch_date(self, app_id: str, patch_date: datetime) -> None: | |
| """Store the latest confirmed major-update timestamp.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| await collection.update_one( | |
| {"appid": str(app_id)}, | |
| {"$set": {"current_patch_at": patch_date}}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error updating game patch date for {app_id}: {e}") | |
| async def update_news_cursor(self, app_id: str, gid: str, date: datetime) -> None: | |
| """Store the latest seen news GID and its date as an incremental scan cursor.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| await collection.update_one( | |
| {"appid": str(app_id)}, | |
| {"$set": {"last_seen_news_gid": gid, "last_seen_news_at": date}}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error updating news cursor for {app_id}: {e}") | |
| async def get_game_patch_date(self, app_id: str) -> datetime | None: | |
| """Get the latest confirmed major-update timestamp for a game.""" | |
| if self.db is None: | |
| return None | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| doc = await collection.find_one( | |
| {"appid": str(app_id)}, | |
| {"_id": 0, "current_patch_at": 1}, | |
| ) | |
| if doc and doc.get("current_patch_at"): | |
| val = doc["current_patch_at"] | |
| if isinstance(val, datetime): | |
| return val | |
| return None | |
| return None | |
| except PyMongoError as e: | |
| logger.error(f"Error getting game patch date for {app_id}: {e}") | |
| return None | |
| async def upsert_refresh_schedule(self, schedule: dict[str, Any]) -> None: | |
| """Create or replace a refresh schedule document.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| try: | |
| await collection.update_one( | |
| {"app_id": schedule["app_id"]}, | |
| {"$set": schedule}, | |
| upsert=True, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error upserting refresh schedule for {schedule.get('app_id')}: {e}") | |
| async def get_active_schedules(self) -> list[dict[str, Any]]: | |
| """All schedules with status: 'active'.""" | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| try: | |
| cursor = collection.find({"status": "active"}, {"_id": 0}) | |
| return await cursor.to_list(length=10000) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting active schedules: {e}") | |
| return [] | |
| async def has_due_refresh_schedule(self, app_id: str) -> bool: | |
| """True when an active schedule has at least one due, incomplete checkpoint.""" | |
| if self.db is None: | |
| return False | |
| collection = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| now = datetime.now(timezone.utc) | |
| try: | |
| document = await collection.find_one( | |
| { | |
| "app_id": str(app_id), | |
| "status": "active", | |
| "checkpoints": { | |
| "$elemMatch": { | |
| "completed": False, | |
| "due_at": {"$lte": now}, | |
| } | |
| }, | |
| }, | |
| {"_id": 0, "app_id": 1}, | |
| ) | |
| return document is not None | |
| except PyMongoError as e: | |
| logger.error(f"Error checking due refresh schedule for {app_id}: {e}") | |
| return False | |
| async def mark_checkpoint_completed(self, app_id: str, offset_hours: int) -> None: | |
| """Mark a specific checkpoint as completed using positional $ update.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| try: | |
| await collection.update_one( | |
| {"app_id": str(app_id), "checkpoints.offset_hours": offset_hours}, | |
| {"$set": {"checkpoints.$.completed": True}}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error marking checkpoint for {app_id}/{offset_hours}h: {e}") | |
| async def complete_schedule(self, app_id: str) -> None: | |
| """Set schedule status to 'completed'.""" | |
| if self.db is None: | |
| return | |
| collection = self.db[self.COLLECTION_REFRESH_SCHEDULES] | |
| try: | |
| await collection.update_one( | |
| {"app_id": str(app_id)}, | |
| {"$set": {"status": "completed"}}, | |
| ) | |
| except PyMongoError as e: | |
| logger.error(f"Error completing schedule for {app_id}: {e}") | |
| # ========== Priority Games Methods ========== | |
| async def get_priority_games(self) -> list[dict[str, Any]]: | |
| """All games with is_priority == True, all fields except _id.""" | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| cursor = collection.find({"is_priority": True}, {"_id": 0}) | |
| return await cursor.to_list(length=10000) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting priority games: {e}") | |
| return [] | |
| async def get_priority_games_for_analysis(self) -> list[dict[str, Any]]: | |
| """ | |
| Priority games eligible for worker-managed analysis. | |
| DLC stays linked to the priority universe via is_priority, but low-review DLC | |
| falls back to on-demand mode instead of occupying worker capacity. | |
| """ | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| if settings.dlc_worker_analysis_enabled: | |
| query: dict[str, Any] = { | |
| "is_priority": True, | |
| "$or": [ | |
| {"app_type": {"$ne": "dlc"}}, | |
| { | |
| "$expr": { | |
| "$gte": [ | |
| { | |
| "$add": [ | |
| {"$ifNull": ["$positive", 0]}, | |
| {"$ifNull": ["$negative", 0]}, | |
| ] | |
| }, | |
| settings.dlc_min_reviews_for_analysis, | |
| ] | |
| } | |
| }, | |
| ], | |
| } | |
| else: | |
| query = { | |
| "is_priority": True, | |
| "app_type": {"$ne": "dlc"}, | |
| } | |
| try: | |
| cursor = collection.find(query, {"_id": 0}) | |
| return await cursor.to_list(length=10000) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting priority games for analysis: {e}") | |
| return [] | |
| async def get_priority_game_ids(self) -> set[str]: | |
| """Lightweight set of appids for is_priority == True games.""" | |
| if self.db is None: | |
| return set() | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| cursor = collection.find({"is_priority": True}, {"_id": 0, "appid": 1}) | |
| docs = await cursor.to_list(length=10000) | |
| return {str(d["appid"]) for d in docs if d.get("appid")} | |
| except PyMongoError as e: | |
| logger.error(f"Error getting priority game ids: {e}") | |
| return set() | |
| async def get_priority_game_ids_for_analysis(self) -> set[str]: | |
| """App IDs that should behave as worker-managed in runtime decisions.""" | |
| docs = await self.get_priority_games_for_analysis() | |
| return {str(d["appid"]) for d in docs if d.get("appid")} | |
| async def get_dlcs_by_parent_appid(self, parent_appid: str) -> list[dict[str, Any]]: | |
| """Return DLC documents linked to a given base game.""" | |
| if self.db is None: | |
| return [] | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| cursor = collection.find( | |
| {"app_type": "dlc", "parent_appid": str(parent_appid)}, | |
| {"_id": 0}, | |
| ) | |
| return await cursor.to_list(length=1000) | |
| except PyMongoError as e: | |
| logger.error(f"Error getting DLCs for parent {parent_appid}: {e}") | |
| return [] | |
| async def get_existing_appids(self, appids: set[str]) -> set[str]: | |
| """Return the subset of the given appids that have a document in games.""" | |
| if self.db is None or not appids: | |
| return set() | |
| collection = self.db[self.COLLECTION_GAMES] | |
| try: | |
| cursor = collection.find( | |
| {"appid": {"$in": list(appids)}}, | |
| {"_id": 0, "appid": 1}, | |
| ) | |
| docs = await cursor.to_list(length=len(appids) + 1) | |
| return {str(d["appid"]) for d in docs if d.get("appid")} | |
| except PyMongoError as e: | |
| logger.error(f"Error in get_existing_appids: {e}") | |
| return set() | |
| async def bulk_update_priority_fields(self, updates: list[tuple[str, dict]]) -> int: | |
| """ | |
| Batch UpdateOne operations for priority fields. | |
| Args: | |
| updates: List of (appid, fields_dict) tuples. | |
| Returns: | |
| modified_count | |
| """ | |
| if self.db is None or not updates: | |
| return 0 | |
| collection = self.db[self.COLLECTION_GAMES] | |
| operations = [ | |
| UpdateOne({"appid": appid}, {"$set": fields}) | |
| for appid, fields in updates | |
| ] | |
| try: | |
| result = await collection.bulk_write(operations, ordered=False) | |
| return result.modified_count | |
| except BulkWriteError as e: | |
| details = e.details or {} | |
| return details.get("nModified", 0) | |
| except PyMongoError as e: | |
| logger.error(f"Error in bulk_update_priority_fields: {e}") | |
| return 0 | |
| # Globalna instancja (Singleton) | |
| mongodb = MongoDB() | |