"""
PreferenceLab FastAPI Server.
Exposes the PreferenceLabEnvironment via the OpenEnv HTTP interface.
Supports concurrent sessions for parallel training.
Web interface (Gradio UI at /web) is enabled when ENABLE_WEB_INTERFACE=true.
"""
import os
from openenv.core.env_server import create_app
from models import (
ConsistencyAction,
ConsistencyObservation,
LikertAction,
LikertObservation,
PairwiseAction,
PairwiseObservation,
)
from server.environment import PreferenceLabEnvironment
MAX_CONCURRENT_ENVS = int(os.environ.get("MAX_CONCURRENT_ENVS", "64"))
ENABLE_WEB_INTERFACE = os.environ.get("ENABLE_WEB_INTERFACE", "true").lower() == "true"
def create_environment() -> PreferenceLabEnvironment:
"""Factory function — called once per session."""
return PreferenceLabEnvironment()
if ENABLE_WEB_INTERFACE:
try:
from openenv.core.env_server import create_web_interface_app
def build_progress_dashboard(web_manager, action_fields, metadata, is_chat_env, title, quick_start_md):
import gradio as gr
with gr.Blocks() as blocks:
gr.Markdown("## Agent Learning Dashboard")
gr.Markdown("**This system simulates how RLHF agents learn from human feedback in real time.**")
gr.Markdown(
"This dashboard transforms the basic interface into an intelligent view of the RLHF agent's decision-making process. "
"You can observe reward signals, evaluation rationale, and training progression."
)
with gr.Row():
best_reward_disp = gr.Markdown("### Best Reward: --")
reward_delta_disp = gr.Markdown("### Recent Delta: --")
confidence_disp = gr.Markdown("### Confidence: --")
with gr.Row():
with gr.Column(scale=2):
reward_plot = gr.LinePlot(
x="Step",
y="Reward",
title="Learning Progress (Agent Improving Over Time)",
tooltip=["Step", "Reward"],
x_title="Episode Step",
y_title="Reward",
y_lim=[0.0, 1.0]
)
with gr.Row():
reward_explanation = gr.Textbox(label="Reward Explanation", lines=2)
improvement_tip = gr.Textbox(label="Agent Suggestion", lines=2)
with gr.Column(scale=1):
with gr.Row():
refresh_btn = gr.Button("Sync Agent State", variant="primary")
demo_btn = gr.Button("Run Guided Demo", variant="secondary")
agent_thinking = gr.Markdown(
"### Agent Process:\n"
"- Understanding input\n"
"- Comparing responses\n"
"- Evaluating alignment\n"
"- Assigning reward\n"
)
dataset_vis = gr.HTML("Dataset: ...")
session_summary = gr.Markdown("### Session Summary\n_Episode ongoing..._")
def update_dashboard():
import pandas as pd
import html
logs = getattr(web_manager.episode_state, "action_logs", [])
data = []
for log in logs:
if getattr(log, "reward", None) is not None:
data.append({"Step": getattr(log, "step_count", 0), "Reward": float(log.reward)})
# Always ensure graph shows at least one point
if not data:
df = pd.DataFrame({"Step": [0], "Reward": [0.0]})
return df, "Awaiting first agent action...", "Waiting...", "### Agent Process\n_Waiting for agent actions..._", "Dataset: Pending", "### Episode Summary\n_No steps yet._", "### Best Reward: --", "### Recent Delta: --", "### Confidence: --"
df = pd.DataFrame(data)
latest_reward = data[-1]["Reward"]
latest_step = data[-1]["Step"]
# Explain reward
if latest_reward > 0.8:
exp = "High quality response, well aligned with user intent"
tip = "Try making the response more concise"
elif latest_reward > 0.5:
exp = "Decent response but can be improved in clarity"
tip = "Improve structure and clarity"
else:
exp = "Poor response, lacks relevance or correctness"
tip = "Focus on relevance and correctness"
# Extract dataset name
last_log = logs[-1]
info = {}
if hasattr(last_log, "observation") and last_log.observation is not None:
if hasattr(last_log.observation, "info"):
info = last_log.observation.info
elif hasattr(last_log.observation, "model_extra") and last_log.observation.model_extra:
info = last_log.observation.model_extra.get("info", {})
dataset_str = info.get("dataset", "Synthetic / Unknown") if isinstance(info, dict) else "Unknown"
dataset_str = html.escape(str(dataset_str))
# Session summary metrics
initial_reward = data[0]["Reward"]
improvement = 0.0
if initial_reward > 0:
improvement = ((latest_reward - initial_reward) / initial_reward) * 100
summary = (
f"### Episode Summary\n"
f"- **Final Reward:** {latest_reward:.2f}\n"
f"- **Improvement:** {improvement:+.1f}%\n"
f"- **Steps:** {latest_step}"
)
# Dynamic Agent Thinking Engine
task_type = getattr(last_log.observation, "task_type", "unknown") if hasattr(last_log, "observation") else "unknown"
thinking = f"### Agent Process (Step {latest_step}):\n"
thinking += f"- Received `{task_type}` observation\n"
if task_type == "pairwise":
thinking += "- Compared Response A and B against Gold Standard\n"
elif task_type == "likert":
thinking += "- Evaluated response on 4 heuristic axes (Helpfulness, Honesty, etc)\n"
elif task_type == "consistency":
thinking += "- Checked consistency rankings for transitivity faults\n"
else:
thinking += "- Parsing standard input features\n"
if latest_reward > 0.8:
thinking += "- Decision matched gold labels almost perfectly\n"
thinking += "- Issuing high positive reinforcement"
elif latest_reward > 0.5:
thinking += "- Decision showed partial alignment\n"
thinking += "- Issuing moderate reinforcement"
else:
thinking += "- Decision strongly contradicted gold labels\n"
thinking += "- Issuing negative reinforcement penalty"
# KPI Visualizations
best_reward = max([d["Reward"] for d in data])
if len(data) > 1:
delta = latest_reward - data[-2]["Reward"]
delta_str = f"+{delta:.2f}" if delta >= 0 else f"{delta:.2f}"
else:
delta_str = "--"
conf = 0.8
if hasattr(last_log, "action") and last_log.action is not None:
if hasattr(last_log.action, "confidence"):
conf = last_log.action.confidence
elif isinstance(last_log.action, dict) and "confidence" in last_log.action:
conf = last_log.action["confidence"]
try:
conf = float(conf)
except (ValueError, TypeError):
conf = 0.8
conf_str = f"{int(conf * 100)}%"
return df, exp, tip, thinking, f"Dataset: {dataset_str.upper()}", summary, f"### Best Reward: {best_reward:.2f}", f"### Recent Delta: {delta_str}", f"### Confidence: {conf_str}"
# Manual safe refresh mapping
refresh_btn.click(
fn=update_dashboard,
inputs=None,
outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp]
)
def run_demo_mode():
import time
import pandas as pd
# Step 1
df1 = pd.DataFrame([{"Step": 1, "Reward": 0.2}])
yield df1, "Poor response, lacks relevance", "Focus on correctness", "### Agent Process (Demo):\n- Parsing standard input features\n- Decision strongly contradicted gold labels\n- Issuing negative reinforcement penalty", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.20\n- **Improvement:** 0.0%\n- **Steps:** 1", "### Best Reward: 0.20", "### Recent Delta: --", "### Confidence: 20%"
time.sleep(2)
# Step 2
df2 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}])
yield df2, "Decent response but can be improved in clarity", "Improve structure and clarity", "### Agent Process (Demo):\n- Compared Response A and B against Gold Standard\n- Decision showed partial alignment\n- Issuing moderate reinforcement", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.55\n- **Improvement:** +175.0%\n- **Steps:** 2", "### Best Reward: 0.55", "### Recent Delta: +0.35", "### Confidence: 60%"
time.sleep(2)
# Step 3
df3 = pd.DataFrame([{"Step": 1, "Reward": 0.2}, {"Step": 2, "Reward": 0.55}, {"Step": 3, "Reward": 0.99}])
yield df3, "High quality response, well aligned with user intent", "Try making the response more concise", "### Agent Process (Demo):\n- Evaluated response on 4 heuristic axes\n- Decision matched gold labels almost perfectly\n- Issuing high positive reinforcement", "Dataset: ANTHROPIC/HH-RLHF", "### Episode Summary\n- **Final Reward:** 0.99\n- **Improvement:** +395.0%\n- **Steps:** 3", "### Best Reward: 0.99", "### Recent Delta: +0.44", "### Confidence: 95%"
demo_btn.click(
fn=run_demo_mode,
inputs=None,
outputs=[reward_plot, reward_explanation, improvement_tip, agent_thinking, dataset_vis, session_summary, best_reward_disp, reward_delta_disp, confidence_disp]
)
return blocks
# Mounts the Gradio playground at /web and redirects / → /web/
app = create_web_interface_app(
create_environment,
PairwiseAction,
PairwiseObservation,
env_name="preference_lab",
max_concurrent_envs=MAX_CONCURRENT_ENVS,
gradio_builder=build_progress_dashboard,
)
except (ModuleNotFoundError, ImportError):
# gradio not installed — fall back to plain API
ENABLE_WEB_INTERFACE = False
if not ENABLE_WEB_INTERFACE:
# Plain REST + WebSocket API only (no Gradio)
app = create_app(
create_environment,
PairwiseAction,
PairwiseObservation,
max_concurrent_envs=MAX_CONCURRENT_ENVS,
)
from collections import defaultdict
from threading import Lock
from pydantic import BaseModel, Field
leaderboard = defaultdict(list)
leaderboard_lock = Lock()
class LeaderboardEntry(BaseModel):
model: str = Field(..., min_length=1, max_length=255)
score: float = Field(..., ge=0.0, le=1.0)
@app.get("/leaderboard")
def get_leaderboard():
with leaderboard_lock:
return {
model: {
"avg_score": sum(scores)/len(scores) if scores else 0,
"runs": len(scores),
"scores": scores[-50:] # Limit returned scores to last 50
}
for model, scores in leaderboard.items()
}
@app.post("/leaderboard/submit")
def submit_score(entry: LeaderboardEntry):
with leaderboard_lock:
leaderboard[entry.model].append(entry.score)
# Limit stored scores to prevent memory issues
if len(leaderboard[entry.model]) > 1000:
leaderboard[entry.model] = leaderboard[entry.model][-1000:]
return {"status": "recorded"}
# ── Browser housekeeping routes ────────────────────────────────
# Browsers auto-request these; returning proper responses prevents
# console 404 noise and enables basic PWA support.
@app.get("/manifest.json", include_in_schema=False)
async def web_manifest():
"""Basic PWA web app manifest — silences browser manifest fetch errors."""
return JSONResponse({
"name": "PreferenceLab",
"short_name": "PrefLab",
"description": "OpenEnv RLHF preference data collection environment",
"start_url": "/web/",
"display": "standalone",
"background_color": "#0f172a",
"theme_color": "#6366f1",
"icons": [
{
"src": "https://huggingface.co/front/assets/huggingface_logo-noborder.svg",
"sizes": "any",
"type": "image/svg+xml",
}
],
})
@app.get("/.well-known/appspecific/com.chrome.devtools.json", include_in_schema=False)
async def chrome_devtools():
"""Suppress Chrome DevTools discovery 404."""
return JSONResponse({})
def main():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()