Team404_FORGE / app.py
sanjay7676's picture
feat(gradio): timeout fallback to bundled mock_ui charts and summary
5ca87df
import os
import json
import gradio as gr
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from typing import Any, Dict
from fastapi import FastAPI
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from trainer import run_benchmark_mode, run_compare_mode
from memory import CoachMemory
from metrics.charts import generate_charts
from metrics.ui_mock import install_mock_charts_to_outputs, load_mock_ui_summary
from config import LOG_SUMMARY_FILE, REWARD_GRAPHS_DIR, OUTPUTS_DIR
from api_server import app as api_app
# Handle missing directories
os.makedirs(REWARD_GRAPHS_DIR, exist_ok=True)
os.makedirs(OUTPUTS_DIR, exist_ok=True)
def get_current_metrics() -> Dict[str, Any]:
"""Load latest metrics from summary.json if it exists."""
if os.path.exists(LOG_SUMMARY_FILE):
try:
with open(LOG_SUMMARY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except:
pass
return {}
def get_memory_lessons() -> str:
"""Get top lessons from CoachMemory."""
memory = CoachMemory()
summary = memory.summary()
top_lessons = summary.get("top_lessons", [])
if not top_lessons:
return "No lessons recorded yet."
output = ""
for idx, lesson in enumerate(top_lessons):
note = lesson.get("coach_note") or ""
w = lesson.get("reward_weight", 0.0)
output += f"{idx + 1}. {note} (Weight: {w})\n"
return output
def _cuda_ready() -> bool:
try:
import torch # noqa: PLC0415
return bool(torch.cuda.is_available())
except ImportError:
return False
def _ui_provider_options() -> list[str]:
# GPU Space: lead with local HF (real weights on T4). CPU: lead with offline so demos stay instant.
if _cuda_ready():
return ["custom_hf", "auto", "nim", "openrouter", "offline"]
return ["offline", "auto", "nim", "openrouter", "custom_hf"]
FORGE_PROVIDER_OPTIONS = _ui_provider_options()
def default_forge_ui_provider() -> str:
override = os.getenv("FORGE_DEFAULT_PROVIDER", "").strip().lower()
if override in FORGE_PROVIDER_OPTIONS:
return override
return "custom_hf" if _cuda_ready() else "offline"
def _benchmark_episode_cap() -> int:
return 30 if _cuda_ready() else 5
def _ui_candidates_per_step() -> int:
"""Gradio-only: fewer generations per step so `custom_hf` returns while the queue is still open."""
return max(1, min(8, int(os.getenv("FORGE_UI_CANDIDATES", "1"))))
def _ui_max_steps_for_gradio() -> int | None:
"""Gradio-only: cap steps per episode (`FORGE_UI_STEPS`). Use full, default, or 0 for global config.STEPS_PER_EPISODE."""
raw = os.getenv("FORGE_UI_STEPS", "2").strip().lower()
if raw in ("full", "default", "0"):
return None
try:
return max(1, min(10, int(raw)))
except ValueError:
return 2
def _ui_run_timeout_sec(*, compare: bool = False) -> float | None:
"""Seconds before Gradio swaps in bundled charts; ``None`` = wait indefinitely."""
raw = os.getenv("FORGE_UI_RUN_TIMEOUT_SEC", "120").strip()
if raw.lower() in ("0", "", "off", "none", "disable", "false"):
return None
try:
base = float(raw)
except ValueError:
base = 120.0
if base <= 0:
return None
if not compare:
return base
mult_raw = os.getenv("FORGE_UI_COMPARE_TIMEOUT_MULT", "2").strip()
try:
mult = float(mult_raw)
except ValueError:
mult = 2.0
return base * max(1.0, mult)
def _run_with_timeout(fn, timeout_sec: float | None):
"""Run ``fn()`` and return ``(result, ok, err_kind)``; pool is shut down with ``wait=False``."""
if timeout_sec is None:
try:
return fn(), True, None
except Exception as exc: # noqa: BLE001 — UI resilience
return None, False, str(exc)
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(fn)
try:
return future.result(timeout=timeout_sec), True, None
except FuturesTimeoutError:
return None, False, "timeout"
except Exception as exc: # noqa: BLE001
return None, False, str(exc)
finally:
executor.shutdown(wait=False)
def _fallback_ui_assets(err_kind: str | None) -> Dict[str, Any]:
"""Bundled PNGs + fixed ``summary.json`` so the UI closes without waiting on the tester."""
if err_kind == "timeout":
gr.Warning(
"This run exceeded FORGE_UI_RUN_TIMEOUT_SEC. Showing bundled illustrative charts and "
"fixed summary numbers from assets/mock_ui (not from this session). A slow job may still "
"be running in the background."
)
else:
gr.Warning(
f"Benchmark run failed ({err_kind}). Showing bundled illustrative charts and fixed "
"summary numbers from assets/mock_ui."
)
install_mock_charts_to_outputs(OUTPUTS_DIR)
return load_mock_ui_summary()
def run_benchmark_ui(episodes, forge_provider_label: str):
"""Gradio wrapper for benchmark mode."""
ep_count = min(int(episodes), _benchmark_episode_cap())
mode = forge_provider_label if forge_provider_label in (
"auto", "custom_hf", "nim", "openrouter", "offline", "mock"
) else "offline"
if mode == "custom_hf" and not _cuda_ready():
gr.Info(
"No GPU: using the offline deterministic baseline for inference "
"(environment and rewards are still real; no local Hub weight load)."
)
mode = "offline"
def _benchmark_job():
return run_benchmark_mode(
policy_name="model",
episodes=ep_count,
verbose=False,
forge_provider=mode,
candidates_per_step=_ui_candidates_per_step(),
max_steps=_ui_max_steps_for_gradio(),
)
report, ok, err = _run_with_timeout(_benchmark_job, _ui_run_timeout_sec(compare=False))
if not ok:
summary = _fallback_ui_assets(err)
lessons = get_memory_lessons()
else:
summary = report.get("summary", {})
generate_charts() # Update trends too
lessons = get_memory_lessons()
# Paths for Gradio (as requested by user)
reward_path = os.path.join(OUTPUTS_DIR, "reward_curve.png")
pass_rate_path = os.path.join(OUTPUTS_DIR, "pass_rate.png")
return (
f"{summary.get('avg_pass_rate', 0.0):.2f}",
f"{summary.get('avg_defender_reward', 0.0):+.2f}",
f"{summary.get('avg_adversary_reward', 0.0):+.2f}",
f"{summary.get('max_tier', 1)}",
reward_path if os.path.exists(reward_path) else None,
pass_rate_path if os.path.exists(pass_rate_path) else None,
lessons
)
def run_compare_ui(episodes, forge_provider_label: str):
"""Gradio wrapper for compare mode."""
ep_count = min(int(episodes), 10 if _cuda_ready() else 3)
mode = forge_provider_label if forge_provider_label in (
"auto", "custom_hf", "nim", "openrouter", "offline", "mock"
) else "offline"
if mode == "custom_hf" and not _cuda_ready():
gr.Info(
"No GPU: using the offline deterministic baseline for inference "
"(environment and rewards are still real; no local Hub weight load)."
)
mode = "offline"
def _compare_job():
return run_compare_mode(
model_policy_name="model",
episodes=ep_count,
verbose=False,
forge_provider=mode,
candidates_per_step=_ui_candidates_per_step(),
max_steps=_ui_max_steps_for_gradio(),
)
report, ok, err = _run_with_timeout(_compare_job, _ui_run_timeout_sec(compare=True))
if not ok:
model_summary = _fallback_ui_assets(err)
lessons = get_memory_lessons()
else:
model_summary = report.get("model", {})
generate_charts()
lessons = get_memory_lessons()
# Paths for Gradio (as requested by user)
reward_path = os.path.join(OUTPUTS_DIR, "reward_curve.png")
pass_rate_path = os.path.join(OUTPUTS_DIR, "pass_rate.png")
return (
f"{model_summary.get('avg_pass_rate', 0.0):.2f}",
f"{model_summary.get('avg_defender_reward', 0.0):+.2f}",
f"{model_summary.get('avg_adversary_reward', 0.0):+.2f}",
f"{model_summary.get('max_tier', 1)}",
reward_path if os.path.exists(reward_path) else None,
pass_rate_path if os.path.exists(pass_rate_path) else None,
lessons
)
# --- Gradio UI Layout ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# FORGE-v4: Adversarial Robust Code Generation Environment")
# Pre-load data
initial_lessons = get_memory_lessons()
initial_reward = os.path.join(OUTPUTS_DIR, "reward_curve.png")
initial_pass = os.path.join(OUTPUTS_DIR, "pass_rate.png")
with gr.Tab("1. Project Summary"):
gr.Markdown("""
### Adversarial Code-Generation Benchmarking
FORGE-v4 is an environment for training and evaluating code-generation models against adversarial pressure.
**Key Features:**
- **Two-Agent Interaction**: Defender (Coder) vs. Adversary (Breaker).
- **Tiered Red-Teaming**: The Breaker escalates difficulty (negatives, duplicates, large arrays) as the Defender improves.
- **CoachMemory Feedback**: Models learn from past failures to generate more robust solutions.
- **OpenEnv Compliant**: Standardized API for LLM agent integration.
""")
with gr.Tab("2. Training & Evaluation"):
with gr.Row():
episodes_input = gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Episodes (Limited for Demo)")
provider_input = gr.Dropdown(
choices=FORGE_PROVIDER_OPTIONS,
value=default_forge_ui_provider(),
label="Inference provider",
info=(
"**custom_hf** = local PyTorch + Hub weights on **GPU** only; on CPU it automatically uses **offline** baseline. "
"**auto** = NIM → OpenRouter → optional local HF if **HF_TOKEN** is set → else offline. "
"**offline** = deterministic baseline (no Hub load; fast on CPU). "
"Gradio uses **`FORGE_UI_CANDIDATES`** (default 1) and **`FORGE_UI_STEPS`** (default 2 steps/episode; set `full` for config default). "
"If **`FORGE_UI_RUN_TIMEOUT_SEC`** is exceeded, the UI shows bundled **`assets/mock_ui`** charts and fixed summary numbers. CLI/training use full settings."
),
)
with gr.Row():
btn_benchmark = gr.Button("Run Model Benchmark", variant="primary")
btn_compare = gr.Button("Compare Baseline vs Model", variant="secondary")
gr.Markdown("### Latest Evaluation Results")
with gr.Row():
m_pass = gr.Textbox(label="Pass Rate", placeholder="0.00")
m_def_reward = gr.Textbox(label="Defender Reward", placeholder="+0.0")
m_adv_reward = gr.Textbox(label="Adversary Reward", placeholder="+0.0")
m_tier = gr.Textbox(label="Max Tier Reached", placeholder="1")
with gr.Row():
with gr.Column():
gr.Markdown("#### Reward Trend")
plot_reward = gr.Image(value=initial_reward if os.path.exists(initial_reward) else None, label="Reward Curve", type="filepath")
with gr.Column():
gr.Markdown("#### Pass Rate Trend")
plot_pass = gr.Image(value=initial_pass if os.path.exists(initial_pass) else None, label="Pass Rate Curve", type="filepath")
gr.Markdown("### Coach Memory: Top Lessons Learned")
memory_output = gr.Textbox(value=initial_lessons, lines=5, label="Strategic Improvements", placeholder="Run training to see lessons...")
with gr.Tab("3. API Endpoints"):
gr.Markdown("""
### OpenEnv API Standard
FORGE-v4 serves **Gradio at `/`** and the OpenEnv JSON routes at the **same origin** (`/health`, `/reset`, `/step`, `/state`). Locally, `python api_server.py` serves **API-only** on **`:8000`**; `python app.py` serves UI **+** API on **`:7860`**. On this Space, use your **`*.hf.space`** base URL (no `/start` — use **`POST /reset`** then **`POST /step`**).
- **`GET /health`**: Liveness / version check.
- **`POST /reset`**: Starts a new episode and returns the initial state (new random task each time unless Space secret **`FORGE_DETERMINISTIC_RESET=1`**).
- **`POST /step`**: JSON body: `coder_code`, `coder_version`, optional `candidate_solutions` (array of strings). Returns rewards and updated state.
- **`GET /state`**: Current environment snapshot.
**Example (replace `BASE` with your Space `https://….hf.space` host):**
`curl -sS "$BASE/health"` → `curl -sS -X POST "$BASE/reset" -H "Content-Type: application/json"` → `curl -sS -X POST "$BASE/step" -H "Content-Type: application/json" -d '{"coder_code":"def solution(arr):\\n return sorted(list(arr))","coder_version":"demo"}'`
""")
# Event handlers
btn_benchmark.click(
run_benchmark_ui,
inputs=[episodes_input, provider_input],
outputs=[m_pass, m_def_reward, m_adv_reward, m_tier, plot_reward, plot_pass, memory_output],
)
btn_compare.click(
run_compare_ui,
inputs=[episodes_input, provider_input],
outputs=[m_pass, m_def_reward, m_adv_reward, m_tier, plot_reward, plot_pass, memory_output],
)
# Mount Gradio at "/" so Hugging Face Spaces (hub iframe + *.hf.space) load assets and
# websockets from the same root. OpenEnv routes on api_app are registered before this mount
# and keep precedence over the Gradio catch-all.
app = gr.mount_gradio_app(
api_app,
demo,
path="/",
ssr_mode=False,
)
# HF Spaces (and other reverse proxies) terminate TLS and set X-Forwarded-Proto. Without this,
# Gradio's slash redirects emit http://… which the browser blocks inside https iframes → blank UI.
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
proxy_headers=True,
forwarded_allow_ips="*",
)