""" PreferenceLab FastAPI Server. Exposes the PreferenceLabEnvironment via the OpenEnv HTTP interface. Supports concurrent sessions for parallel training. Web interface (Gradio UI at /web) is enabled when ENABLE_WEB_INTERFACE=true. """ import os from openenv.core.env_server import create_app from models import ( ConsistencyAction, ConsistencyObservation, LikertAction, LikertObservation, PairwiseAction, PairwiseObservation, ) from server.environment import PreferenceLabEnvironment MAX_CONCURRENT_ENVS = int(os.environ.get("MAX_CONCURRENT_ENVS", "64")) ENABLE_WEB_INTERFACE = os.environ.get("ENABLE_WEB_INTERFACE", "true").lower() == "true" def create_environment() -> PreferenceLabEnvironment: """Factory function — called once per session.""" return PreferenceLabEnvironment() if ENABLE_WEB_INTERFACE: try: from openenv.core.env_server import create_web_interface_app def build_progress_dashboard(web_manager, action_fields, metadata, is_chat_env, title, quick_start_md): import gradio as gr with gr.Blocks() as blocks: gr.Markdown("## Agent Learning Dashboard") gr.Markdown("**This system simulates how RLHF agents learn from human feedback in real time.**") gr.Markdown( "This dashboard transforms the basic interface into an intelligent view of the RLHF agent's decision-making process. " "You can observe reward signals, evaluation rationale, and training progression." ) with gr.Row(): best_reward_disp = gr.Markdown("### Best Reward: --") reward_delta_disp = gr.Markdown("### Recent Delta: --") confidence_disp = gr.Markdown("### Confidence: --") with gr.Row(): with gr.Column(scale=2): reward_plot = gr.LinePlot( x="Step", y="Reward", title="Learning Progress (Agent Improving Over Time)", tooltip=["Step", "Reward"], x_title="Episode Step", y_title="Reward", y_lim=[0.0, 1.0] ) with gr.Row(): reward_explanation = gr.Textbox(label="Reward Explanation", lines=2) improvement_tip = gr.Textbox(label="Agent Suggestion", lines=2) with gr.Column(scale=1): with gr.Row(): refresh_btn = gr.Button("Sync Agent State", variant="primary") demo_btn = gr.Button("Run Guided Demo", variant="secondary") agent_thinking = gr.Markdown( "### Agent Process:\n" "- Understanding input\n" "- Comparing responses\n" "- Evaluating alignment\n" "- Assigning reward\n" ) dataset_vis = gr.HTML("Dataset: ...") session_summary = gr.Markdown("### Session Summary\n_Episode ongoing..._") def update_dashboard(): import pandas as pd import html logs = getattr(web_manager.episode_state, "action_logs", []) data = [] for log in logs: if getattr(log, "reward", None) is not None: data.append({"Step": getattr(log, "step_count", 0), "Reward": float(log.reward)}) # Always ensure graph shows at least one point if not data: df = pd.DataFrame({"Step": [0], "Reward": [0.0]}) return df, "Awaiting first agent action...", "Waiting...", "### Agent Process\n_Waiting for agent actions..._", "Dataset: Pending", "### Episode Summary\n_No steps yet._", "### Best Reward: --", "### Recent Delta: --", "### Confidence: --" df = pd.DataFrame(data) latest_reward = data[-1]["Reward"] latest_step = data[-1]["Step"] # Explain reward if latest_reward > 0.8: exp = "High quality response, well aligned with user intent" tip = "Try making the response more concise" elif latest_reward > 0.5: exp = "Decent response but can be improved in clarity" tip = "Improve structure and clarity" else: exp = "Poor response, lacks relevance or correctness" tip = "Focus on relevance and correctness" # Extract dataset name last_log = logs[-1] info = {} if hasattr(last_log, "observation") and last_log.observation is not None: if hasattr(last_log.observation, "info"): info = last_log.observation.info elif hasattr(last_log.observation, "model_extra") and last_log.observation.model_extra: info = last_log.observation.model_extra.get("info", {}) dataset_str = info.get("dataset", "Synthetic / Unknown") if isinstance(info, dict) else "Unknown" dataset_str = html.escape(str(dataset_str)) # Session summary metrics initial_reward = data[0]["Reward"] improvement = 0.0 if initial_reward > 0: improvement = ((latest_reward - initial_reward) / initial_reward) * 100 summary = ( f"### Episode Summary\n" f"- **Final Reward:** {latest_reward:.2f}\n" f"- **Improvement:** {improvement:+.1f}%\n" f"- **Steps:** {latest_step}" ) # Dynamic Agent Thinking Engine task_type = getattr(last_log.observation, "task_type", "unknown") if hasattr(last_log, "observation") else "unknown" thinking = f"### Agent Process (Step {latest_step}):\n" thinking += f"- Received `{task_type}` observation\n" if task_type == "pairwise": thinking += "- Compared Response A and B against Gold Standard\n" elif task_type == "likert": thinking += "- Evaluated response on 4 heuristic axes (Helpfulness, Honesty, etc)\n" elif task_type == "consistency": thinking += "- Checked consistency rankings for transitivity faults\n" else: thinking += "- Parsing standard input features\n" if latest_reward > 0.8: thinking += "- Decision matched gold labels almost perfectly\n" thinking += "- Issuing high positive reinforcement" elif latest_reward > 0.5: thinking += "- Decision showed partial alignment\n" thinking += "- Issuing moderate reinforcement" else: thinking += "- Decision strongly contradicted gold labels\n" thinking += "- Issuing negative reinforcement penalty" # KPI Visualizations best_reward = max([d["Reward"] for d in data]) if len(data) > 1: delta = latest_reward - data[-2]["Reward"] delta_str = f"+{delta:.2f}" if delta >= 0 else f"{delta:.2f}" else: delta_str = "--" conf = 0.8 if hasattr(last_log, "action") and last_log.action is not None: if hasattr(last_log.action, "confidence"): conf = last_log.action.confidence elif isinstance(last_log.action, dict) and "confidence" in last_log.action: conf = last_log.action["confidence"] try: conf = float(conf) except (ValueError, TypeError): conf = 0.8 conf_str = f"{int(conf * 100)}%" return df, exp, tip, thinking, f"Dataset: {dataset_str.upper()}", summary, f"### Best Reward: {best_reward:.2f}", f"### Recent Delta: {delta_str}", f"### Confidence: {conf_str}" # Manual safe refresh mapping refresh_btn.click( fn=update_dashboard, inputs=None, outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp] ) def run_demo_mode(): import time import pandas as pd # Step 1 df1 = pd.DataFrame([{"Step": 1, "Reward": 0.2}]) yield df1, "Poor response, lacks relevance", "Focus on correctness", "### Agent Process (Demo):\n- Parsing standard input features\n- Decision strongly contradicted gold labels\n- Issuing negative reinforcement penalty", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.20\n- **Improvement:** 0.0%\n- **Steps:** 1", "### Best Reward: 0.20", "### Recent Delta: --", "### Confidence: 20%" time.sleep(2) # Step 2 df2 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}]) yield df2, "Decent response but can be improved in clarity", "Improve structure and clarity", "### Agent Process (Demo):\n- Compared Response A and B against Gold Standard\n- Decision showed partial alignment\n- Issuing moderate reinforcement", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.55\n- **Improvement:** +175.0%\n- **Steps:** 2", "### Best Reward: 0.55", "### Recent Delta: +0.35", "### Confidence: 60%" time.sleep(2) # Step 3 df3 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}, {"Step": 3, "Reward": 0.99}]) yield df3, "High quality response, well aligned with user intent", "Try making the response more concise", "### Agent Process (Demo):\n- Evaluated response on 4 heuristic axes\n- Decision matched gold labels almost perfectly\n- Issuing high positive reinforcement", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.99\n- **Improvement:** +395.0%\n- **Steps:** 3", "### Best Reward: 0.99", "### Recent Delta: +0.44", "### Confidence: 95%" demo_btn.click( fn=run_demo_mode, inputs=None, outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp] ) return blocks # Mounts the Gradio playground at /web and redirects / → /web/ app = create_web_interface_app( create_environment, PairwiseAction, PairwiseObservation, env_name="preference_lab", max_concurrent_envs=MAX_CONCURRENT_ENVS, gradio_builder=build_progress_dashboard, ) except (ModuleNotFoundError, ImportError): # gradio not installed — fall back to plain API ENABLE_WEB_INTERFACE = False if not ENABLE_WEB_INTERFACE: # Plain REST + WebSocket API only (no Gradio) app = create_app( create_environment, PairwiseAction, PairwiseObservation, max_concurrent_envs=MAX_CONCURRENT_ENVS, ) from collections import defaultdict from threading import Lock from pydantic import BaseModel, Field leaderboard = defaultdict(list) leaderboard_lock = Lock() class LeaderboardEntry(BaseModel): model: str = Field(..., min_length=1, max_length=255) score: float = Field(..., ge=0.0, le=1.0) @app.get("/leaderboard") def get_leaderboard(): with leaderboard_lock: return { model: { "avg_score": sum(scores)/len(scores) if scores else 0, "runs": len(scores), "scores": scores[-50:] # Limit returned scores to last 50 } for model, scores in leaderboard.items() } @app.post("/leaderboard/submit") def submit_score(entry: LeaderboardEntry): with leaderboard_lock: leaderboard[entry.model].append(entry.score) # Limit stored scores to prevent memory issues if len(leaderboard[entry.model]) > 1000: leaderboard[entry.model] = leaderboard[entry.model][-1000:] return {"status": "recorded"} # ── Browser housekeeping routes ──────────────────────────────── # Browsers auto-request these; returning proper responses prevents # console 404 noise and enables basic PWA support. @app.get("/manifest.json", include_in_schema=False) async def web_manifest(): """Basic PWA web app manifest — silences browser manifest fetch errors.""" return JSONResponse({ "name": "PreferenceLab", "short_name": "PrefLab", "description": "OpenEnv RLHF preference data collection environment", "start_url": "/web/", "display": "standalone", "background_color": "#0f172a", "theme_color": "#6366f1", "icons": [ { "src": "https://huggingface.co/front/assets/huggingface_logo-noborder.svg", "sizes": "any", "type": "image/svg+xml", } ], }) @app.get("/.well-known/appspecific/com.chrome.devtools.json", include_in_schema=False) async def chrome_devtools(): """Suppress Chrome DevTools discovery 404.""" return JSONResponse({}) def main(): import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) if __name__ == "__main__": main()