GitHub Action
deploy: worker release from GitHub
8ff1b66
"""
Smoke Test — local verification of worker cycle and analysis pipeline.
Usage:
cd backend
python scripts/smoke_test.py analyze <appid> # run full analysis for a game
python scripts/smoke_test.py cycle # mini worker cycle (1 game)
"""
import argparse
import asyncio
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# Ensure backend/app is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from app.core.config import settings # noqa: E402
from app.db.mongodb import mongodb # noqa: E402
from app.services.nlp_service import NLPService # noqa: E402
from app.services.steam_service import SteamService # noqa: E402
from app.services.update_detection_service import UpdateDetectionService # noqa: E402
from app.services.precache_service import PreCacheService # noqa: E402
from app.services.analysis_runner import run_full_analysis # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("smoke_test")
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
def _print(status: str, msg: str) -> None:
tag = {"OK": "\033[32mOK\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m", "INFO": "\033[36mINFO\033[0m"}
print(f"[{_ts()}] [{tag.get(status, status)}] {msg}")
# ── analyze subcommand ──────────────────────────────────────────────
async def cmd_analyze(app_id: str) -> None:
_print("INFO", f"Starting analysis for app_id={app_id}")
_print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}")
await mongodb.connect()
steam_svc = SteamService()
nlp_svc = NLPService()
try:
t0 = time.monotonic()
result = await run_full_analysis(app_id, f"smoke-{app_id}", steam_svc, nlp_svc)
elapsed = time.monotonic() - t0
if result is None:
_print("FAIL", "run_full_analysis returned None")
return
game = result.get("game", {})
topics = result.get("topics", [])
analyzed = result.get("analyzed_reviews", 0)
highlights = result.get("general_highlights", [])
_print("OK", f"Analysis complete in {elapsed:.1f}s")
_print("OK", f" Game: {game.get('name', '?')} (appid {game.get('app_id', '?')})")
_print("OK", f" Reviews analyzed: {analyzed}")
_print("OK", f" Topics found: {len(topics)}")
_print("OK", f" General highlights: {len(highlights)}")
# Verify cache write
cached = await mongodb.get_cached_analysis(app_id)
if cached:
_print("OK", " Cache write verified — document found in MongoDB")
else:
_print("FAIL", " Cache write verification FAILED — no document in MongoDB")
finally:
await steam_svc.close()
await mongodb.disconnect()
# ── cycle subcommand ─────────────────────────────────────────────────
async def cmd_cycle() -> None:
_print("INFO", "Starting mini worker cycle")
_print("INFO", f"MongoDB: {settings.mongodb_url[:30]}... / DB: {settings.mongodb_db_name}")
await mongodb.connect()
steam_svc = SteamService()
nlp_svc = NLPService()
update_svc = UpdateDetectionService()
try:
# Step 1: Get top 1 game
_print("INFO", "Step 1: Fetching top game by reviews...")
top_games = await mongodb.get_top_games_by_reviews(1)
if not top_games:
_print("SKIP", "No games in DB — run game sync first or use 'analyze' subcommand")
return
game = top_games[0]
app_id = str(game.get("appid", ""))
name = game.get("name", "?")
_print("OK", f" Top game: {name} (appid {app_id})")
# Step 2: Test datetime comparison (the bug this patch fixes)
_print("INFO", "Step 2: Testing synced_at datetime comparison...")
synced_at = game.get("synced_at")
if synced_at:
try:
delta = datetime.now(timezone.utc) - synced_at
hours = delta.total_seconds() / 3600
_print("OK", f" synced_at delta: {hours:.1f}h (tz={synced_at.tzinfo})")
except TypeError as e:
_print("FAIL", f" datetime subtraction failed: {e}")
return
else:
_print("SKIP", " No synced_at field — game sync not run yet")
# Step 3: Update detection (1 game)
_print("INFO", "Step 3: Update detection...")
t0 = time.monotonic()
updated = await update_svc.check_for_updates([game])
elapsed = time.monotonic() - t0
_print("OK", f" Updates detected: {len(updated)} in {elapsed:.1f}s")
# Step 4: Bootstrap missing analyses
_print("INFO", "Step 4: Bootstrap missing analyses...")
precache_svc = PreCacheService(steam_svc, nlp_svc)
bootstrapped = await precache_svc.bootstrap_missing_analyses(top_games)
_print("OK", f" Bootstrapped: {bootstrapped}")
# Step 5: Process due analyses (max 1)
_print("INFO", "Step 5: Processing due analyses (max 1)...")
orig = settings.precache_max_analyses_per_cycle
# Temporarily limit to 1
object.__setattr__(settings, "precache_max_analyses_per_cycle", 1)
try:
executed = await precache_svc.process_due_analyses()
_print("OK", f" Executed: {executed}")
finally:
object.__setattr__(settings, "precache_max_analyses_per_cycle", orig)
_print("OK", "Mini cycle complete")
finally:
await update_svc.close()
await steam_svc.close()
await mongodb.disconnect()
# ── main ─────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="SentimentStream smoke test")
sub = parser.add_subparsers(dest="command")
p_analyze = sub.add_parser("analyze", help="Run full analysis for a game")
p_analyze.add_argument("appid", help="Steam app ID (e.g. 730)")
sub.add_parser("cycle", help="Run mini worker cycle (top 1 game)")
args = parser.parse_args()
if args.command == "analyze":
asyncio.run(cmd_analyze(args.appid))
elif args.command == "cycle":
asyncio.run(cmd_cycle())
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()