GitHub Action
deploy: worker release from GitHub
8ff1b66
"""
Moduł połączenia z bazą danych MongoDB.
Wykorzystuje Motor (async driver) do asynchronicznej komunikacji z MongoDB.
Implementuje cache wyników analizy z TTL 24h.
"""
import asyncio
import logging
import re
from datetime import datetime, timedelta, timezone
from typing import Any
from bson.codec_options import CodecOptions
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import ASCENDING, DESCENDING, UpdateOne
from pymongo.errors import (
BulkWriteError,
ConnectionFailure,
OperationFailure,
PyMongoError,
)
from app.core.config import settings
logger = logging.getLogger(__name__)
class MongoDB:
"""
Klasa zarządzająca połączeniem z MongoDB.
Implementuje wzorzec Singleton poprzez globalną instancję.
Obsługuje cache wyników analizy z automatyczną walidacją TTL.
Przechowuje listę gier Steam do autouzupełniania.
Attributes:
client: Klient MongoDB (Motor).
db: Referencja do bazy danych.
"""
COLLECTION_ANALYSES = "analyses"
COLLECTION_GAMES = "games"
COLLECTION_STEAM_ERRORS = "steam_errors"
COLLECTION_REFRESH_SCHEDULES = "refresh_schedules"
def __init__(self) -> None:
"""Inicjalizuje instancję bez aktywnego połączenia."""
self.client: AsyncIOMotorClient | None = None # type: ignore
self.db: AsyncIOMotorDatabase | None = None # type: ignore
async def connect(self, max_retries: int = 3) -> None:
"""
Nawiązuje połączenie z MongoDB z exponential backoff.
Tworzy indeksy dla optymalnej wydajności zapytań.
Args:
max_retries: Maksymalna liczba prób połączenia.
Raises:
ConnectionError: Gdy nie można połączyć się z bazą po wszystkich próbach.
"""
for attempt in range(1, max_retries + 1):
try:
self.client = AsyncIOMotorClient(settings.mongodb_url, tz_aware=True)
codec_options: CodecOptions = CodecOptions(tz_aware=True)
self.db = self.client.get_database(
settings.mongodb_db_name, codec_options=codec_options
)
# Weryfikacja połączenia
await self.client.admin.command("ping")
logger.info(f"Połączono z MongoDB: {settings.mongodb_db_name}")
# Utwórz indeksy
await self._create_indexes()
return
except (ConnectionFailure, PyMongoError) as e:
if attempt < max_retries:
delay = 2 ** (attempt - 1) # 1s, 2s, 4s
logger.warning(
f"MongoDB connection attempt {attempt}/{max_retries} failed: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
else:
logger.error(f"MongoDB connection failed after {max_retries} attempts: {e}")
raise ConnectionError(
f"Nie można połączyć się z MongoDB po {max_retries} próbach: {e}"
)
async def _create_indexes(self) -> None:
"""Tworzy indeksy dla kolekcji."""
if self.db is None:
return
# Indeksy dla analiz
analyses = self.db[self.COLLECTION_ANALYSES]
await analyses.create_index("game_id", unique=True)
# Migrate from old global TTL index (cached_at) to per-document TTL (expires_at)
try:
existing_indexes = await analyses.index_information()
for idx_name, idx_info in existing_indexes.items():
if idx_info.get("expireAfterSeconds") is not None and "cached_at" in str(idx_info.get("key")):
await analyses.drop_index(idx_name)
logger.info(f"Dropped old TTL index: {idx_name}")
break
except OperationFailure:
pass # Old index may not exist
await analyses.create_index("expires_at", expireAfterSeconds=0)
# Indeksy dla listy gier
games = self.db[self.COLLECTION_GAMES]
await games.create_index("appid", unique=True)
# Indeks dla wyszukiwania regex (case-insensitive)
await games.create_index("name_lower")
await games.create_index("name_cn")
# Rzadki indeks dla flagi sprawdzenia (oszczędność miejsca, szybkość zapytania)
await games.create_index("cn_name_checked", sparse=True)
await games.create_index("parent_appid", sparse=True)
# Compound index for sorting games by review count (worker game sync)
await games.create_index(
[("positive", DESCENDING), ("negative", DESCENDING)],
sparse=True,
)
await games.create_index(
[
("name_lower", ASCENDING),
("app_type", ASCENDING),
("positive", DESCENDING),
("negative", DESCENDING),
]
)
await games.create_index("is_priority", sparse=True)
# Indeksy dla cache błędów Steam API
steam_errors = self.db[self.COLLECTION_STEAM_ERRORS]
await steam_errors.create_index("app_id", unique=True)
await steam_errors.create_index("expires_at", expireAfterSeconds=0)
# Indexes for refresh schedules (worker pre-cache)
schedules = self.db[self.COLLECTION_REFRESH_SCHEDULES]
await schedules.create_index("app_id", unique=True)
await schedules.create_index("status")
logger.debug("Utworzono indeksy MongoDB")
async def disconnect(self) -> None:
"""Zamyka połączenie z MongoDB."""
if self.client:
self.client.close()
logger.info("Rozłączono z MongoDB")
def _is_document_expired(self, document: dict[str, Any]) -> bool:
"""Check if a cache document is expired using expires_at or cached_at fallback.
With tz_aware=True on the Motor client, all datetimes from MongoDB are
already timezone-aware, so no manual .replace(tzinfo=...) is needed.
"""
now = datetime.now(timezone.utc)
# New-format: per-document expires_at
expires_at = document.get("expires_at")
if expires_at:
if isinstance(expires_at, str):
expires_at = datetime.fromisoformat(expires_at)
return now >= expires_at
# Old-format fallback: cached_at + default TTL
cached_at = document.get("cached_at")
if cached_at:
if isinstance(cached_at, str):
cached_at = datetime.fromisoformat(cached_at)
ttl_hours = document.get("ttl_hours", settings.cache_ttl_hours)
return now - cached_at > timedelta(hours=ttl_hours)
return True # No timestamp info = treat as expired
async def get_cached_analysis_full(self, game_id: str) -> dict[str, Any] | None:
"""
Returns full cache document (with review IDs, TTL info) or None if expired/missing.
"""
if self.db is None:
return None
collection = self.db[self.COLLECTION_ANALYSES]
try:
document = await collection.find_one({"game_id": game_id})
if not document:
return None
if self._is_document_expired(document):
logger.info(f"Cache expired for game {game_id}")
return None
document.pop("_id", None)
return document
except PyMongoError as e:
logger.error(f"Error reading cache: {e}")
return None
async def get_stale_analysis(self, game_id: str) -> dict[str, Any] | None:
"""
Returns cache document even if expired. Used by incremental path
to retrieve old review IDs. Returns None only if no document exists.
"""
return await self.get_analysis(game_id)
async def get_analysis(self, game_id: str) -> dict[str, Any] | None:
"""
Returns an analysis document regardless of TTL.
Product freshness is evaluated outside MongoDB, so this method is the
canonical read path for "show stale result + refresh" behavior.
"""
if self.db is None:
return None
collection = self.db[self.COLLECTION_ANALYSES]
try:
document = await collection.find_one({"game_id": game_id})
if not document:
return None
document.pop("_id", None)
return document
except PyMongoError as e:
logger.error(f"Error reading stale cache: {e}")
return None
async def get_cached_analysis(self, game_id: str) -> dict[str, Any] | None:
"""
Returns cached analysis results or None if expired/missing.
Backward-compatible wrapper around get_cached_analysis_full.
"""
doc = await self.get_cached_analysis_full(game_id)
if doc is None:
return None
results = doc.get("results")
if isinstance(results, dict) and results.get("cached_at") is None and doc.get("cached_at") is not None:
results = {**results, "cached_at": doc["cached_at"]}
return results
async def save_analysis(
self,
game_id: str,
results: dict[str, Any],
analyzed_review_ids: list[str] | None = None,
latest_review_timestamp: int = 0,
ttl_hours: int | None = None,
analyzed_at: datetime | None = None,
) -> None:
"""
Saves analysis results to cache with per-document TTL.
Purges review IDs to keep only the most recent ones (space efficiency).
"""
if self.db is None:
logger.warning("Brak połączenia z MongoDB - nie zapisano cache")
return
collection = self.db[self.COLLECTION_ANALYSES]
effective_ttl = ttl_hours or settings.cache_ttl_hours
now = datetime.now(timezone.utc)
analysis_date = analyzed_at
if analysis_date is None:
raw_value = results.get("analysis_date") or results.get("cached_at")
if isinstance(raw_value, str):
analysis_date = datetime.fromisoformat(raw_value)
elif isinstance(raw_value, datetime):
analysis_date = raw_value
if analysis_date is None:
analysis_date = now
if results.get("analysis_date") is None:
results = {**results, "analysis_date": analysis_date}
# Purge old IDs — keep only the most recent N
if analyzed_review_ids:
analyzed_review_ids = analyzed_review_ids[-settings.incremental_max_stored_ids:]
document: dict[str, Any] = {
"game_id": game_id,
"results": results,
"analyzed_review_ids": analyzed_review_ids or [],
"latest_review_timestamp": latest_review_timestamp,
"cached_at": now,
"analyzed_at": analysis_date,
"ttl_hours": effective_ttl,
"expires_at": now + timedelta(hours=effective_ttl),
}
try:
await collection.update_one(
{"game_id": game_id},
{"$set": document},
upsert=True,
)
logger.info(f"Saved cache for game {game_id} (TTL: {effective_ttl}h)")
except PyMongoError as e:
logger.error(f"Error saving cache: {e}")
async def delete_cached_analysis(self, game_id: str) -> bool:
"""
Usuwa cache dla danej gry.
Args:
game_id: Identyfikator gry Steam.
Returns:
True jeśli usunięto, False w przeciwnym razie.
"""
if self.db is None:
return False
collection = self.db[self.COLLECTION_ANALYSES]
try:
result = await collection.delete_one({"game_id": game_id})
return result.deleted_count > 0
except PyMongoError as e:
logger.error(f"Błąd usuwania cache: {e}")
return False
# ========== Steam API Error Cache ==========
async def get_steam_error(self, app_id: str) -> dict[str, Any] | None:
"""
Sprawdza czy app_id ma cached error.
Returns:
Dict z polami app_id, status_code, expires_at lub None.
"""
if self.db is None:
return None
collection = self.db[self.COLLECTION_STEAM_ERRORS]
try:
document = await collection.find_one({"app_id": app_id})
if not document:
return None
document.pop("_id", None)
return document
except PyMongoError as e:
logger.error(f"Błąd odczytu steam error cache: {e}")
return None
async def cache_steam_error(
self, app_id: str, status_code: int, ttl_seconds: int
) -> None:
"""
Cachuje błąd Steam API z automatycznym TTL.
MongoDB TTL index automatycznie usunie dokument po expires_at.
"""
if self.db is None:
return
collection = self.db[self.COLLECTION_STEAM_ERRORS]
document = {
"app_id": app_id,
"status_code": status_code,
"cached_at": datetime.now(timezone.utc),
"expires_at": datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds),
}
try:
await collection.update_one(
{"app_id": app_id},
{"$set": document},
upsert=True,
)
logger.info(
f"Cached Steam error {status_code} for app {app_id} (TTL: {ttl_seconds}s)"
)
except PyMongoError as e:
logger.error(f"Błąd zapisu steam error cache: {e}")
# ========== Metody dla listy gier (autouzupełnianie) ==========
async def get_games_count(self) -> int:
"""Zwraca liczbę gier w bazie."""
if self.db is None:
return 0
collection = self.db[self.COLLECTION_GAMES]
return await collection.count_documents({})
async def save_games_batch(self, games: list[dict[str, str]]) -> int:
"""
Zapisuje partię gier do bazy (bulk insert).
Args:
games: Lista słowników z kluczami 'appid', 'name', opcjonalnie 'developer', 'publisher'.
Returns:
Liczba zapisanych gier.
"""
if self.db is None or not games:
return 0
collection = self.db[self.COLLECTION_GAMES]
# Dodaj pole name_lower dla wyszukiwania case-insensitive
documents = []
for game in games:
if not game.get("name"):
continue
doc = {
"appid": game["appid"],
"name": game["name"],
"name_lower": game["name"].lower(),
}
# Dodaj opcjonalne pola
if game.get("developer"):
doc["developer"] = game["developer"]
if game.get("publisher"):
doc["publisher"] = game["publisher"]
documents.append(doc)
try:
# Użyj ordered=False żeby kontynuować mimo duplikatów
result = await collection.insert_many(documents, ordered=False)
return len(result.inserted_ids)
except BulkWriteError as e:
# Duplicates are expected with ordered=False — count successful inserts
inserted = e.details.get("nInserted", 0)
logger.debug(f"Pominięto duplikaty podczas zapisu gier ({inserted} inserted)")
return inserted
except PyMongoError as e:
logger.error(f"Błąd zapisu gier: {e}")
return 0
async def clear_games(self) -> None:
"""Usuwa wszystkie gry z bazy."""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
await collection.delete_many({})
logger.info("Usunięto wszystkie gry z bazy")
async def upsert_game(self, game_data: dict[str, Any]) -> None:
"""
Dodaje lub aktualizuje pojedynczą grę w bazie danych.
Używane głównie przez mechanizm Fallback Search.
"""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
appid = str(game_data["appid"])
# Przygotuj dokument
update_doc = {
"appid": appid,
"name": game_data["name"],
"name_lower": game_data["name"].lower(),
}
if game_data.get("name_cn"):
update_doc["name_cn"] = game_data["name_cn"]
update_doc["cn_name_checked"] = True
elif game_data.get("cn_name_checked"):
update_doc["cn_name_checked"] = True
if game_data.get("header_image") is not None:
update_doc["header_image"] = game_data["header_image"]
if game_data.get("total_reviews") is not None:
update_doc["total_reviews"] = game_data["total_reviews"]
# Worker-supplied fields
for field in (
"positive", "negative", "tags", "genre", "ccu",
"last_game_update_at", "synced_at", "developer", "publisher",
"app_type", "parent_appid", "dlc_checked_at",
):
if game_data.get(field) is not None:
update_doc[field] = game_data[field]
try:
await collection.update_one(
{"appid": appid},
{"$set": update_doc},
upsert=True
)
logger.debug(f"Zsynchronizowano grę {appid} w MongoDB")
except PyMongoError as e:
logger.error(f"Błąd upsert gry {appid}: {e}")
async def search_games(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
"""
Wyszukuje gry po nazwie (EN lub CN).
Używa wyszukiwania case-insensitive z prefiksem.
Args:
query: Tekst do wyszukania.
limit: Maksymalna liczba wyników.
Returns:
Lista gier pasujących do zapytania (appid, name, name_cn, developer, publisher).
"""
normalized_query = query.strip()
if self.db is None or not normalized_query or len(normalized_query) < 2:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
query_lower = normalized_query.lower()
name_pattern = re.escape(query_lower)
name_prefix_pattern = f"^{name_pattern}"
name_exact_pattern = f"^{name_pattern}$"
cn_pattern = re.escape(normalized_query)
cn_prefix_pattern = f"^{cn_pattern}"
cn_exact_pattern = f"^{cn_pattern}$"
match_filter: dict[str, Any] = {
"$or": [
{"name_lower": {"$regex": name_pattern}},
{"name_cn": {"$regex": cn_pattern, "$options": "i"}},
]
}
if not settings.dlc_visible_in_search:
match_filter["app_type"] = {"$ne": "dlc"}
pipeline = [
{"$match": match_filter},
{
"$addFields": {
"match_rank": {
"$switch": {
"branches": [
{
"case": {
"$or": [
{
"$regexMatch": {
"input": {"$ifNull": ["$name_lower", ""]},
"regex": name_exact_pattern,
}
},
{
"$regexMatch": {
"input": {"$ifNull": ["$name_cn", ""]},
"regex": cn_exact_pattern,
"options": "i",
}
},
]
},
"then": 0,
},
{
"case": {
"$or": [
{
"$regexMatch": {
"input": {"$ifNull": ["$name_lower", ""]},
"regex": name_prefix_pattern,
}
},
{
"$regexMatch": {
"input": {"$ifNull": ["$name_cn", ""]},
"regex": cn_prefix_pattern,
"options": "i",
}
},
]
},
"then": 1,
},
],
"default": 2,
}
},
"type_rank": {
"$switch": {
"branches": [
{
"case": {
"$in": [
{"$ifNull": ["$app_type", "unknown"]},
["game", "unknown"],
]
},
"then": 0,
},
{"case": {"$eq": ["$app_type", "dlc"]}, "then": 1},
{"case": {"$eq": ["$app_type", "demo"]}, "then": 2},
],
"default": 1,
}
},
"review_count": {
"$add": [
{"$ifNull": ["$positive", 0]},
{"$ifNull": ["$negative", 0]},
]
},
}
},
{
"$sort": {
"match_rank": 1,
"type_rank": 1,
"review_count": -1,
"name": 1,
}
},
{"$limit": limit},
{
"$project": {
"_id": 0,
"appid": 1,
"name": 1,
"name_cn": 1,
"developer": 1,
"publisher": 1,
"app_type": 1,
"parent_appid": 1,
}
},
]
cursor = collection.aggregate(pipeline)
results = await cursor.to_list(length=limit)
return results
except PyMongoError as e:
logger.error(f"Błąd wyszukiwania gier: {e}")
return []
async def get_game_update_date(self, app_id: str) -> datetime | None:
"""Get the last game update timestamp for a game."""
if self.db is None:
return None
collection = self.db[self.COLLECTION_GAMES]
try:
doc = await collection.find_one(
{"appid": str(app_id)},
{"_id": 0, "last_game_update_at": 1},
)
if doc and doc.get("last_game_update_at"):
val = doc["last_game_update_at"]
if isinstance(val, datetime):
return val
return None
return None
except PyMongoError as e:
logger.error(f"Error getting game update date for {app_id}: {e}")
return None
async def get_games_without_cn_name(self, limit: int = 200) -> list[dict[str, Any]]:
"""
Pobiera gry, które nie mają jeszcze nazwy chińskiej i nie były sprawdzane.
Sortuje po liczbie pozytywnych recenzji (jeśli dostępne, dla priorytetyzacji).
"""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
pipeline = [
{"$match": {
"name_cn": {"$exists": False},
"cn_name_checked": {"$ne": True}, # Pomiń już sprawdzone
}},
# Sortowanie po positive (DESC), ale gry bez tego pola trafią na koniec (sparse index handling)
{"$sort": {"positive": -1}},
{"$limit": limit},
{"$project": {"_id": 0, "appid": 1, "name": 1}},
]
cursor = collection.aggregate(pipeline)
return await cursor.to_list(length=limit)
except PyMongoError as e:
logger.error(f"Error getting games without CN name: {e}")
return []
async def mark_cn_name_checked(self, app_id: str, name_cn: str | None = None) -> None:
"""
Oznacza grę jako sprawdzoną pod kątem chińskiej nazwy.
Opcjonalnie zapisuje znalezioną nazwę.
"""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
update_doc: dict[str, Any] = {"cn_name_checked": True}
if name_cn:
update_doc["name_cn"] = name_cn
try:
await collection.update_one(
{"appid": str(app_id)},
{"$set": update_doc}
)
except PyMongoError as e:
logger.error(f"Error marking CN name checked for {app_id}: {e}")
async def get_games_missing_app_type(self, limit: int = 200) -> list[dict[str, Any]]:
"""
Return high-signal games that still need Steam Store type enrichment.
We prioritize already-priority games first, then any app with enough reviews
to qualify a DLC for worker-managed analysis.
"""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
pipeline = [
{
"$addFields": {
"total_reviews_sum": {
"$add": [
{"$ifNull": ["$positive", 0]},
{"$ifNull": ["$negative", 0]},
]
}
}
},
{
"$match": {
"dlc_checked_at": {"$exists": False},
"$or": [
{"is_priority": True},
{
"total_reviews_sum": {
"$gte": settings.dlc_min_reviews_for_analysis
}
},
],
}
},
{"$sort": {"is_priority": -1, "total_reviews_sum": -1}},
{"$limit": limit},
{"$project": {"_id": 0, "appid": 1, "name": 1}},
]
cursor = collection.aggregate(pipeline)
return await cursor.to_list(length=limit)
except PyMongoError as e:
logger.error(f"Error getting games missing app type: {e}")
return []
async def mark_app_type_checked(
self,
app_id: str,
*,
app_type: str,
parent_appid: str | None = None,
) -> None:
"""Persist Steam Store app type metadata."""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
update_doc: dict[str, Any] = {
"app_type": app_type,
"parent_appid": str(parent_appid) if parent_appid else None,
"dlc_checked_at": datetime.now(timezone.utc),
}
try:
await collection.update_one(
{"appid": str(app_id)},
{"$set": update_doc},
)
except PyMongoError as e:
logger.error(f"Error marking app type checked for {app_id}: {e}")
# ========== Worker Methods ==========
async def upsert_games_batch(self, games: list[dict[str, Any]]) -> tuple[int, int]:
"""
Bulk upsert games via UpdateOne operations.
Returns:
(upserted_count, modified_count)
"""
if self.db is None or not games:
return (0, 0)
collection = self.db[self.COLLECTION_GAMES]
operations = []
for game in games:
appid = str(game.get("appid", ""))
name = game.get("name", "")
if not appid or not name:
continue
update_doc: dict[str, Any] = {
"appid": appid,
"name": name,
"name_lower": name.lower(),
}
for field in (
"developer", "publisher", "positive", "negative",
"tags", "genre", "ccu", "synced_at",
"app_type", "parent_appid", "dlc_checked_at",
):
if game.get(field) is not None:
update_doc[field] = game[field]
operations.append(
UpdateOne({"appid": appid}, {"$set": update_doc}, upsert=True)
)
if not operations:
return (0, 0)
try:
result = await collection.bulk_write(operations, ordered=False)
return (result.upserted_count, result.modified_count)
except BulkWriteError as e:
details = e.details or {}
return (details.get("nUpserted", 0), details.get("nModified", 0))
except PyMongoError as e:
logger.error(f"Error in upsert_games_batch: {e}")
return (0, 0)
async def get_top_games_by_reviews(self, limit: int = 500) -> list[dict[str, Any]]:
"""Top N games sorted by total review count (positive + negative) DESC."""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
pipeline = [
{"$match": {"positive": {"$exists": True}, "negative": {"$exists": True}}},
{"$addFields": {"total_reviews_sum": {"$add": ["$positive", "$negative"]}}},
{"$sort": {"total_reviews_sum": -1}},
{"$limit": limit},
{"$project": {"_id": 0}},
]
cursor = collection.aggregate(pipeline)
return await cursor.to_list(length=limit)
except PyMongoError as e:
logger.error(f"Error getting top games: {e}")
return []
async def update_game_update_date(self, app_id: str, update_at: datetime) -> None:
"""Store the latest game update timestamp."""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
try:
await collection.update_one(
{"appid": str(app_id)},
{"$set": {"last_game_update_at": update_at}},
)
except PyMongoError as e:
logger.error(f"Error updating game update date for {app_id}: {e}")
async def update_game_patch_date(self, app_id: str, patch_date: datetime) -> None:
"""Store the latest confirmed major-update timestamp."""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
try:
await collection.update_one(
{"appid": str(app_id)},
{"$set": {"current_patch_at": patch_date}},
)
except PyMongoError as e:
logger.error(f"Error updating game patch date for {app_id}: {e}")
async def update_news_cursor(self, app_id: str, gid: str, date: datetime) -> None:
"""Store the latest seen news GID and its date as an incremental scan cursor."""
if self.db is None:
return
collection = self.db[self.COLLECTION_GAMES]
try:
await collection.update_one(
{"appid": str(app_id)},
{"$set": {"last_seen_news_gid": gid, "last_seen_news_at": date}},
)
except PyMongoError as e:
logger.error(f"Error updating news cursor for {app_id}: {e}")
async def get_game_patch_date(self, app_id: str) -> datetime | None:
"""Get the latest confirmed major-update timestamp for a game."""
if self.db is None:
return None
collection = self.db[self.COLLECTION_GAMES]
try:
doc = await collection.find_one(
{"appid": str(app_id)},
{"_id": 0, "current_patch_at": 1},
)
if doc and doc.get("current_patch_at"):
val = doc["current_patch_at"]
if isinstance(val, datetime):
return val
return None
return None
except PyMongoError as e:
logger.error(f"Error getting game patch date for {app_id}: {e}")
return None
async def upsert_refresh_schedule(self, schedule: dict[str, Any]) -> None:
"""Create or replace a refresh schedule document."""
if self.db is None:
return
collection = self.db[self.COLLECTION_REFRESH_SCHEDULES]
try:
await collection.update_one(
{"app_id": schedule["app_id"]},
{"$set": schedule},
upsert=True,
)
except PyMongoError as e:
logger.error(f"Error upserting refresh schedule for {schedule.get('app_id')}: {e}")
async def get_active_schedules(self) -> list[dict[str, Any]]:
"""All schedules with status: 'active'."""
if self.db is None:
return []
collection = self.db[self.COLLECTION_REFRESH_SCHEDULES]
try:
cursor = collection.find({"status": "active"}, {"_id": 0})
return await cursor.to_list(length=10000)
except PyMongoError as e:
logger.error(f"Error getting active schedules: {e}")
return []
async def has_due_refresh_schedule(self, app_id: str) -> bool:
"""True when an active schedule has at least one due, incomplete checkpoint."""
if self.db is None:
return False
collection = self.db[self.COLLECTION_REFRESH_SCHEDULES]
now = datetime.now(timezone.utc)
try:
document = await collection.find_one(
{
"app_id": str(app_id),
"status": "active",
"checkpoints": {
"$elemMatch": {
"completed": False,
"due_at": {"$lte": now},
}
},
},
{"_id": 0, "app_id": 1},
)
return document is not None
except PyMongoError as e:
logger.error(f"Error checking due refresh schedule for {app_id}: {e}")
return False
async def mark_checkpoint_completed(self, app_id: str, offset_hours: int) -> None:
"""Mark a specific checkpoint as completed using positional $ update."""
if self.db is None:
return
collection = self.db[self.COLLECTION_REFRESH_SCHEDULES]
try:
await collection.update_one(
{"app_id": str(app_id), "checkpoints.offset_hours": offset_hours},
{"$set": {"checkpoints.$.completed": True}},
)
except PyMongoError as e:
logger.error(f"Error marking checkpoint for {app_id}/{offset_hours}h: {e}")
async def complete_schedule(self, app_id: str) -> None:
"""Set schedule status to 'completed'."""
if self.db is None:
return
collection = self.db[self.COLLECTION_REFRESH_SCHEDULES]
try:
await collection.update_one(
{"app_id": str(app_id)},
{"$set": {"status": "completed"}},
)
except PyMongoError as e:
logger.error(f"Error completing schedule for {app_id}: {e}")
# ========== Priority Games Methods ==========
async def get_priority_games(self) -> list[dict[str, Any]]:
"""All games with is_priority == True, all fields except _id."""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
cursor = collection.find({"is_priority": True}, {"_id": 0})
return await cursor.to_list(length=10000)
except PyMongoError as e:
logger.error(f"Error getting priority games: {e}")
return []
async def get_priority_games_for_analysis(self) -> list[dict[str, Any]]:
"""
Priority games eligible for worker-managed analysis.
DLC stays linked to the priority universe via is_priority, but low-review DLC
falls back to on-demand mode instead of occupying worker capacity.
"""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
if settings.dlc_worker_analysis_enabled:
query: dict[str, Any] = {
"is_priority": True,
"$or": [
{"app_type": {"$ne": "dlc"}},
{
"$expr": {
"$gte": [
{
"$add": [
{"$ifNull": ["$positive", 0]},
{"$ifNull": ["$negative", 0]},
]
},
settings.dlc_min_reviews_for_analysis,
]
}
},
],
}
else:
query = {
"is_priority": True,
"app_type": {"$ne": "dlc"},
}
try:
cursor = collection.find(query, {"_id": 0})
return await cursor.to_list(length=10000)
except PyMongoError as e:
logger.error(f"Error getting priority games for analysis: {e}")
return []
async def get_priority_game_ids(self) -> set[str]:
"""Lightweight set of appids for is_priority == True games."""
if self.db is None:
return set()
collection = self.db[self.COLLECTION_GAMES]
try:
cursor = collection.find({"is_priority": True}, {"_id": 0, "appid": 1})
docs = await cursor.to_list(length=10000)
return {str(d["appid"]) for d in docs if d.get("appid")}
except PyMongoError as e:
logger.error(f"Error getting priority game ids: {e}")
return set()
async def get_priority_game_ids_for_analysis(self) -> set[str]:
"""App IDs that should behave as worker-managed in runtime decisions."""
docs = await self.get_priority_games_for_analysis()
return {str(d["appid"]) for d in docs if d.get("appid")}
async def get_dlcs_by_parent_appid(self, parent_appid: str) -> list[dict[str, Any]]:
"""Return DLC documents linked to a given base game."""
if self.db is None:
return []
collection = self.db[self.COLLECTION_GAMES]
try:
cursor = collection.find(
{"app_type": "dlc", "parent_appid": str(parent_appid)},
{"_id": 0},
)
return await cursor.to_list(length=1000)
except PyMongoError as e:
logger.error(f"Error getting DLCs for parent {parent_appid}: {e}")
return []
async def get_existing_appids(self, appids: set[str]) -> set[str]:
"""Return the subset of the given appids that have a document in games."""
if self.db is None or not appids:
return set()
collection = self.db[self.COLLECTION_GAMES]
try:
cursor = collection.find(
{"appid": {"$in": list(appids)}},
{"_id": 0, "appid": 1},
)
docs = await cursor.to_list(length=len(appids) + 1)
return {str(d["appid"]) for d in docs if d.get("appid")}
except PyMongoError as e:
logger.error(f"Error in get_existing_appids: {e}")
return set()
async def bulk_update_priority_fields(self, updates: list[tuple[str, dict]]) -> int:
"""
Batch UpdateOne operations for priority fields.
Args:
updates: List of (appid, fields_dict) tuples.
Returns:
modified_count
"""
if self.db is None or not updates:
return 0
collection = self.db[self.COLLECTION_GAMES]
operations = [
UpdateOne({"appid": appid}, {"$set": fields})
for appid, fields in updates
]
try:
result = await collection.bulk_write(operations, ordered=False)
return result.modified_count
except BulkWriteError as e:
details = e.details or {}
return details.get("nModified", 0)
except PyMongoError as e:
logger.error(f"Error in bulk_update_priority_fields: {e}")
return 0
# Globalna instancja (Singleton)
mongodb = MongoDB()