Spaces:
Running
Running
| """ | |
| Update Detection Service — checks Steam News API for game updates. | |
| Compares the latest news/patch date with the stored `last_game_update_at` | |
| to detect games that have been recently updated. | |
| """ | |
| import logging | |
| import re | |
| from datetime import datetime, timezone | |
| from typing import Any, NamedTuple, cast | |
| import httpx | |
| from app.core.config import settings | |
| from app.db.mongodb import mongodb | |
| logger = logging.getLogger(__name__) | |
| STEAM_NEWS_API_URL = "https://api.steampowered.com/ISteamNews/GetNewsForApp/v2/" | |
| # Matches two-segment versions: 1.2, v2.0, 0.6, 123.4 | |
| # Excludes three-segment (0.6.1) via negative lookahead, 4-digit years via \d{1,3}, | |
| # and sub-segments of longer versions (e.g. "6.1" within "0.6.1") via lookbehind. | |
| VERSION_RE = re.compile(r'(?<!\d\.)\bv?\d{1,3}\.\d+\b(?!\.\d)') | |
| # Phase 1 regex constants | |
| RELEASE_PHRASE_RE = re.compile( | |
| r'\b(out now|is out|is live|now live|now available|full release|' | |
| r'leaving early access|out of early access)\b', | |
| re.IGNORECASE | |
| ) | |
| CONTENT_UPDATE_RE = re.compile( | |
| r'\b(major update|content update|big update|biggest update)\b', | |
| re.IGNORECASE | |
| ) | |
| ACTION_WORD_RE = re.compile( | |
| r'\b(update|patch|release|available|launch|live|out)\b', | |
| re.IGNORECASE | |
| ) | |
| HOTFIX_RE = re.compile(r'\b(hotfix|hot.?fix)\b', re.IGNORECASE) | |
| BRANCH_RE = re.compile( | |
| r'\b(experimental branch|experimental.{0,10}patch|experimental.{0,10}build|' | |
| r'public.?test|pts build|beta branch|' | |
| r'on experimental|for experimental)\b', | |
| re.IGNORECASE | |
| ) | |
| MAJOR_RELEASE_RE = re.compile( | |
| r'\b(out now|is out|is live|now live|now available|full release|' | |
| r'leaving early access|out of early access)\b', | |
| re.IGNORECASE | |
| ) | |
| MAJOR_CONTENT_RE = re.compile( | |
| r'\b(major update|content update|big update|biggest update)\b', | |
| re.IGNORECASE | |
| ) | |
| ONE_ZERO_RE = re.compile(r'\b1\.0\b(?!\.\d)') | |
| # Phase 2 regex constants | |
| EVENT_FESTIVAL_RE = re.compile( | |
| r'\b(festival|anniversary\s+event|community\s+event|' | |
| r'in-game\s+event|roadmap|preview)\b', | |
| re.IGNORECASE | |
| ) | |
| UPDATE_OR_PATCH_RE = re.compile(r'\b(update|patch)\b', re.IGNORECASE) | |
| NAMED_VERSION_RE = re.compile(r'\bV\d+\b') # case-sensitive: uppercase V only | |
| UPDATE_WORD_RE = re.compile(r'\bupdate\b', re.IGNORECASE) | |
| PATCH_WORD_RE = re.compile(r'\bpatch\b', re.IGNORECASE) | |
| MAINT_LANGUAGE_RE = re.compile( | |
| r'\b(fix(?:es|ed)?|bug\s*fix|improv(?:es?|ed|ements?)|stability|performance|tweak)\b', | |
| re.IGNORECASE | |
| ) | |
| _NEWS_MAX_PAGES = 5 # Max pages in incremental mode (5 * 5 = 25 items) | |
| class NewsCheckResult(NamedTuple): | |
| latest_update_date: datetime | None # date of most recent update-related item | |
| is_major: bool # whether any item qualifies as major | |
| major_date: datetime | None # date of most recent major item; None if not major | |
| newest_seen_gid: str | None = None # GID of newest news item (for cursor persistence) | |
| newest_seen_at: datetime | None = None # timestamp of newest news item | |
| class UpdateDetectionService: | |
| """Detects game updates via Steam News API.""" | |
| def __init__(self, client: httpx.AsyncClient | None = None) -> None: | |
| self._client = client | |
| self._owns_client = client is None | |
| async def _get_client(self) -> httpx.AsyncClient: | |
| if self._client is None: | |
| self._client = httpx.AsyncClient(timeout=15.0) | |
| return self._client | |
| async def close(self) -> None: | |
| if self._owns_client and self._client is not None: | |
| await self._client.aclose() | |
| self._client = None | |
| def _is_update_related(item: dict) -> bool: | |
| """Return True if news item is update-related. | |
| Conditions (any one is sufficient): | |
| A: 'patchnotes' in tags | |
| B: feedlabel == 'Product Update' | |
| C: title matches release-style phrases | |
| D: title matches large content update phrases | |
| E: title has a version number AND an action word | |
| """ | |
| tags = item.get("tags") | |
| if isinstance(tags, list): | |
| is_patch = "patchnotes" in tags | |
| else: | |
| is_patch = "patchnotes" in (tags or "") | |
| feedlabel = item.get("feedlabel") or "" | |
| if is_patch or feedlabel == "Product Update": | |
| return True | |
| # Conditions C/D/E: title-based signals — restricted to developer feed only. | |
| # Third-party news sites (GamingOnLinux etc.) can write about updates using | |
| # the same language, so we only trust these signals from the developer's own feed. | |
| if item.get("feedname") != "steam_community_announcements": | |
| return False | |
| title = item.get("title", "") | |
| if RELEASE_PHRASE_RE.search(title): | |
| return True | |
| if CONTENT_UPDATE_RE.search(title): | |
| return True | |
| if VERSION_RE.search(title) and ACTION_WORD_RE.search(title): | |
| return True | |
| # F: named version (V70) + "update" in title (developer feed only) | |
| if NAMED_VERSION_RE.search(title) and UPDATE_WORD_RE.search(title): | |
| return True | |
| return False | |
| def _is_major_update(item: dict) -> bool: | |
| """Return True if the news item represents a major update. | |
| Negative signals (blockers) are checked first: | |
| - hotfix keyword → not major | |
| - experimental branch / public test branch → not major | |
| Positive signals (any one is sufficient): | |
| - version number in title (VERSION_RE) | |
| - release language (MAJOR_RELEASE_RE) | |
| - standalone '1.0' (ONE_ZERO_RE) | |
| - large content phrases (MAJOR_CONTENT_RE) | |
| """ | |
| title = item.get("title", "") | |
| if HOTFIX_RE.search(title): | |
| return False | |
| if BRANCH_RE.search(title): | |
| return False | |
| if EVENT_FESTIVAL_RE.search(title) and not UPDATE_OR_PATCH_RE.search(title): | |
| return False | |
| if PATCH_WORD_RE.search(title) and MAINT_LANGUAGE_RE.search(title): | |
| return False | |
| if VERSION_RE.search(title): | |
| return True | |
| if MAJOR_RELEASE_RE.search(title): | |
| return True | |
| if ONE_ZERO_RE.search(title): | |
| return True | |
| if MAJOR_CONTENT_RE.search(title): | |
| return True | |
| if NAMED_VERSION_RE.search(title) and UPDATE_WORD_RE.search(title): | |
| return True | |
| return False | |
| def _collect_update_candidates( | |
| news_items: list[dict], | |
| ) -> tuple[datetime | None, datetime | None]: | |
| """Scan all items, return (latest_update_date, major_date). | |
| latest_update_date: max date of all update-related items (or None) | |
| major_date: max date of major items (or None if no major found) | |
| """ | |
| latest_update_ts: int | None = None | |
| major_ts: int | None = None | |
| for item in news_items: | |
| if not UpdateDetectionService._is_update_related(item): | |
| continue | |
| ts = item.get("date") or 0 | |
| if not ts: | |
| continue | |
| if latest_update_ts is None or ts > latest_update_ts: | |
| latest_update_ts = ts | |
| if UpdateDetectionService._is_major_update(item): | |
| if major_ts is None or ts > major_ts: | |
| major_ts = ts | |
| latest_update_date = ( | |
| datetime.fromtimestamp(latest_update_ts, tz=timezone.utc) | |
| if latest_update_ts is not None | |
| else None | |
| ) | |
| major_date = ( | |
| datetime.fromtimestamp(major_ts, tz=timezone.utc) | |
| if major_ts is not None | |
| else None | |
| ) | |
| return latest_update_date, major_date | |
| async def _fetch_news_page( | |
| client: httpx.AsyncClient, | |
| app_id: str, | |
| count: int, | |
| enddate: int | None = None, | |
| ) -> list[dict]: | |
| """Fetch a single page of news items from Steam API. | |
| Returns [] on HTTP error or request failure. | |
| """ | |
| params: dict[str, Any] = { | |
| "appid": app_id, | |
| "count": count, | |
| "maxlength": 0, | |
| } | |
| if enddate is not None: | |
| params["enddate"] = enddate | |
| try: | |
| resp = await client.get(STEAM_NEWS_API_URL, params=params) | |
| if resp.status_code != 200: | |
| return [] | |
| data = resp.json() | |
| return data.get("appnews", {}).get("newsitems", []) | |
| except (httpx.RequestError, ValueError, KeyError) as e: | |
| logger.debug(f"News page fetch failed for {app_id}: {e}") | |
| return [] | |
| def _scan_batch_with_stopping( | |
| items: list[dict], | |
| last_seen_gid: str | None, | |
| last_seen_at_ts: int | None, | |
| refresh_cutoff_ts: int | None, | |
| ) -> tuple[list[dict], bool]: | |
| """Scan items (newest→oldest), collecting until a stop condition is met. | |
| Stop conditions (item is NOT included): | |
| - gid matches last_seen_gid | |
| - item date <= last_seen_at_ts | |
| - item date < refresh_cutoff_ts | |
| Returns (accepted_items, hit_stop). | |
| """ | |
| accepted: list[dict] = [] | |
| for item in items: | |
| gid = str(item.get("gid", "")) | |
| ts = item.get("date") or 0 | |
| if last_seen_gid and gid and gid == last_seen_gid: | |
| return accepted, True | |
| if last_seen_at_ts is not None and ts and ts <= last_seen_at_ts: | |
| return accepted, True | |
| if refresh_cutoff_ts is not None and ts and ts < refresh_cutoff_ts: | |
| return accepted, True | |
| accepted.append(item) | |
| return accepted, False | |
| async def _get_latest_news_date( | |
| self, | |
| app_id: str, | |
| last_seen_gid: str | None = None, | |
| last_seen_at: datetime | None = None, | |
| ) -> NewsCheckResult: | |
| """Fetch and scan Steam news for update candidates. | |
| In initial mode (no cursor): fetches count=20, single page. | |
| In incremental mode (cursor present): fetches count=5 with pagination, | |
| stopping at the known cursor or the refresh window boundary. | |
| """ | |
| client = await self._get_client() | |
| is_incremental = last_seen_gid is not None or last_seen_at is not None | |
| count = settings.news_incremental_count if is_incremental else settings.news_initial_count | |
| # Compute stop thresholds for incremental mode | |
| last_seen_at_ts: int | None = None | |
| refresh_cutoff_ts: int | None = None | |
| if is_incremental: | |
| last_seen_at_ts = int(last_seen_at.timestamp()) if last_seen_at else None | |
| now_ts = int(datetime.now(timezone.utc).timestamp()) | |
| cutoff_ts = now_ts - (settings.news_refresh_window_hours * 3600) | |
| # If cursor is older than the refresh window (worker was down), | |
| # disable the time cutoff and scan to the cursor instead. | |
| # _NEWS_MAX_PAGES protects against unbounded pagination. | |
| if last_seen_at_ts is not None and last_seen_at_ts < cutoff_ts: | |
| refresh_cutoff_ts = None | |
| else: | |
| refresh_cutoff_ts = cutoff_ts | |
| all_accepted: list[dict] = [] | |
| newest_gid: str | None = None | |
| newest_ts: int = 0 | |
| scan_complete = False | |
| pages_fetched = 0 | |
| enddate: int | None = None | |
| while True: | |
| items = await self._fetch_news_page(client, app_id, count, enddate) | |
| if not items: | |
| if pages_fetched == 0: | |
| # First page empty (no news or HTTP error) — newest_gid stays None | |
| pass | |
| # Pagination page empty → incomplete scan → don't update cursor | |
| break | |
| pages_fetched += 1 | |
| # Track newest item (from first page only) | |
| if newest_gid is None: | |
| for item in items: | |
| gid = str(item.get("gid", "")) | |
| ts = item.get("date") or 0 | |
| if gid and ts: | |
| newest_gid = gid | |
| newest_ts = ts | |
| break | |
| if is_incremental: | |
| accepted, hit_stop = self._scan_batch_with_stopping( | |
| items, last_seen_gid, last_seen_at_ts, refresh_cutoff_ts | |
| ) | |
| all_accepted.extend(accepted) | |
| if hit_stop: | |
| scan_complete = True | |
| break | |
| if len(items) < count: | |
| scan_complete = True # API has no more items | |
| break | |
| if pages_fetched >= _NEWS_MAX_PAGES: | |
| scan_complete = True # page limit reached | |
| break | |
| oldest_ts = items[-1].get("date") or 0 | |
| if not oldest_ts: | |
| break # can't paginate → incomplete scan | |
| enddate = oldest_ts - 1 | |
| else: | |
| # Initial mode: single fetch, always clean | |
| all_accepted.extend(items) | |
| scan_complete = True | |
| break | |
| latest_update_date, major_date = self._collect_update_candidates(all_accepted) | |
| cursor_gid: str | None = None | |
| cursor_at: datetime | None = None | |
| if scan_complete and newest_gid: | |
| cursor_gid = newest_gid | |
| cursor_at = datetime.fromtimestamp(newest_ts, tz=timezone.utc) | |
| if latest_update_date is None: | |
| return NewsCheckResult( | |
| None, False, None, | |
| newest_seen_gid=cursor_gid, | |
| newest_seen_at=cursor_at, | |
| ) | |
| return NewsCheckResult( | |
| latest_update_date=latest_update_date, | |
| is_major=major_date is not None, | |
| major_date=major_date, | |
| newest_seen_gid=cursor_gid, | |
| newest_seen_at=cursor_at, | |
| ) | |
| async def check_for_updates( | |
| self, games: list[dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| """ | |
| Check Steam News API for each game. Return games with confirmed major updates. | |
| Non-major patchnotes update last_game_update_at but do not trigger a schedule. | |
| """ | |
| updated_games: list[dict[str, Any]] = [] | |
| dlcs_by_parent: dict[str, list[dict[str, Any]]] = {} | |
| for game in games: | |
| if game.get("app_type") == "dlc" and game.get("parent_appid"): | |
| dlcs_by_parent.setdefault(str(game["parent_appid"]), []).append(game) | |
| for game in games: | |
| app_id = str(game.get("appid", "")) | |
| if not app_id: | |
| continue | |
| if game.get("app_type") == "dlc": | |
| continue | |
| last_known = game.get("last_game_update_at") | |
| # Normalize last_known to datetime if it's a timestamp | |
| if last_known is not None and not isinstance(last_known, datetime): | |
| try: | |
| last_known = datetime.fromtimestamp(float(last_known), tz=timezone.utc) | |
| except (ValueError, TypeError): | |
| last_known = None | |
| result = await self._get_latest_news_date( | |
| app_id, | |
| last_seen_gid=game.get("last_seen_news_gid"), | |
| last_seen_at=game.get("last_seen_news_at"), | |
| ) | |
| # Persist cursor before any early-continue — even if no updates found | |
| if result.newest_seen_gid: | |
| await mongodb.update_news_cursor( | |
| app_id, result.newest_seen_gid, cast(datetime, result.newest_seen_at) | |
| ) | |
| if result.latest_update_date is None: | |
| continue | |
| if last_known is None or result.latest_update_date > last_known: | |
| await mongodb.update_game_update_date(app_id, result.latest_update_date) | |
| if result.is_major: | |
| current_patch_at = game.get("current_patch_at") | |
| patch_date = cast(datetime, result.major_date) # always not None when is_major=True | |
| if current_patch_at is None or patch_date > current_patch_at: | |
| await mongodb.update_game_patch_date(app_id, patch_date) | |
| updated_games.append({**game, "update_at": patch_date}) | |
| for dlc in dlcs_by_parent.get(app_id, []): | |
| dlc_appid = str(dlc.get("appid", "")) | |
| if not dlc_appid: | |
| continue | |
| await mongodb.update_game_patch_date(dlc_appid, patch_date) | |
| updated_games.append({**dlc, "update_at": patch_date}) | |
| logger.info( | |
| f"Update detection: {len(updated_games)}/{len(games)} games have new updates" | |
| ) | |
| return updated_games | |