Spaces:
Running
Running
File size: 4,459 Bytes
8ff1b66 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | #!/usr/bin/env python3
"""
Skrypt do pobierania listy gier ze SteamSpy i zapisywania do MongoDB.
Pobiera wszystkie gry dostępne w SteamSpy API (~85,000 gier)
i zapisuje je do kolekcji 'games' w MongoDB.
Uruchom z głównego katalogu projektu:
python scripts/fetch_games_to_mongodb.py
Wymaga uruchomionego MongoDB i ustawionych zmiennych środowiskowych.
"""
import asyncio
import sys
import time
from pathlib import Path
import httpx
# Dodaj backend do path
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
from app.db.mongodb import mongodb
STEAMSPY_API = "https://steamspy.com/api.php"
MAX_PAGES = 90 # ~90,000 gier
DELAY_SECONDS = 1.0 # Opóźnienie między zapytaniami
BATCH_SIZE = 1000 # Zapisuj do MongoDB co tyle gier
def fetch_page_sync(client: httpx.Client, page: int) -> list[dict]:
"""Pobiera jedną stronę gier ze SteamSpy (synchronicznie)."""
params = {"request": "all", "page": page}
try:
response = client.get(STEAMSPY_API, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if not data or not isinstance(data, dict):
return []
# Wyciągnij appid, name, developer, publisher
games = []
for info in data.values():
if not isinstance(info, dict) or not info.get("name"):
continue
game = {
"appid": str(info["appid"]),
"name": info["name"],
}
# Dodaj developer jeśli istnieje
if info.get("developer"):
game["developer"] = info["developer"]
# Dodaj publisher jeśli istnieje i różny od developer
if info.get("publisher") and info.get("publisher") != info.get("developer"):
game["publisher"] = info["publisher"]
games.append(game)
return games
except Exception as e:
print(f" Błąd na stronie {page}: {e}")
return []
async def main():
"""Główna funkcja - pobiera gry i zapisuje do MongoDB."""
print("=" * 60)
print("Pobieranie listy gier ze SteamSpy do MongoDB")
print("=" * 60)
print()
# Połącz z MongoDB
print("Łączenie z MongoDB...")
try:
await mongodb.connect()
except Exception as e:
print(f"Błąd połączenia z MongoDB: {e}")
print("Upewnij się, że MongoDB jest uruchomione i zmienne środowiskowe są ustawione.")
return
# Sprawdź czy już są gry w bazie
existing_count = await mongodb.get_games_count()
if existing_count > 0:
print(f"W bazie jest już {existing_count} gier.")
response = input("Czy usunąć istniejące i pobrać nowe? (t/n): ").strip().lower()
if response != "t":
print("Anulowano.")
await mongodb.disconnect()
return
await mongodb.clear_games()
print("Usunięto istniejące gry.")
print()
print(f"Pobieranie maksymalnie {MAX_PAGES} stron po ~1000 gier każda...")
print(f"Opóźnienie między zapytaniami: {DELAY_SECONDS}s")
print()
all_games: list[dict] = []
total_saved = 0
with httpx.Client() as client:
for page in range(MAX_PAGES):
print(f"Strona {page + 1}/{MAX_PAGES}...", end=" ", flush=True)
games = fetch_page_sync(client, page)
if not games:
print("Pusta - koniec danych")
break
all_games.extend(games)
print(f"OK ({len(games)} gier, łącznie: {len(all_games)})")
# Zapisuj do MongoDB co BATCH_SIZE gier
if len(all_games) >= BATCH_SIZE:
saved = await mongodb.save_games_batch(all_games)
total_saved += saved
all_games = []
print(f" -> Zapisano do MongoDB (łącznie: {total_saved})")
# Opóźnienie
if page < MAX_PAGES - 1:
time.sleep(DELAY_SECONDS)
# Zapisz pozostałe gry
if all_games:
saved = await mongodb.save_games_batch(all_games)
total_saved += saved
print(f" -> Zapisano pozostałe do MongoDB")
# Podsumowanie
final_count = await mongodb.get_games_count()
print()
print("=" * 60)
print(f"Zakończono!")
print(f"Gier w bazie: {final_count}")
print("=" * 60)
await mongodb.disconnect()
if __name__ == "__main__":
asyncio.run(main())
|