""" Smoke Test — local verification of worker cycle and analysis pipeline. Usage: cd backend python scripts/smoke_test.py analyze # run full analysis for a game python scripts/smoke_test.py cycle # mini worker cycle (1 game) """ import argparse import asyncio import logging import sys import time from datetime import datetime, timezone from pathlib import Path # Ensure backend/app is importable sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from app.core.config import settings # noqa: E402 from app.db.mongodb import mongodb # noqa: E402 from app.services.nlp_service import NLPService # noqa: E402 from app.services.steam_service import SteamService # noqa: E402 from app.services.update_detection_service import UpdateDetectionService # noqa: E402 from app.services.precache_service import PreCacheService # noqa: E402 from app.services.analysis_runner import run_full_analysis # noqa: E402 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("smoke_test") def _ts() -> str: return datetime.now(timezone.utc).strftime("%H:%M:%S") def _print(status: str, msg: str) -> None: tag = {"OK": "\033[32mOK\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m", "INFO": "\033[36mINFO\033[0m"} print(f"[{_ts()}] [{tag.get(status, status)}] {msg}") # ── analyze subcommand ────────────────────────────────────────────── async def cmd_analyze(app_id: str) -> None: _print("INFO", f"Starting analysis for app_id={app_id}") _print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}") await mongodb.connect() steam_svc = SteamService() nlp_svc = NLPService() try: t0 = time.monotonic() result = await run_full_analysis(app_id, f"smoke-{app_id}", steam_svc, nlp_svc) elapsed = time.monotonic() - t0 if result is None: _print("FAIL", "run_full_analysis returned None") return game = result.get("game", {}) topics = result.get("topics", []) analyzed = result.get("analyzed_reviews", 0) highlights = result.get("general_highlights", []) _print("OK", f"Analysis complete in {elapsed:.1f}s") _print("OK", f" Game: {game.get('name', '?')} (appid {game.get('app_id', '?')})") _print("OK", f" Reviews analyzed: {analyzed}") _print("OK", f" Topics found: {len(topics)}") _print("OK", f" General highlights: {len(highlights)}") # Verify cache write cached = await mongodb.get_cached_analysis(app_id) if cached: _print("OK", " Cache write verified — document found in MongoDB") else: _print("FAIL", " Cache write verification FAILED — no document in MongoDB") finally: await steam_svc.close() await mongodb.disconnect() # ── cycle subcommand ───────────────────────────────────────────────── async def cmd_cycle() -> None: _print("INFO", "Starting mini worker cycle") _print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}") await mongodb.connect() steam_svc = SteamService() nlp_svc = NLPService() update_svc = UpdateDetectionService() try: # Step 1: Get top 1 game _print("INFO", "Step 1: Fetching top game by reviews...") top_games = await mongodb.get_top_games_by_reviews(1) if not top_games: _print("SKIP", "No games in DB — run game sync first or use 'analyze' subcommand") return game = top_games[0] app_id = str(game.get("appid", "")) name = game.get("name", "?") _print("OK", f" Top game: {name} (appid {app_id})") # Step 2: Test datetime comparison (the bug this patch fixes) _print("INFO", "Step 2: Testing synced_at datetime comparison...") synced_at = game.get("synced_at") if synced_at: try: delta = datetime.now(timezone.utc) - synced_at hours = delta.total_seconds() / 3600 _print("OK", f" synced_at delta: {hours:.1f}h (tz={synced_at.tzinfo})") except TypeError as e: _print("FAIL", f" datetime subtraction failed: {e}") return else: _print("SKIP", " No synced_at field — game sync not run yet") # Step 3: Update detection (1 game) _print("INFO", "Step 3: Update detection...") t0 = time.monotonic() updated = await update_svc.check_for_updates([game]) elapsed = time.monotonic() - t0 _print("OK", f" Updates detected: {len(updated)} in {elapsed:.1f}s") # Step 4: Bootstrap missing analyses _print("INFO", "Step 4: Bootstrap missing analyses...") precache_svc = PreCacheService(steam_svc, nlp_svc) bootstrapped = await precache_svc.bootstrap_missing_analyses(top_games) _print("OK", f" Bootstrapped: {bootstrapped}") # Step 5: Process due analyses (max 1) _print("INFO", "Step 5: Processing due analyses (max 1)...") orig = settings.precache_max_analyses_per_cycle # Temporarily limit to 1 object.__setattr__(settings, "precache_max_analyses_per_cycle", 1) try: executed = await precache_svc.process_due_analyses() _print("OK", f" Executed: {executed}") finally: object.__setattr__(settings, "precache_max_analyses_per_cycle", orig) _print("OK", "Mini cycle complete") finally: await update_svc.close() await steam_svc.close() await mongodb.disconnect() # ── main ───────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser(description="SentimentStream smoke test") sub = parser.add_subparsers(dest="command") p_analyze = sub.add_parser("analyze", help="Run full analysis for a game") p_analyze.add_argument("appid", help="Steam app ID (e.g. 730)") sub.add_parser("cycle", help="Run mini worker cycle (top 1 game)") args = parser.parse_args() if args.command == "analyze": asyncio.run(cmd_analyze(args.appid)) elif args.command == "cycle": asyncio.run(cmd_cycle()) else: parser.print_help() sys.exit(1) if __name__ == "__main__": main()