Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Skrypt do "rozgrzewania" cache (Pre-warming). | |
| Wykonuje pełną analizę dla zdefiniowanej listy popularnych gier, | |
| aby podczas prezentacji wyniki były dostępne natychmiast (z cache MongoDB). | |
| Uruchomienie: | |
| python scripts/prewarm_cache.py | |
| """ | |
| import asyncio | |
| import sys | |
| from pathlib import Path | |
| import json | |
| # Dodaj backend do path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "backend")) | |
| from app.services.steam_service import steam_service | |
| from app.services.nlp_service import nlp_service | |
| from app.db.mongodb import mongodb | |
| from app.core.sampling import create_sample_plan | |
| from app.routers.analyze import aggregate_topics | |
| from app.models.schemas import AnalysisResult, TopicSentiment | |
| # Lista gier do pre-analizy (Hity Steam) | |
| TARGET_GAMES = [ | |
| "Cyberpunk 2077", | |
| "Baldur's Gate 3", | |
| "Elden Ring", | |
| "Starfield", | |
| "Hades", | |
| "Stardew Valley" | |
| ] | |
| async def analyze_game_headless(game_name: str): | |
| """Wykonuje analizę gry bez streamowania (headless).""" | |
| print(f"\n[START] Analiza: {game_name}") | |
| # 1. Szukaj gry | |
| game = await steam_service.search_game(game_name) | |
| if not game: | |
| print(f"[ERROR] Nie znaleziono gry: {game_name}") | |
| return | |
| print(f" -> Znaleziono: {game.name} (ID: {game.app_id})") | |
| # 2. Sprawdź cache | |
| cached = await mongodb.get_cached_analysis(game.app_id) | |
| if cached: | |
| print(f" -> [SKIP] Wyniki już są w cache (zaktualizowane: {cached.get('timestamp', 'unknown')})") | |
| return | |
| # 3. Analiza | |
| stats = await steam_service.get_review_stats(game.app_id) | |
| sample_plan = create_sample_plan(stats.total, stats.positive, stats.negative) | |
| print(f" -> Plan: {sample_plan.total} recenzji (Statystyczna: {sample_plan.statistical_sample})") | |
| # Update game object | |
| game = game.model_copy(update={"target_count": sample_plan.total}) | |
| processed = 0 | |
| total_skipped = 0 | |
| aggregated_topics: list[TopicSentiment] = [] | |
| print(" -> Pobieranie i analiza w toku...", end="", flush=True) | |
| async for batch in steam_service.fetch_reviews_stratified(game.app_id, sample_plan): | |
| if not batch.reviews: | |
| continue | |
| print(".", end="", flush=True) | |
| batch_results, batch_skipped = await nlp_service.analyze_batch(batch.reviews) | |
| if batch_results: | |
| aggregated_topics = aggregate_topics(aggregated_topics, batch_results) | |
| total_skipped += batch_skipped | |
| processed += len(batch.reviews) | |
| # Małe opóźnienie żeby nie zabić API HF | |
| await asyncio.sleep(0.5) | |
| print(" OK") | |
| # 4. Zapisz | |
| result = AnalysisResult( | |
| game=game, | |
| topics=aggregated_topics, | |
| analyzed_reviews=processed, | |
| skipped_count=total_skipped | |
| ) | |
| await mongodb.save_analysis(game.app_id, result.model_dump()) | |
| print(f"[DONE] Zapisano wyniki dla {game.name}!") | |
| async def main(): | |
| print("=" * 60) | |
| print("CACHE PRE-WARMER - Przygotowanie do prezentacji") | |
| print("=" * 60) | |
| try: | |
| await mongodb.connect() | |
| for game_name in TARGET_GAMES: | |
| await analyze_game_headless(game_name) | |
| except Exception as e: | |
| print(f"\n[FATAL ERROR] {e}") | |
| finally: | |
| await mongodb.disconnect() | |
| print("\nZakończono.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |