Spaces:
Running
Running
| """ | |
| Worker Main App — lightweight FastAPI for background game sync and pre-cache. | |
| Endpoints: | |
| GET /health — MongoDB ping, last cycle summary, cycle_running flag | |
| POST /trigger — token-protected, starts a worker cycle as background task | |
| GET /logs — token-protected, read structured log tail | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from fastapi import FastAPI, Query, Request | |
| from fastapi.responses import JSONResponse | |
| from app.core.config import settings | |
| from app.core.worker_logging import ( | |
| AsyncTimingContext, | |
| WORKER_LOG_WHITELIST, | |
| log_structured, | |
| read_log_tail, | |
| resolve_log_path, | |
| set_cycle_id, | |
| setup_app_logging, | |
| setup_structured_logger, | |
| ) | |
| from app.db.mongodb import mongodb | |
| from app.services.game_sync_service import GameSyncService | |
| from app.services.nlp_service import NLPService | |
| from app.services.precache_service import PreCacheService | |
| from app.services.steam_service import SteamService | |
| from app.services.priority_refresh_service import PriorityRefreshService | |
| from app.services.update_detection_service import UpdateDetectionService | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Cycle state | |
| _cycle_running = False | |
| _last_cycle_summary: dict[str, Any] = {} | |
| async def lifespan(app: FastAPI): | |
| """Connect MongoDB on startup, disconnect on shutdown.""" | |
| await mongodb.connect() | |
| setup_structured_logger("worker") | |
| setup_app_logging() | |
| logger.info("Worker started — MongoDB connected, structured logging initialized") | |
| yield | |
| await mongodb.disconnect() | |
| logger.info("Worker shutting down") | |
| app = FastAPI(title="SentimentStream Worker", lifespan=lifespan) | |
| async def health(): | |
| """Health check with cycle status.""" | |
| mongo_ok = False | |
| try: | |
| if mongodb.client: | |
| await mongodb.client.admin.command("ping") | |
| mongo_ok = True | |
| except Exception: | |
| pass | |
| return { | |
| "status": "ok" if mongo_ok else "degraded", | |
| "mongodb": "connected" if mongo_ok else "disconnected", | |
| "cycle_running": _cycle_running, | |
| "last_cycle": _last_cycle_summary, | |
| } | |
| def _check_bearer_token(request: Request) -> bool: | |
| """Validate Bearer token from Authorization header.""" | |
| auth = request.headers.get("Authorization", "") | |
| expected = settings.worker_trigger_token | |
| return bool(expected and auth.startswith("Bearer ") and auth[7:] == expected) | |
| async def trigger(request: Request): | |
| """Token-protected trigger to start a worker cycle.""" | |
| global _cycle_running | |
| if not _check_bearer_token(request): | |
| return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) | |
| if _cycle_running: | |
| return JSONResponse(status_code=503, content={"detail": "Cycle already running"}) | |
| asyncio.create_task(_run_cycle()) | |
| return {"status": "started"} | |
| async def get_logs( | |
| request: Request, | |
| lines: int = Query(default=100, ge=1, le=1000), | |
| level: str | None = Query(default=None), | |
| event: str | None = Query(default=None), | |
| file: str = Query(default="worker"), | |
| ): | |
| """Token-protected endpoint to read structured log tail.""" | |
| if not _check_bearer_token(request): | |
| return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) | |
| log_path = resolve_log_path(file, WORKER_LOG_WHITELIST) | |
| if log_path is None: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"detail": f"Unknown log file: '{file}'. Valid: {list(WORKER_LOG_WHITELIST.keys())}"}, | |
| ) | |
| entries = read_log_tail(log_path, lines=lines, level=level, event=event) | |
| return {"entries": entries, "count": len(entries)} | |
| async def _run_cycle() -> None: | |
| """Execute a full worker cycle.""" | |
| global _cycle_running, _last_cycle_summary | |
| _cycle_running = True | |
| started = datetime.now(timezone.utc) | |
| summary: dict[str, Any] = {"started_at": started.isoformat()} | |
| cycle_id = uuid.uuid4().hex[:8] | |
| set_cycle_id(cycle_id) | |
| log_structured("cycle_start", cycle_id=cycle_id) | |
| steam_svc = SteamService() | |
| nlp_svc = NLPService() | |
| game_sync_svc = GameSyncService() | |
| priority_svc = PriorityRefreshService() | |
| update_svc = UpdateDetectionService() | |
| try: | |
| # 1. Game sync (if enabled and not synced recently) | |
| if settings.game_sync_enabled: | |
| top_games = await mongodb.get_top_games_by_reviews(1) | |
| last_synced = top_games[0].get("synced_at") if top_games else None | |
| hours_since_sync = None | |
| if last_synced: | |
| delta = datetime.now(timezone.utc) - last_synced | |
| hours_since_sync = delta.total_seconds() / 3600 | |
| if hours_since_sync is None or hours_since_sync > 20: | |
| async with AsyncTimingContext() as t_sync: | |
| logger.info("Starting game sync...") | |
| upserted, modified = await game_sync_svc.sync_all_games() | |
| summary["game_sync"] = {"upserted": upserted, "modified": modified} | |
| log_structured("game_sync", elapsed_s=t_sync.elapsed_s, | |
| detail=summary["game_sync"]) | |
| async with AsyncTimingContext() as t_details: | |
| enriched = await game_sync_svc.sync_top_game_details() | |
| summary["game_details"] = {"enriched": enriched} | |
| log_structured("game_details", elapsed_s=t_details.elapsed_s, | |
| detail=summary["game_details"]) | |
| else: | |
| summary["game_sync"] = "skipped (recent)" | |
| log_structured("game_sync", detail="skipped (recent)") | |
| # ALWAYS enrich CN names if sync is enabled, even if main sync skipped | |
| async with AsyncTimingContext() as t_cn: | |
| cn_processed = await game_sync_svc.enrich_cn_names() | |
| summary["cn_enrichment"] = {"processed": cn_processed} | |
| log_structured("cn_enrichment", elapsed_s=t_cn.elapsed_s, | |
| detail=summary["cn_enrichment"]) | |
| async with AsyncTimingContext() as t_app_types: | |
| app_types_processed = await game_sync_svc.enrich_app_types() | |
| summary["app_type_enrichment"] = {"processed": app_types_processed} | |
| log_structured("app_type_enrichment", elapsed_s=t_app_types.elapsed_s, | |
| detail=summary["app_type_enrichment"]) | |
| # 1b. Priority refresh | |
| async with AsyncTimingContext() as t_priority: | |
| priority_result = await priority_svc.refresh_priorities() | |
| summary["priority_refresh"] = priority_result | |
| log_structured("priority_refresh", elapsed_s=t_priority.elapsed_s, detail=priority_result) | |
| # 2. Update detection | |
| async with AsyncTimingContext() as t_update: | |
| top_games = await mongodb.get_priority_games_for_analysis() | |
| updated_games = await update_svc.check_for_updates(top_games) | |
| summary["updates_detected"] = len(updated_games) | |
| log_structured("update_detection", elapsed_s=t_update.elapsed_s, | |
| detail={"updates_detected": len(updated_games)}) | |
| # 3. Create schedules for updated games | |
| precache_svc = PreCacheService(steam_svc, nlp_svc) | |
| async with AsyncTimingContext() as t_sched: | |
| if updated_games: | |
| await precache_svc.create_schedules_for_updates(updated_games) | |
| log_structured("create_schedules", elapsed_s=t_sched.elapsed_s, | |
| detail={"updated_games": len(updated_games) if updated_games else 0}) | |
| # 4. Bootstrap missing analyses | |
| async with AsyncTimingContext() as t_boot: | |
| bootstrapped = await precache_svc.bootstrap_missing_analyses(top_games) | |
| summary["bootstrapped"] = bootstrapped | |
| log_structured("bootstrap_missing", elapsed_s=t_boot.elapsed_s, | |
| detail={"bootstrapped": bootstrapped}) | |
| # 5. Process due analyses | |
| if settings.precache_enabled: | |
| async with AsyncTimingContext() as t_analyses: | |
| executed = await precache_svc.process_due_analyses() | |
| summary["analyses_executed"] = executed | |
| log_structured("process_due_analyses", elapsed_s=t_analyses.elapsed_s, | |
| detail={"executed": executed}) | |
| else: | |
| summary["precache"] = "disabled" | |
| except Exception as e: | |
| logger.error(f"Cycle error: {e}", exc_info=True) | |
| summary["error"] = str(e) | |
| log_structured("cycle_error", level=logging.ERROR, error=str(e)) | |
| finally: | |
| await game_sync_svc.close() | |
| await priority_svc.close() | |
| await update_svc.close() | |
| await steam_svc.close() | |
| elapsed = (datetime.now(timezone.utc) - started).total_seconds() | |
| summary["elapsed_seconds"] = round(elapsed, 1) | |
| _last_cycle_summary = summary | |
| _cycle_running = False | |
| log_structured("cycle_end", elapsed_s=round(elapsed, 1), | |
| detail=summary) | |
| set_cycle_id(None) | |
| logger.info(f"Cycle complete in {elapsed:.1f}s: {summary}") | |