from __future__ import annotations
import difflib
import base64
import functools
import hashlib
import html
import json
import math
import re
import subprocess
import urllib.error
import urllib.request
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
try:
import spaces
gpu_task: Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]] = spaces.GPU
except Exception:
def gpu_task(*_args: Any, **_kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
return fn
return decorator
APP_DIR = Path(__file__).resolve().parent
GENERATED_DIR = APP_DIR / "generated"
RECORDING_DIR = APP_DIR / "recordings"
GENERATED_DIR.mkdir(exist_ok=True)
RECORDING_DIR.mkdir(exist_ok=True)
TTS_MODEL_ID = "openbmb/VoxCPM2"
JUDGE_MODEL_ID = "openbmb/MiniCPM5-1B"
ENGLISH_ASR_MODEL_ID = "facebook/wav2vec2-base-960h"
TARGET_SAMPLE_RATE = 16000
MAX_TARGET_CHARS = 180
MAX_ATTEMPT_SECONDS = 20.0
MAX_RECORDING_BYTES = 12 * 1024 * 1024
FAST_TTS_CFG_VALUE = 1.35
FAST_TTS_STEPS = 4
TTS_CACHE_VERSION = "fast-v3"
AUDIO_CACHE: dict[str, str] = {}
SAMPLE_DATASET_ID = "loay/build-small-shadowing-mini-audio"
SAMPLE_DATASET_REVISION = "main"
SAMPLE_VERSION = "v1"
SAMPLE_BASE_URL = (
f"https://huggingface.co/datasets/{SAMPLE_DATASET_ID}/resolve/"
f"{SAMPLE_DATASET_REVISION}/reference/{SAMPLE_VERSION}"
)
SUPPORTED_LANGUAGES = [
"Arabic",
"Burmese",
"Chinese",
"Danish",
"Dutch",
"English",
"Finnish",
"French",
"German",
"Greek",
"Hebrew",
"Hindi",
"Indonesian",
"Italian",
"Japanese",
"Khmer",
"Korean",
"Lao",
"Malay",
"Norwegian",
"Polish",
"Portuguese",
"Russian",
"Spanish",
"Swahili",
"Swedish",
"Tagalog",
"Thai",
"Turkish",
"Vietnamese",
]
LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
VOICE_STYLES = {
"Careful": "clear friendly tutor voice, medium-slow pace",
"Happy": "warm upbeat voice, natural pace",
"Slow": "slow clear practice voice",
"Story": "bright storyteller voice with gentle expression",
}
FALLBACK_PHRASES = {
"Arabic": "أود أن أتمرن على المحادثة كل يوم.",
"Burmese": "နေ့တိုင်း စကားပြော လေ့ကျင့်ချင်ပါတယ်။",
"Chinese": "我每天都想练习说话。",
"Danish": "Jeg vil gerne øve mig i at tale hver dag.",
"Dutch": "Ik wil elke dag oefenen met spreken.",
"English": "I want to practice speaking clearly every day.",
"Finnish": "Haluan harjoitella puhumista selkeästi joka päivä.",
"French": "Je veux pratiquer la parole clairement chaque jour.",
"German": "Ich möchte jeden Tag klar sprechen üben.",
"Greek": "Θέλω να εξασκούμαι στην ομιλία κάθε μέρα.",
"Hebrew": "אני רוצה לתרגל דיבור ברור בכל יום.",
"Hindi": "मैं हर दिन साफ़ बोलने का अभ्यास करना चाहता हूँ।",
"Indonesian": "Saya ingin berlatih berbicara dengan jelas setiap hari.",
"Italian": "Voglio esercitarmi a parlare chiaramente ogni giorno.",
"Japanese": "毎日、はっきり話す練習をしたいです。",
"Khmer": "ខ្ញុំចង់ហាត់និយាយឱ្យច្បាស់រាល់ថ្ងៃ។",
"Korean": "저는 매일 또렷하게 말하는 연습을 하고 싶어요.",
"Lao": "ຂ້ອຍຢາກຝຶກເວົ້າໃຫ້ຊັດເຈນທຸກມື້.",
"Malay": "Saya mahu berlatih bercakap dengan jelas setiap hari.",
"Norwegian": "Jeg vil øve på å snakke tydelig hver dag.",
"Polish": "Chcę codziennie ćwiczyć wyraźne mówienie.",
"Portuguese": "Quero praticar falar com clareza todos os dias.",
"Russian": "Я хочу каждый день тренироваться говорить ясно.",
"Spanish": "Quiero practicar hablar con claridad todos los días.",
"Swahili": "Nataka kufanya mazoezi ya kuzungumza wazi kila siku.",
"Swedish": "Jag vill öva på att tala tydligt varje dag.",
"Tagalog": "Gusto kong magsanay magsalita nang malinaw araw-araw.",
"Thai": "ฉันอยากฝึกพูดให้ชัดเจนทุกวัน",
"Turkish": "Her gün açık konuşma pratiği yapmak istiyorum.",
"Vietnamese": "Tôi muốn luyện nói rõ ràng mỗi ngày.",
}
STARTER_PHRASES = {
"English": [
"I want to practice speaking clearly every day.",
"Today I will speak slowly and clearly.",
"Please help me say this sentence better.",
"I can listen first and then repeat.",
"My voice is getting clearer with practice.",
],
}
LEVEL_STARTER_PHRASES = {
"English": {
"A1": [
"I can say this slowly.",
"My voice is clear.",
"I listen and repeat.",
],
"A2": STARTER_PHRASES["English"],
"B1": [
"I want to explain my idea clearly today.",
"Please listen while I repeat the sentence.",
"I can speak with better rhythm and timing.",
],
"B2": [
"I am practicing steady speech with natural rhythm.",
"Clear pronunciation helps my ideas sound more confident.",
"I can repeat the line while keeping the same pace.",
],
"C1": [
"I am refining my speech so each phrase sounds precise and natural.",
"I want my pacing, stress, and intonation to match the speaker.",
"Careful listening helps me improve the shape of every sentence.",
],
"C2": [
"I am polishing subtle rhythm, emphasis, and tone in connected speech.",
"Shadowing helps me reproduce fluent speech patterns with greater control.",
"I can adapt my delivery while preserving clarity, timing, and expression.",
],
}
}
CSS = """
:root {
--lpl-navy: #0d2547;
--lpl-ink: #12233f;
--lpl-muted: #59708f;
--lpl-soft: #eef9fd;
--lpl-panel: #ffffff;
--lpl-line: #d6e7ef;
--lpl-teal: #11a99d;
--lpl-teal-dark: #05877f;
--lpl-coral: #ff6258;
--lpl-yellow: #ffc234;
--lpl-cream: #fff6df;
--lpl-shadow: 0 18px 44px rgba(13, 37, 71, 0.11);
color-scheme: light;
}
html,
body {
background: linear-gradient(180deg, #f6fcff 0%, #eef8fe 48%, #f9fdff 100%);
color: var(--lpl-ink);
color-scheme: light !important;
}
.gradio-container {
--body-background-fill: transparent;
--body-text-color: var(--lpl-ink);
--block-background-fill: transparent;
--block-border-color: transparent;
--block-info-text-color: var(--lpl-muted);
--input-background-fill: #edf8fb;
--input-border-color: transparent;
--input-placeholder-color: #59708f;
--input-text-color: var(--lpl-navy);
--button-primary-background-fill: var(--lpl-teal);
--button-primary-background-fill-hover: var(--lpl-teal-dark);
--button-primary-text-color: #ffffff;
max-width: 1480px !important;
margin: 0 auto !important;
padding: 18px 24px 22px !important;
font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif !important;
background: transparent !important;
color: var(--lpl-ink) !important;
color-scheme: light !important;
}
.gradio-container footer,
.gradio-container .prose h1,
.gradio-container .prose h2,
.gradio-container .prose h3 {
display: none !important;
}
.gradio-container .generating,
.gradio-container .pending,
.gradio-container .loading {
opacity: 1 !important;
filter: none !important;
color: var(--lpl-ink) !important;
}
.gradio-container .generating *,
.gradio-container .pending *,
.gradio-container .loading * {
opacity: 1 !important;
filter: none !important;
}
.gradio-container .generating::before,
.gradio-container .generating::after,
.gradio-container .pending::before,
.gradio-container .pending::after {
display: none !important;
}
.lpl-topbar {
min-height: 70px;
display: flex;
align-items: center;
justify-content: space-between;
gap: 20px;
padding: 6px 2px 18px;
}
.lpl-brand {
display: flex;
align-items: center;
flex-wrap: wrap;
gap: 12px;
color: var(--lpl-navy);
font-weight: 830;
font-size: clamp(1.32rem, 2vw, 2rem);
letter-spacing: 0;
}
.lpl-mark {
width: 54px;
height: 42px;
border-radius: 24px 24px 24px 8px;
background: linear-gradient(145deg, var(--lpl-teal), #19c6bd);
position: relative;
box-shadow: 0 10px 20px rgba(17, 169, 157, 0.22);
}
.lpl-mark::before {
content: "";
position: absolute;
width: 7px;
height: 7px;
left: 14px;
top: 16px;
border-radius: 99px;
background: white;
box-shadow: 13px 0 0 white, 26px 0 0 white;
}
.lpl-divider {
width: 1px;
height: 28px;
background: #c9dce7;
}
.lpl-product {
color: var(--lpl-teal-dark);
font-weight: 780;
font-size: clamp(1rem, 1.5vw, 1.45rem);
}
.lpl-tagline {
color: var(--lpl-muted);
font-weight: 740;
font-size: clamp(0.95rem, 1.35vw, 1.18rem);
}
.lpl-steps {
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
gap: 12px;
margin-bottom: 20px;
}
.lpl-step {
min-height: 88px;
border-radius: 28px;
border: 1px solid #e5f0f5;
background: rgba(255, 255, 255, 0.92);
color: var(--lpl-ink);
display: flex;
align-items: center;
gap: 16px;
padding: 16px 22px;
box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08);
}
.lpl-step.is-active {
color: #ffffff;
background: linear-gradient(135deg, var(--lpl-teal), #13bbb2);
border-color: transparent;
}
.lpl-step-number {
width: 46px;
height: 46px;
border-radius: 999px;
display: grid;
place-items: center;
flex: 0 0 auto;
background: #ffffff;
color: var(--lpl-navy);
font-size: 1.18rem;
font-weight: 850;
box-shadow: inset 0 0 0 1px rgba(13, 37, 71, 0.08);
}
.lpl-step-label {
font-size: clamp(1.05rem, 1.7vw, 1.42rem);
font-weight: 820;
}
.lpl-layout {
align-items: stretch !important;
}
.lpl-choose,
.lpl-practice,
.lpl-feedback {
background: rgba(255, 255, 255, 0.94);
border: 1px solid #e2eef4;
border-radius: 24px;
box-shadow: var(--lpl-shadow);
padding: 22px !important;
color: var(--lpl-navy) !important;
color-scheme: light !important;
}
.lpl-choose {
min-height: 650px;
}
.lpl-choose *,
.lpl-practice *,
.lpl-feedback * {
color-scheme: light !important;
}
.lpl-card-title {
margin: 0 0 18px;
color: var(--lpl-navy);
font-weight: 860;
font-size: 1.32rem;
}
.lpl-field-label {
color: var(--lpl-navy);
font-weight: 760;
font-size: 0.95rem;
margin: 14px 0 8px;
}
.lpl-choose .wrap,
.lpl-choose label,
.lpl-choose .block-info,
.lpl-choose .label-wrap,
.lpl-choose .container,
.lpl-choose .form {
color: var(--lpl-navy) !important;
}
.lpl-choose .form,
.lpl-choose .block,
.lpl-choose .wrap,
.lpl-choose .container,
.lpl-choose [data-testid="block-label"] {
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
}
.lpl-choose input,
.lpl-choose textarea,
.lpl-choose select,
.lpl-choose .wrap,
.lpl-choose [data-testid="dropdown"],
.lpl-choose [role="textbox"],
.lpl-choose [role="combobox"] {
border-radius: 999px !important;
background: #edf8fb !important;
border-color: transparent !important;
color: var(--lpl-navy) !important;
}
.lpl-choose input::placeholder,
.lpl-choose textarea::placeholder {
color: #6f8299 !important;
opacity: 1 !important;
}
.lpl-choose textarea {
border-radius: 22px !important;
min-height: 82px !important;
}
.lpl-choose .wrap:focus-within,
.lpl-choose [data-testid="dropdown"]:focus-within {
box-shadow: 0 0 0 3px rgba(17, 169, 157, 0.18) !important;
}
.gradio-container [role="listbox"],
.gradio-container [data-testid="dropdown-options"],
.gradio-container .options,
.gradio-container .dropdown-options,
.gradio-container .select-options,
body [role="listbox"],
body [data-testid="dropdown-options"],
body .options,
body .dropdown-options,
body .select-options {
background: #ffffff !important;
color: var(--lpl-navy) !important;
border: 1px solid #d7eaf1 !important;
border-radius: 18px !important;
box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important;
}
.gradio-container [role="option"],
.gradio-container .option,
body [role="option"],
body .option {
background: #ffffff !important;
color: var(--lpl-navy) !important;
}
.gradio-container [role="option"]:hover,
.gradio-container [role="option"][aria-selected="true"],
.gradio-container .option:hover,
.gradio-container .option.selected,
body [role="option"]:hover,
body [role="option"][aria-selected="true"],
body .option:hover,
body .option.selected {
background: #edf8fb !important;
color: var(--lpl-teal-dark) !important;
}
body .toast-wrap,
body .toast,
body [data-testid="toast"],
.gradio-container .toast-wrap,
.gradio-container .toast,
.gradio-container [data-testid="toast"] {
background: #ffffff !important;
color: var(--lpl-navy) !important;
border-color: #d7eaf1 !important;
box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important;
}
body .toast *,
body [data-testid="toast"] *,
.gradio-container .toast *,
.gradio-container [data-testid="toast"] * {
color: var(--lpl-navy) !important;
}
.lpl-level-radio .wrap,
.lpl-voice-radio .wrap {
background: transparent !important;
}
.lpl-level-radio,
.lpl-voice-radio,
.lpl-level-radio .wrap,
.lpl-voice-radio .wrap,
.lpl-level-radio .container,
.lpl-voice-radio .container {
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
}
.lpl-level-radio label,
.lpl-voice-radio label {
min-height: 44px !important;
border-radius: 999px !important;
background: #edf8fb !important;
border: 1px solid transparent !important;
color: var(--lpl-navy) !important;
font-weight: 760 !important;
}
.lpl-level-radio label span,
.lpl-voice-radio label span {
color: var(--lpl-navy) !important;
}
.lpl-level-radio input:checked + span,
.lpl-voice-radio input:checked + span {
color: var(--lpl-teal-dark) !important;
}
.lpl-level-radio label:has(input:checked),
.lpl-voice-radio label:has(input:checked),
.lpl-level-radio label[aria-checked="true"],
.lpl-voice-radio label[aria-checked="true"],
.lpl-level-radio [role="radio"][aria-checked="true"],
.lpl-voice-radio [role="radio"][aria-checked="true"],
.lpl-level-radio label.selected,
.lpl-voice-radio label.selected {
background: linear-gradient(135deg, var(--lpl-teal), #18c7be) !important;
border-color: transparent !important;
color: #ffffff !important;
box-shadow: 0 12px 22px rgba(17, 169, 157, 0.24) !important;
}
.lpl-level-radio label:has(input:checked) *,
.lpl-voice-radio label:has(input:checked) *,
.lpl-level-radio label[aria-checked="true"] *,
.lpl-voice-radio label[aria-checked="true"] *,
.lpl-level-radio [role="radio"][aria-checked="true"] *,
.lpl-voice-radio [role="radio"][aria-checked="true"] *,
.lpl-level-radio label.selected *,
.lpl-voice-radio label.selected * {
color: #ffffff !important;
}
.lpl-level-radio label:hover,
.lpl-voice-radio label:hover {
border-color: #9fdedb !important;
transform: translateY(-1px);
}
.lpl-main-btn,
.lpl-score-btn {
width: 100%;
min-height: 64px !important;
border-radius: 999px !important;
border: 0 !important;
color: #ffffff !important;
font-size: 1.25rem !important;
font-weight: 850 !important;
box-shadow: 0 16px 32px rgba(17, 169, 157, 0.24) !important;
}
.lpl-main-btn {
background: linear-gradient(135deg, var(--lpl-teal), #08bcb3) !important;
margin-top: 16px !important;
}
.lpl-score-btn {
background: linear-gradient(135deg, var(--lpl-navy), #123866) !important;
box-shadow: 0 18px 36px rgba(13, 37, 71, 0.24) !important;
}
.lpl-main-btn:disabled,
.lpl-main-btn[disabled],
.lpl-score-btn:disabled,
.lpl-score-btn[disabled] {
opacity: 0.62 !important;
cursor: wait !important;
}
.lpl-score-status {
min-height: 32px;
margin: 10px 0 16px;
display: flex;
align-items: center;
gap: 10px;
color: var(--lpl-muted);
font-size: 0.96rem;
font-weight: 760;
}
.lpl-score-status.is-loading {
color: var(--lpl-navy);
}
.lpl-spinner {
width: 18px;
height: 18px;
border-radius: 999px;
border: 3px solid #dff0f4;
border-top-color: var(--lpl-teal);
animation: lpl-spin 0.8s linear infinite;
}
@keyframes lpl-spin {
to {
transform: rotate(360deg);
}
}
.lpl-practice {
min-height: 650px;
}
.lpl-phrase {
min-height: 225px;
border-radius: 24px;
background: #ffffff;
border: 1px solid #e5eef4;
box-shadow: 0 14px 34px rgba(13, 37, 71, 0.08);
display: flex;
flex-direction: column;
justify-content: center;
padding: 26px 30px;
text-align: center;
position: relative;
overflow: hidden;
}
.lpl-phrase::after {
content: "";
width: 52%;
height: 2px;
border-radius: 99px;
background: repeating-linear-gradient(90deg, #9ee4e0 0 12px, transparent 12px 24px);
margin: 22px auto 0;
}
.lpl-phrase.is-loading {
border-color: #b9e9e6;
background:
radial-gradient(circle at 50% 42%, rgba(17, 169, 157, 0.13), transparent 34%),
#ffffff;
}
.lpl-phrase.is-loading::before {
content: "";
position: absolute;
inset: -45% auto auto 50%;
width: 260px;
height: 260px;
border-radius: 999px;
border: 2px solid rgba(17, 169, 157, 0.22);
transform: translateX(-50%);
animation: lpl-breathe 1.5s ease-in-out infinite;
}
.lpl-phrase-meta {
color: var(--lpl-muted);
font-weight: 720;
margin-bottom: 12px;
position: relative;
z-index: 1;
}
.lpl-phrase-text {
color: var(--lpl-navy);
font-size: clamp(2rem, 4.5vw, 3.35rem);
line-height: 1.25;
font-weight: 900;
letter-spacing: 0;
position: relative;
z-index: 1;
}
.lpl-loading-line {
max-width: 780px;
margin: 16px auto 0;
color: var(--lpl-muted);
font-size: 1rem;
line-height: 1.45;
font-weight: 720;
position: relative;
z-index: 1;
}
.lpl-voice-loader {
min-height: 42px;
margin: 18px auto 2px;
display: flex;
align-items: center;
justify-content: center;
gap: 8px;
position: relative;
z-index: 1;
}
.lpl-voice-loader span {
width: 10px;
height: 18px;
border-radius: 999px;
background: linear-gradient(180deg, var(--lpl-teal), #2bd6cd);
animation: lpl-wave 0.82s ease-in-out infinite;
}
.lpl-voice-loader span:nth-child(2) {
animation-delay: 0.08s;
}
.lpl-voice-loader span:nth-child(3) {
animation-delay: 0.16s;
}
.lpl-voice-loader span:nth-child(4) {
animation-delay: 0.24s;
}
.lpl-voice-loader span:nth-child(5) {
animation-delay: 0.32s;
}
@keyframes lpl-wave {
0%,
100% {
transform: scaleY(0.58);
opacity: 0.55;
}
50% {
transform: scaleY(1.7);
opacity: 1;
}
}
@keyframes lpl-breathe {
0%,
100% {
opacity: 0.24;
transform: translateX(-50%) scale(0.82);
}
50% {
opacity: 0.48;
transform: translateX(-50%) scale(1);
}
}
.lpl-media-grid {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 16px;
margin: 18px 0;
}
.lpl-audio-panel {
border-radius: 24px;
border: 1px solid #e4eef4;
background: #ffffff;
overflow: hidden;
box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08);
}
.lpl-audio-panel.is-record {
background: linear-gradient(180deg, #fff8f6 0%, #ffffff 52%);
}
.lpl-audio-head {
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
min-height: 64px;
padding: 16px 18px 8px;
color: var(--lpl-navy);
font-weight: 820;
}
.lpl-audio-head span {
color: var(--lpl-muted);
font-size: 0.9rem;
font-weight: 700;
}
.lpl-audio-panel .audio-container,
.lpl-audio-panel .block,
.lpl-audio-panel .wrap,
.lpl-audio-panel .form {
border: 0 !important;
box-shadow: none !important;
background: transparent !important;
color: var(--lpl-navy) !important;
}
.lpl-audio-panel audio {
width: 100% !important;
min-height: 46px !important;
padding: 0 18px 18px !important;
box-sizing: border-box !important;
color-scheme: light !important;
}
.lpl-audio-panel .label-wrap,
.lpl-audio-panel .download,
.lpl-audio-panel .share {
display: none !important;
}
.lpl-native-player,
.lpl-native-recorder {
padding: 0 18px 18px;
}
.lpl-native-player audio,
.lpl-native-recorder audio {
width: 100%;
min-height: 46px;
padding: 0 !important;
color-scheme: light !important;
}
.lpl-player-empty,
.lpl-recorder-status {
min-height: 46px;
display: flex;
align-items: center;
color: var(--lpl-muted);
font-weight: 720;
}
.lpl-player-empty.is-loading,
.lpl-status-card.is-loading {
color: var(--lpl-navy);
gap: 10px;
}
.lpl-recorder-actions {
display: flex;
align-items: center;
flex-wrap: wrap;
gap: 12px;
margin-bottom: 12px;
}
.lpl-recorder-actions button {
min-height: 48px;
border: 0;
border-radius: 999px;
padding: 0 20px;
font-weight: 840;
cursor: pointer;
color: #ffffff;
background: var(--lpl-coral);
}
.lpl-recorder-actions button[data-stop] {
color: var(--lpl-coral);
background: #fff2f1;
border: 1px solid #ffbcb7;
}
.lpl-recorder-actions button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.lpl-recorder-meter {
height: 9px;
border-radius: 999px;
overflow: hidden;
background: #edf3f6;
margin: 0 0 12px;
}
.lpl-recorder-fill {
height: 100%;
width: 0%;
border-radius: 999px;
background: linear-gradient(90deg, var(--lpl-coral), var(--lpl-yellow));
}
.lpl-status {
min-height: 34px;
color: var(--lpl-muted);
font-size: 0.98rem;
font-weight: 680;
}
.lpl-status-card {
color: var(--lpl-muted);
font-size: 0.98rem;
line-height: 1.45;
display: flex;
align-items: center;
gap: 10px;
}
.lpl-feedback {
min-height: 650px;
}
.lpl-score-empty,
.lpl-score-card {
border-radius: 24px;
background: #ffffff;
border: 1px solid #e4eef4;
box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08);
padding: 22px;
color: var(--lpl-navy);
}
.lpl-score-empty {
min-height: 160px;
display: flex;
align-items: center;
color: var(--lpl-muted);
font-weight: 720;
line-height: 1.45;
}
.lpl-score-empty.is-error {
align-items: flex-start;
flex-direction: column;
gap: 8px;
border-color: #ffd0cc;
background: #fff8f7;
color: var(--lpl-navy);
}
.lpl-score-empty.is-error strong {
color: var(--lpl-coral);
font-size: 1.08rem;
}
.lpl-score-top {
display: flex;
align-items: center;
gap: 18px;
margin-bottom: 20px;
}
.lpl-ring {
--score: 0;
width: 132px;
height: 132px;
border-radius: 999px;
display: grid;
place-items: center;
flex: 0 0 auto;
background:
radial-gradient(circle at center, white 0 56%, transparent 58%),
conic-gradient(var(--lpl-teal) calc(var(--score) * 1%), #e6f0f5 0);
color: var(--lpl-navy);
font-size: 2.55rem;
font-weight: 900;
}
.lpl-score-copy strong {
display: block;
font-size: 1.42rem;
margin-bottom: 8px;
}
.lpl-score-copy span {
color: var(--lpl-muted);
line-height: 1.45;
font-weight: 650;
}
.lpl-meter {
margin: 14px 0;
}
.lpl-meter-row {
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
color: var(--lpl-navy);
font-weight: 760;
margin-bottom: 7px;
}
.lpl-meter-row span:last-child {
color: var(--lpl-teal-dark);
}
.lpl-bar {
height: 9px;
background: #e8f1f5;
border-radius: 999px;
overflow: hidden;
}
.lpl-fill {
height: 100%;
width: 0;
border-radius: 999px;
background: linear-gradient(90deg, var(--lpl-teal), #1f78c8);
}
.lpl-feedback-card,
.lpl-next-card {
margin-top: 16px;
border-radius: 24px;
background: #ffffff;
border: 1px solid #e4eef4;
box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08);
padding: 20px;
color: var(--lpl-navy);
}
.lpl-feedback-card h3,
.lpl-next-card h3 {
margin: 0 0 10px;
font-size: 1.14rem;
color: var(--lpl-navy);
}
.lpl-feedback-card p,
.lpl-next-card p {
margin: 0;
color: #37506f;
font-weight: 650;
line-height: 1.48;
}
.lpl-feedback-card ol {
margin: 14px 0 0 22px;
padding: 0;
color: #37506f;
font-weight: 650;
line-height: 1.52;
}
.lpl-feedback-card li {
margin: 7px 0;
}
.lpl-words {
margin-top: 14px;
padding-top: 14px;
border-top: 1px solid #e7eff4;
color: var(--lpl-muted);
font-weight: 650;
line-height: 1.5;
}
.lpl-words strong {
color: var(--lpl-navy);
}
.lpl-bottom-next {
margin-top: 18px;
border-radius: 24px;
background: linear-gradient(180deg, #fff8df 0%, #fff2c9 100%);
border: 1px solid #ffe5a1;
padding: 18px 22px;
display: flex;
gap: 18px;
align-items: center;
color: var(--lpl-navy);
box-shadow: 0 16px 28px rgba(255, 194, 52, 0.16);
}
.lpl-bottom-next strong {
font-size: 1.12rem;
}
.lpl-bottom-next span {
color: #37506f;
font-weight: 700;
}
.lpl-footer {
margin: 22px auto 0;
padding: 14px 18px;
border: 1px solid #dbeaf1;
border-radius: 999px;
background: rgba(255, 255, 255, 0.82);
color: #37506f;
font-size: 0.94rem;
font-weight: 700;
line-height: 1.45;
text-align: center;
box-shadow: 0 14px 28px rgba(13, 37, 71, 0.07);
}
.lpl-footer strong {
color: var(--lpl-navy);
font-weight: 860;
}
@media (max-width: 1080px) {
.lpl-layout {
flex-direction: column !important;
}
.lpl-layout > .column,
.lpl-layout > div {
width: 100% !important;
min-width: 0 !important;
}
.lpl-choose,
.lpl-practice,
.lpl-feedback {
min-height: auto;
}
.lpl-steps,
.lpl-media-grid {
grid-template-columns: 1fr 1fr;
}
}
@media (max-width: 760px) {
.gradio-container {
padding: 14px 12px 18px !important;
}
.lpl-topbar {
align-items: flex-start;
flex-direction: column;
}
.lpl-steps,
.lpl-media-grid {
grid-template-columns: 1fr;
}
.lpl-footer {
border-radius: 22px;
}
.lpl-step {
min-height: 66px;
border-radius: 20px;
}
.lpl-choose,
.lpl-practice,
.lpl-feedback {
min-height: auto;
padding: 16px !important;
}
.lpl-phrase {
min-height: 180px;
padding: 22px 18px;
}
.lpl-score-top {
align-items: flex-start;
flex-direction: column;
}
.lpl-ring {
width: 112px;
height: 112px;
}
}
"""
@dataclass
class ScoreResult:
overall: int
voice_shape: int
timing: int
rhythm: int
melody: int
reference_duration: float
attempt_duration: float
duration_ratio: float
evidence: dict[str, Any]
def clamp_score(value: float) -> int:
if not math.isfinite(value):
return 0
return int(round(max(0.0, min(100.0, value))))
def clean_text(value: Any, limit: int = 500) -> str:
value = re.sub(r"\s+", " ", str(value or "").strip())
return value[:limit]
def gpu_available() -> bool:
try:
import torch
return bool(torch.cuda.is_available())
except Exception:
return False
def require_gpu(action: str) -> None:
if not gpu_available():
raise gr.Error(
f"{action} needs GPU hardware. This Space is currently on CPU hardware; "
"switch it to GPU or ZeroGPU, then try again."
)
def language_fallback(language: str) -> str:
return FALLBACK_PHRASES.get(language, FALLBACK_PHRASES["English"])
def starter_phrases(language: str, level: str = "A2") -> list[str]:
level_phrases = LEVEL_STARTER_PHRASES.get(language, {}).get(level)
if level_phrases:
return level_phrases
phrases = STARTER_PHRASES.get(language)
if phrases:
return phrases
return [language_fallback(language)]
def has_prebuilt_sample(language: str, target_text: str) -> bool:
return clean_text(target_text, MAX_TARGET_CHARS) == clean_text(language_fallback(language), MAX_TARGET_CHARS)
@functools.lru_cache(maxsize=1)
def get_tts_model() -> Any:
from voxcpm import VoxCPM
return VoxCPM.from_pretrained(TTS_MODEL_ID, load_denoiser=False)
@functools.lru_cache(maxsize=1)
def get_judge_model() -> tuple[Any, Any]:
import torch
from transformers import AutoTokenizer
model_errors: list[str] = []
model_classes: list[Any] = []
try:
from transformers import AutoModelForMultimodalLM
model_classes.append(AutoModelForMultimodalLM)
except Exception as exc:
model_errors.append(str(exc))
try:
from transformers import AutoModelForCausalLM
model_classes.append(AutoModelForCausalLM)
except Exception as exc:
model_errors.append(str(exc))
tokenizer = AutoTokenizer.from_pretrained(JUDGE_MODEL_ID, trust_remote_code=True)
last_error: Exception | None = None
for model_class in model_classes:
try:
model = model_class.from_pretrained(
JUDGE_MODEL_ID,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
)
model.eval()
return tokenizer, model
except Exception as exc:
last_error = exc
model_errors.append(str(exc))
raise RuntimeError("Could not load the judging model: " + "; ".join(model_errors)) from last_error
@functools.lru_cache(maxsize=1)
def get_english_asr_model() -> tuple[Any, Any, Any]:
import torch
from transformers import AutoModelForCTC, AutoProcessor
processor = AutoProcessor.from_pretrained(ENGLISH_ASR_MODEL_ID)
model = AutoModelForCTC.from_pretrained(ENGLISH_ASR_MODEL_ID)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
return processor, model, device
def run_judge(messages: list[dict[str, str]], max_new_tokens: int = 360) -> str:
import torch
tokenizer, model = get_judge_model()
template_kwargs = {
"tokenize": True,
"add_generation_prompt": True,
"return_dict": True,
"return_tensors": "pt",
}
try:
inputs = tokenizer.apply_chat_template(messages, enable_thinking=False, **template_kwargs)
except TypeError:
inputs = tokenizer.apply_chat_template(messages, **template_kwargs)
device = next(model.parameters()).device
inputs = {key: value.to(device) for key, value in inputs.items()}
with torch.inference_mode():
output_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False,
repetition_penalty=1.02,
)
generated = output_ids[0][inputs["input_ids"].shape[-1] :]
return tokenizer.decode(generated, skip_special_tokens=True).strip()
def selected_text(language: str, level: str, custom_text: str, starter_counter: int = 0) -> tuple[str, str, int]:
custom_text = clean_text(custom_text, MAX_TARGET_CHARS)
if custom_text:
return custom_text, "custom", -1
phrases = starter_phrases(language, level)
line_index = int(starter_counter or 0) % max(1, len(phrases))
return phrases[line_index], "starter", line_index
def build_voice_text(target_text: str, voice_style: str) -> str:
description = VOICE_STYLES.get(voice_style, VOICE_STYLES["Careful"])
return f"({description}){target_text}"
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "item"
def audio_cache_key(language: str, level: str, voice_style: str, target_text: str) -> str:
payload = json.dumps(
{
"version": TTS_CACHE_VERSION,
"language": language,
"level": level,
"voice_style": voice_style,
"target_text": target_text,
"cfg_value": FAST_TTS_CFG_VALUE,
"steps": FAST_TTS_STEPS,
},
ensure_ascii=False,
sort_keys=True,
)
return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:18]
def sample_relpath(language: str, voice_style: str) -> str:
return f"{slugify(language)}/{slugify(voice_style)}.wav"
def sample_url(language: str, voice_style: str) -> str:
return f"{SAMPLE_BASE_URL}/{sample_relpath(language, voice_style)}"
def audio_data_uri(path: str) -> str:
data = Path(path).read_bytes()
return "data:audio/wav;base64," + base64.b64encode(data).decode("ascii")
def render_reference_player(
path: str | None = None,
note: str = "Voice appears here after Make voice.",
loading: bool = False,
) -> str:
if not path or not Path(path).exists():
spinner = ' ' if loading else ""
loading_class = " is-loading" if loading else ""
return (
'
'
f'
{spinner}{html.escape(note)}
'
"
"
)
return (
'"
)
def ensure_sample_audio(language: str, voice_style: str, cache_key: str) -> str | None:
path = GENERATED_DIR / f"sample_{cache_key}.wav"
if path.exists():
return str(path)
url = sample_url(language, voice_style)
try:
with urllib.request.urlopen(url, timeout=25) as response:
raw = response.read()
except (urllib.error.URLError, TimeoutError, OSError) as exc:
print(f"Sample audio unavailable for {language}/{voice_style}: {exc}")
return None
if len(raw) < 1000:
print(f"Sample audio too small for {language}/{voice_style}: {len(raw)} bytes")
return None
path.write_bytes(raw)
try:
audio, sr = load_audio(str(path))
if len(audio) / sr < 0.5:
path.unlink(missing_ok=True)
return None
except Exception as exc:
print(f"Sample audio invalid for {language}/{voice_style}: {exc}")
path.unlink(missing_ok=True)
return None
return str(path)
def normalize_wav(wav: np.ndarray, sample_rate: int) -> np.ndarray:
wav = np.asarray(wav, dtype=np.float32).squeeze()
if wav.ndim > 1:
wav = np.mean(wav, axis=-1)
if sample_rate != TARGET_SAMPLE_RATE:
wav = librosa.resample(wav, orig_sr=sample_rate, target_sr=TARGET_SAMPLE_RATE)
peak = float(np.max(np.abs(wav))) if wav.size else 0.0
if peak > 0:
wav = wav / peak * 0.94
return wav
@gpu_task(duration=120)
def synthesize_reference_file(prompt_text: str, output_path: str) -> None:
require_gpu("Making the voice")
model = get_tts_model()
try:
wav = model.generate(
text=prompt_text,
cfg_value=FAST_TTS_CFG_VALUE,
inference_timesteps=FAST_TTS_STEPS,
retry_badcase=False,
denoise=False,
)
except TypeError:
wav = model.generate(
text=prompt_text,
cfg_value=FAST_TTS_CFG_VALUE,
inference_timesteps=FAST_TTS_STEPS,
)
sample_rate = int(getattr(getattr(model, "tts_model", None), "sample_rate", 48000))
wav = normalize_wav(wav, sample_rate)
sf.write(output_path, wav, TARGET_SAMPLE_RATE, subtype="PCM_16")
def create_practice_audio(
language: str,
level: str,
voice_style: str,
custom_text: str,
starter_counter: int,
) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, int, Any]:
target_text, source, line_index = selected_text(language, level, custom_text, starter_counter)
cache_key = audio_cache_key(language, level, voice_style, target_text)
output_path = GENERATED_DIR / f"reference_{cache_key}.wav"
cached_path = AUDIO_CACHE.get(cache_key)
next_counter = int(starter_counter or 0) + (1 if source == "starter" else 0)
if source == "starter" and line_index == 0 and has_prebuilt_sample(language, target_text):
sample_path = ensure_sample_audio(language, voice_style, cache_key)
if sample_path:
AUDIO_CACHE[cache_key] = sample_path
state = build_state(target_text, language, level, voice_style, "sample", sample_path)
return (
render_reference_player(sample_path),
render_phrase_card(target_text, language, level, voice_style),
render_status("Voice ready from the example library."),
state,
render_steps("say"),
"",
render_empty_score(),
render_empty_feedback(),
render_next_card("Try this next", "Record yourself saying the line above."),
next_counter,
gr.update(interactive=True, value="Make voice"),
)
if (cached_path and Path(cached_path).exists()) or output_path.exists():
path = str(Path(cached_path) if cached_path else output_path)
AUDIO_CACHE[cache_key] = path
state = build_state(target_text, language, level, voice_style, source, path)
return (
render_reference_player(path),
render_phrase_card(target_text, language, level, voice_style),
render_status("Voice ready. Same choices play instantly next time."),
state,
render_steps("say"),
"",
render_empty_score(),
render_empty_feedback(),
render_next_card("Try this next", "Record yourself saying the line above."),
next_counter,
gr.update(interactive=True, value="Make voice"),
)
prompt_text = build_voice_text(target_text, voice_style)
try:
synthesize_reference_file(prompt_text, str(output_path))
except Exception as exc:
print(f"Voice generation failed: {exc}")
return (
render_reference_player(None, "Could not make the voice. Try again."),
render_phrase_card(target_text, language, level, voice_style),
render_status("Could not make the voice. Try a shorter line or press Make voice again."),
{},
render_steps("pick"),
"",
render_empty_score(),
"""
Your feedback
Make the voice first. Then listen, record, and score.
""",
render_next_card("Try this next", "Try a shorter line, then press Make voice."),
next_counter,
gr.update(interactive=True, value="Make voice"),
)
AUDIO_CACHE[cache_key] = str(output_path)
state = build_state(target_text, language, level, voice_style, source, str(output_path))
return (
render_reference_player(str(output_path)),
render_phrase_card(target_text, language, level, voice_style),
render_status("Voice ready. Listen once, then say it."),
state,
render_steps("say"),
"",
render_empty_score(),
render_empty_feedback(),
render_next_card("Try this next", "Record yourself saying the line above."),
next_counter,
gr.update(interactive=True, value="Make voice"),
)
def build_state(
target_text: str,
language: str,
level: str,
voice_style: str,
source: str,
reference_audio: str,
) -> dict[str, Any]:
return {
"target_text": target_text,
"language": language,
"level": level,
"voice_style": voice_style,
"source": source,
"reference_audio": reference_audio,
"sample_rate": TARGET_SAMPLE_RATE,
}
def audio_path_from_gradio(value: Any) -> str:
if isinstance(value, str):
path = value
elif isinstance(value, dict):
path = str(value.get("path") or value.get("name") or "")
else:
path = ""
if not path:
raise ValueError("Record your voice first, then press Score.")
if not Path(path).exists():
raise ValueError("I could not read that recording. Record once more, then press Score.")
return path
def suffix_for_mime(mime_type: str) -> str:
mime_type = (mime_type or "").split(";", 1)[0].strip().lower()
return {
"audio/webm": ".webm",
"audio/ogg": ".ogg",
"audio/oga": ".ogg",
"audio/mp4": ".m4a",
"audio/mpeg": ".mp3",
"audio/wav": ".wav",
"audio/x-wav": ".wav",
}.get(mime_type, ".webm")
def decode_recording_payload(payload: str | None) -> str:
payload = clean_text(payload, MAX_RECORDING_BYTES * 2)
if not payload:
raise ValueError("Record your voice first, then press Score.")
try:
data = json.loads(payload)
except json.JSONDecodeError as exc:
raise ValueError("The recording data was not readable. Record once more.") from exc
data_url = str(data.get("dataUrl") or "")
match = re.match(r"^data:([^;,]+)(?:;[^,]*)?;base64,(.+)$", data_url, flags=re.DOTALL)
if not match:
raise ValueError("The recording was incomplete. Record once more.")
mime_type = str(data.get("mimeType") or match.group(1))
try:
raw = base64.b64decode(match.group(2), validate=True)
except Exception as exc:
raise ValueError("The recording could not be decoded. Record once more.") from exc
if len(raw) < 1200:
raise ValueError("That recording is too small. Record the full line.")
if len(raw) > MAX_RECORDING_BYTES:
raise ValueError("That recording is too large. Keep it under 20 seconds.")
token = uuid.uuid4().hex
raw_path = RECORDING_DIR / f"attempt_{token}{suffix_for_mime(mime_type)}"
wav_path = RECORDING_DIR / f"attempt_{token}.wav"
raw_path.write_bytes(raw)
command = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
str(raw_path),
"-ac",
"1",
"-ar",
str(TARGET_SAMPLE_RATE),
str(wav_path),
]
try:
subprocess.run(command, check=True, capture_output=True, text=True)
except Exception as exc:
raise ValueError("The recording could not be prepared for scoring. Record once more.") from exc
return str(wav_path)
def load_audio(path: str, sr: int = TARGET_SAMPLE_RATE) -> tuple[np.ndarray, int]:
audio, _ = librosa.load(path, sr=sr, mono=True)
audio = np.asarray(audio, dtype=np.float32)
if audio.size == 0:
raise ValueError("The audio is empty. Record once more.")
peak = float(np.max(np.abs(audio)))
if peak < 1e-5:
raise ValueError("The recording sounds silent. Check the microphone and record again.")
audio = audio / peak
return audio, sr
def active_audio_seconds(audio: np.ndarray, sr: int) -> float:
if audio.size == 0:
return 0.0
rms = librosa.feature.rms(y=audio, frame_length=1024, hop_length=256)[0]
if rms.size == 0:
return 0.0
threshold = max(0.015, float(np.percentile(rms, 90)) * 0.35)
active_frames = int(np.sum(rms > threshold))
return active_frames * 256 / sr
def resample_vector(values: np.ndarray, length: int) -> np.ndarray:
values = np.asarray(values, dtype=np.float32)
if values.size == 0:
return np.zeros(length, dtype=np.float32)
if values.size == length:
return values
x_old = np.linspace(0.0, 1.0, values.size)
x_new = np.linspace(0.0, 1.0, length)
return np.interp(x_new, x_old, values).astype(np.float32)
def safe_correlation(a: np.ndarray, b: np.ndarray) -> float:
if a.size < 3 or b.size < 3:
return 0.0
if float(np.std(a)) < 1e-6 or float(np.std(b)) < 1e-6:
return 0.0
return float(np.corrcoef(a, b)[0, 1])
def feature_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]:
ref_mfcc = librosa.feature.mfcc(y=reference, sr=sr, n_mfcc=13)
user_mfcc = librosa.feature.mfcc(y=attempt, sr=sr, n_mfcc=13)
ref_mfcc = librosa.util.normalize(ref_mfcc, axis=1)
user_mfcc = librosa.util.normalize(user_mfcc, axis=1)
cost_matrix, _ = librosa.sequence.dtw(X=ref_mfcc, Y=user_mfcc, metric="cosine")
mean_cost = float(cost_matrix[-1, -1] / max(cost_matrix.shape))
score = clamp_score(100.0 * (1.0 - min(mean_cost, 0.8) / 0.8))
return score, mean_cost
def rhythm_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]:
ref_rms = librosa.feature.rms(y=reference, frame_length=1024, hop_length=256)[0]
user_rms = librosa.feature.rms(y=attempt, frame_length=1024, hop_length=256)[0]
target_len = max(16, min(240, max(ref_rms.size, user_rms.size)))
ref_curve = resample_vector(ref_rms / (np.max(ref_rms) + 1e-6), target_len)
user_curve = resample_vector(user_rms / (np.max(user_rms) + 1e-6), target_len)
corr = safe_correlation(ref_curve, user_curve)
score = clamp_score(55.0 + 45.0 * corr)
return score, corr
def melody_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]:
try:
ref_pitch = librosa.yin(reference, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256)
user_pitch = librosa.yin(attempt, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256)
ref_pitch = np.log(np.maximum(ref_pitch, 1.0))
user_pitch = np.log(np.maximum(user_pitch, 1.0))
target_len = max(16, min(240, max(ref_pitch.size, user_pitch.size)))
ref_curve = resample_vector(ref_pitch - np.median(ref_pitch), target_len)
user_curve = resample_vector(user_pitch - np.median(user_pitch), target_len)
corr = safe_correlation(ref_curve, user_curve)
return clamp_score(55.0 + 45.0 * corr), corr
except Exception as exc:
print(f"Pitch scoring unavailable: {exc}")
return 50, 0.0
def compare_audio(reference_path: str, attempt_path: str) -> ScoreResult:
reference, sr = load_audio(reference_path)
attempt, _ = load_audio(attempt_path, sr=sr)
ref_duration = len(reference) / sr
attempt_duration = len(attempt) / sr
if attempt_duration > MAX_ATTEMPT_SECONDS:
raise ValueError("Keep your recording under 20 seconds, then score again.")
min_attempt_duration = max(0.75, min(1.25, ref_duration * 0.35))
if attempt_duration < min_attempt_duration:
raise ValueError(f"The recording is too short ({attempt_duration:.1f}s). Say the whole line, then score again.")
active_seconds = active_audio_seconds(attempt, sr)
min_active_seconds = max(0.45, min(1.0, ref_duration * 0.22))
if active_seconds < min_active_seconds:
raise ValueError("The recording is mostly quiet. Check the microphone, speak the line, then score again.")
duration_ratio = attempt_duration / max(ref_duration, 0.1)
timing = clamp_score(100.0 * (1.0 - min(abs(duration_ratio - 1.0), 0.75) / 0.75))
voice_shape, mfcc_cost = feature_score(reference, attempt, sr)
rhythm, rhythm_corr = rhythm_score(reference, attempt, sr)
melody, melody_corr = melody_score(reference, attempt, sr)
overall = clamp_score(0.40 * voice_shape + 0.25 * timing + 0.22 * rhythm + 0.13 * melody)
evidence = {
"baseline_score": overall,
"voice_shape": voice_shape,
"timing": timing,
"rhythm": rhythm,
"melody": melody,
"reference_duration_seconds": round(ref_duration, 2),
"attempt_duration_seconds": round(attempt_duration, 2),
"active_speech_seconds": round(active_seconds, 2),
"duration_ratio": round(duration_ratio, 3),
"mfcc_dtw_cost": round(mfcc_cost, 4),
"rhythm_correlation": round(rhythm_corr, 3),
"melody_correlation": round(melody_corr, 3),
}
return ScoreResult(
overall=overall,
voice_shape=voice_shape,
timing=timing,
rhythm=rhythm,
melody=melody,
reference_duration=ref_duration,
attempt_duration=attempt_duration,
duration_ratio=duration_ratio,
evidence=evidence,
)
def normalize_word(word: str) -> str:
return re.sub(r"[^a-z0-9']", "", word.lower())
def word_similarity(a: str, b: str) -> float:
return difflib.SequenceMatcher(None, a, b).ratio()
def align_words(ref_text: str, user_text: str) -> tuple[list[dict[str, Any]], int]:
ref_tokens = [token for token in ref_text.split() if normalize_word(token)]
user_tokens = [token for token in user_text.split() if normalize_word(token)]
ref_norm = [normalize_word(token) for token in ref_tokens]
user_norm = [normalize_word(token) for token in user_tokens]
matcher = difflib.SequenceMatcher(None, ref_norm, user_norm)
feedback: list[dict[str, Any]] = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == "equal":
for offset, idx in enumerate(range(i1, i2)):
feedback.append(
{
"word": ref_tokens[idx],
"spoken": user_tokens[j1 + offset],
"status": "matched",
}
)
elif tag == "replace":
ref_block = ref_tokens[i1:i2]
user_block = user_tokens[j1:j2]
for idx, ref_word in enumerate(ref_block):
spoken = user_block[idx] if idx < len(user_block) else ""
similarity = word_similarity(normalize_word(ref_word), normalize_word(spoken)) if spoken else 0.0
feedback.append(
{
"word": ref_word,
"spoken": spoken,
"status": "close" if similarity >= 0.68 else "missed",
}
)
if len(user_block) > len(ref_block):
for extra in user_block[len(ref_block) :]:
feedback.append({"word": "", "spoken": extra, "status": "extra"})
elif tag == "delete":
for idx in range(i1, i2):
feedback.append({"word": ref_tokens[idx], "spoken": "", "status": "missed"})
elif tag == "insert":
for idx in range(j1, j2):
feedback.append({"word": "", "spoken": user_tokens[idx], "status": "extra"})
target_count = max(1, len(ref_tokens))
matched = sum(1 for item in feedback if item["status"] == "matched")
close = sum(1 for item in feedback if item["status"] == "close")
extra = sum(1 for item in feedback if item["status"] == "extra")
raw = (matched + close * 0.45) / target_count - extra * 0.12 / target_count
return feedback, clamp_score(raw * 100)
def transcribe_english(path: str) -> str:
import torch
processor, model, device = get_english_asr_model()
audio, _ = load_audio(path)
inputs = processor(audio, sampling_rate=TARGET_SAMPLE_RATE, return_tensors="pt", padding=True)
inputs = {key: value.to(device) for key, value in inputs.items()}
with torch.inference_mode():
logits = model(**inputs).logits
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.batch_decode(predicted_ids)[0]
return clean_text(transcription.lower(), 400)
def english_word_evidence(language: str, target_text: str, attempt_path: str) -> dict[str, Any]:
if language.strip().lower() != "english":
return {
"enabled": False,
"status": "skipped",
}
transcript = transcribe_english(attempt_path)
word_feedback, word_score = align_words(target_text, transcript)
return {
"enabled": True,
"status": "ready",
"target_text": target_text,
"user_transcript": transcript,
"word_match_score": word_score,
"word_feedback": word_feedback[:24],
}
def judge_prompt(state: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> list[dict[str, str]]:
payload = {
"language": state.get("language"),
"level": state.get("level"),
"target_text": state.get("target_text"),
"voice_style": state.get("voice_style"),
"acoustic_evidence": score.evidence,
"word_evidence": word_evidence,
}
return [
{
"role": "system",
"content": (
"You are the visible judge for a short language shadowing practice app. "
"Use only the supplied evidence. Do not claim this is a validated pronunciation test, "
"accent detector, fluency exam, or clinical tool. Do not mention model names, providers, "
"internal feature names, JSON, or hidden implementation details. Return only one JSON object."
),
},
{
"role": "user",
"content": (
"Judge this attempt and return only strict JSON. Use these keys and value types:\n"
"{\n"
' "score": ,\n'
' "sub_scores": {"words": , "timing": , "rhythm": , "voice_shape": },\n'
' "short_feedback": ,\n'
' "try_next": [, , ],\n'
' "next_line": \n'
"}\n"
"Rules: scores must be integers from 0 to 100. Do not copy the schema text. "
"Do not use placeholders like short action or one friendly sentence. "
"Keep every sentence short and useful for a child. "
"Do not mention skipped, missing, unavailable, or language-specific word tips to the learner. "
"If English word evidence is ready, use it heavily for the words score. "
"Audio duration and speech activity already passed validation, so do not set every score to 0. "
"If word evidence is skipped, still judge timing, rhythm, and voice shape from acoustic evidence.\n\n"
f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
),
},
]
def repair_judge_prompt(
state: dict[str, Any],
score: ScoreResult,
word_evidence: dict[str, Any],
bad_response: str,
error: str,
) -> list[dict[str, str]]:
payload = {
"language": state.get("language"),
"level": state.get("level"),
"target_text": state.get("target_text"),
"voice_style": state.get("voice_style"),
"acoustic_evidence": score.evidence,
"word_evidence": word_evidence,
"previous_response_problem": error,
"previous_response": clean_text(bad_response, 900),
}
return [
{
"role": "system",
"content": (
"You fix one bad judging response for a short language shadowing app. "
"Use only the supplied evidence. Return only one valid JSON object. "
"Do not mention model names, providers, JSON, or hidden implementation details in user-facing text."
),
},
{
"role": "user",
"content": (
"The previous response was rejected. Return a real judgement now.\n"
"Required JSON keys: score, sub_scores, short_feedback, try_next, next_line.\n"
"sub_scores must include words, timing, rhythm, voice_shape.\n"
"All scores must be integers 0 to 100.\n"
"short_feedback and try_next must be specific to the evidence, not placeholders.\n"
"Do not mention skipped, missing, unavailable, or language-specific word tips to the learner.\n"
"If English word evidence is ready, use the word_match_score heavily for words.\n"
"Because the attempt passed duration and speech checks, do not return all-zero scores.\n\n"
f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
),
},
]
def extract_json_object(text: str) -> dict[str, Any]:
text = text.strip()
text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text, flags=re.IGNORECASE | re.DOTALL).strip()
start = text.find("{")
if start < 0:
raise ValueError("No JSON object found")
depth = 0
in_string = False
escaped = False
for index in range(start, len(text)):
char = text[index]
if in_string:
if escaped:
escaped = False
elif char == "\\":
escaped = True
elif char == '"':
in_string = False
continue
if char == '"':
in_string = True
elif char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return json.loads(text[start : index + 1])
raise ValueError("JSON object was incomplete")
PLACEHOLDER_SNIPPETS = (
"one friendly sentence",
"short action",
"specific short action",
"a short next practice line",
"next practice line in the same language",
" bool:
text = re.sub(r"\s+", " ", str(value or "").strip().lower())
if not text:
return True
return any(snippet in text for snippet in PLACEHOLDER_SNIPPETS)
def evidence_quality_error(
judgement: dict[str, Any],
score: ScoreResult | None,
word_evidence: dict[str, Any] | None,
) -> str | None:
if score is None:
return None
sub_scores = judgement["sub_scores"]
if judgement["score"] == 0 and all(sub_scores[key] == 0 for key in sub_scores):
if score.overall > 0 or int((word_evidence or {}).get("word_match_score") or 0) > 0:
return "The judging response returned all-zero scores despite usable evidence."
word_match = int((word_evidence or {}).get("word_match_score") or 0)
if (word_evidence or {}).get("enabled") and word_match >= 65 and sub_scores["words"] < 35:
return "The judging response ignored strong English word evidence."
timing_evidence = int(score.evidence.get("timing") or 0)
if timing_evidence >= 55 and sub_scores["timing"] < 20:
return "The judging response ignored usable timing evidence."
return None
def normalize_judgement(
raw_text: str,
fallback_next_line: str,
score: ScoreResult | None = None,
word_evidence: dict[str, Any] | None = None,
) -> dict[str, Any]:
try:
data = extract_json_object(raw_text)
except Exception as exc:
raise ValueError("The judging response was not readable. Try Score again.") from exc
if not isinstance(data, dict):
raise ValueError("The judging response had the wrong shape. Try Score again.")
sub_scores = data.get("sub_scores")
if not isinstance(sub_scores, dict):
raise ValueError("The judging response missed sub-scores. Try Score again.")
required = ["words", "timing", "rhythm", "voice_shape"]
if any(key not in sub_scores for key in required):
raise ValueError("The judging response missed a score field. Try Score again.")
steps = data.get("try_next")
if not isinstance(steps, list) or not steps:
raise ValueError("The judging response missed next steps. Try Score again.")
try:
normalized = {
"score": clamp_score(float(data.get("score"))),
"sub_scores": {key: clamp_score(float(sub_scores[key])) for key in required},
"short_feedback": clean_text(data.get("short_feedback"), 220),
"try_next": [clean_text(step, 120) for step in steps[:3] if clean_text(step, 120)],
"next_line": clean_text(data.get("next_line"), MAX_TARGET_CHARS) or fallback_next_line,
}
except Exception as exc:
raise ValueError("The judging response used invalid score values. Try Score again.") from exc
if looks_like_placeholder(normalized["short_feedback"]):
raise ValueError("The judging response copied placeholder feedback.")
if not normalized["try_next"] or any(looks_like_placeholder(step) for step in normalized["try_next"]):
raise ValueError("The judging response copied placeholder practice steps.")
if looks_like_placeholder(normalized["next_line"]):
normalized["next_line"] = fallback_next_line
quality_error = evidence_quality_error(normalized, score, word_evidence)
if quality_error:
raise ValueError(quality_error)
return normalized
def judge_with_retry(
state: dict[str, Any],
score: ScoreResult,
word_evidence: dict[str, Any],
fallback_next_line: str,
) -> tuple[dict[str, Any], str]:
raw_judgement = run_judge(judge_prompt(state, score, word_evidence))
try:
return normalize_judgement(raw_judgement, fallback_next_line, score, word_evidence), raw_judgement
except ValueError as first_error:
repaired = run_judge(repair_judge_prompt(state, score, word_evidence, raw_judgement, str(first_error)))
try:
return normalize_judgement(repaired, fallback_next_line, score, word_evidence), repaired
except ValueError as second_error:
raise ValueError("The judge returned unusable feedback. Press Score again.") from second_error
def score_headline(value: int) -> str:
value = clamp_score(value)
if value >= 78:
return "Great work"
if value >= 52:
return "Good work"
if value >= 25:
return "Keep going"
return "Try again"
@gpu_task(duration=120)
def score_attempt(
recording_payload: Any,
state: dict[str, Any] | None,
) -> tuple[str, str, str, dict[str, Any], str, Any, str]:
def score_error(message: str) -> tuple[str, str, str, dict[str, Any], str, Any, str]:
return (
render_error_score(message),
f"""
Your feedback
{html.escape(message)}
""",
render_next_card("Try this next", "Fix that, then press Score again."),
{},
render_steps("say"),
gr.update(interactive=True, value="Score"),
render_score_status(message),
)
if not gpu_available():
return score_error("Scoring needs GPU hardware. Try again when the Space is on GPU.")
if not state or not state.get("reference_audio"):
return score_error("Press Make voice first.")
try:
attempt_path = decode_recording_payload(str(recording_payload or ""))
score = compare_audio(state["reference_audio"], attempt_path)
word_evidence = english_word_evidence(state.get("language", ""), state.get("target_text", ""), attempt_path)
judgement, raw_judgement = judge_with_retry(
state,
score,
word_evidence,
language_fallback(state.get("language", "English")),
)
except ValueError as exc:
return score_error(str(exc))
except Exception as exc:
return score_error(f"Scoring could not finish: {exc}")
evidence = {
"state": state,
"acoustic": score.evidence,
"words": word_evidence,
"judge": judgement,
}
return (
render_score_card(judgement, score, word_evidence),
render_feedback_card(judgement, word_evidence),
render_next_card("Try this next", judgement["next_line"]),
evidence,
render_steps("tips"),
gr.update(interactive=True, value="Score"),
render_score_status("Score ready."),
)
def render_steps(active: str = "pick") -> str:
steps = [
("pick", "1", "Pick"),
("listen", "2", "Listen"),
("say", "3", "Say it"),
("tips", "4", "Score it"),
]
chunks = ['']
for key, number, label in steps:
active_class = " is-active" if key == active else ""
chunks.append(
f'
'
f'
{number}
'
f'
{html.escape(label)}
'
"
"
)
chunks.append("
")
return "".join(chunks)
def render_phrase_card(target_text: str, language: str, level: str, voice_style: str) -> str:
return f"""
{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}
{html.escape(target_text)}
"""
def render_initial_phrase() -> str:
return render_phrase_card("Pick your practice, then press Make voice.", "Ready", "Step 1", "Simple")
def render_status(message: str, loading: bool = False) -> str:
spinner = ' ' if loading else ""
loading_class = " is-loading" if loading else ""
return f'{spinner}{html.escape(message)}
'
def render_loading_phrase(target_text: str, language: str, level: str, voice_style: str) -> str:
return f"""
{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}
Making your voice
{html.escape(target_text)}
"""
def begin_make_voice(
language: str,
level: str,
voice_style: str,
custom_text: str,
starter_counter: int,
) -> tuple[str, str, str, str, str, str, Any]:
target_text, _, _ = selected_text(language, level, custom_text, starter_counter)
return (
render_reference_player(None, "Making voice", loading=True),
render_loading_phrase(target_text, language, level, voice_style),
render_status("Making voice. This can take a little while the first time.", loading=True),
render_steps("listen"),
"""
Your feedback
First we make the reference voice. Then you can listen and record.
""",
render_next_card("Try this next", "Wait for the voice, then press play."),
gr.update(interactive=False, value="Making..."),
)
def preview_selection(
language: str,
level: str,
voice_style: str,
custom_text: str,
starter_counter: int,
) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, str]:
target_text, _, _ = selected_text(language, level, custom_text, starter_counter)
return (
render_phrase_card(target_text, language, level, voice_style),
render_reference_player(None, "Press Make voice to use these choices."),
render_status("Choices ready. Press Make voice."),
{},
"",
render_empty_score(),
render_empty_feedback(),
render_next_card("Try this next", "Press Make voice to hear the new choice."),
render_steps("pick"),
render_score_status(),
)
def begin_scoring() -> tuple[str, str, str, Any]:
return (
"""
Your feedback
Checking your voice now...
""",
render_steps("tips"),
render_score_status("Scoring your voice...", loading=True),
gr.update(interactive=False, value="Scoring..."),
)
def render_empty_score() -> str:
return 'Your score appears here after you record and press Score.
'
def render_score_status(message: str = "", loading: bool = False) -> str:
if not message:
return '
'
spinner = ' ' if loading else ""
loading_class = " is-loading" if loading else ""
return f'{spinner}{html.escape(message)}
'
def render_error_score(message: str) -> str:
return f"""
Try again
{html.escape(message)}
"""
def render_empty_feedback() -> str:
return """
Your feedback
Listen first. Then record your voice. Then press Score.
"""
def render_meter(label: str, value: int) -> str:
value = clamp_score(value)
return f"""
{html.escape(label)} {value}
"""
def render_score_card(judgement: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> str:
sub_scores = judgement["sub_scores"]
note = f"Reference {score.reference_duration:.1f}s. Your voice {score.attempt_duration:.1f}s."
return f"""
{judgement['score']}
{html.escape(score_headline(judgement["score"]))}
{html.escape(note)}
{render_meter("Words", sub_scores["words"])}
{render_meter("Timing", sub_scores["timing"])}
{render_meter("Rhythm", sub_scores["rhythm"])}
{render_meter("Voice shape", sub_scores["voice_shape"])}
"""
def render_word_note(word_evidence: dict[str, Any]) -> str:
if not word_evidence.get("enabled"):
return ""
transcript = clean_text(word_evidence.get("user_transcript"), 160)
if not transcript:
transcript = "I could not hear clear words."
return f'I heard: {html.escape(transcript)}
'
def render_feedback_card(judgement: dict[str, Any], word_evidence: dict[str, Any]) -> str:
steps = "".join(f"{html.escape(step)} " for step in judgement["try_next"])
return f"""
Your feedback
{html.escape(judgement["short_feedback"])}
{steps}
{render_word_note(word_evidence)}
"""
def render_next_card(title: str, line: str) -> str:
return f"""
{html.escape(title)}
{html.escape(line)}
"""
RECORDER_HTML = """
Record
Stop
Press Record, say the line, then press Stop.
"""
RECORDER_JS = """
const startButton = element.querySelector("[data-start]");
const stopButton = element.querySelector("[data-stop]");
const status = element.querySelector("[data-status]");
const level = element.querySelector("[data-level]");
const preview = element.querySelector("[data-preview]");
let stream = null;
let recorder = null;
let chunks = [];
let startedAt = 0;
let audioContext = null;
let analyser = null;
let meterFrame = null;
function setValue(value) {
props.value = value;
trigger("change");
}
function preferredMimeType() {
const candidates = [
"audio/webm;codecs=opus",
"audio/webm",
"audio/ogg;codecs=opus",
"audio/ogg",
"audio/mp4"
];
if (!window.MediaRecorder) return "";
for (const candidate of candidates) {
if (MediaRecorder.isTypeSupported(candidate)) return candidate;
}
return "";
}
function updateMeter() {
if (!analyser) return;
const data = new Uint8Array(analyser.fftSize);
analyser.getByteTimeDomainData(data);
let sum = 0;
for (const sample of data) {
const centered = (sample - 128) / 128;
sum += centered * centered;
}
const rms = Math.sqrt(sum / data.length);
level.style.width = Math.min(100, Math.round(rms * 380)) + "%";
meterFrame = requestAnimationFrame(updateMeter);
}
function cleanup() {
if (meterFrame) cancelAnimationFrame(meterFrame);
meterFrame = null;
if (stream) stream.getTracks().forEach((track) => track.stop());
stream = null;
if (audioContext) audioContext.close().catch(() => {});
audioContext = null;
analyser = null;
level.style.width = "0%";
}
function blobToDataUrl(blob) {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
reader.onerror = reject;
reader.readAsDataURL(blob);
});
}
startButton.addEventListener("click", async () => {
try {
setValue("");
chunks = [];
preview.removeAttribute("src");
preview.style.display = "none";
stream = await navigator.mediaDevices.getUserMedia({
audio: {
echoCancellation: true,
noiseSuppression: true,
autoGainControl: true
}
});
audioContext = new (window.AudioContext || window.webkitAudioContext)();
const source = audioContext.createMediaStreamSource(stream);
analyser = audioContext.createAnalyser();
analyser.fftSize = 256;
source.connect(analyser);
updateMeter();
const mimeType = preferredMimeType();
recorder = new MediaRecorder(stream, mimeType ? { mimeType } : undefined);
recorder.ondataavailable = (event) => {
if (event.data && event.data.size > 0) chunks.push(event.data);
};
recorder.onstop = async () => {
const durationSeconds = (performance.now() - startedAt) / 1000;
const blob = new Blob(chunks, { type: recorder.mimeType || mimeType || "audio/webm" });
cleanup();
startButton.disabled = false;
stopButton.disabled = true;
if (durationSeconds < 0.7 || blob.size < 1200) {
status.textContent = "Too short. Press Record and say the whole line.";
return;
}
preview.src = URL.createObjectURL(blob);
preview.style.display = "block";
const dataUrl = await blobToDataUrl(blob);
setValue(JSON.stringify({
dataUrl,
mimeType: blob.type || "audio/webm",
durationSeconds,
sizeBytes: blob.size,
createdAt: Date.now()
}));
status.textContent = `Recorded ${durationSeconds.toFixed(1)}s. Play it back, then Score.`;
};
recorder.start();
startedAt = performance.now();
startButton.disabled = true;
stopButton.disabled = false;
status.textContent = "Recording... speak now.";
} catch (error) {
cleanup();
startButton.disabled = false;
stopButton.disabled = true;
status.textContent = `Microphone error: ${error.message || error}`;
}
});
stopButton.addEventListener("click", () => {
if (recorder && recorder.state !== "inactive") {
status.textContent = "Preparing recording...";
recorder.stop();
}
});
"""
class NativeRecorder(gr.HTML):
def __init__(self, value: str = "", **kwargs: Any) -> None:
super().__init__(
value=value,
html_template=RECORDER_HTML,
js_on_load=RECORDER_JS,
container=False,
**kwargs,
)
def api_info(self) -> dict[str, str]:
return {"type": "string"}
with gr.Blocks(
title="EchoYard",
theme=gr.themes.Base(primary_hue="teal", neutral_hue="slate"),
css=CSS,
) as demo:
app_state = gr.State({})
score_state = gr.State({})
starter_counter = gr.State(0)
gr.HTML(
"""
"""
)
stepper = gr.HTML(render_steps("pick"))
with gr.Row(elem_classes=["lpl-layout"]):
with gr.Column(scale=3, min_width=280, elem_classes=["lpl-choose"]):
gr.HTML('Choose your practice ')
language = gr.Dropdown(SUPPORTED_LANGUAGES, value="English", label="Language", filterable=True)
level = gr.Radio(LEVELS, value="A2", label="Level", elem_classes=["lpl-level-radio"])
voice_style = gr.Radio(
list(VOICE_STYLES),
value="Careful",
label="Voice style",
elem_classes=["lpl-voice-radio"],
)
custom_text = gr.Textbox(
label="Words to say",
value="",
placeholder="Leave blank for a short practice line.",
lines=3,
max_lines=3,
max_length=MAX_TARGET_CHARS,
)
generate_btn = gr.Button("Make voice", variant="primary", elem_classes=["lpl-main-btn"])
with gr.Column(scale=6, min_width=420, elem_classes=["lpl-practice"]):
phrase_card = gr.HTML(render_initial_phrase())
with gr.Row(elem_classes=["lpl-media-grid"]):
with gr.Column(elem_classes=["lpl-audio-panel"]):
gr.HTML('Reference listen first
')
reference_player = gr.HTML(render_reference_player(), container=False)
with gr.Column(elem_classes=["lpl-audio-panel", "is-record"]):
gr.HTML('Your turn speak now
')
attempt_recorder = NativeRecorder(value="", elem_id="native-recorder")
generation_status = gr.HTML(render_status("Press Make voice. Then listen, record, and score."))
next_panel = gr.HTML(render_next_card("Try this next", "Make a voice to begin."))
with gr.Column(scale=3, min_width=300, elem_classes=["lpl-feedback"]):
gr.HTML('Your feedback ')
score_btn = gr.Button("Score", variant="primary", elem_classes=["lpl-score-btn"])
score_status = gr.HTML(render_score_status(), container=False)
score_panel = gr.HTML(render_empty_score())
feedback_panel = gr.HTML(render_empty_feedback())
gr.HTML(
"""
"""
)
for picker in (language, level, voice_style, custom_text):
picker.change(
fn=preview_selection,
inputs=[language, level, voice_style, custom_text, starter_counter],
outputs=[
phrase_card,
reference_player,
generation_status,
app_state,
attempt_recorder,
score_panel,
feedback_panel,
next_panel,
stepper,
score_status,
],
show_progress="hidden",
api_visibility="private",
)
make_voice_start = generate_btn.click(
fn=begin_make_voice,
inputs=[language, level, voice_style, custom_text, starter_counter],
outputs=[reference_player, phrase_card, generation_status, stepper, feedback_panel, next_panel, generate_btn],
show_progress="hidden",
api_visibility="private",
)
make_voice_start.then(
fn=create_practice_audio,
inputs=[language, level, voice_style, custom_text, starter_counter],
outputs=[
reference_player,
phrase_card,
generation_status,
app_state,
stepper,
attempt_recorder,
score_panel,
feedback_panel,
next_panel,
starter_counter,
generate_btn,
],
show_progress="minimal",
show_progress_on=generate_btn,
api_visibility="private",
concurrency_id="voice",
concurrency_limit=1,
)
score_start = score_btn.click(
fn=begin_scoring,
inputs=None,
outputs=[feedback_panel, stepper, score_status, score_btn],
show_progress="hidden",
api_visibility="private",
)
score_start.then(
fn=score_attempt,
inputs=[attempt_recorder, app_state],
outputs=[score_panel, feedback_panel, next_panel, score_state, stepper, score_btn, score_status],
show_progress="minimal",
show_progress_on=score_btn,
api_visibility="private",
concurrency_id="score",
concurrency_limit=1,
)
if __name__ == "__main__":
demo.queue(default_concurrency_limit=1).launch()