Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI application for the Ad Fraud Investigation Environment. | |
| Creates the OpenEnv server via create_app() and registers custom HTTP | |
| endpoints required by the hackathon: /tasks, /baseline, /grader. | |
| Gradio is disabled (ENABLE_WEB_INTERFACE=false); the HTML UI lives at /investigate. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| from openenv.core.env_server import create_app | |
| try: | |
| from ..models import AdReviewAction, AdReviewObservation | |
| except ImportError: | |
| from models import AdReviewAction, AdReviewObservation | |
| from .environment import AdFraudEnvironment, get_last_grader_result | |
| from .investigate_ui import register_investigate_ui | |
| logger = logging.getLogger(__name__) | |
| # Do not mount OpenEnv's Gradio stack (single FastAPI process on port 8000). | |
| os.environ.setdefault("ENABLE_WEB_INTERFACE", "false") | |
| app = create_app( | |
| AdFraudEnvironment, | |
| AdReviewAction, | |
| AdReviewObservation, | |
| env_name="ad_fraud_env", | |
| ) | |
| register_investigate_ui(app) | |
| # ------------------------------------------------------------------ | |
| # Custom endpoints required by the competition | |
| # ------------------------------------------------------------------ | |
| async def tasks() -> Dict[str, Any]: | |
| """Return the list of tasks and the action schema.""" | |
| try: | |
| from ..data.ad_generator import TASK_CONFIGS | |
| except ImportError: | |
| from data.ad_generator import TASK_CONFIGS | |
| task_list = [] | |
| for tid, cfg in TASK_CONFIGS.items(): | |
| task_list.append({ | |
| "id": cfg.task_id, | |
| "name": cfg.name, | |
| "difficulty": cfg.difficulty, | |
| "queue_size": cfg.queue_size, | |
| "action_budget": cfg.action_budget, | |
| "description": cfg.description, | |
| }) | |
| return { | |
| "tasks": task_list, | |
| "action_schema": AdReviewAction.model_json_schema(), | |
| } | |
| async def baseline() -> Dict[str, Any]: | |
| """Return baseline scores, running live inference if credentials are available.""" | |
| baseline_path = Path(__file__).resolve().parent.parent / "baseline_scores.json" | |
| has_creds = all(os.getenv(v) for v in ("API_BASE_URL", "MODEL_NAME", "HF_TOKEN")) | |
| if has_creds: | |
| try: | |
| try: | |
| from ..inference import run_baseline | |
| except ImportError: | |
| from inference import run_baseline | |
| scores = run_baseline() | |
| with open(baseline_path, "w") as f: | |
| json.dump(scores, f, indent=2) | |
| return scores | |
| except Exception as e: | |
| logger.warning("Live baseline failed, falling back to cached: %s", e) | |
| if baseline_path.exists(): | |
| with open(baseline_path) as f: | |
| return json.load(f) | |
| return { | |
| "error": "No baseline scores available. Set API_BASE_URL, MODEL_NAME, and HF_TOKEN to run live inference.", | |
| "tasks": {}, | |
| } | |
| async def grader() -> Dict[str, Any]: | |
| """Return grader score from the most recently completed episode.""" | |
| result = get_last_grader_result() | |
| if not result: | |
| return { | |
| "error": "No completed episode. Run an episode via WebSocket or the /investigate UI.", | |
| "grader_score": None, | |
| } | |
| return result | |
| def main() -> None: | |
| """Entry point for direct execution.""" | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| if __name__ == "__main__": | |
| main() | |