Spaces:
Running
Running
| """ | |
| Smoke Test — local verification of worker cycle and analysis pipeline. | |
| Usage: | |
| cd backend | |
| python scripts/smoke_test.py analyze <appid> # run full analysis for a game | |
| python scripts/smoke_test.py cycle # mini worker cycle (1 game) | |
| """ | |
| import argparse | |
| import asyncio | |
| import logging | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| # Ensure backend/app is importable | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| from app.core.config import settings # noqa: E402 | |
| from app.db.mongodb import mongodb # noqa: E402 | |
| from app.services.nlp_service import NLPService # noqa: E402 | |
| from app.services.steam_service import SteamService # noqa: E402 | |
| from app.services.update_detection_service import UpdateDetectionService # noqa: E402 | |
| from app.services.precache_service import PreCacheService # noqa: E402 | |
| from app.services.analysis_runner import run_full_analysis # noqa: E402 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger("smoke_test") | |
| def _ts() -> str: | |
| return datetime.now(timezone.utc).strftime("%H:%M:%S") | |
| def _print(status: str, msg: str) -> None: | |
| tag = {"OK": "\033[32mOK\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m", "INFO": "\033[36mINFO\033[0m"} | |
| print(f"[{_ts()}] [{tag.get(status, status)}] {msg}") | |
| # ── analyze subcommand ────────────────────────────────────────────── | |
| async def cmd_analyze(app_id: str) -> None: | |
| _print("INFO", f"Starting analysis for app_id={app_id}") | |
| _print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}") | |
| await mongodb.connect() | |
| steam_svc = SteamService() | |
| nlp_svc = NLPService() | |
| try: | |
| t0 = time.monotonic() | |
| result = await run_full_analysis(app_id, f"smoke-{app_id}", steam_svc, nlp_svc) | |
| elapsed = time.monotonic() - t0 | |
| if result is None: | |
| _print("FAIL", "run_full_analysis returned None") | |
| return | |
| game = result.get("game", {}) | |
| topics = result.get("topics", []) | |
| analyzed = result.get("analyzed_reviews", 0) | |
| highlights = result.get("general_highlights", []) | |
| _print("OK", f"Analysis complete in {elapsed:.1f}s") | |
| _print("OK", f" Game: {game.get('name', '?')} (appid {game.get('app_id', '?')})") | |
| _print("OK", f" Reviews analyzed: {analyzed}") | |
| _print("OK", f" Topics found: {len(topics)}") | |
| _print("OK", f" General highlights: {len(highlights)}") | |
| # Verify cache write | |
| cached = await mongodb.get_cached_analysis(app_id) | |
| if cached: | |
| _print("OK", " Cache write verified — document found in MongoDB") | |
| else: | |
| _print("FAIL", " Cache write verification FAILED — no document in MongoDB") | |
| finally: | |
| await steam_svc.close() | |
| await mongodb.disconnect() | |
| # ── cycle subcommand ───────────────────────────────────────────────── | |
| async def cmd_cycle() -> None: | |
| _print("INFO", "Starting mini worker cycle") | |
| _print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}") | |
| await mongodb.connect() | |
| steam_svc = SteamService() | |
| nlp_svc = NLPService() | |
| update_svc = UpdateDetectionService() | |
| try: | |
| # Step 1: Get top 1 game | |
| _print("INFO", "Step 1: Fetching top game by reviews...") | |
| top_games = await mongodb.get_top_games_by_reviews(1) | |
| if not top_games: | |
| _print("SKIP", "No games in DB — run game sync first or use 'analyze' subcommand") | |
| return | |
| game = top_games[0] | |
| app_id = str(game.get("appid", "")) | |
| name = game.get("name", "?") | |
| _print("OK", f" Top game: {name} (appid {app_id})") | |
| # Step 2: Test datetime comparison (the bug this patch fixes) | |
| _print("INFO", "Step 2: Testing synced_at datetime comparison...") | |
| synced_at = game.get("synced_at") | |
| if synced_at: | |
| try: | |
| delta = datetime.now(timezone.utc) - synced_at | |
| hours = delta.total_seconds() / 3600 | |
| _print("OK", f" synced_at delta: {hours:.1f}h (tz={synced_at.tzinfo})") | |
| except TypeError as e: | |
| _print("FAIL", f" datetime subtraction failed: {e}") | |
| return | |
| else: | |
| _print("SKIP", " No synced_at field — game sync not run yet") | |
| # Step 3: Update detection (1 game) | |
| _print("INFO", "Step 3: Update detection...") | |
| t0 = time.monotonic() | |
| updated = await update_svc.check_for_updates([game]) | |
| elapsed = time.monotonic() - t0 | |
| _print("OK", f" Updates detected: {len(updated)} in {elapsed:.1f}s") | |
| # Step 4: Bootstrap missing analyses | |
| _print("INFO", "Step 4: Bootstrap missing analyses...") | |
| precache_svc = PreCacheService(steam_svc, nlp_svc) | |
| bootstrapped = await precache_svc.bootstrap_missing_analyses(top_games) | |
| _print("OK", f" Bootstrapped: {bootstrapped}") | |
| # Step 5: Process due analyses (max 1) | |
| _print("INFO", "Step 5: Processing due analyses (max 1)...") | |
| orig = settings.precache_max_analyses_per_cycle | |
| # Temporarily limit to 1 | |
| object.__setattr__(settings, "precache_max_analyses_per_cycle", 1) | |
| try: | |
| executed = await precache_svc.process_due_analyses() | |
| _print("OK", f" Executed: {executed}") | |
| finally: | |
| object.__setattr__(settings, "precache_max_analyses_per_cycle", orig) | |
| _print("OK", "Mini cycle complete") | |
| finally: | |
| await update_svc.close() | |
| await steam_svc.close() | |
| await mongodb.disconnect() | |
| # ── main ───────────────────────────────────────────────────────────── | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="SentimentStream smoke test") | |
| sub = parser.add_subparsers(dest="command") | |
| p_analyze = sub.add_parser("analyze", help="Run full analysis for a game") | |
| p_analyze.add_argument("appid", help="Steam app ID (e.g. 730)") | |
| sub.add_parser("cycle", help="Run mini worker cycle (top 1 game)") | |
| args = parser.parse_args() | |
| if args.command == "analyze": | |
| asyncio.run(cmd_analyze(args.appid)) | |
| elif args.command == "cycle": | |
| asyncio.run(cmd_cycle()) | |
| else: | |
| parser.print_help() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |