Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import gradio as gr | |
| import pandas as pd | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError | |
| from typing import Any, Dict | |
| from fastapi import FastAPI | |
| from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware | |
| from trainer import run_benchmark_mode, run_compare_mode | |
| from memory import CoachMemory | |
| from metrics.charts import generate_charts | |
| from metrics.ui_mock import install_mock_charts_to_outputs, load_mock_ui_summary | |
| from config import LOG_SUMMARY_FILE, REWARD_GRAPHS_DIR, OUTPUTS_DIR | |
| from api_server import app as api_app | |
| # Handle missing directories | |
| os.makedirs(REWARD_GRAPHS_DIR, exist_ok=True) | |
| os.makedirs(OUTPUTS_DIR, exist_ok=True) | |
| def get_current_metrics() -> Dict[str, Any]: | |
| """Load latest metrics from summary.json if it exists.""" | |
| if os.path.exists(LOG_SUMMARY_FILE): | |
| try: | |
| with open(LOG_SUMMARY_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except: | |
| pass | |
| return {} | |
| def get_memory_lessons() -> str: | |
| """Get top lessons from CoachMemory.""" | |
| memory = CoachMemory() | |
| summary = memory.summary() | |
| top_lessons = summary.get("top_lessons", []) | |
| if not top_lessons: | |
| return "No lessons recorded yet." | |
| output = "" | |
| for idx, lesson in enumerate(top_lessons): | |
| note = lesson.get("coach_note") or "" | |
| w = lesson.get("reward_weight", 0.0) | |
| output += f"{idx + 1}. {note} (Weight: {w})\n" | |
| return output | |
| def _cuda_ready() -> bool: | |
| try: | |
| import torch # noqa: PLC0415 | |
| return bool(torch.cuda.is_available()) | |
| except ImportError: | |
| return False | |
| def _ui_provider_options() -> list[str]: | |
| # GPU Space: lead with local HF (real weights on T4). CPU: lead with offline so demos stay instant. | |
| if _cuda_ready(): | |
| return ["custom_hf", "auto", "nim", "openrouter", "offline"] | |
| return ["offline", "auto", "nim", "openrouter", "custom_hf"] | |
| FORGE_PROVIDER_OPTIONS = _ui_provider_options() | |
| def default_forge_ui_provider() -> str: | |
| override = os.getenv("FORGE_DEFAULT_PROVIDER", "").strip().lower() | |
| if override in FORGE_PROVIDER_OPTIONS: | |
| return override | |
| return "custom_hf" if _cuda_ready() else "offline" | |
| def _benchmark_episode_cap() -> int: | |
| return 30 if _cuda_ready() else 5 | |
| def _ui_candidates_per_step() -> int: | |
| """Gradio-only: fewer generations per step so `custom_hf` returns while the queue is still open.""" | |
| return max(1, min(8, int(os.getenv("FORGE_UI_CANDIDATES", "1")))) | |
| def _ui_max_steps_for_gradio() -> int | None: | |
| """Gradio-only: cap steps per episode (`FORGE_UI_STEPS`). Use full, default, or 0 for global config.STEPS_PER_EPISODE.""" | |
| raw = os.getenv("FORGE_UI_STEPS", "2").strip().lower() | |
| if raw in ("full", "default", "0"): | |
| return None | |
| try: | |
| return max(1, min(10, int(raw))) | |
| except ValueError: | |
| return 2 | |
| def _ui_run_timeout_sec(*, compare: bool = False) -> float | None: | |
| """Seconds before Gradio swaps in bundled charts; ``None`` = wait indefinitely.""" | |
| raw = os.getenv("FORGE_UI_RUN_TIMEOUT_SEC", "120").strip() | |
| if raw.lower() in ("0", "", "off", "none", "disable", "false"): | |
| return None | |
| try: | |
| base = float(raw) | |
| except ValueError: | |
| base = 120.0 | |
| if base <= 0: | |
| return None | |
| if not compare: | |
| return base | |
| mult_raw = os.getenv("FORGE_UI_COMPARE_TIMEOUT_MULT", "2").strip() | |
| try: | |
| mult = float(mult_raw) | |
| except ValueError: | |
| mult = 2.0 | |
| return base * max(1.0, mult) | |
| def _run_with_timeout(fn, timeout_sec: float | None): | |
| """Run ``fn()`` and return ``(result, ok, err_kind)``; pool is shut down with ``wait=False``.""" | |
| if timeout_sec is None: | |
| try: | |
| return fn(), True, None | |
| except Exception as exc: # noqa: BLE001 — UI resilience | |
| return None, False, str(exc) | |
| executor = ThreadPoolExecutor(max_workers=1) | |
| future = executor.submit(fn) | |
| try: | |
| return future.result(timeout=timeout_sec), True, None | |
| except FuturesTimeoutError: | |
| return None, False, "timeout" | |
| except Exception as exc: # noqa: BLE001 | |
| return None, False, str(exc) | |
| finally: | |
| executor.shutdown(wait=False) | |
| def _fallback_ui_assets(err_kind: str | None) -> Dict[str, Any]: | |
| """Bundled PNGs + fixed ``summary.json`` so the UI closes without waiting on the tester.""" | |
| if err_kind == "timeout": | |
| gr.Warning( | |
| "This run exceeded FORGE_UI_RUN_TIMEOUT_SEC. Showing bundled illustrative charts and " | |
| "fixed summary numbers from assets/mock_ui (not from this session). A slow job may still " | |
| "be running in the background." | |
| ) | |
| else: | |
| gr.Warning( | |
| f"Benchmark run failed ({err_kind}). Showing bundled illustrative charts and fixed " | |
| "summary numbers from assets/mock_ui." | |
| ) | |
| install_mock_charts_to_outputs(OUTPUTS_DIR) | |
| return load_mock_ui_summary() | |
| def run_benchmark_ui(episodes, forge_provider_label: str): | |
| """Gradio wrapper for benchmark mode.""" | |
| ep_count = min(int(episodes), _benchmark_episode_cap()) | |
| mode = forge_provider_label if forge_provider_label in ( | |
| "auto", "custom_hf", "nim", "openrouter", "offline", "mock" | |
| ) else "offline" | |
| if mode == "custom_hf" and not _cuda_ready(): | |
| gr.Info( | |
| "No GPU: using the offline deterministic baseline for inference " | |
| "(environment and rewards are still real; no local Hub weight load)." | |
| ) | |
| mode = "offline" | |
| def _benchmark_job(): | |
| return run_benchmark_mode( | |
| policy_name="model", | |
| episodes=ep_count, | |
| verbose=False, | |
| forge_provider=mode, | |
| candidates_per_step=_ui_candidates_per_step(), | |
| max_steps=_ui_max_steps_for_gradio(), | |
| ) | |
| report, ok, err = _run_with_timeout(_benchmark_job, _ui_run_timeout_sec(compare=False)) | |
| if not ok: | |
| summary = _fallback_ui_assets(err) | |
| lessons = get_memory_lessons() | |
| else: | |
| summary = report.get("summary", {}) | |
| generate_charts() # Update trends too | |
| lessons = get_memory_lessons() | |
| # Paths for Gradio (as requested by user) | |
| reward_path = os.path.join(OUTPUTS_DIR, "reward_curve.png") | |
| pass_rate_path = os.path.join(OUTPUTS_DIR, "pass_rate.png") | |
| return ( | |
| f"{summary.get('avg_pass_rate', 0.0):.2f}", | |
| f"{summary.get('avg_defender_reward', 0.0):+.2f}", | |
| f"{summary.get('avg_adversary_reward', 0.0):+.2f}", | |
| f"{summary.get('max_tier', 1)}", | |
| reward_path if os.path.exists(reward_path) else None, | |
| pass_rate_path if os.path.exists(pass_rate_path) else None, | |
| lessons | |
| ) | |
| def run_compare_ui(episodes, forge_provider_label: str): | |
| """Gradio wrapper for compare mode.""" | |
| ep_count = min(int(episodes), 10 if _cuda_ready() else 3) | |
| mode = forge_provider_label if forge_provider_label in ( | |
| "auto", "custom_hf", "nim", "openrouter", "offline", "mock" | |
| ) else "offline" | |
| if mode == "custom_hf" and not _cuda_ready(): | |
| gr.Info( | |
| "No GPU: using the offline deterministic baseline for inference " | |
| "(environment and rewards are still real; no local Hub weight load)." | |
| ) | |
| mode = "offline" | |
| def _compare_job(): | |
| return run_compare_mode( | |
| model_policy_name="model", | |
| episodes=ep_count, | |
| verbose=False, | |
| forge_provider=mode, | |
| candidates_per_step=_ui_candidates_per_step(), | |
| max_steps=_ui_max_steps_for_gradio(), | |
| ) | |
| report, ok, err = _run_with_timeout(_compare_job, _ui_run_timeout_sec(compare=True)) | |
| if not ok: | |
| model_summary = _fallback_ui_assets(err) | |
| lessons = get_memory_lessons() | |
| else: | |
| model_summary = report.get("model", {}) | |
| generate_charts() | |
| lessons = get_memory_lessons() | |
| # Paths for Gradio (as requested by user) | |
| reward_path = os.path.join(OUTPUTS_DIR, "reward_curve.png") | |
| pass_rate_path = os.path.join(OUTPUTS_DIR, "pass_rate.png") | |
| return ( | |
| f"{model_summary.get('avg_pass_rate', 0.0):.2f}", | |
| f"{model_summary.get('avg_defender_reward', 0.0):+.2f}", | |
| f"{model_summary.get('avg_adversary_reward', 0.0):+.2f}", | |
| f"{model_summary.get('max_tier', 1)}", | |
| reward_path if os.path.exists(reward_path) else None, | |
| pass_rate_path if os.path.exists(pass_rate_path) else None, | |
| lessons | |
| ) | |
| # --- Gradio UI Layout --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# FORGE-v4: Adversarial Robust Code Generation Environment") | |
| # Pre-load data | |
| initial_lessons = get_memory_lessons() | |
| initial_reward = os.path.join(OUTPUTS_DIR, "reward_curve.png") | |
| initial_pass = os.path.join(OUTPUTS_DIR, "pass_rate.png") | |
| with gr.Tab("1. Project Summary"): | |
| gr.Markdown(""" | |
| ### Adversarial Code-Generation Benchmarking | |
| FORGE-v4 is an environment for training and evaluating code-generation models against adversarial pressure. | |
| **Key Features:** | |
| - **Two-Agent Interaction**: Defender (Coder) vs. Adversary (Breaker). | |
| - **Tiered Red-Teaming**: The Breaker escalates difficulty (negatives, duplicates, large arrays) as the Defender improves. | |
| - **CoachMemory Feedback**: Models learn from past failures to generate more robust solutions. | |
| - **OpenEnv Compliant**: Standardized API for LLM agent integration. | |
| """) | |
| with gr.Tab("2. Training & Evaluation"): | |
| with gr.Row(): | |
| episodes_input = gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Episodes (Limited for Demo)") | |
| provider_input = gr.Dropdown( | |
| choices=FORGE_PROVIDER_OPTIONS, | |
| value=default_forge_ui_provider(), | |
| label="Inference provider", | |
| info=( | |
| "**custom_hf** = local PyTorch + Hub weights on **GPU** only; on CPU it automatically uses **offline** baseline. " | |
| "**auto** = NIM → OpenRouter → optional local HF if **HF_TOKEN** is set → else offline. " | |
| "**offline** = deterministic baseline (no Hub load; fast on CPU). " | |
| "Gradio uses **`FORGE_UI_CANDIDATES`** (default 1) and **`FORGE_UI_STEPS`** (default 2 steps/episode; set `full` for config default). " | |
| "If **`FORGE_UI_RUN_TIMEOUT_SEC`** is exceeded, the UI shows bundled **`assets/mock_ui`** charts and fixed summary numbers. CLI/training use full settings." | |
| ), | |
| ) | |
| with gr.Row(): | |
| btn_benchmark = gr.Button("Run Model Benchmark", variant="primary") | |
| btn_compare = gr.Button("Compare Baseline vs Model", variant="secondary") | |
| gr.Markdown("### Latest Evaluation Results") | |
| with gr.Row(): | |
| m_pass = gr.Textbox(label="Pass Rate", placeholder="0.00") | |
| m_def_reward = gr.Textbox(label="Defender Reward", placeholder="+0.0") | |
| m_adv_reward = gr.Textbox(label="Adversary Reward", placeholder="+0.0") | |
| m_tier = gr.Textbox(label="Max Tier Reached", placeholder="1") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Reward Trend") | |
| plot_reward = gr.Image(value=initial_reward if os.path.exists(initial_reward) else None, label="Reward Curve", type="filepath") | |
| with gr.Column(): | |
| gr.Markdown("#### Pass Rate Trend") | |
| plot_pass = gr.Image(value=initial_pass if os.path.exists(initial_pass) else None, label="Pass Rate Curve", type="filepath") | |
| gr.Markdown("### Coach Memory: Top Lessons Learned") | |
| memory_output = gr.Textbox(value=initial_lessons, lines=5, label="Strategic Improvements", placeholder="Run training to see lessons...") | |
| with gr.Tab("3. API Endpoints"): | |
| gr.Markdown(""" | |
| ### OpenEnv API Standard | |
| FORGE-v4 serves **Gradio at `/`** and the OpenEnv JSON routes at the **same origin** (`/health`, `/reset`, `/step`, `/state`). Locally, `python api_server.py` serves **API-only** on **`:8000`**; `python app.py` serves UI **+** API on **`:7860`**. On this Space, use your **`*.hf.space`** base URL (no `/start` — use **`POST /reset`** then **`POST /step`**). | |
| - **`GET /health`**: Liveness / version check. | |
| - **`POST /reset`**: Starts a new episode and returns the initial state (new random task each time unless Space secret **`FORGE_DETERMINISTIC_RESET=1`**). | |
| - **`POST /step`**: JSON body: `coder_code`, `coder_version`, optional `candidate_solutions` (array of strings). Returns rewards and updated state. | |
| - **`GET /state`**: Current environment snapshot. | |
| **Example (replace `BASE` with your Space `https://….hf.space` host):** | |
| `curl -sS "$BASE/health"` → `curl -sS -X POST "$BASE/reset" -H "Content-Type: application/json"` → `curl -sS -X POST "$BASE/step" -H "Content-Type: application/json" -d '{"coder_code":"def solution(arr):\\n return sorted(list(arr))","coder_version":"demo"}'` | |
| """) | |
| # Event handlers | |
| btn_benchmark.click( | |
| run_benchmark_ui, | |
| inputs=[episodes_input, provider_input], | |
| outputs=[m_pass, m_def_reward, m_adv_reward, m_tier, plot_reward, plot_pass, memory_output], | |
| ) | |
| btn_compare.click( | |
| run_compare_ui, | |
| inputs=[episodes_input, provider_input], | |
| outputs=[m_pass, m_def_reward, m_adv_reward, m_tier, plot_reward, plot_pass, memory_output], | |
| ) | |
| # Mount Gradio at "/" so Hugging Face Spaces (hub iframe + *.hf.space) load assets and | |
| # websockets from the same root. OpenEnv routes on api_app are registered before this mount | |
| # and keep precedence over the Gradio catch-all. | |
| app = gr.mount_gradio_app( | |
| api_app, | |
| demo, | |
| path="/", | |
| ssr_mode=False, | |
| ) | |
| # HF Spaces (and other reverse proxies) terminate TLS and set X-Forwarded-Proto. Without this, | |
| # Gradio's slash redirects emit http://… which the browser blocks inside https iframes → blank UI. | |
| app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| proxy_headers=True, | |
| forwarded_allow_ips="*", | |
| ) | |