GitHub Action
deploy: worker release from GitHub
065cebf
"""
Serwis do komunikacji ze Steam API.
Odpowiada za pobieranie informacji o grach oraz recenzji.
Wykorzystuje publiczne API Steam (nie wymaga klucza API).
Implementuje statystyczne próbkowanie recenzji (stratified sampling).
Retry z exponential backoff dla 429/5xx/timeout.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Any, AsyncGenerator
import httpx
from app.core.config import settings
from app.core.sampling import SamplePlan, create_sample_plan
from app.db.mongodb import mongodb
from app.models.schemas import GameInfo, ReviewBatch, ReviewItem
from app.services.steam_errors import SteamAPIError, SteamRateLimitError
logger = logging.getLogger(__name__)
# Status codes that should be retried
_RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
@dataclass
class ReviewStats:
"""Statystyki recenzji gry."""
total: int
positive: int
negative: int
class SteamService:
"""
Serwis do pobierania danych ze Steam API.
"""
STORE_API_URL = "https://store.steampowered.com/api"
REVIEW_API_URL = "https://store.steampowered.com/appreviews"
SEARCH_API_URL = "https://store.steampowered.com/api/storesearch"
def __init__(self, timeout: float = 30.0) -> None:
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=self.timeout)
async def close(self) -> None:
"""Close the shared HTTP client."""
await self.client.aclose()
@staticmethod
def _review_dedup_key(review_item: ReviewItem) -> str:
"""Prefer Steam's stable review id; fall back to timestamp+text for malformed rows."""
if review_item.recommendation_id:
return f"id:{review_item.recommendation_id}"
return f"fallback:{review_item.timestamp_created}:{review_item.text}"
def _dedupe_review_batch(
self,
review_items: list[ReviewItem],
seen_review_keys: set[str],
) -> tuple[list[str], list[ReviewItem]]:
deduped_items: list[ReviewItem] = []
deduped_reviews: list[str] = []
for review_item in review_items:
dedup_key = self._review_dedup_key(review_item)
if dedup_key in seen_review_keys:
continue
seen_review_keys.add(dedup_key)
deduped_items.append(review_item)
deduped_reviews.append(review_item.text)
return deduped_reviews, deduped_items
async def _request_with_retry(
self,
client: httpx.AsyncClient,
url: str,
params: dict[str, Any],
context: str = "",
) -> httpx.Response:
"""
Wykonuje request z retry i exponential backoff.
"""
max_attempts = settings.steam_retry_max_attempts
base_delay = settings.steam_retry_base_delay
max_delay = settings.steam_retry_max_delay
last_exception: Exception | None = None
for attempt in range(max_attempts):
try:
response = await client.get(url, params=params)
status = response.status_code
if status == 200:
return response
# Non-retryable client errors
if status == 404:
raise SteamAPIError(404, context, f"Not found: {url}")
if status == 403:
raise SteamAPIError(403, context, f"Forbidden: {url}")
if 400 <= status < 500 and status not in _RETRYABLE_STATUS_CODES:
raise SteamAPIError(status, context, f"Client error {status}: {url}")
# Retryable errors (429, 5xx)
if attempt < max_attempts - 1:
delay = min(base_delay * (2 ** attempt), max_delay)
# Respect Retry-After header for 429
if status == 429:
retry_after = response.headers.get("Retry-After")
if retry_after:
try:
delay = min(float(retry_after), max_delay)
except ValueError:
pass
logger.warning(
f"Steam API {status} for {context}, "
f"retry {attempt + 1}/{max_attempts - 1} after {delay:.1f}s"
)
await asyncio.sleep(delay)
else:
# Exhausted retries
if status == 429:
raise SteamRateLimitError(context)
raise SteamAPIError(status, context, f"Server error {status} after {max_attempts} attempts: {url}")
except (httpx.TimeoutException, httpx.ConnectError) as e:
last_exception = e
if attempt < max_attempts - 1:
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
f"Steam API {type(e).__name__} for {context}, "
f"retry {attempt + 1}/{max_attempts - 1} after {delay:.1f}s"
)
await asyncio.sleep(delay)
else:
raise SteamAPIError(
0, context,
f"Connection failed after {max_attempts} attempts: {e}"
) from e
# Should not reach here, but just in case
raise SteamAPIError(0, context, "Unexpected retry exhaustion") from last_exception
async def search_game(self, query: str) -> GameInfo | None:
"""Wyszukuje grę po nazwie używając publicznego API wyszukiwarki Steam."""
client = self.client
search_locales = [
(settings.steam_region, settings.steam_review_language),
("US", "english"),
]
items: list[dict[str, Any]] = []
seen_locales: set[tuple[str, str]] = set()
for region, language in search_locales:
locale = (region, language)
if locale in seen_locales:
continue
seen_locales.add(locale)
params = {
"term": query,
"l": language,
"cc": region,
}
try:
response = await self._request_with_retry(
client, self.SEARCH_API_URL, params, context=f"search:{query}"
)
data = response.json()
except (SteamAPIError, SteamRateLimitError) as e:
logger.error(f"Błąd wyszukiwania gry '{query}': {e}")
return None
items = data.get("items", [])
if items:
break
if not items:
logger.warning(f"Nie znaleziono gry: {query}")
return None
first_result = items[0]
app_id = str(first_result.get("id"))
game_info = await self.get_game_info(app_id)
if game_info:
await mongodb.upsert_game({
"appid": game_info.app_id,
"name": game_info.name,
"name_cn": game_info.name_cn,
"cn_name_checked": True,
"header_image": game_info.header_image,
"total_reviews": game_info.total_reviews
})
return game_info
async def get_game_info(self, app_id: str) -> GameInfo | None:
"""Pobiera szczegółowe metadane gry (obrazek, nazwę) z appdetails."""
cached_error = await mongodb.get_steam_error(app_id)
if cached_error:
logger.info(
f"Skipping Steam API for app {app_id} — "
f"cached error {cached_error.get('status_code')}"
)
return None
client = self.client
details_url = f"{self.STORE_API_URL}/appdetails"
async def fetch_localized(lang: str):
try:
params = {"appids": app_id, "l": lang, "cc": settings.steam_region}
resp = await self._request_with_retry(
client, details_url, params, context=app_id
)
return resp.json().get(app_id, {})
except SteamAPIError as e:
if e.status_code == 404:
await mongodb.cache_steam_error(
app_id, 404, settings.steam_error_cache_ttl_404
)
return {}
data_zh, data_en = await asyncio.gather(
fetch_localized("schinese"),
fetch_localized("english")
)
if not data_en.get("success") and not data_zh.get("success"):
logger.warning(f"Nie znaleziono szczegółów gry: {app_id}")
return None
base_data = data_en.get("data") or data_zh.get("data")
name_en = data_en.get("data", {}).get("name") or base_data.get("name")
name_zh = data_zh.get("data", {}).get("name")
stats = await self.get_review_stats(app_id)
return GameInfo(
app_id=app_id,
name=name_en,
name_cn=name_zh if name_zh != name_en else None,
header_image=base_data.get("header_image"),
total_reviews=stats.total,
)
async def get_review_stats(self, app_id: str) -> ReviewStats:
"""Pobiera sumaryczne statystyki recenzji potrzebne do planowania próbki."""
cached_error = await mongodb.get_steam_error(app_id)
if cached_error:
logger.info(
f"Skipping review stats for app {app_id} — "
f"cached error {cached_error.get('status_code')}"
)
return ReviewStats(total=0, positive=0, negative=0)
client = self.client
url = f"{self.REVIEW_API_URL}/{app_id}"
params = {
"json": "1",
"filter": "all",
"num_per_page": "0",
}
try:
response = await self._request_with_retry(
client, url, params, context=app_id
)
data = response.json()
summary = data.get("query_summary", {})
return ReviewStats(
total=summary.get("total_reviews", 0),
positive=summary.get("total_positive", 0),
negative=summary.get("total_negative", 0),
)
except SteamAPIError as e:
if e.status_code in (404, 429):
ttl = (
settings.steam_error_cache_ttl_429
if e.status_code == 429
else settings.steam_error_cache_ttl_404
)
await mongodb.cache_steam_error(app_id, e.status_code, ttl)
logger.error(f"Błąd pobierania statystyk recenzji: {e}")
return ReviewStats(total=0, positive=0, negative=0)
async def _fetch_reviews_batch(
self,
client: httpx.AsyncClient,
app_id: str,
review_type: str,
filter_type: str,
num_per_page: int,
cursor: str | None,
) -> tuple[list[str], list[ReviewItem], str | None]:
"""Pobiera pojedynczą paczkę recenzji (do 100 sztuk)."""
url = f"{self.REVIEW_API_URL}/{app_id}"
params: dict[str, Any] = {
"json": "1",
"filter": filter_type,
"review_type": review_type,
"language": settings.steam_review_language,
"num_per_page": str(num_per_page),
"cursor": cursor or "*",
"purchase_type": "all",
}
try:
response = await self._request_with_retry(
client, url, params, context=app_id
)
data = response.json()
except SteamRateLimitError:
await mongodb.cache_steam_error(
app_id, 429, settings.steam_error_cache_ttl_429
)
logger.error(f"Rate limited fetching reviews for {app_id}")
return [], [], None
except SteamAPIError as e:
logger.error(f"Błąd pobierania recenzji: {e}")
return [], [], None
if not data.get("success"):
return [], [], None
reviews_data = data.get("reviews", [])
review_texts: list[str] = []
review_items: list[ReviewItem] = []
for review in reviews_data:
text = review.get("review")
if not text:
continue
author = review.get("author") or {}
review_texts.append(text)
review_items.append(ReviewItem(
text=text,
recommendation_id=str(review.get("recommendationid", "")),
timestamp_created=review.get("timestamp_created", 0),
voted_up=review.get("voted_up"),
playtime_at_review_minutes=author.get("playtime_at_review"),
primarily_steam_deck=review.get("primarily_steam_deck"),
deck_playtime_at_review_minutes=author.get("deck_playtime_at_review"),
author_steamid=author.get("steamid"),
language=review.get("language"),
steam_purchase=review.get("steam_purchase"),
received_for_free=review.get("received_for_free"),
written_during_early_access=review.get("written_during_early_access"),
))
new_cursor = data.get("cursor")
return review_texts, review_items, new_cursor
async def fetch_reviews_stratified(
self,
app_id: str,
sample_plan: SamplePlan,
) -> AsyncGenerator[ReviewBatch, None]:
"""
Główna logika pobierania danych. Działa w dwóch fazach.
"""
batch_size = settings.review_batch_size
seen_review_keys: set[str] = set()
seen_cursors: set[str] = set()
client = self.client
# --- FAZA 1: TOP HELPFUL ---
cursor: str | None = "*"
fetched = 0
while fetched < sample_plan.top_helpful:
to_fetch = min(batch_size, sample_plan.top_helpful - fetched)
reviews, review_items, cursor = await self._fetch_reviews_batch(
client, app_id, "all", "all", to_fetch, cursor
)
if not reviews:
break
if cursor and cursor in seen_cursors:
logger.warning(f"Repeated cursor {cursor} for {app_id} (top_helpful). Shortfall: {sample_plan.top_helpful - fetched}")
break
if cursor:
seen_cursors.add(cursor)
new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys)
fetched += len(new_reviews)
if new_reviews:
yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor)
if not cursor or cursor == "*":
break
# --- FAZA 2a: RECENT POSITIVE ---
positive_target = sample_plan.positive_count
if positive_target > 0:
cursor = "*"
fetched = 0
seen_cursors_pos: set[str] = set()
while fetched < positive_target:
to_fetch = min(batch_size, positive_target - fetched)
# Jeśli mamy dużo duplikatów, prosimy o więcej niż pozostało do targetu (ale max batch_size)
if fetched > 0:
to_fetch = batch_size
reviews, review_items, cursor = await self._fetch_reviews_batch(
client, app_id, "positive", "recent", to_fetch, cursor or "*"
)
if not reviews:
break
if cursor and cursor in seen_cursors_pos:
logger.warning(f"Repeated cursor {cursor} for {app_id} (positive). Shortfall: {positive_target - fetched}")
break
if cursor:
seen_cursors_pos.add(cursor)
new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys)
fetched += len(new_reviews)
if new_reviews:
yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor)
if not cursor or cursor == "*":
break
# --- FAZA 2b: RECENT NEGATIVE ---
negative_target = sample_plan.negative_count
if negative_target > 0:
cursor = "*"
fetched = 0
seen_cursors_neg: set[str] = set()
while fetched < negative_target:
to_fetch = min(batch_size, negative_target - fetched)
if fetched > 0:
to_fetch = batch_size
reviews, review_items, cursor = await self._fetch_reviews_batch(
client, app_id, "negative", "recent", to_fetch, cursor or "*"
)
if not reviews:
break
if cursor and cursor in seen_cursors_neg:
logger.warning(f"Repeated cursor {cursor} for {app_id} (negative). Shortfall: {negative_target - fetched}")
break
if cursor:
seen_cursors_neg.add(cursor)
new_reviews, new_items = self._dedupe_review_batch(review_items, seen_review_keys)
fetched += len(new_reviews)
if new_reviews:
yield ReviewBatch(reviews=new_reviews, review_items=new_items, cursor=cursor)
if not cursor or cursor == "*":
break
logger.info(f"Pobrano łącznie {len(seen_review_keys)} unikalnych recenzji")
async def fetch_recent_reviews(
self,
app_id: str,
exclude_ids: set[str] | None = None,
*,
boundary_ts: int | None = None,
boundary_ids_at_ts: set[str] | None = None,
) -> list[ReviewItem]:
"""
Fetch recent reviews for incremental analysis.
"""
exclude_ids = exclude_ids or set()
boundary_ids_at_ts = boundary_ids_at_ts or set()
batch_size = settings.review_batch_size
client = self.client
cursor: str | None = "*"
seen_cursors: set[str] = set()
seen_review_ids: set[str] = set()
new_items: list[ReviewItem] = []
while True:
_, review_items, cursor = await self._fetch_reviews_batch(
client, app_id, "all", "recent", batch_size, cursor
)
if not review_items:
break
if cursor and cursor in seen_cursors:
logger.warning(f"Repeated cursor {cursor} for {app_id} (recent incremental fetch)")
break
if cursor:
seen_cursors.add(cursor)
if boundary_ts is not None and boundary_ts > 0:
crossed_boundary = False
batch_new: list[ReviewItem] = []
for review_item in review_items:
review_id = review_item.recommendation_id
if review_id and review_id in seen_review_ids:
continue
if review_id:
seen_review_ids.add(review_id)
if review_item.timestamp_created > boundary_ts:
if review_id not in exclude_ids:
batch_new.append(review_item)
continue
if review_item.timestamp_created == boundary_ts:
if (
review_id not in exclude_ids
and review_id not in boundary_ids_at_ts
):
batch_new.append(review_item)
continue
crossed_boundary = True
break
new_items.extend(batch_new)
if crossed_boundary or not cursor or cursor == "*":
break
continue
# Filter out already-known reviews
batch_new = [ri for ri in review_items if ri.recommendation_id not in exclude_ids]
# Early exit: if >80% of batch is known, we've passed the boundary
known_ratio = 1 - (len(batch_new) / len(review_items)) if review_items else 0
new_items.extend(batch_new)
if known_ratio > 0.8:
logger.info(
f"Early exit for {app_id}: {known_ratio:.0%} of batch already known"
)
break
if not cursor or cursor == "*":
break
logger.info(f"Incremental fetch for {app_id}: {len(new_items)} new reviews")
return new_items
async def fetch_recent_reviews_since(
self,
app_id: str,
since_timestamp: int,
) -> list[ReviewItem]:
"""
Fetch all recent reviews whose timestamp falls inside the requested time window.
Steam's `filter=recent` endpoint is ordered newest-first, so once a batch crosses
the cutoff we can stop paging.
"""
batch_size = settings.review_batch_size
client = self.client
cursor: str | None = "*"
seen_cursors: set[str] = set()
seen_review_ids: set[str] = set()
window_items: list[ReviewItem] = []
while True:
_, review_items, cursor = await self._fetch_reviews_batch(
client, app_id, "all", "recent", batch_size, cursor
)
if not review_items:
break
if cursor and cursor in seen_cursors:
logger.warning(f"Repeated cursor {cursor} for {app_id} (recent window fetch)")
break
if cursor:
seen_cursors.add(cursor)
crossed_window_boundary = False
for item in review_items:
review_id = item.recommendation_id
if review_id and review_id in seen_review_ids:
continue
if review_id:
seen_review_ids.add(review_id)
if item.timestamp_created >= since_timestamp:
window_items.append(item)
else:
crossed_window_boundary = True
if crossed_window_boundary or not cursor or cursor == "*":
break
logger.info(f"Recent-window fetch for {app_id}: {len(window_items)} reviews since {since_timestamp}")
return window_items
async def fetch_reviews(
self,
app_id: str,
batch_size: int | None = None,
max_reviews: int | None = None,
) -> AsyncGenerator[ReviewBatch, None]:
"""Wrapper dla zachowania kompatybilności."""
stats = await self.get_review_stats(app_id)
if stats.total == 0:
return
sample_plan = create_sample_plan(stats.total, stats.positive, stats.negative)
async for batch in self.fetch_reviews_stratified(app_id, sample_plan):
yield batch
# Globalna instancja serwisu (Singleton)
steam_service = SteamService()