sentimentstream-worker / scripts /prewarm_cache.py
GitHub Action
deploy: worker release from GitHub
8ff1b66
#!/usr/bin/env python3
"""
Skrypt do "rozgrzewania" cache (Pre-warming).
Wykonuje pełną analizę dla zdefiniowanej listy popularnych gier,
aby podczas prezentacji wyniki były dostępne natychmiast (z cache MongoDB).
Uruchomienie:
python scripts/prewarm_cache.py
"""
import asyncio
import sys
from pathlib import Path
import json
# Dodaj backend do path
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
from app.services.steam_service import steam_service
from app.services.nlp_service import nlp_service
from app.db.mongodb import mongodb
from app.core.sampling import create_sample_plan
from app.routers.analyze import aggregate_topics
from app.models.schemas import AnalysisResult, TopicSentiment
# Lista gier do pre-analizy (Hity Steam)
TARGET_GAMES = [
"Cyberpunk 2077",
"Baldur's Gate 3",
"Elden Ring",
"Starfield",
"Hades",
"Stardew Valley"
]
async def analyze_game_headless(game_name: str):
"""Wykonuje analizę gry bez streamowania (headless)."""
print(f"\n[START] Analiza: {game_name}")
# 1. Szukaj gry
game = await steam_service.search_game(game_name)
if not game:
print(f"[ERROR] Nie znaleziono gry: {game_name}")
return
print(f" -> Znaleziono: {game.name} (ID: {game.app_id})")
# 2. Sprawdź cache
cached = await mongodb.get_cached_analysis(game.app_id)
if cached:
print(f" -> [SKIP] Wyniki już są w cache (zaktualizowane: {cached.get('timestamp', 'unknown')})")
return
# 3. Analiza
stats = await steam_service.get_review_stats(game.app_id)
sample_plan = create_sample_plan(stats.total, stats.positive, stats.negative)
print(f" -> Plan: {sample_plan.total} recenzji (Statystyczna: {sample_plan.statistical_sample})")
# Update game object
game = game.model_copy(update={"target_count": sample_plan.total})
processed = 0
total_skipped = 0
aggregated_topics: list[TopicSentiment] = []
print(" -> Pobieranie i analiza w toku...", end="", flush=True)
async for batch in steam_service.fetch_reviews_stratified(game.app_id, sample_plan):
if not batch.reviews:
continue
print(".", end="", flush=True)
batch_results, batch_skipped = await nlp_service.analyze_batch(batch.reviews)
if batch_results:
aggregated_topics = aggregate_topics(aggregated_topics, batch_results)
total_skipped += batch_skipped
processed += len(batch.reviews)
# Małe opóźnienie żeby nie zabić API HF
await asyncio.sleep(0.5)
print(" OK")
# 4. Zapisz
result = AnalysisResult(
game=game,
topics=aggregated_topics,
analyzed_reviews=processed,
skipped_count=total_skipped
)
await mongodb.save_analysis(game.app_id, result.model_dump())
print(f"[DONE] Zapisano wyniki dla {game.name}!")
async def main():
print("=" * 60)
print("CACHE PRE-WARMER - Przygotowanie do prezentacji")
print("=" * 60)
try:
await mongodb.connect()
for game_name in TARGET_GAMES:
await analyze_game_headless(game_name)
except Exception as e:
print(f"\n[FATAL ERROR] {e}")
finally:
await mongodb.disconnect()
print("\nZakończono.")
if __name__ == "__main__":
asyncio.run(main())