sentimentstream-worker / scripts /fetch_games_to_mongodb.py
GitHub Action
deploy: worker release from GitHub
8ff1b66
#!/usr/bin/env python3
"""
Skrypt do pobierania listy gier ze SteamSpy i zapisywania do MongoDB.
Pobiera wszystkie gry dostępne w SteamSpy API (~85,000 gier)
i zapisuje je do kolekcji 'games' w MongoDB.
Uruchom z głównego katalogu projektu:
python scripts/fetch_games_to_mongodb.py
Wymaga uruchomionego MongoDB i ustawionych zmiennych środowiskowych.
"""
import asyncio
import sys
import time
from pathlib import Path
import httpx
# Dodaj backend do path
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
from app.db.mongodb import mongodb
STEAMSPY_API = "https://steamspy.com/api.php"
MAX_PAGES = 90 # ~90,000 gier
DELAY_SECONDS = 1.0 # Opóźnienie między zapytaniami
BATCH_SIZE = 1000 # Zapisuj do MongoDB co tyle gier
def fetch_page_sync(client: httpx.Client, page: int) -> list[dict]:
"""Pobiera jedną stronę gier ze SteamSpy (synchronicznie)."""
params = {"request": "all", "page": page}
try:
response = client.get(STEAMSPY_API, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if not data or not isinstance(data, dict):
return []
# Wyciągnij appid, name, developer, publisher
games = []
for info in data.values():
if not isinstance(info, dict) or not info.get("name"):
continue
game = {
"appid": str(info["appid"]),
"name": info["name"],
}
# Dodaj developer jeśli istnieje
if info.get("developer"):
game["developer"] = info["developer"]
# Dodaj publisher jeśli istnieje i różny od developer
if info.get("publisher") and info.get("publisher") != info.get("developer"):
game["publisher"] = info["publisher"]
games.append(game)
return games
except Exception as e:
print(f" Błąd na stronie {page}: {e}")
return []
async def main():
"""Główna funkcja - pobiera gry i zapisuje do MongoDB."""
print("=" * 60)
print("Pobieranie listy gier ze SteamSpy do MongoDB")
print("=" * 60)
print()
# Połącz z MongoDB
print("Łączenie z MongoDB...")
try:
await mongodb.connect()
except Exception as e:
print(f"Błąd połączenia z MongoDB: {e}")
print("Upewnij się, że MongoDB jest uruchomione i zmienne środowiskowe są ustawione.")
return
# Sprawdź czy już są gry w bazie
existing_count = await mongodb.get_games_count()
if existing_count > 0:
print(f"W bazie jest już {existing_count} gier.")
response = input("Czy usunąć istniejące i pobrać nowe? (t/n): ").strip().lower()
if response != "t":
print("Anulowano.")
await mongodb.disconnect()
return
await mongodb.clear_games()
print("Usunięto istniejące gry.")
print()
print(f"Pobieranie maksymalnie {MAX_PAGES} stron po ~1000 gier każda...")
print(f"Opóźnienie między zapytaniami: {DELAY_SECONDS}s")
print()
all_games: list[dict] = []
total_saved = 0
with httpx.Client() as client:
for page in range(MAX_PAGES):
print(f"Strona {page + 1}/{MAX_PAGES}...", end=" ", flush=True)
games = fetch_page_sync(client, page)
if not games:
print("Pusta - koniec danych")
break
all_games.extend(games)
print(f"OK ({len(games)} gier, łącznie: {len(all_games)})")
# Zapisuj do MongoDB co BATCH_SIZE gier
if len(all_games) >= BATCH_SIZE:
saved = await mongodb.save_games_batch(all_games)
total_saved += saved
all_games = []
print(f" -> Zapisano do MongoDB (łącznie: {total_saved})")
# Opóźnienie
if page < MAX_PAGES - 1:
time.sleep(DELAY_SECONDS)
# Zapisz pozostałe gry
if all_games:
saved = await mongodb.save_games_batch(all_games)
total_saved += saved
print(f" -> Zapisano pozostałe do MongoDB")
# Podsumowanie
final_count = await mongodb.get_games_count()
print()
print("=" * 60)
print(f"Zakończono!")
print(f"Gier w bazie: {final_count}")
print("=" * 60)
await mongodb.disconnect()
if __name__ == "__main__":
asyncio.run(main())