import os import sys import json import time import glob import importlib.util import random import asyncio from typing import Optional, List from openai import AsyncOpenAI import uvicorn from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, RedirectResponse from pydantic import BaseModel from openenv.core.env_server import create_app # ============================================================================== # PHASE 1: NATIVE ENGINE BRIDGE & DYNAMIC LOADING # ============================================================================== _CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) _ROOT_DIR = os.path.abspath(os.path.join(_CURRENT_DIR, "..")) if _ROOT_DIR not in sys.path: sys.path.insert(0, _ROOT_DIR) if _CURRENT_DIR not in sys.path: sys.path.insert(0, _CURRENT_DIR) try: from fin_auditor_environment import FinAuditorEnvironment, hft_auditor from models import AuditorAction, AuditorObservation HAS_ENV = True NATIVE_VERIFIED = hft_auditor is not None hft_mod = hft_auditor except ImportError as e: HAS_ENV = False NATIVE_VERIFIED = False hft_mod = None print(f"\n[CRITICAL WARNING] Could not import dependencies. Running in fallback UI mode.") print(f"Exact Error: {e}\n") # ============================================================================== # PHASE 2: SYSTEM STATE & AUTHORITY TRACKING # ============================================================================== # FIX: Global pointer to capture the OpenEnv-managed instance active_env_instance = None if HAS_ENV and NATIVE_VERIFIED: class TrackedFinAuditorEnvironment(FinAuditorEnvironment): """Wrapper class to capture the environment instance created by OpenEnv""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) global active_env_instance active_env_instance = self # OpenEnv creates the FastAPI app and instantiates TrackedFinAuditorEnvironment internally app = create_app(TrackedFinAuditorEnvironment, AuditorAction, AuditorObservation) else: app = FastAPI(title="PayGorn (MOCK MODE)") @app.post("/reset") async def mock_reset(): return {"reward": 0.0} @app.post("/step") async def mock_step(action: dict): return {"reward": 0.5, "done": False, "step_count": 0} app_metrics = {"last_step_latency_us": 0.0} @app.middleware("http") async def capture_step_latency(request: Request, call_next): if request.url.path == "/step": start_ns = time.perf_counter_ns() response = await call_next(request) app_metrics["last_step_latency_us"] = (time.perf_counter_ns() - start_ns) / 1000.0 return response return await call_next(request) @app.get("/", include_in_schema=False) async def root_redirect(): return RedirectResponse(url="/web") system_health = { "so_found": NATIVE_VERIFIED, "key_validated": False, "model_detected": False, "connected": False, } llm_session = { "base_url": "https://router.huggingface.co/v1", "api_key": os.getenv("HF_TOKEN", ""), "model_name": "", "available_models": [] } class LLMConfig(BaseModel): api_key: str = "" model_name: Optional[str] = None base_url: Optional[str] = None class DifficultyConfig(BaseModel): level: str class ActionRequest(BaseModel): action_type: str async def execute_llm_step(api_key: str, base_url: str, model_name: str, batch_size: int = 40) -> list[int]: client = AsyncOpenAI(api_key=api_key, base_url=base_url, max_retries=1) prompt = ( "You are a High-Frequency Trading Risk Compliance Auditor.\n" f"You are processing a microsecond batch of {batch_size} trades.\n" "Rule 1: A False Negative (missing fraud) is catastrophic.\n" "Rule 2: A False Positive (flagging a valid trade) is a minor penalty.\n" f"Based on standard HFT anomaly distributions, output exactly {batch_size} binary decisions.\n" "0 = VALID, 1 = ANOMALY.\n" "Respond ONLY in valid JSON format using this exact schema: {\"decisions\": [int, int, ...]}" ) try: response = await client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": "Analyze the telemetry stream and output the decision matrix."} ], response_format={"type": "json_object"}, temperature=0.2 ) result = json.loads(response.choices[0].message.content) decisions = result.get("decisions", []) if not isinstance(decisions, list): decisions = [] if len(decisions) < batch_size: decisions += [random.choice([0, 1]) for _ in range(batch_size - len(decisions))] return decisions[:batch_size] except Exception as e: print(f"[LLM ROUTER ERROR] API execution failed: {e}") return [random.choice([0, 1]) for _ in range(batch_size)] # ============================================================================== # PHASE 3: API ENDPOINTS (The Data Pipeline) # ============================================================================== @app.get("/state") async def get_state(): # FIX: Check the dynamically tracked instance instead of the old static one if not active_env_instance: return {"status": "FALLBACK_MOCK_MODE", "health": system_health, "accuracy": 0.0, "latency_us": 0.0, "throughput_m": 0.0, "buffer_saturation": 0.0, "active_count": 0, "total_ingested": 0, "step_count": 0, "difficulty": "EASY", "metrics": {"tp": 0, "tn": 0, "fp": 0, "fn": 0}} tp = getattr(active_env_instance.state, 'last_tp', 0) fp = getattr(active_env_instance.state, 'last_fp', 0) tn = getattr(active_env_instance.state, 'last_tn', 0) fn = getattr(active_env_instance.state, 'last_fn', 0) accuracy = tp / (tp + fp) if (tp + fp) > 0 else 0.0 latency_us = app_metrics["last_step_latency_us"] return { "status": "NATIVE_RECON_ONLINE", "health": system_health, "accuracy": round(accuracy, 4), "latency_us": round(latency_us, 3), "latency_source": "grounded_app_middleware", "throughput_m": round((40 * 1e6) / (latency_us * 1000), 2) if latency_us > 0 else 0.0, "active_count": active_env_instance.engine.active_count, "total_ingested": active_env_instance.engine.total_ingested, "ring_buffer_size": active_env_instance.engine.ring_buffer_size, "buffer_saturation": (active_env_instance.engine.ring_buffer_size / active_env_instance.engine.pool_capacity) * 100, "step_count": active_env_instance.state.step_count, "metrics": {"tp": tp, "tn": tn, "fp": fp, "fn": fn}, "difficulty": getattr(active_env_instance, 'difficulty', "EASY") } @app.post("/dashboard/get_action") async def get_dashboard_action(req: ActionRequest): batch_size = 40 if req.action_type == "perfect": decisions = [1] * batch_size elif req.action_type == "llm": api_key = llm_session.get("api_key") base_url = llm_session.get("base_url") model_name = llm_session.get("model_name") if not model_name and llm_session.get("available_models"): model_name = llm_session["available_models"][0] if not api_key or not model_name: raise HTTPException(status_code=400, detail="LLM Provider not authenticated. Please inject an API key.") decisions = await execute_llm_step(api_key, base_url, model_name, batch_size) else: decisions = [random.choice([0, 1]) for _ in range(batch_size)] return {"decisions": decisions} @app.post("/config/llm") async def config_llm(cfg: LLMConfig): api_key = cfg.api_key if not api_key: api_key = llm_session.get("api_key") if not api_key: raise HTTPException(status_code=400, detail="API Key cannot be blank") base_url = "https://router.huggingface.co/v1" if api_key.startswith("AIza"): base_url = "https://generativelanguage.googleapis.com/v1beta/openai/" elif api_key.startswith("sk-ant"): base_url = "https://api.anthropic.com/v1" try: client = AsyncOpenAI(base_url=base_url, api_key=api_key, max_retries=2) try: response = await client.models.list() model_list = [m.id for m in response.data] except Exception as e: if "googleapis" in base_url: model_list = ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-2.0-flash"] else: raise e llm_session["api_key"] = api_key llm_session["base_url"] = base_url llm_session["available_models"] = model_list system_health["key_validated"] = True system_health["model_detected"] = len(model_list) > 0 if cfg.model_name and cfg.model_name in model_list: llm_session["model_name"] = cfg.model_name os.environ["MODEL_NAME"] = cfg.model_name os.environ["LLM_API_KEY"] = api_key os.environ["API_BASE_URL"] = base_url system_health["connected"] = True msg = f"Injected {cfg.model_name} credentials." else: msg = "Key validated. Models discovered via OpenAI spec." return {"status": "success", "models": model_list, "message": msg} except Exception as e: system_health["key_validated"] = False system_health["model_detected"] = False system_health["connected"] = False return {"status": "error", "message": f"Validation failed: {str(e)}"} @app.post("/config/default") async def config_default(): token = os.getenv("HF_TOKEN") if not token: return {"status": "error", "message": "No HF_TOKEN found in system environment"} return await config_llm(LLMConfig(api_key=token, base_url="https://router.huggingface.co/v1")) @app.post("/config/difficulty") async def set_difficulty(cfg: DifficultyConfig): if active_env_instance and NATIVE_VERIFIED: os.environ["TASK_ID"] = cfg.level.lower() if "easy" in cfg.level.lower(): active_env_instance.difficulty = hft_mod.Difficulty.EASY elif "medium" in cfg.level.lower(): active_env_instance.difficulty = hft_mod.Difficulty.MEDIUM else: active_env_instance.difficulty = hft_mod.Difficulty.HARD return {"status": "success", "difficulty": cfg.level} return {"status": "error", "message": "Engine not loaded"} @app.websocket("/ws/telemetry") async def websocket_telemetry(websocket: WebSocket): await websocket.accept() try: while True: if active_env_instance and NATIVE_VERIFIED: data = { "active_count": active_env_instance.engine.active_count, "total_ingested": active_env_instance.engine.total_ingested, "ring_buffer_size": active_env_instance.engine.ring_buffer_size, "pool_capacity": active_env_instance.engine.pool_capacity, "latency_us": round(app_metrics["last_step_latency_us"], 3), "status": "NATIVE_ACTIVE" } else: data = {"status": "MOCK_FALLBACK", "latency_us": 0.0} await websocket.send_json(data) await asyncio.sleep(0.5) except WebSocketDisconnect: pass # ============================================================================== # PHASE 4: THE JUDGE-AUTHORITY DASHBOARD # ============================================================================== @app.get("/web", response_class=HTMLResponse) async def root_dashboard(): html_content = """ PayGorn | HFT Command Center
> SPSC_RING_BUFFER: ACTIVE
SYSTEM: INITIALIZING...

HFT Audit Command Center

An OpenEnv-compliant RL environment simulating high-frequency adversarial trading. Built on a native C++20 lock-free ring buffer, it maps microsecond telemetry directly to the agent's matrix for deterministic, zero-copy evaluation.

ENGINE_LATENCY
0.000μs
RECON_INTEGRITY (ACC)
0.00ratio
THROUGHPUT
0.00M/s
BUFFER_SAT
0.0%
ACTIVE_TRADES
0
TOTAL_LOGS
0
Execution Ledger
STEP OBS_STATE ACTION REWARD DONE
CLASSIFICATION_PULSE
TPTNFPFN
Raw Telemetry Stream
[SYS] Boot sequence initialized...
[SYS] Bridging C++ Memory Pool...
""" return HTMLResponse(content=html_content) def main(): port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port) if __name__ == "__main__": main()