Spaces:
Running
Running
| """ | |
| Plotweaver AI β TTS MOS Evaluation Platform | |
| ============================================ | |
| A multi-user Gradio application for collecting Mean Opinion Score (MOS) ratings | |
| of synthesised speech across multiple languages. | |
| Roles | |
| ----- | |
| - Reviewer: signs up, selects the language(s) they are competent in, listens to | |
| audio samples and rates each on 7 criteria (1-5) plus free-text comments. | |
| - Admin: uploads audio per language, manages languages/users, and views the | |
| aggregated MOS results per language and per model, with CSV/XLSX export. | |
| Persistence | |
| ----------- | |
| Everything (users, languages, samples, ratings) lives in a single SQLite | |
| database. Audio files are stored on disk under the data directory. Point | |
| MOS_DATA_DIR at a persistent location (on Hugging Face Spaces enable persistent | |
| storage and set MOS_DATA_DIR=/data) so data survives restarts. | |
| Bootstrapping the first admin | |
| ----------------------------- | |
| On the signup form there is an optional "Admin code" field. If it matches the | |
| ADMIN_CODE environment variable, the new account is created as an admin. | |
| The effective admin code is printed to the logs on startup. | |
| """ | |
| import os | |
| import re | |
| import hmac | |
| import sqlite3 | |
| import hashlib | |
| import secrets | |
| import tempfile | |
| import datetime as dt | |
| import shutil | |
| import pandas as pd | |
| import gradio as gr | |
| # --------------------------------------------------------------------------- # | |
| # Configuration | |
| # --------------------------------------------------------------------------- # | |
| # DATA_DIR is the *persistent* location (e.g. a Hugging Face Storage Bucket at | |
| # /data). Audio files and the database BACKUP live here. | |
| DATA_DIR = os.environ.get("MOS_DATA_DIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")) | |
| AUDIO_DIR = os.path.join(DATA_DIR, "audio") | |
| EXPORT_DIR = os.path.join(DATA_DIR, "exports") | |
| ADMIN_CODE = os.environ.get("ADMIN_CODE", "plotweaver-admin") | |
| # The LIVE database runs on fast LOCAL disk. SQLite needs real POSIX file | |
| # locking, which FUSE/object-store mounts (like a bucket) do not provide | |
| # reliably β running it directly on the bucket causes writes to silently not | |
| # appear on later reads. So we keep the working DB local and atomically back it | |
| # up to the bucket after every change (and restore it on startup). | |
| LOCAL_DIR = os.environ.get("MOS_LOCAL_DIR") or os.path.join(tempfile.gettempdir(), "mos_live") | |
| DB_PATH = os.path.join(LOCAL_DIR, "mos.db") # live (local disk) | |
| DB_BACKUP = os.path.join(DATA_DIR, "mos.db") # persistent copy (bucket) | |
| for d in (DATA_DIR, AUDIO_DIR, EXPORT_DIR, LOCAL_DIR): | |
| os.makedirs(d, exist_ok=True) | |
| # Restore the live DB from the persistent backup on startup (first boot / after | |
| # the container is recycled), so prior data survives restarts. | |
| if os.path.abspath(DB_BACKUP) != os.path.abspath(DB_PATH) \ | |
| and os.path.exists(DB_BACKUP) and not os.path.exists(DB_PATH): | |
| try: | |
| shutil.copyfile(DB_BACKUP, DB_PATH) | |
| except OSError: | |
| pass | |
| # Secret used to sign browser session tokens (keeps a refreshed page logged in). | |
| SESSION_SECRET = os.environ.get("MOS_SECRET") or hashlib.sha256( | |
| ("mos-session::" + ADMIN_CODE).encode()).hexdigest() | |
| def backup_db(): | |
| """Atomically copy the live DB to the persistent backup on the bucket.""" | |
| if os.path.abspath(DB_BACKUP) == os.path.abspath(DB_PATH): | |
| return | |
| try: | |
| src = sqlite3.connect(DB_PATH) | |
| dst = sqlite3.connect(DB_BACKUP) | |
| with dst: | |
| src.backup(dst) | |
| src.close() | |
| dst.close() | |
| except Exception: # noqa β best-effort backup; never break a write | |
| pass | |
| def make_token(user_id): | |
| sig = hmac.new(SESSION_SECRET.encode(), str(user_id).encode(), hashlib.sha256).hexdigest()[:32] | |
| return f"{user_id}.{sig}" | |
| def verify_token(token): | |
| """Return the user_id if the signed token is valid and the user is active.""" | |
| if not token or "." not in str(token): | |
| return None | |
| uid_str, sig = str(token).rsplit(".", 1) | |
| try: | |
| uid = int(uid_str) | |
| except ValueError: | |
| return None | |
| good = hmac.new(SESSION_SECRET.encode(), str(uid).encode(), hashlib.sha256).hexdigest()[:32] | |
| if not hmac.compare_digest(sig, good): | |
| return None | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT id, is_active FROM users WHERE id=?", (uid,)).fetchone() | |
| return uid if (row and row["is_active"]) else None | |
| # The 7 MOS criteria, in the order they appear on the evaluation form. | |
| # (db_column, display_label, short_definition) | |
| CRITERIA = [ | |
| ("naturalness", "Naturalness", "How human-like and natural the speech sounds."), | |
| ("intelligibility", "Intelligibility", "How easy it is to understand the spoken content."), | |
| ("pronunciation", "Pronunciation Accuracy", "Whether words, phonemes, tones and language-specific sounds are correct."), | |
| ("prosody", "Prosody & Expressiveness", "Rhythm, stress, pitch, intonation and speaking style."), | |
| ("fluency", "Fluency", "Smoothness without awkward pauses, repetitions or glitches."), | |
| ("audio_quality", "Audio Quality", "Technical quality: noise, distortion, clipping, artifacts."), | |
| ("overall", "Overall Quality", "Overall impression considering all aspects of synthesis quality."), | |
| ] | |
| CRITERIA_KEYS = [c[0] for c in CRITERIA] | |
| SCALE_HINT = "1 = Very Poor Β· 2 = Poor Β· 3 = Fair Β· 4 = Good Β· 5 = Excellent" | |
| # --------------------------------------------------------------------------- # | |
| # Database helpers | |
| # --------------------------------------------------------------------------- # | |
| def get_conn(): | |
| conn = sqlite3.connect(DB_PATH, timeout=30) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA foreign_keys = ON;") | |
| return conn | |
| def init_db(): | |
| with get_conn() as conn: | |
| # WAL gives better concurrency on a real local disk, but it relies on | |
| # shared-memory/mmap that does NOT work on FUSE/object-store mounts | |
| # (e.g. a Hugging Face Storage Bucket). Default to DELETE for safety; | |
| # set MOS_JOURNAL_MODE=WAL when running on a genuine local disk. | |
| journal = os.environ.get("MOS_JOURNAL_MODE", "DELETE").upper() | |
| if journal not in ("DELETE", "WAL", "TRUNCATE", "PERSIST", "MEMORY"): | |
| journal = "DELETE" | |
| conn.execute(f"PRAGMA journal_mode = {journal};") | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| email TEXT, | |
| password_hash TEXT NOT NULL, | |
| salt TEXT NOT NULL, | |
| role TEXT NOT NULL DEFAULT 'reviewer', | |
| native_langs TEXT DEFAULT '', | |
| is_active INTEGER NOT NULL DEFAULT 1, | |
| created_at TEXT NOT NULL | |
| ); | |
| """) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS languages ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| code TEXT UNIQUE NOT NULL, | |
| name TEXT NOT NULL, | |
| created_at TEXT NOT NULL | |
| ); | |
| """) | |
| # user <-> language assignment (which languages a reviewer is eligible for) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS user_languages ( | |
| user_id INTEGER NOT NULL, | |
| language_id INTEGER NOT NULL, | |
| PRIMARY KEY (user_id, language_id), | |
| FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, | |
| FOREIGN KEY (language_id) REFERENCES languages(id) ON DELETE CASCADE | |
| ); | |
| """) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS samples ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| language_id INTEGER NOT NULL, | |
| sample_name TEXT NOT NULL, | |
| model_name TEXT NOT NULL DEFAULT 'unspecified', | |
| file_path TEXT NOT NULL, | |
| is_reference INTEGER NOT NULL DEFAULT 0, | |
| transcript TEXT DEFAULT '', | |
| created_at TEXT NOT NULL, | |
| FOREIGN KEY (language_id) REFERENCES languages(id) ON DELETE CASCADE | |
| ); | |
| """) | |
| cols = ",\n".join(f"{k} INTEGER" for k in CRITERIA_KEYS) | |
| conn.execute(f""" | |
| CREATE TABLE IF NOT EXISTS ratings ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| sample_id INTEGER NOT NULL, | |
| {cols}, | |
| comments TEXT DEFAULT '', | |
| updated_at TEXT NOT NULL, | |
| UNIQUE (user_id, sample_id), | |
| FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, | |
| FOREIGN KEY (sample_id) REFERENCES samples(id) ON DELETE CASCADE | |
| ); | |
| """) | |
| def now_iso(): | |
| return dt.datetime.utcnow().isoformat(timespec="seconds") | |
| # --------------------------------------------------------------------------- # | |
| # Auth | |
| # --------------------------------------------------------------------------- # | |
| def hash_password(password, salt=None): | |
| if salt is None: | |
| salt = secrets.token_hex(16) | |
| h = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt.encode("utf-8"), 200_000) | |
| return h.hex(), salt | |
| def create_user(username, email, password, role="reviewer", language_ids=None): | |
| username = (username or "").strip() | |
| if not re.fullmatch(r"[A-Za-z0-9_.\-]{3,32}", username): | |
| raise ValueError("Username must be 3-32 chars: letters, numbers, _ . - only.") | |
| if not password or len(password) < 6: | |
| raise ValueError("Password must be at least 6 characters.") | |
| pw_hash, salt = hash_password(password) | |
| with get_conn() as conn: | |
| try: | |
| cur = conn.execute( | |
| "INSERT INTO users (username, email, password_hash, salt, role, created_at) " | |
| "VALUES (?,?,?,?,?,?)", | |
| (username, (email or "").strip(), pw_hash, salt, role, now_iso()), | |
| ) | |
| except sqlite3.IntegrityError: | |
| raise ValueError(f"Username '{username}' is already taken.") | |
| uid = cur.lastrowid | |
| for lid in (language_ids or []): | |
| conn.execute("INSERT OR IGNORE INTO user_languages (user_id, language_id) VALUES (?,?)", (uid, lid)) | |
| backup_db() | |
| return uid | |
| def delete_user(user_id): | |
| """Delete a user; cascades to their user_languages and ratings.""" | |
| with get_conn() as conn: | |
| conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) | |
| backup_db() | |
| def authenticate(username, password): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM users WHERE username = ?", ((username or "").strip(),)).fetchone() | |
| if not row or not row["is_active"]: | |
| return None | |
| pw_hash, _ = hash_password(password, row["salt"]) | |
| if secrets.compare_digest(pw_hash, row["password_hash"]): | |
| return dict(row) | |
| return None | |
| def user_session(uid): | |
| """Return a lightweight session dict used in gr.State.""" | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM users WHERE id = ?", (uid,)).fetchone() | |
| langs = conn.execute( | |
| "SELECT l.id, l.code, l.name FROM user_languages ul " | |
| "JOIN languages l ON l.id = ul.language_id WHERE ul.user_id = ? ORDER BY l.name", (uid,) | |
| ).fetchall() | |
| return { | |
| "id": row["id"], | |
| "username": row["username"], | |
| "role": row["role"], | |
| "languages": [{"id": l["id"], "code": l["code"], "name": l["name"]} for l in langs], | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # Languages | |
| # --------------------------------------------------------------------------- # | |
| def add_language(code, name): | |
| code = (code or "").strip().lower() | |
| name = (name or "").strip() | |
| if not code or not name: | |
| raise ValueError("Both language code and name are required.") | |
| with get_conn() as conn: | |
| try: | |
| conn.execute("INSERT INTO languages (code, name, created_at) VALUES (?,?,?)", (code, name, now_iso())) | |
| except sqlite3.IntegrityError: | |
| raise ValueError(f"Language code '{code}' already exists.") | |
| backup_db() | |
| def list_languages(): | |
| with get_conn() as conn: | |
| return [dict(r) for r in conn.execute("SELECT * FROM languages ORDER BY name").fetchall()] | |
| def set_user_languages(uid, language_ids): | |
| with get_conn() as conn: | |
| conn.execute("DELETE FROM user_languages WHERE user_id = ?", (uid,)) | |
| for lid in language_ids: | |
| conn.execute("INSERT OR IGNORE INTO user_languages (user_id, language_id) VALUES (?,?)", (uid, lid)) | |
| backup_db() | |
| # --------------------------------------------------------------------------- # | |
| # Samples | |
| # --------------------------------------------------------------------------- # | |
| def add_sample(language_id, src_path, sample_name=None, model_name="unspecified", | |
| is_reference=False, transcript=""): | |
| if not src_path or not os.path.exists(src_path): | |
| raise ValueError("Audio file not found.") | |
| ext = os.path.splitext(src_path)[1].lower() or ".wav" | |
| lang_dir = os.path.join(AUDIO_DIR, str(language_id)) | |
| os.makedirs(lang_dir, exist_ok=True) | |
| fname = f"{secrets.token_hex(8)}{ext}" | |
| dst = os.path.join(lang_dir, fname) | |
| shutil.copyfile(src_path, dst) | |
| sample_name = (sample_name or "").strip() or os.path.splitext(os.path.basename(src_path))[0] | |
| with get_conn() as conn: | |
| conn.execute( | |
| "INSERT INTO samples (language_id, sample_name, model_name, file_path, is_reference, transcript, created_at) " | |
| "VALUES (?,?,?,?,?,?,?)", | |
| (language_id, sample_name, (model_name or "unspecified").strip(), dst, | |
| 1 if is_reference else 0, (transcript or "").strip(), now_iso()), | |
| ) | |
| backup_db() | |
| def list_samples(language_id=None): | |
| q = ("SELECT s.*, l.name AS language_name, l.code AS language_code " | |
| "FROM samples s JOIN languages l ON l.id = s.language_id") | |
| args = () | |
| if language_id: | |
| q += " WHERE s.language_id = ?" | |
| args = (language_id,) | |
| q += " ORDER BY s.language_id, s.id" | |
| with get_conn() as conn: | |
| return [dict(r) for r in conn.execute(q, args).fetchall()] | |
| def delete_sample(sample_id): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT file_path FROM samples WHERE id = ?", (sample_id,)).fetchone() | |
| if row and row["file_path"] and os.path.exists(row["file_path"]): | |
| try: | |
| os.remove(row["file_path"]) | |
| except OSError: | |
| pass | |
| conn.execute("DELETE FROM samples WHERE id = ?", (sample_id,)) | |
| backup_db() | |
| def set_sample_transcript(sample_id, transcript): | |
| with get_conn() as conn: | |
| conn.execute("UPDATE samples SET transcript = ? WHERE id = ?", | |
| ((transcript or "").strip(), sample_id)) | |
| backup_db() | |
| def get_sample_transcript(sample_id): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT transcript FROM samples WHERE id = ?", (sample_id,)).fetchone() | |
| return (row["transcript"] if row else "") or "" | |
| def get_sample_meta(sample_id): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT transcript, is_reference FROM samples WHERE id = ?", | |
| (sample_id,)).fetchone() | |
| if not row: | |
| return "", False | |
| return (row["transcript"] or ""), bool(row["is_reference"]) | |
| def set_sample_reference(sample_id, is_ref): | |
| with get_conn() as conn: | |
| conn.execute("UPDATE samples SET is_reference = ? WHERE id = ?", | |
| (1 if is_ref else 0, sample_id)) | |
| backup_db() | |
| def unmark_all_references(): | |
| with get_conn() as conn: | |
| n = conn.execute("UPDATE samples SET is_reference = 0 WHERE is_reference = 1").rowcount | |
| backup_db() | |
| return n | |
| # --------------------------------------------------------------------------- # | |
| # Ratings | |
| # --------------------------------------------------------------------------- # | |
| def upsert_rating(user_id, sample_id, scores, comments=""): | |
| cols = ", ".join(CRITERIA_KEYS) | |
| placeholders = ", ".join("?" for _ in CRITERIA_KEYS) | |
| updates = ", ".join(f"{k}=excluded.{k}" for k in CRITERIA_KEYS) | |
| vals = [int(scores[k]) for k in CRITERIA_KEYS] | |
| with get_conn() as conn: | |
| conn.execute( | |
| f"INSERT INTO ratings (user_id, sample_id, {cols}, comments, updated_at) " | |
| f"VALUES (?,?,{placeholders},?,?) " | |
| f"ON CONFLICT(user_id, sample_id) DO UPDATE SET {updates}, " | |
| f"comments=excluded.comments, updated_at=excluded.updated_at", | |
| [user_id, sample_id, *vals, (comments or "").strip(), now_iso()], | |
| ) | |
| backup_db() | |
| def get_rating(user_id, sample_id): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM ratings WHERE user_id=? AND sample_id=?", | |
| (user_id, sample_id)).fetchone() | |
| return dict(row) if row else None | |
| def samples_for_reviewer(user_id, language_id): | |
| """Samples in a language with a 'rated' flag for this reviewer.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT s.id, s.sample_name, s.is_reference, " | |
| " CASE WHEN r.id IS NULL THEN 0 ELSE 1 END AS rated " | |
| "FROM samples s " | |
| "LEFT JOIN ratings r ON r.sample_id = s.id AND r.user_id = ? " | |
| "WHERE s.language_id = ? ORDER BY s.id", | |
| (user_id, language_id), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| # --------------------------------------------------------------------------- # | |
| # Aggregation | |
| # --------------------------------------------------------------------------- # | |
| def ratings_dataframe(language_id=None, include_reference=False): | |
| q = ( | |
| "SELECT r.*, s.language_id, s.sample_name, s.model_name, s.is_reference, " | |
| " l.name AS language_name, l.code AS language_code " | |
| "FROM ratings r " | |
| "JOIN samples s ON s.id = r.sample_id " | |
| "JOIN languages l ON l.id = s.language_id" | |
| ) | |
| args = () | |
| if language_id: | |
| q += " WHERE s.language_id = ?" | |
| args = (language_id,) | |
| with get_conn() as conn: | |
| df = pd.read_sql_query(q, conn, params=args) | |
| if not include_reference and not df.empty: | |
| df = df[df["is_reference"] == 0] | |
| return df | |
| def compute_results(language_id, by_model=True): | |
| """Return (per_model_df, per_sample_df, summary_text) for a language.""" | |
| df = ratings_dataframe(language_id=language_id, include_reference=True) | |
| if df.empty: | |
| return pd.DataFrame(), pd.DataFrame(), "No ratings collected yet for this language." | |
| # Split system vs reference (human anchor) samples. | |
| sys_df = df[df["is_reference"] == 0] | |
| ref_df = df[df["is_reference"] == 1] | |
| def agg_block(frame, group_cols): | |
| if frame.empty: | |
| return pd.DataFrame() | |
| g = frame.groupby(group_cols, dropna=False) | |
| out = g[CRITERIA_KEYS].mean().round(3) | |
| out["MOS (mean of criteria)"] = out[CRITERIA_KEYS].mean(axis=1).round(3) | |
| out["overall_std"] = g["overall"].std().round(3) | |
| out["n_ratings"] = g.size() | |
| out["n_reviewers"] = g["user_id"].nunique() | |
| out["n_samples"] = g["sample_id"].nunique() | |
| return out.reset_index() | |
| per_model = agg_block(sys_df, ["model_name"]) | |
| per_sample = agg_block(sys_df, ["model_name", "sample_id", "sample_name"]) | |
| # Friendly column labels. | |
| label_map = {k: lbl for k, lbl, _ in CRITERIA} | |
| per_model = per_model.rename(columns=label_map) | |
| per_sample = per_sample.rename(columns=label_map) | |
| lang_name = df["language_name"].iloc[0] | |
| lines = [f"Language: {lang_name}", | |
| f"Total ratings: {len(sys_df)} | Reviewers: {sys_df['user_id'].nunique()} | " | |
| f"Samples: {sys_df['sample_id'].nunique()} | Models: {sys_df['model_name'].nunique()}"] | |
| if not sys_df.empty: | |
| overall_mos = sys_df["overall"].mean() | |
| mean_mos = sys_df[CRITERIA_KEYS].mean(axis=1).mean() | |
| lines.append(f"System MOS (Overall criterion): {overall_mos:.3f} | " | |
| f"System MOS (mean of all criteria): {mean_mos:.3f}") | |
| if not ref_df.empty: | |
| lines.append(f"Reference/anchor MOS (Overall): {ref_df['overall'].mean():.3f} " | |
| f"({ref_df['sample_id'].nunique()} anchor samples) β sanity check on reviewer calibration.") | |
| return per_model, per_sample, "\n".join(lines) | |
| def export_results(language_id): | |
| per_model, per_sample, _ = compute_results(language_id) | |
| raw = ratings_dataframe(language_id=language_id, include_reference=True) | |
| langs = {l["id"]: l for l in list_languages()} | |
| code = langs.get(language_id, {}).get("code", str(language_id)) | |
| path = os.path.join(EXPORT_DIR, f"mos_results_{code}_{dt.datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.xlsx") | |
| with pd.ExcelWriter(path, engine="openpyxl") as xw: | |
| (per_model if not per_model.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Per Model", index=False) | |
| (per_sample if not per_sample.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Per Sample", index=False) | |
| (raw if not raw.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Raw Ratings", index=False) | |
| return path | |
| # --------------------------------------------------------------------------- # | |
| # Startup | |
| # --------------------------------------------------------------------------- # | |
| init_db() | |
| print("=" * 64) | |
| print("Plotweaver AI β TTS MOS Evaluation Platform") | |
| print(f"Persistent dir : {DATA_DIR}") | |
| print(f"Live database : {DB_PATH}") | |
| print(f"DB backup : {DB_BACKUP}") | |
| print(f"Admin code : {ADMIN_CODE} (use on signup to create an admin)") | |
| print("=" * 64) | |
| # =========================================================================== | |
| # GRADIO UI | |
| # =========================================================================== | |
| def lang_choices(): | |
| return [(f"{l['name']} ({l['code']})", l["id"]) for l in list_languages()] | |
| def reviewer_lang_choices(session): | |
| if not session: | |
| return [] | |
| return [(l["name"], l["id"]) for l in session["languages"]] | |
| CSS = """ | |
| .gradio-container { max-width: 1080px !important; margin: auto !important; } | |
| #app-title { text-align: center; margin-bottom: 0; } | |
| #app-title h1 { font-size: 1.7rem; margin: 6px 0 0; } | |
| #app-sub { text-align: center; color: var(--body-text-color-subdued); margin-top: 4px; font-size: 0.95rem; } | |
| #auth-wrap { max-width: 460px; margin: 18px auto; } | |
| .soft-card { border: 1px solid var(--block-border-color); border-radius: 16px; | |
| padding: 18px; background: var(--block-background-fill); } | |
| #userbar { display: flex; align-items: center; justify-content: space-between; | |
| padding: 4px 2px 10px; } | |
| #scale-hint { text-align: center; font-size: 0.9rem; color: var(--body-text-color-subdued); | |
| margin: 4px 0 12px; } | |
| #submit-row button { font-size: 1.05rem; padding: 12px; border-radius: 12px; } | |
| .rating-grid .gr-radio { margin-bottom: 6px; } | |
| footer { display: none !important; } | |
| """ | |
| with gr.Blocks(title="Plotweaver AI β TTS MOS Evaluation", | |
| theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="indigo"), | |
| css=CSS) as demo: | |
| session = gr.State(None) # logged-in user session dict | |
| current_sample = gr.State(None) # sample id currently being rated | |
| auth_token = gr.BrowserState("") # signed token persisted in the browser (survives refresh) | |
| gr.Markdown("# π§ Plotweaver AI β TTS MOS Evaluation", elem_id="app-title") | |
| gr.Markdown("Rate synthesised speech on 7 quality criteria, by language.", elem_id="app-sub") | |
| # ---------- Loading (shown first, until the browser session is checked) ---------- | |
| with gr.Column(visible=True, elem_id="auth-wrap") as loading_col: | |
| gr.Markdown("### Loadingβ¦") | |
| # ----------------------------- AUTH ------------------------------------ # | |
| with gr.Column(visible=False, elem_id="auth-wrap") as auth_col: | |
| with gr.Column(elem_classes="soft-card"): | |
| with gr.Tabs(): | |
| with gr.Tab("Sign in"): | |
| li_user = gr.Textbox(label="Username", autofocus=True) | |
| li_pw = gr.Textbox(label="Password", type="password") | |
| li_btn = gr.Button("Sign in", variant="primary") | |
| li_msg = gr.Markdown() | |
| with gr.Tab("Create account"): | |
| su_user = gr.Textbox(label="Username", info="3β32 chars: letters, numbers, _ . -") | |
| su_email = gr.Textbox(label="Email (optional)") | |
| su_pw = gr.Textbox(label="Password", type="password", info="At least 6 characters") | |
| su_lang = gr.Dropdown(label="Language", choices=lang_choices(), | |
| allow_custom_value=True, | |
| info="The language you will evaluate.") | |
| su_code = gr.Textbox(label="Admin code (optional)", type="password", | |
| info="Leave blank for a normal reviewer account.") | |
| su_btn = gr.Button("Create account", variant="primary") | |
| su_msg = gr.Markdown() | |
| # ----------------------------- APP ------------------------------------- # | |
| with gr.Column(visible=False) as app_col: | |
| with gr.Row(elem_id="userbar"): | |
| greeting = gr.Markdown() | |
| logout_btn = gr.Button("Log out", scale=0, size="sm") | |
| with gr.Tabs(visible=False) as reviewer_tabs: | |
| # ---------- Rate tab ---------- | |
| with gr.Tab("Rate samples"): | |
| with gr.Row(): | |
| rate_lang = gr.Dropdown(label="Language", choices=[], interactive=True, | |
| allow_custom_value=True, scale=2) | |
| rate_sample = gr.Dropdown(label="Sample (β = already rated)", choices=[], | |
| interactive=True, allow_custom_value=True, scale=3) | |
| next_btn = gr.Button("Next unrated βΆ", scale=1, size="sm") | |
| rate_audio = gr.Audio(label="βΆ Listen to the full sample", | |
| type="filepath", interactive=False) | |
| rate_transcript = gr.Textbox( | |
| label="π Reference text (what the speaker should be saying)", | |
| interactive=False, lines=2, elem_id="ref-text") | |
| gr.Markdown( | |
| "Use headphones in a quiet room. " | |
| "**1** Very Poor Β· **2** Poor Β· **3** Fair Β· **4** Good Β· **5** Excellent", | |
| elem_id="scale-hint", | |
| ) | |
| criterion_inputs = {} | |
| with gr.Column(elem_classes="rating-grid"): | |
| with gr.Row(): | |
| for key, label, definition in CRITERIA[:4]: | |
| criterion_inputs[key] = gr.Radio( | |
| choices=[1, 2, 3, 4, 5], label=label, info=definition, value=None) | |
| with gr.Row(): | |
| for key, label, definition in CRITERIA[4:]: | |
| criterion_inputs[key] = gr.Radio( | |
| choices=[1, 2, 3, 4, 5], label=label, info=definition, value=None) | |
| rate_comments = gr.Textbox( | |
| label="Comments (optional)", | |
| placeholder="Pronunciation errors, unnatural prosody, noise, artifactsβ¦", lines=2) | |
| with gr.Row(elem_id="submit-row"): | |
| submit_btn = gr.Button("β Submit / update rating", variant="primary") | |
| rate_msg = gr.Markdown() | |
| # ---------- Progress tab ---------- | |
| with gr.Tab("My progress"): | |
| refresh_prog_btn = gr.Button("Refresh", size="sm") | |
| progress_md = gr.Markdown() | |
| progress_tbl = gr.Dataframe( | |
| headers=["Language", "Rated", "Total", "Remaining"], interactive=False) | |
| # ---------- Admin view (entirely separate from reviewer tabs) ---------- | |
| with gr.Column(visible=False) as admin_panel: | |
| gr.Markdown("### Languages") | |
| with gr.Row(): | |
| al_code = gr.Textbox(label="Code", info="e.g. yo, ha, ig, pcm, en-NG", scale=1) | |
| al_name = gr.Textbox(label="Name", info="e.g. Yoruba", scale=2) | |
| al_btn = gr.Button("Add language", scale=0, size="sm") | |
| al_msg = gr.Markdown() | |
| langs_tbl = gr.Dataframe(headers=["id", "code", "name"], interactive=False, | |
| label="Existing languages") | |
| gr.Markdown("### Upload audio samples") | |
| with gr.Row(): | |
| up_lang = gr.Dropdown(label="Language", choices=lang_choices(), | |
| allow_custom_value=True, scale=1) | |
| up_model = gr.Textbox(label="Model / system name", value="", | |
| info="e.g. F5-TTS, XTTS-v2, MMS-TTS, human. Hidden from reviewers.", | |
| scale=2) | |
| up_files = gr.File(label="Audio files (wav/mp3/flac/ogg)", file_count="multiple", | |
| file_types=["audio"]) | |
| with gr.Row(): | |
| up_isref = gr.Checkbox(label="These are reference / human anchor samples", value=False) | |
| up_transcript = gr.Textbox(label="Transcript (optional, applies to all uploaded)", scale=2) | |
| up_btn = gr.Button("Upload", variant="primary", size="sm") | |
| up_msg = gr.Markdown() | |
| samples_tbl = gr.Dataframe( | |
| headers=["id", "language", "sample_name", "model", "reference"], | |
| interactive=False, label="Samples") | |
| with gr.Row(): | |
| del_sample = gr.Dropdown(label="Select a sample to delete", choices=[], | |
| allow_custom_value=True, scale=3) | |
| del_btn = gr.Button("Delete selected", variant="stop", scale=0, size="sm") | |
| gr.Markdown("#### Edit a sample") | |
| gr.Markdown("Pick a sample to edit its reference text (shown to reviewers) and whether it " | |
| "is a reference / human anchor. **Anchors are excluded from the system MOS** β " | |
| "if your MOS table is empty but ratings exist, the samples are probably marked " | |
| "as anchors here; untick the box and Save.") | |
| with gr.Row(): | |
| tr_sample = gr.Dropdown(label="Sample", choices=[], | |
| allow_custom_value=True, scale=2) | |
| tr_isref = gr.Checkbox(label="Reference / human anchor", value=False, scale=1) | |
| with gr.Row(): | |
| tr_text = gr.Textbox(label="Reference text", lines=2, scale=3) | |
| tr_btn = gr.Button("Save", scale=0, size="sm") | |
| tr_msg = gr.Markdown() | |
| unmark_all_btn = gr.Button("β² Mark ALL samples as system (un-mark every anchor)", size="sm") | |
| unmark_msg = gr.Markdown() | |
| gr.Markdown("### Users") | |
| refresh_users_btn = gr.Button("Refresh users", size="sm") | |
| users_tbl = gr.Dataframe( | |
| headers=["id", "username", "role", "active", "languages", "ratings"], | |
| interactive=False) | |
| with gr.Row(): | |
| promote_id = gr.Number(label="User id", precision=0) | |
| role_choice = gr.Dropdown(label="Set role", choices=["reviewer", "admin"], value="reviewer") | |
| active_choice = gr.Dropdown(label="Set active", choices=["yes", "no"], value="yes") | |
| update_user_btn = gr.Button("Apply", scale=0, size="sm") | |
| user_admin_msg = gr.Markdown() | |
| with gr.Row(): | |
| del_user_id = gr.Number(label="Delete user by id", precision=0) | |
| del_user_btn = gr.Button("Delete user", variant="stop", scale=0, size="sm") | |
| gr.Markdown("Deleting a user also removes all of their ratings. To keep their ratings, " | |
| "set **active = no** instead.") | |
| del_user_msg = gr.Markdown() | |
| gr.Markdown("### Results") | |
| with gr.Row(): | |
| res_lang = gr.Dropdown(label="Language", choices=lang_choices(), | |
| allow_custom_value=True, scale=2) | |
| res_btn = gr.Button("Compute MOS", variant="primary", scale=0, size="sm") | |
| export_btn = gr.Button("Export XLSX", scale=0, size="sm") | |
| res_summary = gr.Markdown() | |
| res_model_tbl = gr.Dataframe(label="MOS by model", interactive=False) | |
| res_sample_tbl = gr.Dataframe(label="MOS by sample", interactive=False) | |
| res_file = gr.File(label="Exported file") | |
| # ===================================================================== # | |
| # Handlers | |
| # ===================================================================== # | |
| rate_outputs = [criterion_inputs[k] for k in CRITERIA_KEYS] | |
| def do_login(username, password): | |
| user = authenticate(username, password) | |
| if not user: | |
| # keep typed credentials so the user can correct them | |
| return (gr.update(), gr.update(), None, "β Invalid username or password.", | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update()) | |
| sess = user_session(user["id"]) | |
| is_admin = sess["role"] == "admin" | |
| rl = reviewer_lang_choices(sess) | |
| if is_admin: | |
| lt, st, ut = _languages_table(), _samples_table(), _users_table() | |
| dch = _sample_delete_choices() | |
| du, tu = gr.update(choices=dch), gr.update(choices=dch) | |
| else: | |
| lt, st, ut = [], [], [] | |
| du, tu = gr.update(), gr.update() | |
| return ( | |
| gr.update(visible=False), # auth_col | |
| gr.update(visible=True), # app_col | |
| sess, # session | |
| "", # li_msg | |
| gr.update(value=f"Signed in as **{sess['username']}** Β· {sess['role']}"), # greeting | |
| gr.update(visible=not is_admin), # reviewer_tabs | |
| gr.update(visible=is_admin), # admin_panel | |
| gr.update(choices=rl, value=None), # rate_lang | |
| gr.update(value=""), # li_user (clear) | |
| gr.update(value=""), # li_pw (clear) | |
| make_token(user["id"]), # auth_token (persist login) | |
| lt, st, ut, du, tu, # admin tables + pickers | |
| ) | |
| LOGIN_OUTPUTS = [auth_col, app_col, session, li_msg, greeting, reviewer_tabs, admin_panel, | |
| rate_lang, li_user, li_pw, auth_token, | |
| langs_tbl, samples_tbl, users_tbl, del_sample, tr_sample] | |
| li_btn.click(do_login, [li_user, li_pw], LOGIN_OUTPUTS) | |
| li_pw.submit(do_login, [li_user, li_pw], LOGIN_OUTPUTS) # Enter-to-login | |
| def do_signup(username, email, password, lang_id, code): | |
| role = "admin" if (code and code == ADMIN_CODE) else "reviewer" | |
| lang_ids = [lang_id] if lang_id else [] | |
| try: | |
| create_user(username, email, password, role=role, language_ids=lang_ids) | |
| except ValueError as e: | |
| # keep fields so the user can fix the problem | |
| return (f"β {e}", gr.update(), gr.update(), gr.update(), gr.update(), gr.update()) | |
| note = " (admin account)" if role == "admin" else "" | |
| return ("β Account created" + note + ". Switch to the **Sign in** tab to continue.", | |
| gr.update(value=""), gr.update(value=""), gr.update(value=""), | |
| gr.update(value=None), gr.update(value="")) | |
| su_btn.click(do_signup, [su_user, su_email, su_pw, su_lang, su_code], | |
| [su_msg, su_user, su_email, su_pw, su_lang, su_code]) | |
| def do_logout(): | |
| return (gr.update(visible=True), gr.update(visible=False), None, "", "") | |
| logout_btn.click(do_logout, None, [auth_col, app_col, session, greeting, auth_token]) | |
| # ---- Rating flow ---- | |
| def load_samples_for_lang(sess, language_id): | |
| if not sess or not language_id: | |
| return gr.update(choices=[], value=None) | |
| items = samples_for_reviewer(sess["id"], language_id) | |
| choices = [(("β " if s["rated"] else "β’ ") + s["sample_name"], s["id"]) for s in items] | |
| return gr.update(choices=choices, value=(choices[0][1] if choices else None)) | |
| rate_lang.change(load_samples_for_lang, [session, rate_lang], [rate_sample]) | |
| def load_sample(sess, sample_id): | |
| if not sample_id: | |
| return (None, "", None, *[None] * len(CRITERIA_KEYS), "") | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT * FROM samples WHERE id=?", (sample_id,)).fetchone() | |
| if not row: | |
| return (None, "", None, *[None] * len(CRITERIA_KEYS), "") | |
| transcript = row["transcript"] or "β No reference text provided for this sample β" | |
| existing = get_rating(sess["id"], sample_id) if sess else None | |
| scores = [existing[k] if existing else None for k in CRITERIA_KEYS] | |
| comments = existing["comments"] if existing else "" | |
| return (row["file_path"], transcript, sample_id, *scores, comments) | |
| rate_sample.change( | |
| load_sample, [session, rate_sample], | |
| [rate_audio, rate_transcript, current_sample, *rate_outputs, rate_comments], | |
| ) | |
| def go_next_unrated(sess, language_id): | |
| if not sess or not language_id: | |
| return gr.update() | |
| items = samples_for_reviewer(sess["id"], language_id) | |
| nxt = next((s["id"] for s in items if not s["rated"]), None) | |
| if nxt is None: | |
| return gr.update() | |
| return gr.update(value=nxt) | |
| next_btn.click(go_next_unrated, [session, rate_lang], [rate_sample]) | |
| def score_lang(sess, sample_id): | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT language_id FROM samples WHERE id=?", (sample_id,)).fetchone() | |
| return row["language_id"] if row else None | |
| def submit_rating(sess, sample_id, comments, *scores): | |
| if not sess: | |
| return "β Not signed in." | |
| if not sample_id: | |
| return "β No sample selected." | |
| score_map = dict(zip(CRITERIA_KEYS, scores)) | |
| missing = [lbl for (k, lbl, _) in CRITERIA if score_map.get(k) in (None, "")] | |
| if missing: | |
| return "β Please rate every criterion. Missing: " + ", ".join(missing) | |
| upsert_rating(sess["id"], sample_id, score_map, comments) | |
| lid = score_lang(sess, sample_id) | |
| items = samples_for_reviewer(sess["id"], lid) | |
| rated = sum(1 for s in items if s["rated"]) | |
| return f"β Saved β {rated}/{len(items)} samples rated in this language. Click **Next unrated βΆ** to continue." | |
| submit_btn.click( | |
| submit_rating, [session, current_sample, rate_comments, *rate_outputs], [rate_msg], | |
| ).then( | |
| load_samples_for_lang, [session, rate_lang], [rate_sample] | |
| ) | |
| # ---- Progress ---- | |
| def load_progress(sess): | |
| if not sess: | |
| return "Not signed in.", [] | |
| rows = [] | |
| for l in sess["languages"]: | |
| items = samples_for_reviewer(sess["id"], l["id"]) | |
| rated = sum(1 for s in items if s["rated"]) | |
| rows.append([l["name"], rated, len(items), len(items) - rated]) | |
| if not rows: | |
| return "You have no languages assigned yet. Add some under **My languages**.", [] | |
| return f"Progress for **{sess['username']}**:", rows | |
| refresh_prog_btn.click(load_progress, [session], [progress_md, progress_tbl]) | |
| # ---- Admin: languages ---- | |
| def _languages_table(): | |
| return [[l["id"], l["code"], l["name"]] for l in list_languages()] | |
| def _lang_dropdown_updates(): | |
| ch = lang_choices() | |
| return (gr.update(choices=ch), gr.update(choices=ch), gr.update(choices=ch)) | |
| def admin_add_language(sess, code, name): | |
| if not sess or sess["role"] != "admin": | |
| return ("β Admin only.", _languages_table(), *_lang_dropdown_updates()) | |
| try: | |
| add_language(code, name) | |
| except ValueError as e: | |
| return (f"β {e}", _languages_table(), *_lang_dropdown_updates()) | |
| return (f"β Added {name} ({code}).", _languages_table(), *_lang_dropdown_updates()) | |
| al_btn.click( | |
| admin_add_language, [session, al_code, al_name], | |
| [al_msg, langs_tbl, up_lang, res_lang, su_lang], | |
| ) | |
| # ---- Admin: samples ---- | |
| def _samples_table(): | |
| return [[s["id"], f"{s['language_name']} ({s['language_code']})", s["sample_name"], | |
| s["model_name"], "yes" if s["is_reference"] else "no"] for s in list_samples()] | |
| def _sample_delete_choices(): | |
| return [(f"#{s['id']} β {s['sample_name']} ({s['model_name']}, {s['language_code']})", s["id"]) | |
| for s in list_samples()] | |
| def admin_upload(sess, language_id, files, model, is_ref, transcript): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _samples_table(), gr.update(), gr.update() | |
| if not language_id: | |
| return "β Choose a language first.", _samples_table(), gr.update(), gr.update() | |
| if not files: | |
| return "β No files selected.", _samples_table(), gr.update(), gr.update() | |
| count = 0 | |
| for f in files: | |
| path = f if isinstance(f, str) else getattr(f, "name", None) | |
| try: | |
| add_sample(language_id, path, model_name=model, is_reference=is_ref, transcript=transcript) | |
| count += 1 | |
| except Exception as e: # noqa | |
| ch = _sample_delete_choices() | |
| return (f"β Error on a file: {e}", _samples_table(), | |
| gr.update(choices=ch), gr.update(choices=ch)) | |
| ch = _sample_delete_choices() | |
| return (f"β Uploaded {count} sample(s).", _samples_table(), | |
| gr.update(choices=ch, value=None), gr.update(choices=ch, value=None)) | |
| up_btn.click(admin_upload, [session, up_lang, up_files, up_model, up_isref, up_transcript], | |
| [up_msg, samples_tbl, del_sample, tr_sample]) | |
| def admin_delete_sample(sess, sid): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _samples_table(), gr.update(), gr.update() | |
| if not sid: | |
| ch = _sample_delete_choices() | |
| return ("β Select a sample to delete.", _samples_table(), | |
| gr.update(choices=ch), gr.update(choices=ch)) | |
| delete_sample(int(sid)) | |
| ch = _sample_delete_choices() | |
| return (f"β Deleted sample #{int(sid)} (and any ratings it had).", _samples_table(), | |
| gr.update(choices=ch, value=None), gr.update(choices=ch, value=None)) | |
| del_btn.click(admin_delete_sample, [session, del_sample], | |
| [up_msg, samples_tbl, del_sample, tr_sample]) | |
| # ---- Admin: per-sample reference text & anchor flag ---- | |
| def load_sample_meta(sess, sid): | |
| if not sess or sess["role"] != "admin" or not sid: | |
| return "", False | |
| tr, isref = get_sample_meta(int(sid)) | |
| return tr, isref | |
| tr_sample.change(load_sample_meta, [session, tr_sample], [tr_text, tr_isref]) | |
| def save_sample_meta(sess, sid, text, is_ref): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _samples_table() | |
| if not sid: | |
| return "β Select a sample first.", _samples_table() | |
| set_sample_transcript(int(sid), text) | |
| set_sample_reference(int(sid), is_ref) | |
| flag = "reference / anchor (excluded from MOS)" if is_ref else "system sample (counts toward MOS)" | |
| return (f"β Saved sample #{int(sid)} β now a {flag}.", _samples_table()) | |
| tr_btn.click(save_sample_meta, [session, tr_sample, tr_text, tr_isref], [tr_msg, samples_tbl]) | |
| def do_unmark_all(sess): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _samples_table(), gr.update() | |
| n = unmark_all_references() | |
| return (f"β Un-marked {n} sample(s) β they now count toward the MOS. " | |
| f"Go to Results and click **Compute MOS**.", _samples_table(), gr.update(value=False)) | |
| unmark_all_btn.click(do_unmark_all, [session], [unmark_msg, samples_tbl, tr_isref]) | |
| # ---- Admin: users ---- | |
| def _users_table(): | |
| rows = [] | |
| with get_conn() as conn: | |
| for u in conn.execute("SELECT * FROM users ORDER BY id").fetchall(): | |
| langs = conn.execute( | |
| "SELECT l.name FROM user_languages ul JOIN languages l ON l.id=ul.language_id " | |
| "WHERE ul.user_id=?", (u["id"],)).fetchall() | |
| nratings = conn.execute("SELECT COUNT(*) c FROM ratings WHERE user_id=?", | |
| (u["id"],)).fetchone()["c"] | |
| rows.append([u["id"], u["username"], u["role"], "yes" if u["is_active"] else "no", | |
| ", ".join(l["name"] for l in langs), nratings]) | |
| return rows | |
| refresh_users_btn.click(lambda s: _users_table() if s and s["role"] == "admin" else [], | |
| [session], [users_tbl]) | |
| def admin_update_user(sess, uid, role, active): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _users_table() | |
| if not uid: | |
| return "β Enter a user id.", _users_table() | |
| with get_conn() as conn: | |
| conn.execute("UPDATE users SET role=?, is_active=? WHERE id=?", | |
| (role, 1 if active == "yes" else 0, int(uid))) | |
| backup_db() | |
| return f"β Updated user {int(uid)}.", _users_table() | |
| update_user_btn.click(admin_update_user, [session, promote_id, role_choice, active_choice], | |
| [user_admin_msg, users_tbl]) | |
| def admin_delete_user(sess, uid): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", _users_table() | |
| if not uid: | |
| return "β Enter a user id.", _users_table() | |
| uid = int(uid) | |
| if uid == sess["id"]: | |
| return "β You can't delete your own account while signed in.", _users_table() | |
| with get_conn() as conn: | |
| row = conn.execute("SELECT username FROM users WHERE id=?", (uid,)).fetchone() | |
| if not row: | |
| return f"β No user with id {uid}.", _users_table() | |
| delete_user(uid) | |
| return (f"β Deleted user #{uid} ({row['username']}) and all of their ratings.", | |
| _users_table()) | |
| del_user_btn.click(admin_delete_user, [session, del_user_id], [del_user_msg, users_tbl]) | |
| # ---- Admin: results ---- | |
| def admin_results(sess, language_id): | |
| if not sess or sess["role"] != "admin": | |
| return "β Admin only.", None, None | |
| if not language_id: | |
| return "Choose a language.", None, None | |
| per_model, per_sample, summary = compute_results(language_id) | |
| if per_model is None or per_model.empty: | |
| per_model = pd.DataFrame({"info": ["No system (non-anchor) ratings yet. " | |
| "If ratings exist, the rated samples may be marked as " | |
| "reference/anchor β untick that in 'Edit a sample'."]}) | |
| if per_sample is None or per_sample.empty: | |
| per_sample = pd.DataFrame({"info": ["No system (non-anchor) per-sample ratings yet."]}) | |
| return summary, per_model, per_sample | |
| res_btn.click(admin_results, [session, res_lang], [res_summary, res_model_tbl, res_sample_tbl]) | |
| def admin_export(sess, language_id): | |
| if not sess or sess["role"] != "admin" or not language_id: | |
| return None | |
| return export_results(language_id) | |
| export_btn.click(admin_export, [session, res_lang], [res_file]) | |
| # Restore the session on page load (so a refresh keeps you signed in). | |
| def restore_session(token): | |
| uid = verify_token(token) | |
| if not uid: | |
| # not logged in: hide loading, show the sign-in screen | |
| return (gr.update(visible=False), # loading_col | |
| gr.update(visible=True), # auth_col | |
| gr.update(visible=False), # app_col | |
| None, gr.update(), gr.update(), gr.update(), gr.update(), | |
| [], [], [], gr.update(), gr.update()) | |
| sess = user_session(uid) | |
| is_admin = sess["role"] == "admin" | |
| rl = reviewer_lang_choices(sess) | |
| if is_admin: | |
| lt, st, ut = _languages_table(), _samples_table(), _users_table() | |
| dch = _sample_delete_choices() | |
| du, tu = gr.update(choices=dch), gr.update(choices=dch) | |
| else: | |
| lt, st, ut = [], [], [] | |
| du, tu = gr.update(), gr.update() | |
| return ( | |
| gr.update(visible=False), # loading_col | |
| gr.update(visible=False), # auth_col | |
| gr.update(visible=True), # app_col | |
| sess, # session | |
| gr.update(value=f"Signed in as **{sess['username']}** Β· {sess['role']}"), # greeting | |
| gr.update(visible=not is_admin), # reviewer_tabs | |
| gr.update(visible=is_admin), # admin_panel | |
| gr.update(choices=rl, value=None), # rate_lang | |
| lt, st, ut, du, tu, # admin tables + pickers | |
| ) | |
| demo.load(restore_session, [auth_token], | |
| [loading_col, auth_col, app_col, session, greeting, reviewer_tabs, admin_panel, | |
| rate_lang, langs_tbl, samples_tbl, users_tbl, del_sample, tr_sample]) | |
| # Refresh language-dependent choices on every page load, so newly added | |
| # languages appear in the signup form (and admin dropdowns) without a restart. | |
| def refresh_lang_choices(): | |
| ch = lang_choices() | |
| return (gr.update(choices=ch), gr.update(choices=ch), gr.update(choices=ch)) | |
| demo.load(refresh_lang_choices, None, [su_lang, up_lang, res_lang]) | |
| if __name__ == "__main__": | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| allowed_paths=[DATA_DIR], # let Gradio serve audio stored under MOS_DATA_DIR (e.g. /data) | |
| ) | |