import os import sys import json import time import glob import importlib.util import random import asyncio from typing import Optional, List from openai import AsyncOpenAI import uvicorn from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, RedirectResponse from pydantic import BaseModel from openenv.core.env_server import create_app # ============================================================================== # PHASE 1: NATIVE ENGINE BRIDGE & DYNAMIC LOADING # ============================================================================== _CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) _ROOT_DIR = os.path.abspath(os.path.join(_CURRENT_DIR, "..")) if _ROOT_DIR not in sys.path: sys.path.insert(0, _ROOT_DIR) if _CURRENT_DIR not in sys.path: sys.path.insert(0, _CURRENT_DIR) # Always define this as a safe global so /dashboard/state never throws NameError # even when NATIVE_VERIFIED is False (C++ binary missing). active_env_instance = None try: from server.fin_auditor_environment import FinAuditorEnvironment, hft_auditor from models import AuditorAction, AuditorObservation from tasks import task1_easy, task2_medium, task3_hard # Graders — used by the /grader evaluation endpoint from graders.grader_detection import EasyDetectionGrader from graders.grader_classification import MediumClassificationGrader from graders.grader_fix import HardFixGrader # Instantiate graders once at module level so they accumulate state _grader_easy = EasyDetectionGrader() _grader_medium = MediumClassificationGrader() _grader_hard = HardFixGrader() HAS_ENV = True NATIVE_VERIFIED = hft_auditor is not None hft_mod = hft_auditor except ImportError as e: HAS_ENV = False NATIVE_VERIFIED = False hft_mod = None _grader_easy = None _grader_medium = None _grader_hard = None print(f"\n[CRITICAL WARNING] Could not import dependencies. Running in fallback UI mode.") print(f"Exact Error: {e}\n") # ============================================================================== # PHASE 2: SYSTEM STATE & AUTHORITY TRACKING # ============================================================================== if HAS_ENV and NATIVE_VERIFIED: # ── Dashboard singleton ──────────────────────────────────────────────────── # Used ONLY by /dashboard/* endpoints and /ws/telemetry for live telemetry. # NEVER passed to create_app — OpenEnv gets a factory that produces isolated # instances per session so close() cannot corrupt the dashboard engine. active_env_instance = FinAuditorEnvironment() # Pre-load the first batch so the dashboard has real data immediately. # (reset() calls generate_batch internally, so just call reset here.) active_env_instance.reset() # ── OpenEnv factory ──────────────────────────────────────────────────────── # CRITICAL: OpenEnv's WebSocket server creates ONE env per session via # env_factory() and then sends reset + step messages to the SAME instance. # This means reset() IS called before step(), so the C++ engine has data. # For HTTP mode (stateless), each request gets its own env — step() is called # on a cold engine, but our __init__ initialises counters to 0 so it won't # crash; it will just return the floor reward of 0.01 (acceptable for Phase 2). def env_factory() -> FinAuditorEnvironment: """Create a fresh, self-contained FinAuditorEnvironment per OpenEnv session.""" return FinAuditorEnvironment() # NOTE: create_app() has no `tasks=` parameter in openenv-core >= 0.2.x. # Task routing (easy/medium/hard difficulty) is handled inside reset() via # the task_id kwarg that Phase 2 injects into the reset message body. app = create_app( env_factory, AuditorAction, AuditorObservation, ) @app.get("/health") async def health_check(): """Liveness probe for HF Space / Docker.""" return {"status": "healthy", "engine": "NATIVE_ACTIVE" if NATIVE_VERIFIED else "MOCK"} else: # Fallback for local development without the C++ binary app = FastAPI(title="PayGorn (MOCK MODE)") @app.post("/reset") async def mock_reset(): return {"reward": 0.01} @app.post("/step") async def mock_step(action: dict): return {"reward": 0.5, "done": False, "step_count": 0} # Initialize metrics for the dashboard latency middleware app_metrics = {"last_step_latency_us": 0.0} # ── Auto-bootstrap on startup ──────────────────────────────────────────────── @app.on_event("startup") async def auto_bootstrap(): """Auto-authenticate with HF_TOKEN and initialize engine on boot.""" token = os.getenv("HF_TOKEN", "") if token: try: from openai import AsyncOpenAI client = AsyncOpenAI(base_url="https://router.huggingface.co/v1", api_key=token, max_retries=2) try: response = await client.models.list() model_list = [m.id for m in response.data] except Exception: model_list = ["meta-llama/Meta-Llama-3-8B-Instruct"] llm_session["api_key"] = token llm_session["base_url"] = "https://router.huggingface.co/v1" llm_session["available_models"] = model_list if model_list: llm_session["model_name"] = model_list[0] system_health["key_validated"] = True system_health["model_detected"] = len(model_list) > 0 system_health["connected"] = True print(f"[BOOT] Auto-authenticated with HF_TOKEN. {len(model_list)} models discovered.") except Exception as e: print(f"[BOOT] Auto-auth failed: {e}") else: print("[BOOT] No HF_TOKEN found. Manual authentication required.") @app.middleware("http") async def capture_step_latency(request: Request, call_next): if request.url.path in ("/step", "/dashboard/step"): start_ns = time.perf_counter_ns() response = await call_next(request) app_metrics["last_step_latency_us"] = (time.perf_counter_ns() - start_ns) / 1000.0 return response return await call_next(request) @app.get("/", include_in_schema=False) async def root_redirect(): return RedirectResponse(url="/web") system_health = { "so_found": NATIVE_VERIFIED, "key_validated": False, "model_detected": False, "connected": False, } llm_session = { "base_url": "https://router.huggingface.co/v1", "api_key": os.getenv("HF_TOKEN", ""), "model_name": "", "available_models": [] } class LLMConfig(BaseModel): api_key: str = "" model_name: Optional[str] = None base_url: Optional[str] = None class DifficultyConfig(BaseModel): level: str class ActionRequest(BaseModel): action_type: str async def execute_llm_step(api_key: str, base_url: str, model_name: str, batch_size: int = 40) -> list[int]: client = AsyncOpenAI(api_key=api_key, base_url=base_url, max_retries=1) prompt = ( "You are a High-Frequency Trading Risk Compliance Auditor.\n" f"You are processing a microsecond batch of {batch_size} trades.\n" "Rule 1: A False Negative (missing fraud) is catastrophic.\n" "Rule 2: A False Positive (flagging a valid trade) is a minor penalty.\n" f"Based on standard HFT anomaly distributions, output exactly {batch_size} binary decisions.\n" "0 = VALID, 1 = ANOMALY.\n" "Respond ONLY in valid JSON format using this exact schema: {\"decisions\": [int, int, ...]}" ) try: response = await client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": "Analyze the telemetry stream and output the decision matrix."} ], response_format={"type": "json_object"}, temperature=0.2, max_tokens=2000 ) result = json.loads(response.choices[0].message.content) decisions = result.get("decisions", []) if not isinstance(decisions, list): decisions = [] if len(decisions) < batch_size: decisions += [random.choice([0, 1]) for _ in range(batch_size - len(decisions))] return decisions[:batch_size] except Exception as e: print(f"[LLM ROUTER ERROR] API execution failed: {e}") return [random.choice([0, 1]) for _ in range(batch_size)] # ============================================================================== # PHASE 3: API ENDPOINTS (The Data Pipeline) # ============================================================================== @app.get("/dashboard/state") async def get_state(): # FIX: Check the dynamically tracked instance instead of the old static one if not active_env_instance: return {"status": "FALLBACK_MOCK_MODE", "health": system_health, "accuracy": 0.0, "latency_us": 0.0, "throughput_m": 0.0, "buffer_saturation": 0.0, "active_count": 0, "total_ingested": 0, "step_count": 0, "difficulty": "EASY", "metrics": {"tp": 0, "tn": 0, "fp": 0, "fn": 0}} tp = getattr(active_env_instance.state, 'last_tp', 0) fp = getattr(active_env_instance.state, 'last_fp', 0) tn = getattr(active_env_instance.state, 'last_tn', 0) fn = getattr(active_env_instance.state, 'last_fn', 0) accuracy = tp / (tp + fp) if (tp + fp) > 0 else 0.0 latency_us = app_metrics["last_step_latency_us"] return { "status": "NATIVE_RECON_ONLINE", "health": system_health, "accuracy": round(accuracy, 4), "latency_us": round(latency_us, 3), "latency_source": "grounded_app_middleware", "throughput_m": round((40 * 1e6) / (latency_us * 1000), 2) if latency_us > 0 else 0.0, "active_count": active_env_instance.engine.last_expired_count, "total_ingested": active_env_instance.engine.total_ingested, "ring_buffer_size": active_env_instance.engine.ring_buffer_size, "buffer_saturation": min(100.0, (active_env_instance.engine.last_expired_count / 100.0) * 100), "step_count": active_env_instance.state.step_count, "metrics": {"tp": tp, "tn": tn, "fp": fp, "fn": fn}, "difficulty": getattr(active_env_instance, 'difficulty', "EASY") } @app.post("/dashboard/get_action") async def get_dashboard_action(req: ActionRequest): batch_size = 40 if req.action_type == "perfect": decisions = [1] * batch_size elif req.action_type == "llm": api_key = llm_session.get("api_key") base_url = llm_session.get("base_url") model_name = llm_session.get("model_name") if not model_name and llm_session.get("available_models"): model_name = llm_session["available_models"][0] if not api_key or not model_name: raise HTTPException(status_code=400, detail="LLM Provider not authenticated. Please inject an API key.") decisions = await execute_llm_step(api_key, base_url, model_name, batch_size) else: decisions = [random.choice([0, 1]) for _ in range(batch_size)] return {"decisions": decisions} class DashboardStepRequest(BaseModel): """Action payload for the dashboard-native step endpoint.""" decisions: List[int] @app.post("/dashboard/step") async def dashboard_step(req: DashboardStepRequest): """ Dashboard-native step: runs on the SINGLETON engine (warm, with real trade data), not on the OpenEnv /step route which creates a cold engine per request. This is what the three dashboard buttons (OPTIMAL / STRESS / LLM) call so that rewards reflect actual confusion-matrix scoring rather than the 0.01 floor. """ if not active_env_instance or not NATIVE_VERIFIED: raise HTTPException(status_code=503, detail="Native engine not available") from models import AuditorAction action = AuditorAction(decisions=req.decisions) obs = active_env_instance.step(action) return { "reward": obs.reward, "done": obs.done, "step_count": active_env_instance.state.step_count, "features_shape": [len(obs.features), len(obs.features[0]) if obs.features else 0], } @app.post("/dashboard/reset") async def dashboard_reset(): """ Reset the dashboard singleton: re-seeds the ring buffer with fresh trade data. Called by the [FLUSH_SPSC_BUFFER] button in the dashboard JS. """ if not active_env_instance or not NATIVE_VERIFIED: raise HTTPException(status_code=503, detail="Native engine not available") active_env_instance.reset() return {"status": "ok", "step_count": active_env_instance.state.step_count} @app.post("/config/llm") async def config_llm(cfg: LLMConfig): api_key = cfg.api_key if not api_key: api_key = llm_session.get("api_key") if not api_key: raise HTTPException(status_code=400, detail="API Key cannot be blank") base_url = "https://router.huggingface.co/v1" if api_key.startswith("AIza"): base_url = "https://generativelanguage.googleapis.com/v1beta/openai/" elif api_key.startswith("sk-ant"): base_url = "https://api.anthropic.com/v1" try: client = AsyncOpenAI(base_url=base_url, api_key=api_key, max_retries=2) try: response = await client.models.list() model_list = [m.id for m in response.data] except Exception as e: if "googleapis" in base_url: model_list = ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-2.0-flash"] else: raise e llm_session["api_key"] = api_key llm_session["base_url"] = base_url llm_session["available_models"] = model_list system_health["key_validated"] = True system_health["model_detected"] = len(model_list) > 0 if cfg.model_name and cfg.model_name in model_list: llm_session["model_name"] = cfg.model_name os.environ["MODEL_NAME"] = cfg.model_name os.environ["LLM_API_KEY"] = api_key os.environ["API_BASE_URL"] = base_url system_health["connected"] = True msg = f"Injected {cfg.model_name} credentials." else: msg = "Key validated. Models discovered via OpenAI spec." return {"status": "success", "models": model_list, "message": msg} except Exception as e: system_health["key_validated"] = False system_health["model_detected"] = False system_health["connected"] = False return {"status": "error", "message": f"Validation failed: {str(e)}"} @app.post("/config/default") async def config_default(): token = os.getenv("HF_TOKEN") if not token: return {"status": "error", "message": "No HF_TOKEN found in system environment"} return await config_llm(LLMConfig(api_key=token, base_url="https://router.huggingface.co/v1")) @app.post("/config/difficulty") async def set_difficulty(cfg: DifficultyConfig): if active_env_instance and NATIVE_VERIFIED: os.environ["TASK_ID"] = cfg.level.lower() if "easy" in cfg.level.lower(): active_env_instance.difficulty = hft_mod.Difficulty.EASY elif "medium" in cfg.level.lower(): active_env_instance.difficulty = hft_mod.Difficulty.MEDIUM else: active_env_instance.difficulty = hft_mod.Difficulty.HARD return {"status": "success", "difficulty": cfg.level} return {"status": "error", "message": "Engine not loaded"} # ============================================================================== # GRADER ENDPOINT — Required by the competition evaluator # The evaluator calls GET /grader to discover tasks and retrieve their scores. # Each score MUST be strictly between 0.0 and 1.0 (exclusive). # ============================================================================== _GRADER_SCORE_FLOOR = 0.01 # returned when no episode data is available _GRADER_SCORE_CAP = 0.99 def _safe_grade(grader, state) -> float: """Run grader.grade() and enforce the (0, 1) exclusive boundary contract.""" try: raw = grader.grade(state=state) except Exception: raw = _GRADER_SCORE_FLOOR return float(max(_GRADER_SCORE_FLOOR, min(_GRADER_SCORE_CAP, raw))) @app.get("/grader") async def get_grader(): """ Competition evaluation endpoint. Returns the per-task grader names and their current scores so the hackathon evaluator can verify: 1. At least 3 tasks have custom graders (not 'reward'). 2. Every score is strictly in the open interval (0, 1). """ # Use the live singleton state when available; fall back to floor score. state = active_env_instance.state if active_env_instance else None easy_score = _safe_grade(_grader_easy, state) if _grader_easy else _GRADER_SCORE_FLOOR medium_score = _safe_grade(_grader_medium, state) if _grader_medium else _GRADER_SCORE_FLOOR hard_score = _safe_grade(_grader_hard, state) if _grader_hard else _GRADER_SCORE_FLOOR return { "tasks": [ { "task_id": "anomaly_detection_easy", "grader": "graders.grader_detection:EasyDetectionGrader", "score": round(easy_score, 4), "max_steps": 5, "difficulty": "easy", }, { "task_id": "anomaly_detection_medium", "grader": "graders.grader_classification:MediumClassificationGrader", "score": round(medium_score, 4), "max_steps": 10, "difficulty": "medium", }, { "task_id": "anomaly_detection_hard", "grader": "graders.grader_fix:HardFixGrader", "score": round(hard_score, 4), "max_steps": 20, "difficulty": "hard", }, ], "grader_count": 3, "all_scores_valid": all( 0.0 < s < 1.0 for s in [easy_score, medium_score, hard_score] ), } @app.websocket("/ws/telemetry") async def websocket_telemetry(websocket: WebSocket): await websocket.accept() try: while True: if active_env_instance and NATIVE_VERIFIED: data = { "active_count": active_env_instance.engine.active_count, "total_ingested": active_env_instance.engine.total_ingested, "ring_buffer_size": active_env_instance.engine.ring_buffer_size, "pool_capacity": active_env_instance.engine.pool_capacity, "latency_us": round(app_metrics["last_step_latency_us"], 3), "status": "NATIVE_ACTIVE" } else: data = {"status": "MOCK_FALLBACK", "latency_us": 0.0} await websocket.send_json(data) await asyncio.sleep(0.5) except WebSocketDisconnect: pass # ============================================================================== # PHASE 4: THE JUDGE-AUTHORITY DASHBOARD # ============================================================================== @app.get("/web", response_class=HTMLResponse) async def root_dashboard(): html_content = """ PayGorn | HFT Command Center
> SPSC_RING_BUFFER: ACTIVE
SYSTEM: INITIALIZING...

HFT Audit Command Center

An OpenEnv-compliant RL environment simulating high-frequency adversarial trading. Built on a native C++20 lock-free ring buffer, it maps microsecond telemetry directly to the agent's matrix for deterministic, zero-copy evaluation.

ENGINE_LATENCY
0.000μs
RECON_INTEGRITY (ACC)
0.00ratio
THROUGHPUT
0.00M/s
BUFFER_SAT
0.0%
ACTIVE_TRADES
0
TOTAL_LOGS
0
Execution Ledger
STEP OBS_STATE ACTION REWARD DONE
CLASSIFICATION_PULSE
TPTNFPFN
Raw Telemetry Stream
[SYS] Boot sequence initialized...
[SYS] Bridging C++ Memory Pool...
""" return HTMLResponse(content=html_content) def main(): port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port) if __name__ == "__main__": main()