Spaces:
Running
Running
Sibam
refactor: apply production readiness recommendations including dataset caching, XSS protection, pure schemas, and JSON decoding logic.
5ee1380 | """ | |
| PreferenceLab FastAPI Server. | |
| Exposes the PreferenceLabEnvironment via the OpenEnv HTTP interface. | |
| Supports concurrent sessions for parallel training. | |
| Web interface (Gradio UI at /web) is enabled when ENABLE_WEB_INTERFACE=true. | |
| """ | |
| import os | |
| from openenv.core.env_server import create_app | |
| from models import ( | |
| ConsistencyAction, | |
| ConsistencyObservation, | |
| LikertAction, | |
| LikertObservation, | |
| PairwiseAction, | |
| PairwiseObservation, | |
| ) | |
| from server.environment import PreferenceLabEnvironment | |
| MAX_CONCURRENT_ENVS = int(os.environ.get("MAX_CONCURRENT_ENVS", "64")) | |
| ENABLE_WEB_INTERFACE = os.environ.get("ENABLE_WEB_INTERFACE", "true").lower() == "true" | |
| def create_environment() -> PreferenceLabEnvironment: | |
| """Factory function β called once per session.""" | |
| return PreferenceLabEnvironment() | |
| if ENABLE_WEB_INTERFACE: | |
| try: | |
| from openenv.core.env_server import create_web_interface_app | |
| def build_progress_dashboard(web_manager, action_fields, metadata, is_chat_env, title, quick_start_md): | |
| import gradio as gr | |
| with gr.Blocks() as blocks: | |
| gr.Markdown("## Agent Learning Dashboard") | |
| gr.Markdown("**This system simulates how RLHF agents learn from human feedback in real time.**") | |
| gr.Markdown( | |
| "This dashboard transforms the basic interface into an intelligent view of the RLHF agent's decision-making process. " | |
| "You can observe reward signals, evaluation rationale, and training progression." | |
| ) | |
| with gr.Row(): | |
| best_reward_disp = gr.Markdown("### Best Reward: --") | |
| reward_delta_disp = gr.Markdown("### Recent Delta: --") | |
| confidence_disp = gr.Markdown("### Confidence: --") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| reward_plot = gr.LinePlot( | |
| x="Step", | |
| y="Reward", | |
| title="Learning Progress (Agent Improving Over Time)", | |
| tooltip=["Step", "Reward"], | |
| x_title="Episode Step", | |
| y_title="Reward", | |
| y_lim=[0.0, 1.0] | |
| ) | |
| with gr.Row(): | |
| reward_explanation = gr.Textbox(label="Reward Explanation", lines=2) | |
| improvement_tip = gr.Textbox(label="Agent Suggestion", lines=2) | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| refresh_btn = gr.Button("Sync Agent State", variant="primary") | |
| demo_btn = gr.Button("Run Guided Demo", variant="secondary") | |
| agent_thinking = gr.Markdown( | |
| "### Agent Process:\n" | |
| "- Understanding input\n" | |
| "- Comparing responses\n" | |
| "- Evaluating alignment\n" | |
| "- Assigning reward\n" | |
| ) | |
| dataset_vis = gr.HTML("Dataset: <b>...</b>") | |
| session_summary = gr.Markdown("### Session Summary\n_Episode ongoing..._") | |
| def update_dashboard(): | |
| import pandas as pd | |
| import html | |
| logs = getattr(web_manager.episode_state, "action_logs", []) | |
| data = [] | |
| for log in logs: | |
| if getattr(log, "reward", None) is not None: | |
| data.append({"Step": getattr(log, "step_count", 0), "Reward": float(log.reward)}) | |
| # Always ensure graph shows at least one point | |
| if not data: | |
| df = pd.DataFrame({"Step": [0], "Reward": [0.0]}) | |
| return df, "Awaiting first agent action...", "Waiting...", "### Agent Process\n_Waiting for agent actions..._", "Dataset: <b>Pending</b>", "### Episode Summary\n_No steps yet._", "### Best Reward: --", "### Recent Delta: --", "### Confidence: --" | |
| df = pd.DataFrame(data) | |
| latest_reward = data[-1]["Reward"] | |
| latest_step = data[-1]["Step"] | |
| # Explain reward | |
| if latest_reward > 0.8: | |
| exp = "High quality response, well aligned with user intent" | |
| tip = "Try making the response more concise" | |
| elif latest_reward > 0.5: | |
| exp = "Decent response but can be improved in clarity" | |
| tip = "Improve structure and clarity" | |
| else: | |
| exp = "Poor response, lacks relevance or correctness" | |
| tip = "Focus on relevance and correctness" | |
| # Extract dataset name | |
| last_log = logs[-1] | |
| info = {} | |
| if hasattr(last_log, "observation") and last_log.observation is not None: | |
| if hasattr(last_log.observation, "info"): | |
| info = last_log.observation.info | |
| elif hasattr(last_log.observation, "model_extra") and last_log.observation.model_extra: | |
| info = last_log.observation.model_extra.get("info", {}) | |
| dataset_str = info.get("dataset", "Synthetic / Unknown") if isinstance(info, dict) else "Unknown" | |
| dataset_str = html.escape(str(dataset_str)) | |
| # Session summary metrics | |
| initial_reward = data[0]["Reward"] | |
| improvement = 0.0 | |
| if initial_reward > 0: | |
| improvement = ((latest_reward - initial_reward) / initial_reward) * 100 | |
| summary = ( | |
| f"### Episode Summary\n" | |
| f"- **Final Reward:** {latest_reward:.2f}\n" | |
| f"- **Improvement:** {improvement:+.1f}%\n" | |
| f"- **Steps:** {latest_step}" | |
| ) | |
| # Dynamic Agent Thinking Engine | |
| task_type = getattr(last_log.observation, "task_type", "unknown") if hasattr(last_log, "observation") else "unknown" | |
| thinking = f"### Agent Process (Step {latest_step}):\n" | |
| thinking += f"- Received `{task_type}` observation\n" | |
| if task_type == "pairwise": | |
| thinking += "- Compared Response A and B against Gold Standard\n" | |
| elif task_type == "likert": | |
| thinking += "- Evaluated response on 4 heuristic axes (Helpfulness, Honesty, etc)\n" | |
| elif task_type == "consistency": | |
| thinking += "- Checked consistency rankings for transitivity faults\n" | |
| else: | |
| thinking += "- Parsing standard input features\n" | |
| if latest_reward > 0.8: | |
| thinking += "- Decision matched gold labels almost perfectly\n" | |
| thinking += "- Issuing high positive reinforcement" | |
| elif latest_reward > 0.5: | |
| thinking += "- Decision showed partial alignment\n" | |
| thinking += "- Issuing moderate reinforcement" | |
| else: | |
| thinking += "- Decision strongly contradicted gold labels\n" | |
| thinking += "- Issuing negative reinforcement penalty" | |
| # KPI Visualizations | |
| best_reward = max([d["Reward"] for d in data]) | |
| if len(data) > 1: | |
| delta = latest_reward - data[-2]["Reward"] | |
| delta_str = f"+{delta:.2f}" if delta >= 0 else f"{delta:.2f}" | |
| else: | |
| delta_str = "--" | |
| conf = 0.8 | |
| if hasattr(last_log, "action") and last_log.action is not None: | |
| if hasattr(last_log.action, "confidence"): | |
| conf = last_log.action.confidence | |
| elif isinstance(last_log.action, dict) and "confidence" in last_log.action: | |
| conf = last_log.action["confidence"] | |
| try: | |
| conf = float(conf) | |
| except (ValueError, TypeError): | |
| conf = 0.8 | |
| conf_str = f"{int(conf * 100)}%" | |
| return df, exp, tip, thinking, f"Dataset: <b>{dataset_str.upper()}</b>", summary, f"### Best Reward: {best_reward:.2f}", f"### Recent Delta: {delta_str}", f"### Confidence: {conf_str}" | |
| # Manual safe refresh mapping | |
| refresh_btn.click( | |
| fn=update_dashboard, | |
| inputs=None, | |
| outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp] | |
| ) | |
| def run_demo_mode(): | |
| import time | |
| import pandas as pd | |
| # Step 1 | |
| df1 = pd.DataFrame([{"Step": 1, "Reward": 0.2}]) | |
| yield df1, "Poor response, lacks relevance", "Focus on correctness", "### Agent Process (Demo):\n- Parsing standard input features\n- Decision strongly contradicted gold labels\n- Issuing negative reinforcement penalty", "Dataset: <b>ANTHROPIC/HH-RLHF</b>", "### Episode Summary\n- **Final Reward:** 0.20\n- **Improvement:** 0.0%\n- **Steps:** 1", "### Best Reward: 0.20", "### Recent Delta: --", "### Confidence: 20%" | |
| time.sleep(2) | |
| # Step 2 | |
| df2 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}]) | |
| yield df2, "Decent response but can be improved in clarity", "Improve structure and clarity", "### Agent Process (Demo):\n- Compared Response A and B against Gold Standard\n- Decision showed partial alignment\n- Issuing moderate reinforcement", "Dataset: <b>ANTHROPIC/HH-RLHF</b>", "### Episode Summary\n- **Final Reward:** 0.55\n- **Improvement:** +175.0%\n- **Steps:** 2", "### Best Reward: 0.55", "### Recent Delta: +0.35", "### Confidence: 60%" | |
| time.sleep(2) | |
| # Step 3 | |
| df3 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}, {"Step": 3, "Reward": 0.99}]) | |
| yield df3, "High quality response, well aligned with user intent", "Try making the response more concise", "### Agent Process (Demo):\n- Evaluated response on 4 heuristic axes\n- Decision matched gold labels almost perfectly\n- Issuing high positive reinforcement", "Dataset: <b>ANTHROPIC/HH-RLHF</b>", "### Episode Summary\n- **Final Reward:** 0.99\n- **Improvement:** +395.0%\n- **Steps:** 3", "### Best Reward: 0.99", "### Recent Delta: +0.44", "### Confidence: 95%" | |
| demo_btn.click( | |
| fn=run_demo_mode, | |
| inputs=None, | |
| outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp] | |
| ) | |
| return blocks | |
| # Mounts the Gradio playground at /web and redirects / β /web/ | |
| app = create_web_interface_app( | |
| create_environment, | |
| PairwiseAction, | |
| PairwiseObservation, | |
| env_name="preference_lab", | |
| max_concurrent_envs=MAX_CONCURRENT_ENVS, | |
| gradio_builder=build_progress_dashboard, | |
| ) | |
| except (ModuleNotFoundError, ImportError): | |
| # gradio not installed β fall back to plain API | |
| ENABLE_WEB_INTERFACE = False | |
| if not ENABLE_WEB_INTERFACE: | |
| # Plain REST + WebSocket API only (no Gradio) | |
| app = create_app( | |
| create_environment, | |
| PairwiseAction, | |
| PairwiseObservation, | |
| max_concurrent_envs=MAX_CONCURRENT_ENVS, | |
| ) | |
| from collections import defaultdict | |
| from threading import Lock | |
| from pydantic import BaseModel, Field | |
| leaderboard = defaultdict(list) | |
| leaderboard_lock = Lock() | |
| class LeaderboardEntry(BaseModel): | |
| model: str = Field(..., min_length=1, max_length=255) | |
| score: float = Field(..., ge=0.0, le=1.0) | |
| def get_leaderboard(): | |
| with leaderboard_lock: | |
| return { | |
| model: { | |
| "avg_score": sum(scores)/len(scores) if scores else 0, | |
| "runs": len(scores), | |
| "scores": scores[-50:] # Limit returned scores to last 50 | |
| } | |
| for model, scores in leaderboard.items() | |
| } | |
| def submit_score(entry: LeaderboardEntry): | |
| with leaderboard_lock: | |
| leaderboard[entry.model].append(entry.score) | |
| # Limit stored scores to prevent memory issues | |
| if len(leaderboard[entry.model]) > 1000: | |
| leaderboard[entry.model] = leaderboard[entry.model][-1000:] | |
| return {"status": "recorded"} | |
| # ββ Browser housekeeping routes ββββββββββββββββββββββββββββββββ | |
| # Browsers auto-request these; returning proper responses prevents | |
| # console 404 noise and enables basic PWA support. | |
| async def web_manifest(): | |
| """Basic PWA web app manifest β silences browser manifest fetch errors.""" | |
| return JSONResponse({ | |
| "name": "PreferenceLab", | |
| "short_name": "PrefLab", | |
| "description": "OpenEnv RLHF preference data collection environment", | |
| "start_url": "/web/", | |
| "display": "standalone", | |
| "background_color": "#0f172a", | |
| "theme_color": "#6366f1", | |
| "icons": [ | |
| { | |
| "src": "https://huggingface.co/front/assets/huggingface_logo-noborder.svg", | |
| "sizes": "any", | |
| "type": "image/svg+xml", | |
| } | |
| ], | |
| }) | |
| async def chrome_devtools(): | |
| """Suppress Chrome DevTools discovery 404.""" | |
| return JSONResponse({}) | |
| def main(): | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| if __name__ == "__main__": | |
| main() | |