sentimentstream-worker / backend /app /services /update_detection_service.py
GitHub Action
deploy: worker release from GitHub
8ff1b66
"""
Update Detection Service — checks Steam News API for game updates.
Compares the latest news/patch date with the stored `last_game_update_at`
to detect games that have been recently updated.
"""
import logging
import re
from datetime import datetime, timezone
from typing import Any, NamedTuple, cast
import httpx
from app.core.config import settings
from app.db.mongodb import mongodb
logger = logging.getLogger(__name__)
STEAM_NEWS_API_URL = "https://api.steampowered.com/ISteamNews/GetNewsForApp/v2/"
# Matches two-segment versions: 1.2, v2.0, 0.6, 123.4
# Excludes three-segment (0.6.1) via negative lookahead, 4-digit years via \d{1,3},
# and sub-segments of longer versions (e.g. "6.1" within "0.6.1") via lookbehind.
VERSION_RE = re.compile(r'(?<!\d\.)\bv?\d{1,3}\.\d+\b(?!\.\d)')
# Phase 1 regex constants
RELEASE_PHRASE_RE = re.compile(
r'\b(out now|is out|is live|now live|now available|full release|'
r'leaving early access|out of early access)\b',
re.IGNORECASE
)
CONTENT_UPDATE_RE = re.compile(
r'\b(major update|content update|big update|biggest update)\b',
re.IGNORECASE
)
ACTION_WORD_RE = re.compile(
r'\b(update|patch|release|available|launch|live|out)\b',
re.IGNORECASE
)
HOTFIX_RE = re.compile(r'\b(hotfix|hot.?fix)\b', re.IGNORECASE)
BRANCH_RE = re.compile(
r'\b(experimental branch|experimental.{0,10}patch|experimental.{0,10}build|'
r'public.?test|pts build|beta branch|'
r'on experimental|for experimental)\b',
re.IGNORECASE
)
MAJOR_RELEASE_RE = re.compile(
r'\b(out now|is out|is live|now live|now available|full release|'
r'leaving early access|out of early access)\b',
re.IGNORECASE
)
MAJOR_CONTENT_RE = re.compile(
r'\b(major update|content update|big update|biggest update)\b',
re.IGNORECASE
)
ONE_ZERO_RE = re.compile(r'\b1\.0\b(?!\.\d)')
# Phase 2 regex constants
EVENT_FESTIVAL_RE = re.compile(
r'\b(festival|anniversary\s+event|community\s+event|'
r'in-game\s+event|roadmap|preview)\b',
re.IGNORECASE
)
UPDATE_OR_PATCH_RE = re.compile(r'\b(update|patch)\b', re.IGNORECASE)
NAMED_VERSION_RE = re.compile(r'\bV\d+\b') # case-sensitive: uppercase V only
UPDATE_WORD_RE = re.compile(r'\bupdate\b', re.IGNORECASE)
PATCH_WORD_RE = re.compile(r'\bpatch\b', re.IGNORECASE)
MAINT_LANGUAGE_RE = re.compile(
r'\b(fix(?:es|ed)?|bug\s*fix|improv(?:es?|ed|ements?)|stability|performance|tweak)\b',
re.IGNORECASE
)
_NEWS_MAX_PAGES = 5 # Max pages in incremental mode (5 * 5 = 25 items)
class NewsCheckResult(NamedTuple):
latest_update_date: datetime | None # date of most recent update-related item
is_major: bool # whether any item qualifies as major
major_date: datetime | None # date of most recent major item; None if not major
newest_seen_gid: str | None = None # GID of newest news item (for cursor persistence)
newest_seen_at: datetime | None = None # timestamp of newest news item
class UpdateDetectionService:
"""Detects game updates via Steam News API."""
def __init__(self, client: httpx.AsyncClient | None = None) -> None:
self._client = client
self._owns_client = client is None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=15.0)
return self._client
async def close(self) -> None:
if self._owns_client and self._client is not None:
await self._client.aclose()
self._client = None
@staticmethod
def _is_update_related(item: dict) -> bool:
"""Return True if news item is update-related.
Conditions (any one is sufficient):
A: 'patchnotes' in tags
B: feedlabel == 'Product Update'
C: title matches release-style phrases
D: title matches large content update phrases
E: title has a version number AND an action word
"""
tags = item.get("tags")
if isinstance(tags, list):
is_patch = "patchnotes" in tags
else:
is_patch = "patchnotes" in (tags or "")
feedlabel = item.get("feedlabel") or ""
if is_patch or feedlabel == "Product Update":
return True
# Conditions C/D/E: title-based signals — restricted to developer feed only.
# Third-party news sites (GamingOnLinux etc.) can write about updates using
# the same language, so we only trust these signals from the developer's own feed.
if item.get("feedname") != "steam_community_announcements":
return False
title = item.get("title", "")
if RELEASE_PHRASE_RE.search(title):
return True
if CONTENT_UPDATE_RE.search(title):
return True
if VERSION_RE.search(title) and ACTION_WORD_RE.search(title):
return True
# F: named version (V70) + "update" in title (developer feed only)
if NAMED_VERSION_RE.search(title) and UPDATE_WORD_RE.search(title):
return True
return False
@staticmethod
def _is_major_update(item: dict) -> bool:
"""Return True if the news item represents a major update.
Negative signals (blockers) are checked first:
- hotfix keyword → not major
- experimental branch / public test branch → not major
Positive signals (any one is sufficient):
- version number in title (VERSION_RE)
- release language (MAJOR_RELEASE_RE)
- standalone '1.0' (ONE_ZERO_RE)
- large content phrases (MAJOR_CONTENT_RE)
"""
title = item.get("title", "")
if HOTFIX_RE.search(title):
return False
if BRANCH_RE.search(title):
return False
if EVENT_FESTIVAL_RE.search(title) and not UPDATE_OR_PATCH_RE.search(title):
return False
if PATCH_WORD_RE.search(title) and MAINT_LANGUAGE_RE.search(title):
return False
if VERSION_RE.search(title):
return True
if MAJOR_RELEASE_RE.search(title):
return True
if ONE_ZERO_RE.search(title):
return True
if MAJOR_CONTENT_RE.search(title):
return True
if NAMED_VERSION_RE.search(title) and UPDATE_WORD_RE.search(title):
return True
return False
@staticmethod
def _collect_update_candidates(
news_items: list[dict],
) -> tuple[datetime | None, datetime | None]:
"""Scan all items, return (latest_update_date, major_date).
latest_update_date: max date of all update-related items (or None)
major_date: max date of major items (or None if no major found)
"""
latest_update_ts: int | None = None
major_ts: int | None = None
for item in news_items:
if not UpdateDetectionService._is_update_related(item):
continue
ts = item.get("date") or 0
if not ts:
continue
if latest_update_ts is None or ts > latest_update_ts:
latest_update_ts = ts
if UpdateDetectionService._is_major_update(item):
if major_ts is None or ts > major_ts:
major_ts = ts
latest_update_date = (
datetime.fromtimestamp(latest_update_ts, tz=timezone.utc)
if latest_update_ts is not None
else None
)
major_date = (
datetime.fromtimestamp(major_ts, tz=timezone.utc)
if major_ts is not None
else None
)
return latest_update_date, major_date
@staticmethod
async def _fetch_news_page(
client: httpx.AsyncClient,
app_id: str,
count: int,
enddate: int | None = None,
) -> list[dict]:
"""Fetch a single page of news items from Steam API.
Returns [] on HTTP error or request failure.
"""
params: dict[str, Any] = {
"appid": app_id,
"count": count,
"maxlength": 0,
}
if enddate is not None:
params["enddate"] = enddate
try:
resp = await client.get(STEAM_NEWS_API_URL, params=params)
if resp.status_code != 200:
return []
data = resp.json()
return data.get("appnews", {}).get("newsitems", [])
except (httpx.RequestError, ValueError, KeyError) as e:
logger.debug(f"News page fetch failed for {app_id}: {e}")
return []
@staticmethod
def _scan_batch_with_stopping(
items: list[dict],
last_seen_gid: str | None,
last_seen_at_ts: int | None,
refresh_cutoff_ts: int | None,
) -> tuple[list[dict], bool]:
"""Scan items (newest→oldest), collecting until a stop condition is met.
Stop conditions (item is NOT included):
- gid matches last_seen_gid
- item date <= last_seen_at_ts
- item date < refresh_cutoff_ts
Returns (accepted_items, hit_stop).
"""
accepted: list[dict] = []
for item in items:
gid = str(item.get("gid", ""))
ts = item.get("date") or 0
if last_seen_gid and gid and gid == last_seen_gid:
return accepted, True
if last_seen_at_ts is not None and ts and ts <= last_seen_at_ts:
return accepted, True
if refresh_cutoff_ts is not None and ts and ts < refresh_cutoff_ts:
return accepted, True
accepted.append(item)
return accepted, False
async def _get_latest_news_date(
self,
app_id: str,
last_seen_gid: str | None = None,
last_seen_at: datetime | None = None,
) -> NewsCheckResult:
"""Fetch and scan Steam news for update candidates.
In initial mode (no cursor): fetches count=20, single page.
In incremental mode (cursor present): fetches count=5 with pagination,
stopping at the known cursor or the refresh window boundary.
"""
client = await self._get_client()
is_incremental = last_seen_gid is not None or last_seen_at is not None
count = settings.news_incremental_count if is_incremental else settings.news_initial_count
# Compute stop thresholds for incremental mode
last_seen_at_ts: int | None = None
refresh_cutoff_ts: int | None = None
if is_incremental:
last_seen_at_ts = int(last_seen_at.timestamp()) if last_seen_at else None
now_ts = int(datetime.now(timezone.utc).timestamp())
cutoff_ts = now_ts - (settings.news_refresh_window_hours * 3600)
# If cursor is older than the refresh window (worker was down),
# disable the time cutoff and scan to the cursor instead.
# _NEWS_MAX_PAGES protects against unbounded pagination.
if last_seen_at_ts is not None and last_seen_at_ts < cutoff_ts:
refresh_cutoff_ts = None
else:
refresh_cutoff_ts = cutoff_ts
all_accepted: list[dict] = []
newest_gid: str | None = None
newest_ts: int = 0
scan_complete = False
pages_fetched = 0
enddate: int | None = None
while True:
items = await self._fetch_news_page(client, app_id, count, enddate)
if not items:
if pages_fetched == 0:
# First page empty (no news or HTTP error) — newest_gid stays None
pass
# Pagination page empty → incomplete scan → don't update cursor
break
pages_fetched += 1
# Track newest item (from first page only)
if newest_gid is None:
for item in items:
gid = str(item.get("gid", ""))
ts = item.get("date") or 0
if gid and ts:
newest_gid = gid
newest_ts = ts
break
if is_incremental:
accepted, hit_stop = self._scan_batch_with_stopping(
items, last_seen_gid, last_seen_at_ts, refresh_cutoff_ts
)
all_accepted.extend(accepted)
if hit_stop:
scan_complete = True
break
if len(items) < count:
scan_complete = True # API has no more items
break
if pages_fetched >= _NEWS_MAX_PAGES:
scan_complete = True # page limit reached
break
oldest_ts = items[-1].get("date") or 0
if not oldest_ts:
break # can't paginate → incomplete scan
enddate = oldest_ts - 1
else:
# Initial mode: single fetch, always clean
all_accepted.extend(items)
scan_complete = True
break
latest_update_date, major_date = self._collect_update_candidates(all_accepted)
cursor_gid: str | None = None
cursor_at: datetime | None = None
if scan_complete and newest_gid:
cursor_gid = newest_gid
cursor_at = datetime.fromtimestamp(newest_ts, tz=timezone.utc)
if latest_update_date is None:
return NewsCheckResult(
None, False, None,
newest_seen_gid=cursor_gid,
newest_seen_at=cursor_at,
)
return NewsCheckResult(
latest_update_date=latest_update_date,
is_major=major_date is not None,
major_date=major_date,
newest_seen_gid=cursor_gid,
newest_seen_at=cursor_at,
)
async def check_for_updates(
self, games: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""
Check Steam News API for each game. Return games with confirmed major updates.
Non-major patchnotes update last_game_update_at but do not trigger a schedule.
"""
updated_games: list[dict[str, Any]] = []
dlcs_by_parent: dict[str, list[dict[str, Any]]] = {}
for game in games:
if game.get("app_type") == "dlc" and game.get("parent_appid"):
dlcs_by_parent.setdefault(str(game["parent_appid"]), []).append(game)
for game in games:
app_id = str(game.get("appid", ""))
if not app_id:
continue
if game.get("app_type") == "dlc":
continue
last_known = game.get("last_game_update_at")
# Normalize last_known to datetime if it's a timestamp
if last_known is not None and not isinstance(last_known, datetime):
try:
last_known = datetime.fromtimestamp(float(last_known), tz=timezone.utc)
except (ValueError, TypeError):
last_known = None
result = await self._get_latest_news_date(
app_id,
last_seen_gid=game.get("last_seen_news_gid"),
last_seen_at=game.get("last_seen_news_at"),
)
# Persist cursor before any early-continue — even if no updates found
if result.newest_seen_gid:
await mongodb.update_news_cursor(
app_id, result.newest_seen_gid, cast(datetime, result.newest_seen_at)
)
if result.latest_update_date is None:
continue
if last_known is None or result.latest_update_date > last_known:
await mongodb.update_game_update_date(app_id, result.latest_update_date)
if result.is_major:
current_patch_at = game.get("current_patch_at")
patch_date = cast(datetime, result.major_date) # always not None when is_major=True
if current_patch_at is None or patch_date > current_patch_at:
await mongodb.update_game_patch_date(app_id, patch_date)
updated_games.append({**game, "update_at": patch_date})
for dlc in dlcs_by_parent.get(app_id, []):
dlc_appid = str(dlc.get("appid", ""))
if not dlc_appid:
continue
await mongodb.update_game_patch_date(dlc_appid, patch_date)
updated_games.append({**dlc, "update_at": patch_date})
logger.info(
f"Update detection: {len(updated_games)}/{len(games)} games have new updates"
)
return updated_games