MOS_Evaluation / app.py
PlotweaverModel's picture
Upload app.py
62ed762 verified
Raw
History Blame Contribute Delete
51.2 kB
"""
Plotweaver AI β€” TTS MOS Evaluation Platform
============================================
A multi-user Gradio application for collecting Mean Opinion Score (MOS) ratings
of synthesised speech across multiple languages.
Roles
-----
- Reviewer: signs up, selects the language(s) they are competent in, listens to
audio samples and rates each on 7 criteria (1-5) plus free-text comments.
- Admin: uploads audio per language, manages languages/users, and views the
aggregated MOS results per language and per model, with CSV/XLSX export.
Persistence
-----------
Everything (users, languages, samples, ratings) lives in a single SQLite
database. Audio files are stored on disk under the data directory. Point
MOS_DATA_DIR at a persistent location (on Hugging Face Spaces enable persistent
storage and set MOS_DATA_DIR=/data) so data survives restarts.
Bootstrapping the first admin
-----------------------------
On the signup form there is an optional "Admin code" field. If it matches the
ADMIN_CODE environment variable, the new account is created as an admin.
The effective admin code is printed to the logs on startup.
"""
import os
import re
import hmac
import sqlite3
import hashlib
import secrets
import tempfile
import datetime as dt
import shutil
import pandas as pd
import gradio as gr
# --------------------------------------------------------------------------- #
# Configuration
# --------------------------------------------------------------------------- #
# DATA_DIR is the *persistent* location (e.g. a Hugging Face Storage Bucket at
# /data). Audio files and the database BACKUP live here.
DATA_DIR = os.environ.get("MOS_DATA_DIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "data"))
AUDIO_DIR = os.path.join(DATA_DIR, "audio")
EXPORT_DIR = os.path.join(DATA_DIR, "exports")
ADMIN_CODE = os.environ.get("ADMIN_CODE", "plotweaver-admin")
# The LIVE database runs on fast LOCAL disk. SQLite needs real POSIX file
# locking, which FUSE/object-store mounts (like a bucket) do not provide
# reliably β€” running it directly on the bucket causes writes to silently not
# appear on later reads. So we keep the working DB local and atomically back it
# up to the bucket after every change (and restore it on startup).
LOCAL_DIR = os.environ.get("MOS_LOCAL_DIR") or os.path.join(tempfile.gettempdir(), "mos_live")
DB_PATH = os.path.join(LOCAL_DIR, "mos.db") # live (local disk)
DB_BACKUP = os.path.join(DATA_DIR, "mos.db") # persistent copy (bucket)
for d in (DATA_DIR, AUDIO_DIR, EXPORT_DIR, LOCAL_DIR):
os.makedirs(d, exist_ok=True)
# Restore the live DB from the persistent backup on startup (first boot / after
# the container is recycled), so prior data survives restarts.
if os.path.abspath(DB_BACKUP) != os.path.abspath(DB_PATH) \
and os.path.exists(DB_BACKUP) and not os.path.exists(DB_PATH):
try:
shutil.copyfile(DB_BACKUP, DB_PATH)
except OSError:
pass
# Secret used to sign browser session tokens (keeps a refreshed page logged in).
SESSION_SECRET = os.environ.get("MOS_SECRET") or hashlib.sha256(
("mos-session::" + ADMIN_CODE).encode()).hexdigest()
def backup_db():
"""Atomically copy the live DB to the persistent backup on the bucket."""
if os.path.abspath(DB_BACKUP) == os.path.abspath(DB_PATH):
return
try:
src = sqlite3.connect(DB_PATH)
dst = sqlite3.connect(DB_BACKUP)
with dst:
src.backup(dst)
src.close()
dst.close()
except Exception: # noqa β€” best-effort backup; never break a write
pass
def make_token(user_id):
sig = hmac.new(SESSION_SECRET.encode(), str(user_id).encode(), hashlib.sha256).hexdigest()[:32]
return f"{user_id}.{sig}"
def verify_token(token):
"""Return the user_id if the signed token is valid and the user is active."""
if not token or "." not in str(token):
return None
uid_str, sig = str(token).rsplit(".", 1)
try:
uid = int(uid_str)
except ValueError:
return None
good = hmac.new(SESSION_SECRET.encode(), str(uid).encode(), hashlib.sha256).hexdigest()[:32]
if not hmac.compare_digest(sig, good):
return None
with get_conn() as conn:
row = conn.execute("SELECT id, is_active FROM users WHERE id=?", (uid,)).fetchone()
return uid if (row and row["is_active"]) else None
# The 7 MOS criteria, in the order they appear on the evaluation form.
# (db_column, display_label, short_definition)
CRITERIA = [
("naturalness", "Naturalness", "How human-like and natural the speech sounds."),
("intelligibility", "Intelligibility", "How easy it is to understand the spoken content."),
("pronunciation", "Pronunciation Accuracy", "Whether words, phonemes, tones and language-specific sounds are correct."),
("prosody", "Prosody & Expressiveness", "Rhythm, stress, pitch, intonation and speaking style."),
("fluency", "Fluency", "Smoothness without awkward pauses, repetitions or glitches."),
("audio_quality", "Audio Quality", "Technical quality: noise, distortion, clipping, artifacts."),
("overall", "Overall Quality", "Overall impression considering all aspects of synthesis quality."),
]
CRITERIA_KEYS = [c[0] for c in CRITERIA]
SCALE_HINT = "1 = Very Poor Β· 2 = Poor Β· 3 = Fair Β· 4 = Good Β· 5 = Excellent"
# --------------------------------------------------------------------------- #
# Database helpers
# --------------------------------------------------------------------------- #
def get_conn():
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON;")
return conn
def init_db():
with get_conn() as conn:
# WAL gives better concurrency on a real local disk, but it relies on
# shared-memory/mmap that does NOT work on FUSE/object-store mounts
# (e.g. a Hugging Face Storage Bucket). Default to DELETE for safety;
# set MOS_JOURNAL_MODE=WAL when running on a genuine local disk.
journal = os.environ.get("MOS_JOURNAL_MODE", "DELETE").upper()
if journal not in ("DELETE", "WAL", "TRUNCATE", "PERSIST", "MEMORY"):
journal = "DELETE"
conn.execute(f"PRAGMA journal_mode = {journal};")
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'reviewer',
native_langs TEXT DEFAULT '',
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL
);
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS languages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
created_at TEXT NOT NULL
);
""")
# user <-> language assignment (which languages a reviewer is eligible for)
conn.execute("""
CREATE TABLE IF NOT EXISTS user_languages (
user_id INTEGER NOT NULL,
language_id INTEGER NOT NULL,
PRIMARY KEY (user_id, language_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (language_id) REFERENCES languages(id) ON DELETE CASCADE
);
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
language_id INTEGER NOT NULL,
sample_name TEXT NOT NULL,
model_name TEXT NOT NULL DEFAULT 'unspecified',
file_path TEXT NOT NULL,
is_reference INTEGER NOT NULL DEFAULT 0,
transcript TEXT DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY (language_id) REFERENCES languages(id) ON DELETE CASCADE
);
""")
cols = ",\n".join(f"{k} INTEGER" for k in CRITERIA_KEYS)
conn.execute(f"""
CREATE TABLE IF NOT EXISTS ratings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
sample_id INTEGER NOT NULL,
{cols},
comments TEXT DEFAULT '',
updated_at TEXT NOT NULL,
UNIQUE (user_id, sample_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (sample_id) REFERENCES samples(id) ON DELETE CASCADE
);
""")
def now_iso():
return dt.datetime.utcnow().isoformat(timespec="seconds")
# --------------------------------------------------------------------------- #
# Auth
# --------------------------------------------------------------------------- #
def hash_password(password, salt=None):
if salt is None:
salt = secrets.token_hex(16)
h = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt.encode("utf-8"), 200_000)
return h.hex(), salt
def create_user(username, email, password, role="reviewer", language_ids=None):
username = (username or "").strip()
if not re.fullmatch(r"[A-Za-z0-9_.\-]{3,32}", username):
raise ValueError("Username must be 3-32 chars: letters, numbers, _ . - only.")
if not password or len(password) < 6:
raise ValueError("Password must be at least 6 characters.")
pw_hash, salt = hash_password(password)
with get_conn() as conn:
try:
cur = conn.execute(
"INSERT INTO users (username, email, password_hash, salt, role, created_at) "
"VALUES (?,?,?,?,?,?)",
(username, (email or "").strip(), pw_hash, salt, role, now_iso()),
)
except sqlite3.IntegrityError:
raise ValueError(f"Username '{username}' is already taken.")
uid = cur.lastrowid
for lid in (language_ids or []):
conn.execute("INSERT OR IGNORE INTO user_languages (user_id, language_id) VALUES (?,?)", (uid, lid))
backup_db()
return uid
def delete_user(user_id):
"""Delete a user; cascades to their user_languages and ratings."""
with get_conn() as conn:
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
backup_db()
def authenticate(username, password):
with get_conn() as conn:
row = conn.execute("SELECT * FROM users WHERE username = ?", ((username or "").strip(),)).fetchone()
if not row or not row["is_active"]:
return None
pw_hash, _ = hash_password(password, row["salt"])
if secrets.compare_digest(pw_hash, row["password_hash"]):
return dict(row)
return None
def user_session(uid):
"""Return a lightweight session dict used in gr.State."""
with get_conn() as conn:
row = conn.execute("SELECT * FROM users WHERE id = ?", (uid,)).fetchone()
langs = conn.execute(
"SELECT l.id, l.code, l.name FROM user_languages ul "
"JOIN languages l ON l.id = ul.language_id WHERE ul.user_id = ? ORDER BY l.name", (uid,)
).fetchall()
return {
"id": row["id"],
"username": row["username"],
"role": row["role"],
"languages": [{"id": l["id"], "code": l["code"], "name": l["name"]} for l in langs],
}
# --------------------------------------------------------------------------- #
# Languages
# --------------------------------------------------------------------------- #
def add_language(code, name):
code = (code or "").strip().lower()
name = (name or "").strip()
if not code or not name:
raise ValueError("Both language code and name are required.")
with get_conn() as conn:
try:
conn.execute("INSERT INTO languages (code, name, created_at) VALUES (?,?,?)", (code, name, now_iso()))
except sqlite3.IntegrityError:
raise ValueError(f"Language code '{code}' already exists.")
backup_db()
def list_languages():
with get_conn() as conn:
return [dict(r) for r in conn.execute("SELECT * FROM languages ORDER BY name").fetchall()]
def set_user_languages(uid, language_ids):
with get_conn() as conn:
conn.execute("DELETE FROM user_languages WHERE user_id = ?", (uid,))
for lid in language_ids:
conn.execute("INSERT OR IGNORE INTO user_languages (user_id, language_id) VALUES (?,?)", (uid, lid))
backup_db()
# --------------------------------------------------------------------------- #
# Samples
# --------------------------------------------------------------------------- #
def add_sample(language_id, src_path, sample_name=None, model_name="unspecified",
is_reference=False, transcript=""):
if not src_path or not os.path.exists(src_path):
raise ValueError("Audio file not found.")
ext = os.path.splitext(src_path)[1].lower() or ".wav"
lang_dir = os.path.join(AUDIO_DIR, str(language_id))
os.makedirs(lang_dir, exist_ok=True)
fname = f"{secrets.token_hex(8)}{ext}"
dst = os.path.join(lang_dir, fname)
shutil.copyfile(src_path, dst)
sample_name = (sample_name or "").strip() or os.path.splitext(os.path.basename(src_path))[0]
with get_conn() as conn:
conn.execute(
"INSERT INTO samples (language_id, sample_name, model_name, file_path, is_reference, transcript, created_at) "
"VALUES (?,?,?,?,?,?,?)",
(language_id, sample_name, (model_name or "unspecified").strip(), dst,
1 if is_reference else 0, (transcript or "").strip(), now_iso()),
)
backup_db()
def list_samples(language_id=None):
q = ("SELECT s.*, l.name AS language_name, l.code AS language_code "
"FROM samples s JOIN languages l ON l.id = s.language_id")
args = ()
if language_id:
q += " WHERE s.language_id = ?"
args = (language_id,)
q += " ORDER BY s.language_id, s.id"
with get_conn() as conn:
return [dict(r) for r in conn.execute(q, args).fetchall()]
def delete_sample(sample_id):
with get_conn() as conn:
row = conn.execute("SELECT file_path FROM samples WHERE id = ?", (sample_id,)).fetchone()
if row and row["file_path"] and os.path.exists(row["file_path"]):
try:
os.remove(row["file_path"])
except OSError:
pass
conn.execute("DELETE FROM samples WHERE id = ?", (sample_id,))
backup_db()
def set_sample_transcript(sample_id, transcript):
with get_conn() as conn:
conn.execute("UPDATE samples SET transcript = ? WHERE id = ?",
((transcript or "").strip(), sample_id))
backup_db()
def get_sample_transcript(sample_id):
with get_conn() as conn:
row = conn.execute("SELECT transcript FROM samples WHERE id = ?", (sample_id,)).fetchone()
return (row["transcript"] if row else "") or ""
def get_sample_meta(sample_id):
with get_conn() as conn:
row = conn.execute("SELECT transcript, is_reference FROM samples WHERE id = ?",
(sample_id,)).fetchone()
if not row:
return "", False
return (row["transcript"] or ""), bool(row["is_reference"])
def set_sample_reference(sample_id, is_ref):
with get_conn() as conn:
conn.execute("UPDATE samples SET is_reference = ? WHERE id = ?",
(1 if is_ref else 0, sample_id))
backup_db()
def unmark_all_references():
with get_conn() as conn:
n = conn.execute("UPDATE samples SET is_reference = 0 WHERE is_reference = 1").rowcount
backup_db()
return n
# --------------------------------------------------------------------------- #
# Ratings
# --------------------------------------------------------------------------- #
def upsert_rating(user_id, sample_id, scores, comments=""):
cols = ", ".join(CRITERIA_KEYS)
placeholders = ", ".join("?" for _ in CRITERIA_KEYS)
updates = ", ".join(f"{k}=excluded.{k}" for k in CRITERIA_KEYS)
vals = [int(scores[k]) for k in CRITERIA_KEYS]
with get_conn() as conn:
conn.execute(
f"INSERT INTO ratings (user_id, sample_id, {cols}, comments, updated_at) "
f"VALUES (?,?,{placeholders},?,?) "
f"ON CONFLICT(user_id, sample_id) DO UPDATE SET {updates}, "
f"comments=excluded.comments, updated_at=excluded.updated_at",
[user_id, sample_id, *vals, (comments or "").strip(), now_iso()],
)
backup_db()
def get_rating(user_id, sample_id):
with get_conn() as conn:
row = conn.execute("SELECT * FROM ratings WHERE user_id=? AND sample_id=?",
(user_id, sample_id)).fetchone()
return dict(row) if row else None
def samples_for_reviewer(user_id, language_id):
"""Samples in a language with a 'rated' flag for this reviewer."""
with get_conn() as conn:
rows = conn.execute(
"SELECT s.id, s.sample_name, s.is_reference, "
" CASE WHEN r.id IS NULL THEN 0 ELSE 1 END AS rated "
"FROM samples s "
"LEFT JOIN ratings r ON r.sample_id = s.id AND r.user_id = ? "
"WHERE s.language_id = ? ORDER BY s.id",
(user_id, language_id),
).fetchall()
return [dict(r) for r in rows]
# --------------------------------------------------------------------------- #
# Aggregation
# --------------------------------------------------------------------------- #
def ratings_dataframe(language_id=None, include_reference=False):
q = (
"SELECT r.*, s.language_id, s.sample_name, s.model_name, s.is_reference, "
" l.name AS language_name, l.code AS language_code "
"FROM ratings r "
"JOIN samples s ON s.id = r.sample_id "
"JOIN languages l ON l.id = s.language_id"
)
args = ()
if language_id:
q += " WHERE s.language_id = ?"
args = (language_id,)
with get_conn() as conn:
df = pd.read_sql_query(q, conn, params=args)
if not include_reference and not df.empty:
df = df[df["is_reference"] == 0]
return df
def compute_results(language_id, by_model=True):
"""Return (per_model_df, per_sample_df, summary_text) for a language."""
df = ratings_dataframe(language_id=language_id, include_reference=True)
if df.empty:
return pd.DataFrame(), pd.DataFrame(), "No ratings collected yet for this language."
# Split system vs reference (human anchor) samples.
sys_df = df[df["is_reference"] == 0]
ref_df = df[df["is_reference"] == 1]
def agg_block(frame, group_cols):
if frame.empty:
return pd.DataFrame()
g = frame.groupby(group_cols, dropna=False)
out = g[CRITERIA_KEYS].mean().round(3)
out["MOS (mean of criteria)"] = out[CRITERIA_KEYS].mean(axis=1).round(3)
out["overall_std"] = g["overall"].std().round(3)
out["n_ratings"] = g.size()
out["n_reviewers"] = g["user_id"].nunique()
out["n_samples"] = g["sample_id"].nunique()
return out.reset_index()
per_model = agg_block(sys_df, ["model_name"])
per_sample = agg_block(sys_df, ["model_name", "sample_id", "sample_name"])
# Friendly column labels.
label_map = {k: lbl for k, lbl, _ in CRITERIA}
per_model = per_model.rename(columns=label_map)
per_sample = per_sample.rename(columns=label_map)
lang_name = df["language_name"].iloc[0]
lines = [f"Language: {lang_name}",
f"Total ratings: {len(sys_df)} | Reviewers: {sys_df['user_id'].nunique()} | "
f"Samples: {sys_df['sample_id'].nunique()} | Models: {sys_df['model_name'].nunique()}"]
if not sys_df.empty:
overall_mos = sys_df["overall"].mean()
mean_mos = sys_df[CRITERIA_KEYS].mean(axis=1).mean()
lines.append(f"System MOS (Overall criterion): {overall_mos:.3f} | "
f"System MOS (mean of all criteria): {mean_mos:.3f}")
if not ref_df.empty:
lines.append(f"Reference/anchor MOS (Overall): {ref_df['overall'].mean():.3f} "
f"({ref_df['sample_id'].nunique()} anchor samples) β€” sanity check on reviewer calibration.")
return per_model, per_sample, "\n".join(lines)
def export_results(language_id):
per_model, per_sample, _ = compute_results(language_id)
raw = ratings_dataframe(language_id=language_id, include_reference=True)
langs = {l["id"]: l for l in list_languages()}
code = langs.get(language_id, {}).get("code", str(language_id))
path = os.path.join(EXPORT_DIR, f"mos_results_{code}_{dt.datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.xlsx")
with pd.ExcelWriter(path, engine="openpyxl") as xw:
(per_model if not per_model.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Per Model", index=False)
(per_sample if not per_sample.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Per Sample", index=False)
(raw if not raw.empty else pd.DataFrame({"info": ["no data"]})).to_excel(xw, sheet_name="Raw Ratings", index=False)
return path
# --------------------------------------------------------------------------- #
# Startup
# --------------------------------------------------------------------------- #
init_db()
print("=" * 64)
print("Plotweaver AI β€” TTS MOS Evaluation Platform")
print(f"Persistent dir : {DATA_DIR}")
print(f"Live database : {DB_PATH}")
print(f"DB backup : {DB_BACKUP}")
print(f"Admin code : {ADMIN_CODE} (use on signup to create an admin)")
print("=" * 64)
# ===========================================================================
# GRADIO UI
# ===========================================================================
def lang_choices():
return [(f"{l['name']} ({l['code']})", l["id"]) for l in list_languages()]
def reviewer_lang_choices(session):
if not session:
return []
return [(l["name"], l["id"]) for l in session["languages"]]
CSS = """
.gradio-container { max-width: 1080px !important; margin: auto !important; }
#app-title { text-align: center; margin-bottom: 0; }
#app-title h1 { font-size: 1.7rem; margin: 6px 0 0; }
#app-sub { text-align: center; color: var(--body-text-color-subdued); margin-top: 4px; font-size: 0.95rem; }
#auth-wrap { max-width: 460px; margin: 18px auto; }
.soft-card { border: 1px solid var(--block-border-color); border-radius: 16px;
padding: 18px; background: var(--block-background-fill); }
#userbar { display: flex; align-items: center; justify-content: space-between;
padding: 4px 2px 10px; }
#scale-hint { text-align: center; font-size: 0.9rem; color: var(--body-text-color-subdued);
margin: 4px 0 12px; }
#submit-row button { font-size: 1.05rem; padding: 12px; border-radius: 12px; }
.rating-grid .gr-radio { margin-bottom: 6px; }
footer { display: none !important; }
"""
with gr.Blocks(title="Plotweaver AI β€” TTS MOS Evaluation",
theme=gr.themes.Soft(primary_hue="indigo", secondary_hue="indigo"),
css=CSS) as demo:
session = gr.State(None) # logged-in user session dict
current_sample = gr.State(None) # sample id currently being rated
auth_token = gr.BrowserState("") # signed token persisted in the browser (survives refresh)
gr.Markdown("# 🎧 Plotweaver AI β€” TTS MOS Evaluation", elem_id="app-title")
gr.Markdown("Rate synthesised speech on 7 quality criteria, by language.", elem_id="app-sub")
# ---------- Loading (shown first, until the browser session is checked) ----------
with gr.Column(visible=True, elem_id="auth-wrap") as loading_col:
gr.Markdown("### Loading…")
# ----------------------------- AUTH ------------------------------------ #
with gr.Column(visible=False, elem_id="auth-wrap") as auth_col:
with gr.Column(elem_classes="soft-card"):
with gr.Tabs():
with gr.Tab("Sign in"):
li_user = gr.Textbox(label="Username", autofocus=True)
li_pw = gr.Textbox(label="Password", type="password")
li_btn = gr.Button("Sign in", variant="primary")
li_msg = gr.Markdown()
with gr.Tab("Create account"):
su_user = gr.Textbox(label="Username", info="3–32 chars: letters, numbers, _ . -")
su_email = gr.Textbox(label="Email (optional)")
su_pw = gr.Textbox(label="Password", type="password", info="At least 6 characters")
su_lang = gr.Dropdown(label="Language", choices=lang_choices(),
allow_custom_value=True,
info="The language you will evaluate.")
su_code = gr.Textbox(label="Admin code (optional)", type="password",
info="Leave blank for a normal reviewer account.")
su_btn = gr.Button("Create account", variant="primary")
su_msg = gr.Markdown()
# ----------------------------- APP ------------------------------------- #
with gr.Column(visible=False) as app_col:
with gr.Row(elem_id="userbar"):
greeting = gr.Markdown()
logout_btn = gr.Button("Log out", scale=0, size="sm")
with gr.Tabs(visible=False) as reviewer_tabs:
# ---------- Rate tab ----------
with gr.Tab("Rate samples"):
with gr.Row():
rate_lang = gr.Dropdown(label="Language", choices=[], interactive=True,
allow_custom_value=True, scale=2)
rate_sample = gr.Dropdown(label="Sample (βœ“ = already rated)", choices=[],
interactive=True, allow_custom_value=True, scale=3)
next_btn = gr.Button("Next unrated β–Ά", scale=1, size="sm")
rate_audio = gr.Audio(label="β–Ά Listen to the full sample",
type="filepath", interactive=False)
rate_transcript = gr.Textbox(
label="πŸ“„ Reference text (what the speaker should be saying)",
interactive=False, lines=2, elem_id="ref-text")
gr.Markdown(
"Use headphones in a quiet room. &nbsp;&nbsp; "
"**1** Very Poor Β· **2** Poor Β· **3** Fair Β· **4** Good Β· **5** Excellent",
elem_id="scale-hint",
)
criterion_inputs = {}
with gr.Column(elem_classes="rating-grid"):
with gr.Row():
for key, label, definition in CRITERIA[:4]:
criterion_inputs[key] = gr.Radio(
choices=[1, 2, 3, 4, 5], label=label, info=definition, value=None)
with gr.Row():
for key, label, definition in CRITERIA[4:]:
criterion_inputs[key] = gr.Radio(
choices=[1, 2, 3, 4, 5], label=label, info=definition, value=None)
rate_comments = gr.Textbox(
label="Comments (optional)",
placeholder="Pronunciation errors, unnatural prosody, noise, artifacts…", lines=2)
with gr.Row(elem_id="submit-row"):
submit_btn = gr.Button("βœ“ Submit / update rating", variant="primary")
rate_msg = gr.Markdown()
# ---------- Progress tab ----------
with gr.Tab("My progress"):
refresh_prog_btn = gr.Button("Refresh", size="sm")
progress_md = gr.Markdown()
progress_tbl = gr.Dataframe(
headers=["Language", "Rated", "Total", "Remaining"], interactive=False)
# ---------- Admin view (entirely separate from reviewer tabs) ----------
with gr.Column(visible=False) as admin_panel:
gr.Markdown("### Languages")
with gr.Row():
al_code = gr.Textbox(label="Code", info="e.g. yo, ha, ig, pcm, en-NG", scale=1)
al_name = gr.Textbox(label="Name", info="e.g. Yoruba", scale=2)
al_btn = gr.Button("Add language", scale=0, size="sm")
al_msg = gr.Markdown()
langs_tbl = gr.Dataframe(headers=["id", "code", "name"], interactive=False,
label="Existing languages")
gr.Markdown("### Upload audio samples")
with gr.Row():
up_lang = gr.Dropdown(label="Language", choices=lang_choices(),
allow_custom_value=True, scale=1)
up_model = gr.Textbox(label="Model / system name", value="",
info="e.g. F5-TTS, XTTS-v2, MMS-TTS, human. Hidden from reviewers.",
scale=2)
up_files = gr.File(label="Audio files (wav/mp3/flac/ogg)", file_count="multiple",
file_types=["audio"])
with gr.Row():
up_isref = gr.Checkbox(label="These are reference / human anchor samples", value=False)
up_transcript = gr.Textbox(label="Transcript (optional, applies to all uploaded)", scale=2)
up_btn = gr.Button("Upload", variant="primary", size="sm")
up_msg = gr.Markdown()
samples_tbl = gr.Dataframe(
headers=["id", "language", "sample_name", "model", "reference"],
interactive=False, label="Samples")
with gr.Row():
del_sample = gr.Dropdown(label="Select a sample to delete", choices=[],
allow_custom_value=True, scale=3)
del_btn = gr.Button("Delete selected", variant="stop", scale=0, size="sm")
gr.Markdown("#### Edit a sample")
gr.Markdown("Pick a sample to edit its reference text (shown to reviewers) and whether it "
"is a reference / human anchor. **Anchors are excluded from the system MOS** β€” "
"if your MOS table is empty but ratings exist, the samples are probably marked "
"as anchors here; untick the box and Save.")
with gr.Row():
tr_sample = gr.Dropdown(label="Sample", choices=[],
allow_custom_value=True, scale=2)
tr_isref = gr.Checkbox(label="Reference / human anchor", value=False, scale=1)
with gr.Row():
tr_text = gr.Textbox(label="Reference text", lines=2, scale=3)
tr_btn = gr.Button("Save", scale=0, size="sm")
tr_msg = gr.Markdown()
unmark_all_btn = gr.Button("⟲ Mark ALL samples as system (un-mark every anchor)", size="sm")
unmark_msg = gr.Markdown()
gr.Markdown("### Users")
refresh_users_btn = gr.Button("Refresh users", size="sm")
users_tbl = gr.Dataframe(
headers=["id", "username", "role", "active", "languages", "ratings"],
interactive=False)
with gr.Row():
promote_id = gr.Number(label="User id", precision=0)
role_choice = gr.Dropdown(label="Set role", choices=["reviewer", "admin"], value="reviewer")
active_choice = gr.Dropdown(label="Set active", choices=["yes", "no"], value="yes")
update_user_btn = gr.Button("Apply", scale=0, size="sm")
user_admin_msg = gr.Markdown()
with gr.Row():
del_user_id = gr.Number(label="Delete user by id", precision=0)
del_user_btn = gr.Button("Delete user", variant="stop", scale=0, size="sm")
gr.Markdown("Deleting a user also removes all of their ratings. To keep their ratings, "
"set **active = no** instead.")
del_user_msg = gr.Markdown()
gr.Markdown("### Results")
with gr.Row():
res_lang = gr.Dropdown(label="Language", choices=lang_choices(),
allow_custom_value=True, scale=2)
res_btn = gr.Button("Compute MOS", variant="primary", scale=0, size="sm")
export_btn = gr.Button("Export XLSX", scale=0, size="sm")
res_summary = gr.Markdown()
res_model_tbl = gr.Dataframe(label="MOS by model", interactive=False)
res_sample_tbl = gr.Dataframe(label="MOS by sample", interactive=False)
res_file = gr.File(label="Exported file")
# ===================================================================== #
# Handlers
# ===================================================================== #
rate_outputs = [criterion_inputs[k] for k in CRITERIA_KEYS]
def do_login(username, password):
user = authenticate(username, password)
if not user:
# keep typed credentials so the user can correct them
return (gr.update(), gr.update(), None, "❌ Invalid username or password.",
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(), gr.update(), gr.update(), gr.update())
sess = user_session(user["id"])
is_admin = sess["role"] == "admin"
rl = reviewer_lang_choices(sess)
if is_admin:
lt, st, ut = _languages_table(), _samples_table(), _users_table()
dch = _sample_delete_choices()
du, tu = gr.update(choices=dch), gr.update(choices=dch)
else:
lt, st, ut = [], [], []
du, tu = gr.update(), gr.update()
return (
gr.update(visible=False), # auth_col
gr.update(visible=True), # app_col
sess, # session
"", # li_msg
gr.update(value=f"Signed in as **{sess['username']}** Β· {sess['role']}"), # greeting
gr.update(visible=not is_admin), # reviewer_tabs
gr.update(visible=is_admin), # admin_panel
gr.update(choices=rl, value=None), # rate_lang
gr.update(value=""), # li_user (clear)
gr.update(value=""), # li_pw (clear)
make_token(user["id"]), # auth_token (persist login)
lt, st, ut, du, tu, # admin tables + pickers
)
LOGIN_OUTPUTS = [auth_col, app_col, session, li_msg, greeting, reviewer_tabs, admin_panel,
rate_lang, li_user, li_pw, auth_token,
langs_tbl, samples_tbl, users_tbl, del_sample, tr_sample]
li_btn.click(do_login, [li_user, li_pw], LOGIN_OUTPUTS)
li_pw.submit(do_login, [li_user, li_pw], LOGIN_OUTPUTS) # Enter-to-login
def do_signup(username, email, password, lang_id, code):
role = "admin" if (code and code == ADMIN_CODE) else "reviewer"
lang_ids = [lang_id] if lang_id else []
try:
create_user(username, email, password, role=role, language_ids=lang_ids)
except ValueError as e:
# keep fields so the user can fix the problem
return (f"❌ {e}", gr.update(), gr.update(), gr.update(), gr.update(), gr.update())
note = " (admin account)" if role == "admin" else ""
return ("βœ… Account created" + note + ". Switch to the **Sign in** tab to continue.",
gr.update(value=""), gr.update(value=""), gr.update(value=""),
gr.update(value=None), gr.update(value=""))
su_btn.click(do_signup, [su_user, su_email, su_pw, su_lang, su_code],
[su_msg, su_user, su_email, su_pw, su_lang, su_code])
def do_logout():
return (gr.update(visible=True), gr.update(visible=False), None, "", "")
logout_btn.click(do_logout, None, [auth_col, app_col, session, greeting, auth_token])
# ---- Rating flow ----
def load_samples_for_lang(sess, language_id):
if not sess or not language_id:
return gr.update(choices=[], value=None)
items = samples_for_reviewer(sess["id"], language_id)
choices = [(("βœ“ " if s["rated"] else "β€’ ") + s["sample_name"], s["id"]) for s in items]
return gr.update(choices=choices, value=(choices[0][1] if choices else None))
rate_lang.change(load_samples_for_lang, [session, rate_lang], [rate_sample])
def load_sample(sess, sample_id):
if not sample_id:
return (None, "", None, *[None] * len(CRITERIA_KEYS), "")
with get_conn() as conn:
row = conn.execute("SELECT * FROM samples WHERE id=?", (sample_id,)).fetchone()
if not row:
return (None, "", None, *[None] * len(CRITERIA_KEYS), "")
transcript = row["transcript"] or "β€” No reference text provided for this sample β€”"
existing = get_rating(sess["id"], sample_id) if sess else None
scores = [existing[k] if existing else None for k in CRITERIA_KEYS]
comments = existing["comments"] if existing else ""
return (row["file_path"], transcript, sample_id, *scores, comments)
rate_sample.change(
load_sample, [session, rate_sample],
[rate_audio, rate_transcript, current_sample, *rate_outputs, rate_comments],
)
def go_next_unrated(sess, language_id):
if not sess or not language_id:
return gr.update()
items = samples_for_reviewer(sess["id"], language_id)
nxt = next((s["id"] for s in items if not s["rated"]), None)
if nxt is None:
return gr.update()
return gr.update(value=nxt)
next_btn.click(go_next_unrated, [session, rate_lang], [rate_sample])
def score_lang(sess, sample_id):
with get_conn() as conn:
row = conn.execute("SELECT language_id FROM samples WHERE id=?", (sample_id,)).fetchone()
return row["language_id"] if row else None
def submit_rating(sess, sample_id, comments, *scores):
if not sess:
return "❌ Not signed in."
if not sample_id:
return "❌ No sample selected."
score_map = dict(zip(CRITERIA_KEYS, scores))
missing = [lbl for (k, lbl, _) in CRITERIA if score_map.get(k) in (None, "")]
if missing:
return "❌ Please rate every criterion. Missing: " + ", ".join(missing)
upsert_rating(sess["id"], sample_id, score_map, comments)
lid = score_lang(sess, sample_id)
items = samples_for_reviewer(sess["id"], lid)
rated = sum(1 for s in items if s["rated"])
return f"βœ… Saved β€” {rated}/{len(items)} samples rated in this language. Click **Next unrated β–Ά** to continue."
submit_btn.click(
submit_rating, [session, current_sample, rate_comments, *rate_outputs], [rate_msg],
).then(
load_samples_for_lang, [session, rate_lang], [rate_sample]
)
# ---- Progress ----
def load_progress(sess):
if not sess:
return "Not signed in.", []
rows = []
for l in sess["languages"]:
items = samples_for_reviewer(sess["id"], l["id"])
rated = sum(1 for s in items if s["rated"])
rows.append([l["name"], rated, len(items), len(items) - rated])
if not rows:
return "You have no languages assigned yet. Add some under **My languages**.", []
return f"Progress for **{sess['username']}**:", rows
refresh_prog_btn.click(load_progress, [session], [progress_md, progress_tbl])
# ---- Admin: languages ----
def _languages_table():
return [[l["id"], l["code"], l["name"]] for l in list_languages()]
def _lang_dropdown_updates():
ch = lang_choices()
return (gr.update(choices=ch), gr.update(choices=ch), gr.update(choices=ch))
def admin_add_language(sess, code, name):
if not sess or sess["role"] != "admin":
return ("❌ Admin only.", _languages_table(), *_lang_dropdown_updates())
try:
add_language(code, name)
except ValueError as e:
return (f"❌ {e}", _languages_table(), *_lang_dropdown_updates())
return (f"βœ… Added {name} ({code}).", _languages_table(), *_lang_dropdown_updates())
al_btn.click(
admin_add_language, [session, al_code, al_name],
[al_msg, langs_tbl, up_lang, res_lang, su_lang],
)
# ---- Admin: samples ----
def _samples_table():
return [[s["id"], f"{s['language_name']} ({s['language_code']})", s["sample_name"],
s["model_name"], "yes" if s["is_reference"] else "no"] for s in list_samples()]
def _sample_delete_choices():
return [(f"#{s['id']} β€” {s['sample_name']} ({s['model_name']}, {s['language_code']})", s["id"])
for s in list_samples()]
def admin_upload(sess, language_id, files, model, is_ref, transcript):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _samples_table(), gr.update(), gr.update()
if not language_id:
return "❌ Choose a language first.", _samples_table(), gr.update(), gr.update()
if not files:
return "❌ No files selected.", _samples_table(), gr.update(), gr.update()
count = 0
for f in files:
path = f if isinstance(f, str) else getattr(f, "name", None)
try:
add_sample(language_id, path, model_name=model, is_reference=is_ref, transcript=transcript)
count += 1
except Exception as e: # noqa
ch = _sample_delete_choices()
return (f"❌ Error on a file: {e}", _samples_table(),
gr.update(choices=ch), gr.update(choices=ch))
ch = _sample_delete_choices()
return (f"βœ… Uploaded {count} sample(s).", _samples_table(),
gr.update(choices=ch, value=None), gr.update(choices=ch, value=None))
up_btn.click(admin_upload, [session, up_lang, up_files, up_model, up_isref, up_transcript],
[up_msg, samples_tbl, del_sample, tr_sample])
def admin_delete_sample(sess, sid):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _samples_table(), gr.update(), gr.update()
if not sid:
ch = _sample_delete_choices()
return ("❌ Select a sample to delete.", _samples_table(),
gr.update(choices=ch), gr.update(choices=ch))
delete_sample(int(sid))
ch = _sample_delete_choices()
return (f"βœ… Deleted sample #{int(sid)} (and any ratings it had).", _samples_table(),
gr.update(choices=ch, value=None), gr.update(choices=ch, value=None))
del_btn.click(admin_delete_sample, [session, del_sample],
[up_msg, samples_tbl, del_sample, tr_sample])
# ---- Admin: per-sample reference text & anchor flag ----
def load_sample_meta(sess, sid):
if not sess or sess["role"] != "admin" or not sid:
return "", False
tr, isref = get_sample_meta(int(sid))
return tr, isref
tr_sample.change(load_sample_meta, [session, tr_sample], [tr_text, tr_isref])
def save_sample_meta(sess, sid, text, is_ref):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _samples_table()
if not sid:
return "❌ Select a sample first.", _samples_table()
set_sample_transcript(int(sid), text)
set_sample_reference(int(sid), is_ref)
flag = "reference / anchor (excluded from MOS)" if is_ref else "system sample (counts toward MOS)"
return (f"βœ… Saved sample #{int(sid)} β€” now a {flag}.", _samples_table())
tr_btn.click(save_sample_meta, [session, tr_sample, tr_text, tr_isref], [tr_msg, samples_tbl])
def do_unmark_all(sess):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _samples_table(), gr.update()
n = unmark_all_references()
return (f"βœ… Un-marked {n} sample(s) β€” they now count toward the MOS. "
f"Go to Results and click **Compute MOS**.", _samples_table(), gr.update(value=False))
unmark_all_btn.click(do_unmark_all, [session], [unmark_msg, samples_tbl, tr_isref])
# ---- Admin: users ----
def _users_table():
rows = []
with get_conn() as conn:
for u in conn.execute("SELECT * FROM users ORDER BY id").fetchall():
langs = conn.execute(
"SELECT l.name FROM user_languages ul JOIN languages l ON l.id=ul.language_id "
"WHERE ul.user_id=?", (u["id"],)).fetchall()
nratings = conn.execute("SELECT COUNT(*) c FROM ratings WHERE user_id=?",
(u["id"],)).fetchone()["c"]
rows.append([u["id"], u["username"], u["role"], "yes" if u["is_active"] else "no",
", ".join(l["name"] for l in langs), nratings])
return rows
refresh_users_btn.click(lambda s: _users_table() if s and s["role"] == "admin" else [],
[session], [users_tbl])
def admin_update_user(sess, uid, role, active):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _users_table()
if not uid:
return "❌ Enter a user id.", _users_table()
with get_conn() as conn:
conn.execute("UPDATE users SET role=?, is_active=? WHERE id=?",
(role, 1 if active == "yes" else 0, int(uid)))
backup_db()
return f"βœ… Updated user {int(uid)}.", _users_table()
update_user_btn.click(admin_update_user, [session, promote_id, role_choice, active_choice],
[user_admin_msg, users_tbl])
def admin_delete_user(sess, uid):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", _users_table()
if not uid:
return "❌ Enter a user id.", _users_table()
uid = int(uid)
if uid == sess["id"]:
return "❌ You can't delete your own account while signed in.", _users_table()
with get_conn() as conn:
row = conn.execute("SELECT username FROM users WHERE id=?", (uid,)).fetchone()
if not row:
return f"❌ No user with id {uid}.", _users_table()
delete_user(uid)
return (f"βœ… Deleted user #{uid} ({row['username']}) and all of their ratings.",
_users_table())
del_user_btn.click(admin_delete_user, [session, del_user_id], [del_user_msg, users_tbl])
# ---- Admin: results ----
def admin_results(sess, language_id):
if not sess or sess["role"] != "admin":
return "❌ Admin only.", None, None
if not language_id:
return "Choose a language.", None, None
per_model, per_sample, summary = compute_results(language_id)
if per_model is None or per_model.empty:
per_model = pd.DataFrame({"info": ["No system (non-anchor) ratings yet. "
"If ratings exist, the rated samples may be marked as "
"reference/anchor β€” untick that in 'Edit a sample'."]})
if per_sample is None or per_sample.empty:
per_sample = pd.DataFrame({"info": ["No system (non-anchor) per-sample ratings yet."]})
return summary, per_model, per_sample
res_btn.click(admin_results, [session, res_lang], [res_summary, res_model_tbl, res_sample_tbl])
def admin_export(sess, language_id):
if not sess or sess["role"] != "admin" or not language_id:
return None
return export_results(language_id)
export_btn.click(admin_export, [session, res_lang], [res_file])
# Restore the session on page load (so a refresh keeps you signed in).
def restore_session(token):
uid = verify_token(token)
if not uid:
# not logged in: hide loading, show the sign-in screen
return (gr.update(visible=False), # loading_col
gr.update(visible=True), # auth_col
gr.update(visible=False), # app_col
None, gr.update(), gr.update(), gr.update(), gr.update(),
[], [], [], gr.update(), gr.update())
sess = user_session(uid)
is_admin = sess["role"] == "admin"
rl = reviewer_lang_choices(sess)
if is_admin:
lt, st, ut = _languages_table(), _samples_table(), _users_table()
dch = _sample_delete_choices()
du, tu = gr.update(choices=dch), gr.update(choices=dch)
else:
lt, st, ut = [], [], []
du, tu = gr.update(), gr.update()
return (
gr.update(visible=False), # loading_col
gr.update(visible=False), # auth_col
gr.update(visible=True), # app_col
sess, # session
gr.update(value=f"Signed in as **{sess['username']}** Β· {sess['role']}"), # greeting
gr.update(visible=not is_admin), # reviewer_tabs
gr.update(visible=is_admin), # admin_panel
gr.update(choices=rl, value=None), # rate_lang
lt, st, ut, du, tu, # admin tables + pickers
)
demo.load(restore_session, [auth_token],
[loading_col, auth_col, app_col, session, greeting, reviewer_tabs, admin_panel,
rate_lang, langs_tbl, samples_tbl, users_tbl, del_sample, tr_sample])
# Refresh language-dependent choices on every page load, so newly added
# languages appear in the signup form (and admin dropdowns) without a restart.
def refresh_lang_choices():
ch = lang_choices()
return (gr.update(choices=ch), gr.update(choices=ch), gr.update(choices=ch))
demo.load(refresh_lang_choices, None, [su_lang, up_lang, res_lang])
if __name__ == "__main__":
demo.queue().launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860)),
allowed_paths=[DATA_DIR], # let Gradio serve audio stored under MOS_DATA_DIR (e.g. /data)
)