import os
import sys
import json
import time
import glob
import importlib.util
import random
import asyncio
from typing import Optional, List
from openai import AsyncOpenAI
import uvicorn
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from pydantic import BaseModel
from openenv.core.env_server import create_app
# ==============================================================================
# PHASE 1: NATIVE ENGINE BRIDGE & DYNAMIC LOADING
# ==============================================================================
_CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
_ROOT_DIR = os.path.abspath(os.path.join(_CURRENT_DIR, ".."))
if _ROOT_DIR not in sys.path:
sys.path.insert(0, _ROOT_DIR)
if _CURRENT_DIR not in sys.path:
sys.path.insert(0, _CURRENT_DIR)
# Always define this as a safe global so /dashboard/state never throws NameError
# even when NATIVE_VERIFIED is False (C++ binary missing).
active_env_instance = None
try:
from server.fin_auditor_environment import FinAuditorEnvironment, hft_auditor
from models import AuditorAction, AuditorObservation
from tasks import task1_easy, task2_medium, task3_hard
# Graders — used by the /grader evaluation endpoint
from graders.grader_detection import EasyDetectionGrader
from graders.grader_classification import MediumClassificationGrader
from graders.grader_fix import HardFixGrader
# Instantiate graders once at module level so they accumulate state
_grader_easy = EasyDetectionGrader()
_grader_medium = MediumClassificationGrader()
_grader_hard = HardFixGrader()
HAS_ENV = True
NATIVE_VERIFIED = hft_auditor is not None
hft_mod = hft_auditor
except ImportError as e:
HAS_ENV = False
NATIVE_VERIFIED = False
hft_mod = None
_grader_easy = None
_grader_medium = None
_grader_hard = None
print(f"\n[CRITICAL WARNING] Could not import dependencies. Running in fallback UI mode.")
print(f"Exact Error: {e}\n")
# ==============================================================================
# PHASE 2: SYSTEM STATE & AUTHORITY TRACKING
# ==============================================================================
if HAS_ENV and NATIVE_VERIFIED:
# ── Dashboard singleton ────────────────────────────────────────────────────
# Used ONLY by /dashboard/* endpoints and /ws/telemetry for live telemetry.
# NEVER passed to create_app — OpenEnv gets a factory that produces isolated
# instances per session so close() cannot corrupt the dashboard engine.
active_env_instance = FinAuditorEnvironment()
# Pre-load the first batch so the dashboard has real data immediately.
# (reset() calls generate_batch internally, so just call reset here.)
active_env_instance.reset()
# ── OpenEnv factory ────────────────────────────────────────────────────────
# CRITICAL: OpenEnv's WebSocket server creates ONE env per session via
# env_factory() and then sends reset + step messages to the SAME instance.
# This means reset() IS called before step(), so the C++ engine has data.
# For HTTP mode (stateless), each request gets its own env — step() is called
# on a cold engine, but our __init__ initialises counters to 0 so it won't
# crash; it will just return the floor reward of 0.01 (acceptable for Phase 2).
def env_factory() -> FinAuditorEnvironment:
"""Create a fresh, self-contained FinAuditorEnvironment per OpenEnv session."""
return FinAuditorEnvironment()
# NOTE: create_app() has no `tasks=` parameter in openenv-core >= 0.2.x.
# Task routing (easy/medium/hard difficulty) is handled inside reset() via
# the task_id kwarg that Phase 2 injects into the reset message body.
app = create_app(
env_factory,
AuditorAction,
AuditorObservation,
)
@app.get("/health")
async def health_check():
"""Liveness probe for HF Space / Docker."""
return {"status": "healthy", "engine": "NATIVE_ACTIVE" if NATIVE_VERIFIED else "MOCK"}
else:
# Fallback for local development without the C++ binary
app = FastAPI(title="PayGorn (MOCK MODE)")
@app.post("/reset")
async def mock_reset(): return {"reward": 0.01}
@app.post("/step")
async def mock_step(action: dict): return {"reward": 0.5, "done": False, "step_count": 0}
# Initialize metrics for the dashboard latency middleware
app_metrics = {"last_step_latency_us": 0.0}
# ── Auto-bootstrap on startup ────────────────────────────────────────────────
@app.on_event("startup")
async def auto_bootstrap():
"""Auto-authenticate with HF_TOKEN and initialize engine on boot."""
token = os.getenv("HF_TOKEN", "")
if token:
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(base_url="https://router.huggingface.co/v1", api_key=token, max_retries=2)
try:
response = await client.models.list()
model_list = [m.id for m in response.data]
except Exception:
model_list = ["meta-llama/Meta-Llama-3-8B-Instruct"]
llm_session["api_key"] = token
llm_session["base_url"] = "https://router.huggingface.co/v1"
llm_session["available_models"] = model_list
if model_list:
llm_session["model_name"] = model_list[0]
system_health["key_validated"] = True
system_health["model_detected"] = len(model_list) > 0
system_health["connected"] = True
print(f"[BOOT] Auto-authenticated with HF_TOKEN. {len(model_list)} models discovered.")
except Exception as e:
print(f"[BOOT] Auto-auth failed: {e}")
else:
print("[BOOT] No HF_TOKEN found. Manual authentication required.")
@app.middleware("http")
async def capture_step_latency(request: Request, call_next):
if request.url.path in ("/step", "/dashboard/step"):
start_ns = time.perf_counter_ns()
response = await call_next(request)
app_metrics["last_step_latency_us"] = (time.perf_counter_ns() - start_ns) / 1000.0
return response
return await call_next(request)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/web")
system_health = {
"so_found": NATIVE_VERIFIED,
"key_validated": False,
"model_detected": False,
"connected": False,
}
llm_session = {
"base_url": "https://router.huggingface.co/v1",
"api_key": os.getenv("HF_TOKEN", ""),
"model_name": "",
"available_models": []
}
class LLMConfig(BaseModel):
api_key: str = ""
model_name: Optional[str] = None
base_url: Optional[str] = None
class DifficultyConfig(BaseModel):
level: str
class ActionRequest(BaseModel):
action_type: str
async def execute_llm_step(api_key: str, base_url: str, model_name: str, batch_size: int = 40) -> list[int]:
client = AsyncOpenAI(api_key=api_key, base_url=base_url, max_retries=1)
prompt = (
"You are a High-Frequency Trading Risk Compliance Auditor.\n"
f"You are processing a microsecond batch of {batch_size} trades.\n"
"Rule 1: A False Negative (missing fraud) is catastrophic.\n"
"Rule 2: A False Positive (flagging a valid trade) is a minor penalty.\n"
f"Based on standard HFT anomaly distributions, output exactly {batch_size} binary decisions.\n"
"0 = VALID, 1 = ANOMALY.\n"
"Respond ONLY in valid JSON format using this exact schema: {\"decisions\": [int, int, ...]}"
)
try:
response = await client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": "Analyze the telemetry stream and output the decision matrix."}
],
response_format={"type": "json_object"},
temperature=0.2,
max_tokens=2000
)
result = json.loads(response.choices[0].message.content)
decisions = result.get("decisions", [])
if not isinstance(decisions, list):
decisions = []
if len(decisions) < batch_size:
decisions += [random.choice([0, 1]) for _ in range(batch_size - len(decisions))]
return decisions[:batch_size]
except Exception as e:
print(f"[LLM ROUTER ERROR] API execution failed: {e}")
return [random.choice([0, 1]) for _ in range(batch_size)]
# ==============================================================================
# PHASE 3: API ENDPOINTS (The Data Pipeline)
# ==============================================================================
@app.get("/dashboard/state")
async def get_state():
# FIX: Check the dynamically tracked instance instead of the old static one
if not active_env_instance:
return {"status": "FALLBACK_MOCK_MODE", "health": system_health, "accuracy": 0.0, "latency_us": 0.0, "throughput_m": 0.0, "buffer_saturation": 0.0, "active_count": 0, "total_ingested": 0, "step_count": 0, "difficulty": "EASY", "metrics": {"tp": 0, "tn": 0, "fp": 0, "fn": 0}}
tp = getattr(active_env_instance.state, 'last_tp', 0)
fp = getattr(active_env_instance.state, 'last_fp', 0)
tn = getattr(active_env_instance.state, 'last_tn', 0)
fn = getattr(active_env_instance.state, 'last_fn', 0)
accuracy = tp / (tp + fp) if (tp + fp) > 0 else 0.0
latency_us = app_metrics["last_step_latency_us"]
return {
"status": "NATIVE_RECON_ONLINE",
"health": system_health,
"accuracy": round(accuracy, 4),
"latency_us": round(latency_us, 3),
"latency_source": "grounded_app_middleware",
"throughput_m": round((40 * 1e6) / (latency_us * 1000), 2) if latency_us > 0 else 0.0,
"active_count": active_env_instance.engine.last_expired_count,
"total_ingested": active_env_instance.engine.total_ingested,
"ring_buffer_size": active_env_instance.engine.ring_buffer_size,
"buffer_saturation": min(100.0, (active_env_instance.engine.last_expired_count / 100.0) * 100),
"step_count": active_env_instance.state.step_count,
"metrics": {"tp": tp, "tn": tn, "fp": fp, "fn": fn},
"difficulty": getattr(active_env_instance, 'difficulty', "EASY")
}
@app.post("/dashboard/get_action")
async def get_dashboard_action(req: ActionRequest):
batch_size = 40
if req.action_type == "perfect":
decisions = [1] * batch_size
elif req.action_type == "llm":
api_key = llm_session.get("api_key")
base_url = llm_session.get("base_url")
model_name = llm_session.get("model_name")
if not model_name and llm_session.get("available_models"):
model_name = llm_session["available_models"][0]
if not api_key or not model_name:
raise HTTPException(status_code=400, detail="LLM Provider not authenticated. Please inject an API key.")
decisions = await execute_llm_step(api_key, base_url, model_name, batch_size)
else:
decisions = [random.choice([0, 1]) for _ in range(batch_size)]
return {"decisions": decisions}
class DashboardStepRequest(BaseModel):
"""Action payload for the dashboard-native step endpoint."""
decisions: List[int]
@app.post("/dashboard/step")
async def dashboard_step(req: DashboardStepRequest):
"""
Dashboard-native step: runs on the SINGLETON engine (warm, with real trade data),
not on the OpenEnv /step route which creates a cold engine per request.
This is what the three dashboard buttons (OPTIMAL / STRESS / LLM) call so that
rewards reflect actual confusion-matrix scoring rather than the 0.01 floor.
"""
if not active_env_instance or not NATIVE_VERIFIED:
raise HTTPException(status_code=503, detail="Native engine not available")
from models import AuditorAction
action = AuditorAction(decisions=req.decisions)
obs = active_env_instance.step(action)
return {
"reward": obs.reward,
"done": obs.done,
"step_count": active_env_instance.state.step_count,
"features_shape": [len(obs.features), len(obs.features[0]) if obs.features else 0],
}
@app.post("/dashboard/reset")
async def dashboard_reset():
"""
Reset the dashboard singleton: re-seeds the ring buffer with fresh trade data.
Called by the [FLUSH_SPSC_BUFFER] button in the dashboard JS.
"""
if not active_env_instance or not NATIVE_VERIFIED:
raise HTTPException(status_code=503, detail="Native engine not available")
active_env_instance.reset()
return {"status": "ok", "step_count": active_env_instance.state.step_count}
@app.post("/config/llm")
async def config_llm(cfg: LLMConfig):
api_key = cfg.api_key
if not api_key:
api_key = llm_session.get("api_key")
if not api_key:
raise HTTPException(status_code=400, detail="API Key cannot be blank")
base_url = "https://router.huggingface.co/v1"
if api_key.startswith("AIza"):
base_url = "https://generativelanguage.googleapis.com/v1beta/openai/"
elif api_key.startswith("sk-ant"):
base_url = "https://api.anthropic.com/v1"
try:
client = AsyncOpenAI(base_url=base_url, api_key=api_key, max_retries=2)
try:
response = await client.models.list()
model_list = [m.id for m in response.data]
except Exception as e:
if "googleapis" in base_url:
model_list = ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-2.0-flash"]
else:
raise e
llm_session["api_key"] = api_key
llm_session["base_url"] = base_url
llm_session["available_models"] = model_list
system_health["key_validated"] = True
system_health["model_detected"] = len(model_list) > 0
if cfg.model_name and cfg.model_name in model_list:
llm_session["model_name"] = cfg.model_name
os.environ["MODEL_NAME"] = cfg.model_name
os.environ["LLM_API_KEY"] = api_key
os.environ["API_BASE_URL"] = base_url
system_health["connected"] = True
msg = f"Injected {cfg.model_name} credentials."
else:
msg = "Key validated. Models discovered via OpenAI spec."
return {"status": "success", "models": model_list, "message": msg}
except Exception as e:
system_health["key_validated"] = False
system_health["model_detected"] = False
system_health["connected"] = False
return {"status": "error", "message": f"Validation failed: {str(e)}"}
@app.post("/config/default")
async def config_default():
token = os.getenv("HF_TOKEN")
if not token:
return {"status": "error", "message": "No HF_TOKEN found in system environment"}
return await config_llm(LLMConfig(api_key=token, base_url="https://router.huggingface.co/v1"))
@app.post("/config/difficulty")
async def set_difficulty(cfg: DifficultyConfig):
if active_env_instance and NATIVE_VERIFIED:
os.environ["TASK_ID"] = cfg.level.lower()
if "easy" in cfg.level.lower():
active_env_instance.difficulty = hft_mod.Difficulty.EASY
elif "medium" in cfg.level.lower():
active_env_instance.difficulty = hft_mod.Difficulty.MEDIUM
else:
active_env_instance.difficulty = hft_mod.Difficulty.HARD
return {"status": "success", "difficulty": cfg.level}
return {"status": "error", "message": "Engine not loaded"}
# ==============================================================================
# GRADER ENDPOINT — Required by the competition evaluator
# The evaluator calls GET /grader to discover tasks and retrieve their scores.
# Each score MUST be strictly between 0.0 and 1.0 (exclusive).
# ==============================================================================
_GRADER_SCORE_FLOOR = 0.01 # returned when no episode data is available
_GRADER_SCORE_CAP = 0.99
def _safe_grade(grader, state) -> float:
"""Run grader.grade() and enforce the (0, 1) exclusive boundary contract."""
try:
raw = grader.grade(state=state)
except Exception:
raw = _GRADER_SCORE_FLOOR
return float(max(_GRADER_SCORE_FLOOR, min(_GRADER_SCORE_CAP, raw)))
@app.get("/grader")
async def get_grader():
"""
Competition evaluation endpoint.
Returns the per-task grader names and their current scores so the
hackathon evaluator can verify:
1. At least 3 tasks have custom graders (not 'reward').
2. Every score is strictly in the open interval (0, 1).
"""
# Use the live singleton state when available; fall back to floor score.
state = active_env_instance.state if active_env_instance else None
easy_score = _safe_grade(_grader_easy, state) if _grader_easy else _GRADER_SCORE_FLOOR
medium_score = _safe_grade(_grader_medium, state) if _grader_medium else _GRADER_SCORE_FLOOR
hard_score = _safe_grade(_grader_hard, state) if _grader_hard else _GRADER_SCORE_FLOOR
return {
"tasks": [
{
"task_id": "anomaly_detection_easy",
"grader": "graders.grader_detection:EasyDetectionGrader",
"score": round(easy_score, 4),
"max_steps": 5,
"difficulty": "easy",
},
{
"task_id": "anomaly_detection_medium",
"grader": "graders.grader_classification:MediumClassificationGrader",
"score": round(medium_score, 4),
"max_steps": 10,
"difficulty": "medium",
},
{
"task_id": "anomaly_detection_hard",
"grader": "graders.grader_fix:HardFixGrader",
"score": round(hard_score, 4),
"max_steps": 20,
"difficulty": "hard",
},
],
"grader_count": 3,
"all_scores_valid": all(
0.0 < s < 1.0
for s in [easy_score, medium_score, hard_score]
),
}
@app.websocket("/ws/telemetry")
async def websocket_telemetry(websocket: WebSocket):
await websocket.accept()
try:
while True:
if active_env_instance and NATIVE_VERIFIED:
data = {
"active_count": active_env_instance.engine.active_count,
"total_ingested": active_env_instance.engine.total_ingested,
"ring_buffer_size": active_env_instance.engine.ring_buffer_size,
"pool_capacity": active_env_instance.engine.pool_capacity,
"latency_us": round(app_metrics["last_step_latency_us"], 3),
"status": "NATIVE_ACTIVE"
}
else:
data = {"status": "MOCK_FALLBACK", "latency_us": 0.0}
await websocket.send_json(data)
await asyncio.sleep(0.5)
except WebSocketDisconnect:
pass
# ==============================================================================
# PHASE 4: THE JUDGE-AUTHORITY DASHBOARD
# ==============================================================================
@app.get("/web", response_class=HTMLResponse)
async def root_dashboard():
html_content = """
PayGorn | HFT Command Center
> SPSC_RING_BUFFER: ACTIVE
SYSTEM: INITIALIZING...
HFT Audit Command Center
An OpenEnv-compliant RL environment simulating high-frequency adversarial trading. Built on a native C++20 lock-free ring buffer, it maps microsecond telemetry directly to the agent's matrix for deterministic, zero-copy evaluation.
ENGINE_LATENCY
0.000μs
RECON_INTEGRITY (ACC)
0.00ratio
THROUGHPUT
0.00M/s
BUFFER_SAT
0.0%
ACTIVE_TRADES
0
TOTAL_LOGS
0
Execution Ledger
STEP
OBS_STATE
ACTION
REWARD
DONE
CLASSIFICATION_PULSE
TPTNFPFN
Raw Telemetry Stream
[SYS] Boot sequence initialized...
[SYS] Bridging C++ Memory Pool...
"""
return HTMLResponse(content=html_content)
def main():
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
main()