Spaces:
Running
Running
File size: 6,809 Bytes
8ff1b66 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """
Smoke Test — local verification of worker cycle and analysis pipeline.
Usage:
cd backend
python scripts/smoke_test.py analyze <appid> # run full analysis for a game
python scripts/smoke_test.py cycle # mini worker cycle (1 game)
"""
import argparse
import asyncio
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# Ensure backend/app is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from app.core.config import settings # noqa: E402
from app.db.mongodb import mongodb # noqa: E402
from app.services.nlp_service import NLPService # noqa: E402
from app.services.steam_service import SteamService # noqa: E402
from app.services.update_detection_service import UpdateDetectionService # noqa: E402
from app.services.precache_service import PreCacheService # noqa: E402
from app.services.analysis_runner import run_full_analysis # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("smoke_test")
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
def _print(status: str, msg: str) -> None:
tag = {"OK": "\033[32mOK\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m", "INFO": "\033[36mINFO\033[0m"}
print(f"[{_ts()}] [{tag.get(status, status)}] {msg}")
# ── analyze subcommand ──────────────────────────────────────────────
async def cmd_analyze(app_id: str) -> None:
_print("INFO", f"Starting analysis for app_id={app_id}")
_print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}")
await mongodb.connect()
steam_svc = SteamService()
nlp_svc = NLPService()
try:
t0 = time.monotonic()
result = await run_full_analysis(app_id, f"smoke-{app_id}", steam_svc, nlp_svc)
elapsed = time.monotonic() - t0
if result is None:
_print("FAIL", "run_full_analysis returned None")
return
game = result.get("game", {})
topics = result.get("topics", [])
analyzed = result.get("analyzed_reviews", 0)
highlights = result.get("general_highlights", [])
_print("OK", f"Analysis complete in {elapsed:.1f}s")
_print("OK", f" Game: {game.get('name', '?')} (appid {game.get('app_id', '?')})")
_print("OK", f" Reviews analyzed: {analyzed}")
_print("OK", f" Topics found: {len(topics)}")
_print("OK", f" General highlights: {len(highlights)}")
# Verify cache write
cached = await mongodb.get_cached_analysis(app_id)
if cached:
_print("OK", " Cache write verified — document found in MongoDB")
else:
_print("FAIL", " Cache write verification FAILED — no document in MongoDB")
finally:
await steam_svc.close()
await mongodb.disconnect()
# ── cycle subcommand ─────────────────────────────────────────────────
async def cmd_cycle() -> None:
_print("INFO", "Starting mini worker cycle")
_print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}")
await mongodb.connect()
steam_svc = SteamService()
nlp_svc = NLPService()
update_svc = UpdateDetectionService()
try:
# Step 1: Get top 1 game
_print("INFO", "Step 1: Fetching top game by reviews...")
top_games = await mongodb.get_top_games_by_reviews(1)
if not top_games:
_print("SKIP", "No games in DB — run game sync first or use 'analyze' subcommand")
return
game = top_games[0]
app_id = str(game.get("appid", ""))
name = game.get("name", "?")
_print("OK", f" Top game: {name} (appid {app_id})")
# Step 2: Test datetime comparison (the bug this patch fixes)
_print("INFO", "Step 2: Testing synced_at datetime comparison...")
synced_at = game.get("synced_at")
if synced_at:
try:
delta = datetime.now(timezone.utc) - synced_at
hours = delta.total_seconds() / 3600
_print("OK", f" synced_at delta: {hours:.1f}h (tz={synced_at.tzinfo})")
except TypeError as e:
_print("FAIL", f" datetime subtraction failed: {e}")
return
else:
_print("SKIP", " No synced_at field — game sync not run yet")
# Step 3: Update detection (1 game)
_print("INFO", "Step 3: Update detection...")
t0 = time.monotonic()
updated = await update_svc.check_for_updates([game])
elapsed = time.monotonic() - t0
_print("OK", f" Updates detected: {len(updated)} in {elapsed:.1f}s")
# Step 4: Bootstrap missing analyses
_print("INFO", "Step 4: Bootstrap missing analyses...")
precache_svc = PreCacheService(steam_svc, nlp_svc)
bootstrapped = await precache_svc.bootstrap_missing_analyses(top_games)
_print("OK", f" Bootstrapped: {bootstrapped}")
# Step 5: Process due analyses (max 1)
_print("INFO", "Step 5: Processing due analyses (max 1)...")
orig = settings.precache_max_analyses_per_cycle
# Temporarily limit to 1
object.__setattr__(settings, "precache_max_analyses_per_cycle", 1)
try:
executed = await precache_svc.process_due_analyses()
_print("OK", f" Executed: {executed}")
finally:
object.__setattr__(settings, "precache_max_analyses_per_cycle", orig)
_print("OK", "Mini cycle complete")
finally:
await update_svc.close()
await steam_svc.close()
await mongodb.disconnect()
# ── main ─────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="SentimentStream smoke test")
sub = parser.add_subparsers(dest="command")
p_analyze = sub.add_parser("analyze", help="Run full analysis for a game")
p_analyze.add_argument("appid", help="Steam app ID (e.g. 730)")
sub.add_parser("cycle", help="Run mini worker cycle (top 1 game)")
args = parser.parse_args()
if args.command == "analyze":
asyncio.run(cmd_analyze(args.appid))
elif args.command == "cycle":
asyncio.run(cmd_cycle())
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()
|