import os
import sys
import json
import time
import glob
import importlib.util
import random
import asyncio
from typing import Optional, List
from openai import AsyncOpenAI
import uvicorn
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from pydantic import BaseModel
from openenv.core.env_server import create_app
# ==============================================================================
# PHASE 1: NATIVE ENGINE BRIDGE & DYNAMIC LOADING
# ==============================================================================
_CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
_ROOT_DIR = os.path.abspath(os.path.join(_CURRENT_DIR, ".."))
if _ROOT_DIR not in sys.path:
sys.path.insert(0, _ROOT_DIR)
if _CURRENT_DIR not in sys.path:
sys.path.insert(0, _CURRENT_DIR)
try:
from fin_auditor_environment import FinAuditorEnvironment, hft_auditor
from models import AuditorAction, AuditorObservation
HAS_ENV = True
NATIVE_VERIFIED = hft_auditor is not None
hft_mod = hft_auditor
except ImportError as e:
HAS_ENV = False
NATIVE_VERIFIED = False
hft_mod = None
print(f"\n[CRITICAL WARNING] Could not import dependencies. Running in fallback UI mode.")
print(f"Exact Error: {e}\n")
# ==============================================================================
# PHASE 2: SYSTEM STATE & AUTHORITY TRACKING
# ==============================================================================
# FIX: Global pointer to capture the OpenEnv-managed instance
active_env_instance = None
if HAS_ENV and NATIVE_VERIFIED:
class TrackedFinAuditorEnvironment(FinAuditorEnvironment):
"""Wrapper class to capture the environment instance created by OpenEnv"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
global active_env_instance
active_env_instance = self
# OpenEnv creates the FastAPI app and instantiates TrackedFinAuditorEnvironment internally
app = create_app(TrackedFinAuditorEnvironment, AuditorAction, AuditorObservation)
else:
app = FastAPI(title="PayGorn (MOCK MODE)")
@app.post("/reset")
async def mock_reset(): return {"reward": 0.0}
@app.post("/step")
async def mock_step(action: dict): return {"reward": 0.5, "done": False, "step_count": 0}
app_metrics = {"last_step_latency_us": 0.0}
@app.middleware("http")
async def capture_step_latency(request: Request, call_next):
if request.url.path == "/step":
start_ns = time.perf_counter_ns()
response = await call_next(request)
app_metrics["last_step_latency_us"] = (time.perf_counter_ns() - start_ns) / 1000.0
return response
return await call_next(request)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/web")
system_health = {
"so_found": NATIVE_VERIFIED,
"key_validated": False,
"model_detected": False,
"connected": False,
}
llm_session = {
"base_url": "https://router.huggingface.co/v1",
"api_key": os.getenv("HF_TOKEN", ""),
"model_name": "",
"available_models": []
}
class LLMConfig(BaseModel):
api_key: str = ""
model_name: Optional[str] = None
base_url: Optional[str] = None
class DifficultyConfig(BaseModel):
level: str
class ActionRequest(BaseModel):
action_type: str
async def execute_llm_step(api_key: str, base_url: str, model_name: str, batch_size: int = 40) -> list[int]:
client = AsyncOpenAI(api_key=api_key, base_url=base_url, max_retries=1)
prompt = (
"You are a High-Frequency Trading Risk Compliance Auditor.\n"
f"You are processing a microsecond batch of {batch_size} trades.\n"
"Rule 1: A False Negative (missing fraud) is catastrophic.\n"
"Rule 2: A False Positive (flagging a valid trade) is a minor penalty.\n"
f"Based on standard HFT anomaly distributions, output exactly {batch_size} binary decisions.\n"
"0 = VALID, 1 = ANOMALY.\n"
"Respond ONLY in valid JSON format using this exact schema: {\"decisions\": [int, int, ...]}"
)
try:
response = await client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": "Analyze the telemetry stream and output the decision matrix."}
],
response_format={"type": "json_object"},
temperature=0.2
)
result = json.loads(response.choices[0].message.content)
decisions = result.get("decisions", [])
if not isinstance(decisions, list):
decisions = []
if len(decisions) < batch_size:
decisions += [random.choice([0, 1]) for _ in range(batch_size - len(decisions))]
return decisions[:batch_size]
except Exception as e:
print(f"[LLM ROUTER ERROR] API execution failed: {e}")
return [random.choice([0, 1]) for _ in range(batch_size)]
# ==============================================================================
# PHASE 3: API ENDPOINTS (The Data Pipeline)
# ==============================================================================
@app.get("/state")
async def get_state():
# FIX: Check the dynamically tracked instance instead of the old static one
if not active_env_instance:
return {"status": "FALLBACK_MOCK_MODE", "health": system_health, "accuracy": 0.0, "latency_us": 0.0, "throughput_m": 0.0, "buffer_saturation": 0.0, "active_count": 0, "total_ingested": 0, "step_count": 0, "difficulty": "EASY", "metrics": {"tp": 0, "tn": 0, "fp": 0, "fn": 0}}
tp = getattr(active_env_instance.state, 'last_tp', 0)
fp = getattr(active_env_instance.state, 'last_fp', 0)
tn = getattr(active_env_instance.state, 'last_tn', 0)
fn = getattr(active_env_instance.state, 'last_fn', 0)
accuracy = tp / (tp + fp) if (tp + fp) > 0 else 0.0
latency_us = app_metrics["last_step_latency_us"]
return {
"status": "NATIVE_RECON_ONLINE",
"health": system_health,
"accuracy": round(accuracy, 4),
"latency_us": round(latency_us, 3),
"latency_source": "grounded_app_middleware",
"throughput_m": round((40 * 1e6) / (latency_us * 1000), 2) if latency_us > 0 else 0.0,
"active_count": active_env_instance.engine.active_count,
"total_ingested": active_env_instance.engine.total_ingested,
"ring_buffer_size": active_env_instance.engine.ring_buffer_size,
"buffer_saturation": (active_env_instance.engine.ring_buffer_size / active_env_instance.engine.pool_capacity) * 100,
"step_count": active_env_instance.state.step_count,
"metrics": {"tp": tp, "tn": tn, "fp": fp, "fn": fn},
"difficulty": getattr(active_env_instance, 'difficulty', "EASY")
}
@app.post("/dashboard/get_action")
async def get_dashboard_action(req: ActionRequest):
batch_size = 40
if req.action_type == "perfect":
decisions = [1] * batch_size
elif req.action_type == "llm":
api_key = llm_session.get("api_key")
base_url = llm_session.get("base_url")
model_name = llm_session.get("model_name")
if not model_name and llm_session.get("available_models"):
model_name = llm_session["available_models"][0]
if not api_key or not model_name:
raise HTTPException(status_code=400, detail="LLM Provider not authenticated. Please inject an API key.")
decisions = await execute_llm_step(api_key, base_url, model_name, batch_size)
else:
decisions = [random.choice([0, 1]) for _ in range(batch_size)]
return {"decisions": decisions}
@app.post("/config/llm")
async def config_llm(cfg: LLMConfig):
api_key = cfg.api_key
if not api_key:
api_key = llm_session.get("api_key")
if not api_key:
raise HTTPException(status_code=400, detail="API Key cannot be blank")
base_url = "https://router.huggingface.co/v1"
if api_key.startswith("AIza"):
base_url = "https://generativelanguage.googleapis.com/v1beta/openai/"
elif api_key.startswith("sk-ant"):
base_url = "https://api.anthropic.com/v1"
try:
client = AsyncOpenAI(base_url=base_url, api_key=api_key, max_retries=2)
try:
response = await client.models.list()
model_list = [m.id for m in response.data]
except Exception as e:
if "googleapis" in base_url:
model_list = ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-2.0-flash"]
else:
raise e
llm_session["api_key"] = api_key
llm_session["base_url"] = base_url
llm_session["available_models"] = model_list
system_health["key_validated"] = True
system_health["model_detected"] = len(model_list) > 0
if cfg.model_name and cfg.model_name in model_list:
llm_session["model_name"] = cfg.model_name
os.environ["MODEL_NAME"] = cfg.model_name
os.environ["LLM_API_KEY"] = api_key
os.environ["API_BASE_URL"] = base_url
system_health["connected"] = True
msg = f"Injected {cfg.model_name} credentials."
else:
msg = "Key validated. Models discovered via OpenAI spec."
return {"status": "success", "models": model_list, "message": msg}
except Exception as e:
system_health["key_validated"] = False
system_health["model_detected"] = False
system_health["connected"] = False
return {"status": "error", "message": f"Validation failed: {str(e)}"}
@app.post("/config/default")
async def config_default():
token = os.getenv("HF_TOKEN")
if not token:
return {"status": "error", "message": "No HF_TOKEN found in system environment"}
return await config_llm(LLMConfig(api_key=token, base_url="https://router.huggingface.co/v1"))
@app.post("/config/difficulty")
async def set_difficulty(cfg: DifficultyConfig):
if active_env_instance and NATIVE_VERIFIED:
os.environ["TASK_ID"] = cfg.level.lower()
if "easy" in cfg.level.lower():
active_env_instance.difficulty = hft_mod.Difficulty.EASY
elif "medium" in cfg.level.lower():
active_env_instance.difficulty = hft_mod.Difficulty.MEDIUM
else:
active_env_instance.difficulty = hft_mod.Difficulty.HARD
return {"status": "success", "difficulty": cfg.level}
return {"status": "error", "message": "Engine not loaded"}
@app.websocket("/ws/telemetry")
async def websocket_telemetry(websocket: WebSocket):
await websocket.accept()
try:
while True:
if active_env_instance and NATIVE_VERIFIED:
data = {
"active_count": active_env_instance.engine.active_count,
"total_ingested": active_env_instance.engine.total_ingested,
"ring_buffer_size": active_env_instance.engine.ring_buffer_size,
"pool_capacity": active_env_instance.engine.pool_capacity,
"latency_us": round(app_metrics["last_step_latency_us"], 3),
"status": "NATIVE_ACTIVE"
}
else:
data = {"status": "MOCK_FALLBACK", "latency_us": 0.0}
await websocket.send_json(data)
await asyncio.sleep(0.5)
except WebSocketDisconnect:
pass
# ==============================================================================
# PHASE 4: THE JUDGE-AUTHORITY DASHBOARD
# ==============================================================================
@app.get("/web", response_class=HTMLResponse)
async def root_dashboard():
html_content = """
PayGorn | HFT Command Center
> SPSC_RING_BUFFER: ACTIVE
SYSTEM: INITIALIZING...
HFT Audit Command Center
An OpenEnv-compliant RL environment simulating high-frequency adversarial trading. Built on a native C++20 lock-free ring buffer, it maps microsecond telemetry directly to the agent's matrix for deterministic, zero-copy evaluation.
ENGINE_LATENCY
0.000μs
RECON_INTEGRITY (ACC)
0.00ratio
THROUGHPUT
0.00M/s
BUFFER_SAT
0.0%
ACTIVE_TRADES
0
TOTAL_LOGS
0
Execution Ledger
STEP
OBS_STATE
ACTION
REWARD
DONE
CLASSIFICATION_PULSE
TPTNFPFN
Raw Telemetry Stream
[SYS] Boot sequence initialized...
[SYS] Bridging C++ Memory Pool...
"""
return HTMLResponse(content=html_content)
def main():
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
main()