from __future__ import annotations import difflib import base64 import functools import hashlib import html import json import math import re import subprocess import urllib.error import urllib.request import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Callable import gradio as gr import librosa import numpy as np import soundfile as sf try: import spaces gpu_task: Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]] = spaces.GPU except Exception: def gpu_task(*_args: Any, **_kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: return fn return decorator APP_DIR = Path(__file__).resolve().parent GENERATED_DIR = APP_DIR / "generated" RECORDING_DIR = APP_DIR / "recordings" GENERATED_DIR.mkdir(exist_ok=True) RECORDING_DIR.mkdir(exist_ok=True) TTS_MODEL_ID = "openbmb/VoxCPM2" JUDGE_MODEL_ID = "openbmb/MiniCPM5-1B" ENGLISH_ASR_MODEL_ID = "facebook/wav2vec2-base-960h" TARGET_SAMPLE_RATE = 16000 MAX_TARGET_CHARS = 180 MAX_ATTEMPT_SECONDS = 20.0 MAX_RECORDING_BYTES = 12 * 1024 * 1024 FAST_TTS_CFG_VALUE = 1.35 FAST_TTS_STEPS = 4 TTS_CACHE_VERSION = "fast-v3" AUDIO_CACHE: dict[str, str] = {} SAMPLE_DATASET_ID = "loay/build-small-shadowing-mini-audio" SAMPLE_DATASET_REVISION = "main" SAMPLE_VERSION = "v1" SAMPLE_BASE_URL = ( f"https://huggingface.co/datasets/{SAMPLE_DATASET_ID}/resolve/" f"{SAMPLE_DATASET_REVISION}/reference/{SAMPLE_VERSION}" ) SUPPORTED_LANGUAGES = [ "Arabic", "Burmese", "Chinese", "Danish", "Dutch", "English", "Finnish", "French", "German", "Greek", "Hebrew", "Hindi", "Indonesian", "Italian", "Japanese", "Khmer", "Korean", "Lao", "Malay", "Norwegian", "Polish", "Portuguese", "Russian", "Spanish", "Swahili", "Swedish", "Tagalog", "Thai", "Turkish", "Vietnamese", ] LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"] VOICE_STYLES = { "Careful": "clear friendly tutor voice, medium-slow pace", "Happy": "warm upbeat voice, natural pace", "Slow": "slow clear practice voice", "Story": "bright storyteller voice with gentle expression", } FALLBACK_PHRASES = { "Arabic": "أود أن أتمرن على المحادثة كل يوم.", "Burmese": "နေ့တိုင်း စကားပြော လေ့ကျင့်ချင်ပါတယ်။", "Chinese": "我每天都想练习说话。", "Danish": "Jeg vil gerne øve mig i at tale hver dag.", "Dutch": "Ik wil elke dag oefenen met spreken.", "English": "I want to practice speaking clearly every day.", "Finnish": "Haluan harjoitella puhumista selkeästi joka päivä.", "French": "Je veux pratiquer la parole clairement chaque jour.", "German": "Ich möchte jeden Tag klar sprechen üben.", "Greek": "Θέλω να εξασκούμαι στην ομιλία κάθε μέρα.", "Hebrew": "אני רוצה לתרגל דיבור ברור בכל יום.", "Hindi": "मैं हर दिन साफ़ बोलने का अभ्यास करना चाहता हूँ।", "Indonesian": "Saya ingin berlatih berbicara dengan jelas setiap hari.", "Italian": "Voglio esercitarmi a parlare chiaramente ogni giorno.", "Japanese": "毎日、はっきり話す練習をしたいです。", "Khmer": "ខ្ញុំចង់ហាត់និយាយឱ្យច្បាស់រាល់ថ្ងៃ។", "Korean": "저는 매일 또렷하게 말하는 연습을 하고 싶어요.", "Lao": "ຂ້ອຍຢາກຝຶກເວົ້າໃຫ້ຊັດເຈນທຸກມື້.", "Malay": "Saya mahu berlatih bercakap dengan jelas setiap hari.", "Norwegian": "Jeg vil øve på å snakke tydelig hver dag.", "Polish": "Chcę codziennie ćwiczyć wyraźne mówienie.", "Portuguese": "Quero praticar falar com clareza todos os dias.", "Russian": "Я хочу каждый день тренироваться говорить ясно.", "Spanish": "Quiero practicar hablar con claridad todos los días.", "Swahili": "Nataka kufanya mazoezi ya kuzungumza wazi kila siku.", "Swedish": "Jag vill öva på att tala tydligt varje dag.", "Tagalog": "Gusto kong magsanay magsalita nang malinaw araw-araw.", "Thai": "ฉันอยากฝึกพูดให้ชัดเจนทุกวัน", "Turkish": "Her gün açık konuşma pratiği yapmak istiyorum.", "Vietnamese": "Tôi muốn luyện nói rõ ràng mỗi ngày.", } STARTER_PHRASES = { "English": [ "I want to practice speaking clearly every day.", "Today I will speak slowly and clearly.", "Please help me say this sentence better.", "I can listen first and then repeat.", "My voice is getting clearer with practice.", ], } LEVEL_STARTER_PHRASES = { "English": { "A1": [ "I can say this slowly.", "My voice is clear.", "I listen and repeat.", ], "A2": STARTER_PHRASES["English"], "B1": [ "I want to explain my idea clearly today.", "Please listen while I repeat the sentence.", "I can speak with better rhythm and timing.", ], "B2": [ "I am practicing steady speech with natural rhythm.", "Clear pronunciation helps my ideas sound more confident.", "I can repeat the line while keeping the same pace.", ], "C1": [ "I am refining my speech so each phrase sounds precise and natural.", "I want my pacing, stress, and intonation to match the speaker.", "Careful listening helps me improve the shape of every sentence.", ], "C2": [ "I am polishing subtle rhythm, emphasis, and tone in connected speech.", "Shadowing helps me reproduce fluent speech patterns with greater control.", "I can adapt my delivery while preserving clarity, timing, and expression.", ], } } CSS = """ :root { --lpl-navy: #0d2547; --lpl-ink: #12233f; --lpl-muted: #59708f; --lpl-soft: #eef9fd; --lpl-panel: #ffffff; --lpl-line: #d6e7ef; --lpl-teal: #11a99d; --lpl-teal-dark: #05877f; --lpl-coral: #ff6258; --lpl-yellow: #ffc234; --lpl-cream: #fff6df; --lpl-shadow: 0 18px 44px rgba(13, 37, 71, 0.11); color-scheme: light; } html, body { background: linear-gradient(180deg, #f6fcff 0%, #eef8fe 48%, #f9fdff 100%); color: var(--lpl-ink); color-scheme: light !important; } .gradio-container { --body-background-fill: transparent; --body-text-color: var(--lpl-ink); --block-background-fill: transparent; --block-border-color: transparent; --block-info-text-color: var(--lpl-muted); --input-background-fill: #edf8fb; --input-border-color: transparent; --input-placeholder-color: #59708f; --input-text-color: var(--lpl-navy); --button-primary-background-fill: var(--lpl-teal); --button-primary-background-fill-hover: var(--lpl-teal-dark); --button-primary-text-color: #ffffff; max-width: 1480px !important; margin: 0 auto !important; padding: 18px 24px 22px !important; font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif !important; background: transparent !important; color: var(--lpl-ink) !important; color-scheme: light !important; } .gradio-container footer, .gradio-container .prose h1, .gradio-container .prose h2, .gradio-container .prose h3 { display: none !important; } .gradio-container .generating, .gradio-container .pending, .gradio-container .loading { opacity: 1 !important; filter: none !important; color: var(--lpl-ink) !important; } .gradio-container .generating *, .gradio-container .pending *, .gradio-container .loading * { opacity: 1 !important; filter: none !important; } .gradio-container .generating::before, .gradio-container .generating::after, .gradio-container .pending::before, .gradio-container .pending::after { display: none !important; } .lpl-topbar { min-height: 70px; display: flex; align-items: center; justify-content: space-between; gap: 20px; padding: 6px 2px 18px; } .lpl-brand { display: flex; align-items: center; flex-wrap: wrap; gap: 12px; color: var(--lpl-navy); font-weight: 830; font-size: clamp(1.32rem, 2vw, 2rem); letter-spacing: 0; } .lpl-mark { width: 54px; height: 42px; border-radius: 24px 24px 24px 8px; background: linear-gradient(145deg, var(--lpl-teal), #19c6bd); position: relative; box-shadow: 0 10px 20px rgba(17, 169, 157, 0.22); } .lpl-mark::before { content: ""; position: absolute; width: 7px; height: 7px; left: 14px; top: 16px; border-radius: 99px; background: white; box-shadow: 13px 0 0 white, 26px 0 0 white; } .lpl-divider { width: 1px; height: 28px; background: #c9dce7; } .lpl-product { color: var(--lpl-teal-dark); font-weight: 780; font-size: clamp(1rem, 1.5vw, 1.45rem); } .lpl-tagline { color: var(--lpl-muted); font-weight: 740; font-size: clamp(0.95rem, 1.35vw, 1.18rem); } .lpl-steps { display: grid; grid-template-columns: repeat(4, minmax(0, 1fr)); gap: 12px; margin-bottom: 20px; } .lpl-step { min-height: 88px; border-radius: 28px; border: 1px solid #e5f0f5; background: rgba(255, 255, 255, 0.92); color: var(--lpl-ink); display: flex; align-items: center; gap: 16px; padding: 16px 22px; box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); } .lpl-step.is-active { color: #ffffff; background: linear-gradient(135deg, var(--lpl-teal), #13bbb2); border-color: transparent; } .lpl-step-number { width: 46px; height: 46px; border-radius: 999px; display: grid; place-items: center; flex: 0 0 auto; background: #ffffff; color: var(--lpl-navy); font-size: 1.18rem; font-weight: 850; box-shadow: inset 0 0 0 1px rgba(13, 37, 71, 0.08); } .lpl-step-label { font-size: clamp(1.05rem, 1.7vw, 1.42rem); font-weight: 820; } .lpl-layout { align-items: stretch !important; } .lpl-choose, .lpl-practice, .lpl-feedback { background: rgba(255, 255, 255, 0.94); border: 1px solid #e2eef4; border-radius: 24px; box-shadow: var(--lpl-shadow); padding: 22px !important; color: var(--lpl-navy) !important; color-scheme: light !important; } .lpl-choose { min-height: 650px; } .lpl-choose *, .lpl-practice *, .lpl-feedback * { color-scheme: light !important; } .lpl-card-title { margin: 0 0 18px; color: var(--lpl-navy); font-weight: 860; font-size: 1.32rem; } .lpl-field-label { color: var(--lpl-navy); font-weight: 760; font-size: 0.95rem; margin: 14px 0 8px; } .lpl-choose .wrap, .lpl-choose label, .lpl-choose .block-info, .lpl-choose .label-wrap, .lpl-choose .container, .lpl-choose .form { color: var(--lpl-navy) !important; } .lpl-choose .form, .lpl-choose .block, .lpl-choose .wrap, .lpl-choose .container, .lpl-choose [data-testid="block-label"] { background: transparent !important; border: 0 !important; box-shadow: none !important; } .lpl-choose input, .lpl-choose textarea, .lpl-choose select, .lpl-choose .wrap, .lpl-choose [data-testid="dropdown"], .lpl-choose [role="textbox"], .lpl-choose [role="combobox"] { border-radius: 999px !important; background: #edf8fb !important; border-color: transparent !important; color: var(--lpl-navy) !important; } .lpl-choose input::placeholder, .lpl-choose textarea::placeholder { color: #6f8299 !important; opacity: 1 !important; } .lpl-choose textarea { border-radius: 22px !important; min-height: 82px !important; } .lpl-choose .wrap:focus-within, .lpl-choose [data-testid="dropdown"]:focus-within { box-shadow: 0 0 0 3px rgba(17, 169, 157, 0.18) !important; } .gradio-container [role="listbox"], .gradio-container [data-testid="dropdown-options"], .gradio-container .options, .gradio-container .dropdown-options, .gradio-container .select-options, body [role="listbox"], body [data-testid="dropdown-options"], body .options, body .dropdown-options, body .select-options { background: #ffffff !important; color: var(--lpl-navy) !important; border: 1px solid #d7eaf1 !important; border-radius: 18px !important; box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important; } .gradio-container [role="option"], .gradio-container .option, body [role="option"], body .option { background: #ffffff !important; color: var(--lpl-navy) !important; } .gradio-container [role="option"]:hover, .gradio-container [role="option"][aria-selected="true"], .gradio-container .option:hover, .gradio-container .option.selected, body [role="option"]:hover, body [role="option"][aria-selected="true"], body .option:hover, body .option.selected { background: #edf8fb !important; color: var(--lpl-teal-dark) !important; } body .toast-wrap, body .toast, body [data-testid="toast"], .gradio-container .toast-wrap, .gradio-container .toast, .gradio-container [data-testid="toast"] { background: #ffffff !important; color: var(--lpl-navy) !important; border-color: #d7eaf1 !important; box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important; } body .toast *, body [data-testid="toast"] *, .gradio-container .toast *, .gradio-container [data-testid="toast"] * { color: var(--lpl-navy) !important; } .lpl-level-radio .wrap, .lpl-voice-radio .wrap { background: transparent !important; } .lpl-level-radio, .lpl-voice-radio, .lpl-level-radio .wrap, .lpl-voice-radio .wrap, .lpl-level-radio .container, .lpl-voice-radio .container { background: transparent !important; border: 0 !important; box-shadow: none !important; } .lpl-level-radio label, .lpl-voice-radio label { min-height: 44px !important; border-radius: 999px !important; background: #edf8fb !important; border: 1px solid transparent !important; color: var(--lpl-navy) !important; font-weight: 760 !important; } .lpl-level-radio label span, .lpl-voice-radio label span { color: var(--lpl-navy) !important; } .lpl-level-radio input:checked + span, .lpl-voice-radio input:checked + span { color: var(--lpl-teal-dark) !important; } .lpl-level-radio label:has(input:checked), .lpl-voice-radio label:has(input:checked), .lpl-level-radio label[aria-checked="true"], .lpl-voice-radio label[aria-checked="true"], .lpl-level-radio [role="radio"][aria-checked="true"], .lpl-voice-radio [role="radio"][aria-checked="true"], .lpl-level-radio label.selected, .lpl-voice-radio label.selected { background: linear-gradient(135deg, var(--lpl-teal), #18c7be) !important; border-color: transparent !important; color: #ffffff !important; box-shadow: 0 12px 22px rgba(17, 169, 157, 0.24) !important; } .lpl-level-radio label:has(input:checked) *, .lpl-voice-radio label:has(input:checked) *, .lpl-level-radio label[aria-checked="true"] *, .lpl-voice-radio label[aria-checked="true"] *, .lpl-level-radio [role="radio"][aria-checked="true"] *, .lpl-voice-radio [role="radio"][aria-checked="true"] *, .lpl-level-radio label.selected *, .lpl-voice-radio label.selected * { color: #ffffff !important; } .lpl-level-radio label:hover, .lpl-voice-radio label:hover { border-color: #9fdedb !important; transform: translateY(-1px); } .lpl-main-btn, .lpl-score-btn { width: 100%; min-height: 64px !important; border-radius: 999px !important; border: 0 !important; color: #ffffff !important; font-size: 1.25rem !important; font-weight: 850 !important; box-shadow: 0 16px 32px rgba(17, 169, 157, 0.24) !important; } .lpl-main-btn { background: linear-gradient(135deg, var(--lpl-teal), #08bcb3) !important; margin-top: 16px !important; } .lpl-score-btn { background: linear-gradient(135deg, var(--lpl-navy), #123866) !important; box-shadow: 0 18px 36px rgba(13, 37, 71, 0.24) !important; } .lpl-main-btn:disabled, .lpl-main-btn[disabled], .lpl-score-btn:disabled, .lpl-score-btn[disabled] { opacity: 0.62 !important; cursor: wait !important; } .lpl-score-status { min-height: 32px; margin: 10px 0 16px; display: flex; align-items: center; gap: 10px; color: var(--lpl-muted); font-size: 0.96rem; font-weight: 760; } .lpl-score-status.is-loading { color: var(--lpl-navy); } .lpl-spinner { width: 18px; height: 18px; border-radius: 999px; border: 3px solid #dff0f4; border-top-color: var(--lpl-teal); animation: lpl-spin 0.8s linear infinite; } @keyframes lpl-spin { to { transform: rotate(360deg); } } .lpl-practice { min-height: 650px; } .lpl-phrase { min-height: 225px; border-radius: 24px; background: #ffffff; border: 1px solid #e5eef4; box-shadow: 0 14px 34px rgba(13, 37, 71, 0.08); display: flex; flex-direction: column; justify-content: center; padding: 26px 30px; text-align: center; position: relative; overflow: hidden; } .lpl-phrase::after { content: ""; width: 52%; height: 2px; border-radius: 99px; background: repeating-linear-gradient(90deg, #9ee4e0 0 12px, transparent 12px 24px); margin: 22px auto 0; } .lpl-phrase.is-loading { border-color: #b9e9e6; background: radial-gradient(circle at 50% 42%, rgba(17, 169, 157, 0.13), transparent 34%), #ffffff; } .lpl-phrase.is-loading::before { content: ""; position: absolute; inset: -45% auto auto 50%; width: 260px; height: 260px; border-radius: 999px; border: 2px solid rgba(17, 169, 157, 0.22); transform: translateX(-50%); animation: lpl-breathe 1.5s ease-in-out infinite; } .lpl-phrase-meta { color: var(--lpl-muted); font-weight: 720; margin-bottom: 12px; position: relative; z-index: 1; } .lpl-phrase-text { color: var(--lpl-navy); font-size: clamp(2rem, 4.5vw, 3.35rem); line-height: 1.25; font-weight: 900; letter-spacing: 0; position: relative; z-index: 1; } .lpl-loading-line { max-width: 780px; margin: 16px auto 0; color: var(--lpl-muted); font-size: 1rem; line-height: 1.45; font-weight: 720; position: relative; z-index: 1; } .lpl-voice-loader { min-height: 42px; margin: 18px auto 2px; display: flex; align-items: center; justify-content: center; gap: 8px; position: relative; z-index: 1; } .lpl-voice-loader span { width: 10px; height: 18px; border-radius: 999px; background: linear-gradient(180deg, var(--lpl-teal), #2bd6cd); animation: lpl-wave 0.82s ease-in-out infinite; } .lpl-voice-loader span:nth-child(2) { animation-delay: 0.08s; } .lpl-voice-loader span:nth-child(3) { animation-delay: 0.16s; } .lpl-voice-loader span:nth-child(4) { animation-delay: 0.24s; } .lpl-voice-loader span:nth-child(5) { animation-delay: 0.32s; } @keyframes lpl-wave { 0%, 100% { transform: scaleY(0.58); opacity: 0.55; } 50% { transform: scaleY(1.7); opacity: 1; } } @keyframes lpl-breathe { 0%, 100% { opacity: 0.24; transform: translateX(-50%) scale(0.82); } 50% { opacity: 0.48; transform: translateX(-50%) scale(1); } } .lpl-media-grid { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 16px; margin: 18px 0; } .lpl-audio-panel { border-radius: 24px; border: 1px solid #e4eef4; background: #ffffff; overflow: hidden; box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); } .lpl-audio-panel.is-record { background: linear-gradient(180deg, #fff8f6 0%, #ffffff 52%); } .lpl-audio-head { display: flex; align-items: center; justify-content: space-between; gap: 12px; min-height: 64px; padding: 16px 18px 8px; color: var(--lpl-navy); font-weight: 820; } .lpl-audio-head span { color: var(--lpl-muted); font-size: 0.9rem; font-weight: 700; } .lpl-audio-panel .audio-container, .lpl-audio-panel .block, .lpl-audio-panel .wrap, .lpl-audio-panel .form { border: 0 !important; box-shadow: none !important; background: transparent !important; color: var(--lpl-navy) !important; } .lpl-audio-panel audio { width: 100% !important; min-height: 46px !important; padding: 0 18px 18px !important; box-sizing: border-box !important; color-scheme: light !important; } .lpl-audio-panel .label-wrap, .lpl-audio-panel .download, .lpl-audio-panel .share { display: none !important; } .lpl-native-player, .lpl-native-recorder { padding: 0 18px 18px; } .lpl-native-player audio, .lpl-native-recorder audio { width: 100%; min-height: 46px; padding: 0 !important; color-scheme: light !important; } .lpl-player-empty, .lpl-recorder-status { min-height: 46px; display: flex; align-items: center; color: var(--lpl-muted); font-weight: 720; } .lpl-player-empty.is-loading, .lpl-status-card.is-loading { color: var(--lpl-navy); gap: 10px; } .lpl-recorder-actions { display: flex; align-items: center; flex-wrap: wrap; gap: 12px; margin-bottom: 12px; } .lpl-recorder-actions button { min-height: 48px; border: 0; border-radius: 999px; padding: 0 20px; font-weight: 840; cursor: pointer; color: #ffffff; background: var(--lpl-coral); } .lpl-recorder-actions button[data-stop] { color: var(--lpl-coral); background: #fff2f1; border: 1px solid #ffbcb7; } .lpl-recorder-actions button:disabled { opacity: 0.5; cursor: not-allowed; } .lpl-recorder-meter { height: 9px; border-radius: 999px; overflow: hidden; background: #edf3f6; margin: 0 0 12px; } .lpl-recorder-fill { height: 100%; width: 0%; border-radius: 999px; background: linear-gradient(90deg, var(--lpl-coral), var(--lpl-yellow)); } .lpl-status { min-height: 34px; color: var(--lpl-muted); font-size: 0.98rem; font-weight: 680; } .lpl-status-card { color: var(--lpl-muted); font-size: 0.98rem; line-height: 1.45; display: flex; align-items: center; gap: 10px; } .lpl-feedback { min-height: 650px; } .lpl-score-empty, .lpl-score-card { border-radius: 24px; background: #ffffff; border: 1px solid #e4eef4; box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); padding: 22px; color: var(--lpl-navy); } .lpl-score-empty { min-height: 160px; display: flex; align-items: center; color: var(--lpl-muted); font-weight: 720; line-height: 1.45; } .lpl-score-empty.is-error { align-items: flex-start; flex-direction: column; gap: 8px; border-color: #ffd0cc; background: #fff8f7; color: var(--lpl-navy); } .lpl-score-empty.is-error strong { color: var(--lpl-coral); font-size: 1.08rem; } .lpl-score-top { display: flex; align-items: center; gap: 18px; margin-bottom: 20px; } .lpl-ring { --score: 0; width: 132px; height: 132px; border-radius: 999px; display: grid; place-items: center; flex: 0 0 auto; background: radial-gradient(circle at center, white 0 56%, transparent 58%), conic-gradient(var(--lpl-teal) calc(var(--score) * 1%), #e6f0f5 0); color: var(--lpl-navy); font-size: 2.55rem; font-weight: 900; } .lpl-score-copy strong { display: block; font-size: 1.42rem; margin-bottom: 8px; } .lpl-score-copy span { color: var(--lpl-muted); line-height: 1.45; font-weight: 650; } .lpl-meter { margin: 14px 0; } .lpl-meter-row { display: flex; align-items: center; justify-content: space-between; gap: 16px; color: var(--lpl-navy); font-weight: 760; margin-bottom: 7px; } .lpl-meter-row span:last-child { color: var(--lpl-teal-dark); } .lpl-bar { height: 9px; background: #e8f1f5; border-radius: 999px; overflow: hidden; } .lpl-fill { height: 100%; width: 0; border-radius: 999px; background: linear-gradient(90deg, var(--lpl-teal), #1f78c8); } .lpl-feedback-card, .lpl-next-card { margin-top: 16px; border-radius: 24px; background: #ffffff; border: 1px solid #e4eef4; box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); padding: 20px; color: var(--lpl-navy); } .lpl-feedback-card h3, .lpl-next-card h3 { margin: 0 0 10px; font-size: 1.14rem; color: var(--lpl-navy); } .lpl-feedback-card p, .lpl-next-card p { margin: 0; color: #37506f; font-weight: 650; line-height: 1.48; } .lpl-feedback-card ol { margin: 14px 0 0 22px; padding: 0; color: #37506f; font-weight: 650; line-height: 1.52; } .lpl-feedback-card li { margin: 7px 0; } .lpl-words { margin-top: 14px; padding-top: 14px; border-top: 1px solid #e7eff4; color: var(--lpl-muted); font-weight: 650; line-height: 1.5; } .lpl-words strong { color: var(--lpl-navy); } .lpl-bottom-next { margin-top: 18px; border-radius: 24px; background: linear-gradient(180deg, #fff8df 0%, #fff2c9 100%); border: 1px solid #ffe5a1; padding: 18px 22px; display: flex; gap: 18px; align-items: center; color: var(--lpl-navy); box-shadow: 0 16px 28px rgba(255, 194, 52, 0.16); } .lpl-bottom-next strong { font-size: 1.12rem; } .lpl-bottom-next span { color: #37506f; font-weight: 700; } .lpl-footer { margin: 22px auto 0; padding: 14px 18px; border: 1px solid #dbeaf1; border-radius: 999px; background: rgba(255, 255, 255, 0.82); color: #37506f; font-size: 0.94rem; font-weight: 700; line-height: 1.45; text-align: center; box-shadow: 0 14px 28px rgba(13, 37, 71, 0.07); } .lpl-footer strong { color: var(--lpl-navy); font-weight: 860; } @media (max-width: 1080px) { .lpl-layout { flex-direction: column !important; } .lpl-layout > .column, .lpl-layout > div { width: 100% !important; min-width: 0 !important; } .lpl-choose, .lpl-practice, .lpl-feedback { min-height: auto; } .lpl-steps, .lpl-media-grid { grid-template-columns: 1fr 1fr; } } @media (max-width: 760px) { .gradio-container { padding: 14px 12px 18px !important; } .lpl-topbar { align-items: flex-start; flex-direction: column; } .lpl-steps, .lpl-media-grid { grid-template-columns: 1fr; } .lpl-footer { border-radius: 22px; } .lpl-step { min-height: 66px; border-radius: 20px; } .lpl-choose, .lpl-practice, .lpl-feedback { min-height: auto; padding: 16px !important; } .lpl-phrase { min-height: 180px; padding: 22px 18px; } .lpl-score-top { align-items: flex-start; flex-direction: column; } .lpl-ring { width: 112px; height: 112px; } } """ @dataclass class ScoreResult: overall: int voice_shape: int timing: int rhythm: int melody: int reference_duration: float attempt_duration: float duration_ratio: float evidence: dict[str, Any] def clamp_score(value: float) -> int: if not math.isfinite(value): return 0 return int(round(max(0.0, min(100.0, value)))) def clean_text(value: Any, limit: int = 500) -> str: value = re.sub(r"\s+", " ", str(value or "").strip()) return value[:limit] def gpu_available() -> bool: try: import torch return bool(torch.cuda.is_available()) except Exception: return False def require_gpu(action: str) -> None: if not gpu_available(): raise gr.Error( f"{action} needs GPU hardware. This Space is currently on CPU hardware; " "switch it to GPU or ZeroGPU, then try again." ) def language_fallback(language: str) -> str: return FALLBACK_PHRASES.get(language, FALLBACK_PHRASES["English"]) def starter_phrases(language: str, level: str = "A2") -> list[str]: level_phrases = LEVEL_STARTER_PHRASES.get(language, {}).get(level) if level_phrases: return level_phrases phrases = STARTER_PHRASES.get(language) if phrases: return phrases return [language_fallback(language)] def has_prebuilt_sample(language: str, target_text: str) -> bool: return clean_text(target_text, MAX_TARGET_CHARS) == clean_text(language_fallback(language), MAX_TARGET_CHARS) @functools.lru_cache(maxsize=1) def get_tts_model() -> Any: from voxcpm import VoxCPM return VoxCPM.from_pretrained(TTS_MODEL_ID, load_denoiser=False) @functools.lru_cache(maxsize=1) def get_judge_model() -> tuple[Any, Any]: import torch from transformers import AutoTokenizer model_errors: list[str] = [] model_classes: list[Any] = [] try: from transformers import AutoModelForMultimodalLM model_classes.append(AutoModelForMultimodalLM) except Exception as exc: model_errors.append(str(exc)) try: from transformers import AutoModelForCausalLM model_classes.append(AutoModelForCausalLM) except Exception as exc: model_errors.append(str(exc)) tokenizer = AutoTokenizer.from_pretrained(JUDGE_MODEL_ID, trust_remote_code=True) last_error: Exception | None = None for model_class in model_classes: try: model = model_class.from_pretrained( JUDGE_MODEL_ID, torch_dtype="auto", device_map="auto", trust_remote_code=True, ) model.eval() return tokenizer, model except Exception as exc: last_error = exc model_errors.append(str(exc)) raise RuntimeError("Could not load the judging model: " + "; ".join(model_errors)) from last_error @functools.lru_cache(maxsize=1) def get_english_asr_model() -> tuple[Any, Any, Any]: import torch from transformers import AutoModelForCTC, AutoProcessor processor = AutoProcessor.from_pretrained(ENGLISH_ASR_MODEL_ID) model = AutoModelForCTC.from_pretrained(ENGLISH_ASR_MODEL_ID) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) model.eval() return processor, model, device def run_judge(messages: list[dict[str, str]], max_new_tokens: int = 360) -> str: import torch tokenizer, model = get_judge_model() template_kwargs = { "tokenize": True, "add_generation_prompt": True, "return_dict": True, "return_tensors": "pt", } try: inputs = tokenizer.apply_chat_template(messages, enable_thinking=False, **template_kwargs) except TypeError: inputs = tokenizer.apply_chat_template(messages, **template_kwargs) device = next(model.parameters()).device inputs = {key: value.to(device) for key, value in inputs.items()} with torch.inference_mode(): output_ids = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, repetition_penalty=1.02, ) generated = output_ids[0][inputs["input_ids"].shape[-1] :] return tokenizer.decode(generated, skip_special_tokens=True).strip() def selected_text(language: str, level: str, custom_text: str, starter_counter: int = 0) -> tuple[str, str, int]: custom_text = clean_text(custom_text, MAX_TARGET_CHARS) if custom_text: return custom_text, "custom", -1 phrases = starter_phrases(language, level) line_index = int(starter_counter or 0) % max(1, len(phrases)) return phrases[line_index], "starter", line_index def build_voice_text(target_text: str, voice_style: str) -> str: description = VOICE_STYLES.get(voice_style, VOICE_STYLES["Careful"]) return f"({description}){target_text}" def slugify(value: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return slug or "item" def audio_cache_key(language: str, level: str, voice_style: str, target_text: str) -> str: payload = json.dumps( { "version": TTS_CACHE_VERSION, "language": language, "level": level, "voice_style": voice_style, "target_text": target_text, "cfg_value": FAST_TTS_CFG_VALUE, "steps": FAST_TTS_STEPS, }, ensure_ascii=False, sort_keys=True, ) return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:18] def sample_relpath(language: str, voice_style: str) -> str: return f"{slugify(language)}/{slugify(voice_style)}.wav" def sample_url(language: str, voice_style: str) -> str: return f"{SAMPLE_BASE_URL}/{sample_relpath(language, voice_style)}" def audio_data_uri(path: str) -> str: data = Path(path).read_bytes() return "data:audio/wav;base64," + base64.b64encode(data).decode("ascii") def render_reference_player( path: str | None = None, note: str = "Voice appears here after Make voice.", loading: bool = False, ) -> str: if not path or not Path(path).exists(): spinner = '' if loading else "" loading_class = " is-loading" if loading else "" return ( '
' f'
{spinner}{html.escape(note)}
' "
" ) return ( '
' f'' "
" ) def ensure_sample_audio(language: str, voice_style: str, cache_key: str) -> str | None: path = GENERATED_DIR / f"sample_{cache_key}.wav" if path.exists(): return str(path) url = sample_url(language, voice_style) try: with urllib.request.urlopen(url, timeout=25) as response: raw = response.read() except (urllib.error.URLError, TimeoutError, OSError) as exc: print(f"Sample audio unavailable for {language}/{voice_style}: {exc}") return None if len(raw) < 1000: print(f"Sample audio too small for {language}/{voice_style}: {len(raw)} bytes") return None path.write_bytes(raw) try: audio, sr = load_audio(str(path)) if len(audio) / sr < 0.5: path.unlink(missing_ok=True) return None except Exception as exc: print(f"Sample audio invalid for {language}/{voice_style}: {exc}") path.unlink(missing_ok=True) return None return str(path) def normalize_wav(wav: np.ndarray, sample_rate: int) -> np.ndarray: wav = np.asarray(wav, dtype=np.float32).squeeze() if wav.ndim > 1: wav = np.mean(wav, axis=-1) if sample_rate != TARGET_SAMPLE_RATE: wav = librosa.resample(wav, orig_sr=sample_rate, target_sr=TARGET_SAMPLE_RATE) peak = float(np.max(np.abs(wav))) if wav.size else 0.0 if peak > 0: wav = wav / peak * 0.94 return wav @gpu_task(duration=120) def synthesize_reference_file(prompt_text: str, output_path: str) -> None: require_gpu("Making the voice") model = get_tts_model() try: wav = model.generate( text=prompt_text, cfg_value=FAST_TTS_CFG_VALUE, inference_timesteps=FAST_TTS_STEPS, retry_badcase=False, denoise=False, ) except TypeError: wav = model.generate( text=prompt_text, cfg_value=FAST_TTS_CFG_VALUE, inference_timesteps=FAST_TTS_STEPS, ) sample_rate = int(getattr(getattr(model, "tts_model", None), "sample_rate", 48000)) wav = normalize_wav(wav, sample_rate) sf.write(output_path, wav, TARGET_SAMPLE_RATE, subtype="PCM_16") def create_practice_audio( language: str, level: str, voice_style: str, custom_text: str, starter_counter: int, ) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, int, Any]: target_text, source, line_index = selected_text(language, level, custom_text, starter_counter) cache_key = audio_cache_key(language, level, voice_style, target_text) output_path = GENERATED_DIR / f"reference_{cache_key}.wav" cached_path = AUDIO_CACHE.get(cache_key) next_counter = int(starter_counter or 0) + (1 if source == "starter" else 0) if source == "starter" and line_index == 0 and has_prebuilt_sample(language, target_text): sample_path = ensure_sample_audio(language, voice_style, cache_key) if sample_path: AUDIO_CACHE[cache_key] = sample_path state = build_state(target_text, language, level, voice_style, "sample", sample_path) return ( render_reference_player(sample_path), render_phrase_card(target_text, language, level, voice_style), render_status("Voice ready from the example library."), state, render_steps("say"), "", render_empty_score(), render_empty_feedback(), render_next_card("Try this next", "Record yourself saying the line above."), next_counter, gr.update(interactive=True, value="Make voice"), ) if (cached_path and Path(cached_path).exists()) or output_path.exists(): path = str(Path(cached_path) if cached_path else output_path) AUDIO_CACHE[cache_key] = path state = build_state(target_text, language, level, voice_style, source, path) return ( render_reference_player(path), render_phrase_card(target_text, language, level, voice_style), render_status("Voice ready. Same choices play instantly next time."), state, render_steps("say"), "", render_empty_score(), render_empty_feedback(), render_next_card("Try this next", "Record yourself saying the line above."), next_counter, gr.update(interactive=True, value="Make voice"), ) prompt_text = build_voice_text(target_text, voice_style) try: synthesize_reference_file(prompt_text, str(output_path)) except Exception as exc: print(f"Voice generation failed: {exc}") return ( render_reference_player(None, "Could not make the voice. Try again."), render_phrase_card(target_text, language, level, voice_style), render_status("Could not make the voice. Try a shorter line or press Make voice again."), {}, render_steps("pick"), "", render_empty_score(), """

Your feedback

Make the voice first. Then listen, record, and score.

""", render_next_card("Try this next", "Try a shorter line, then press Make voice."), next_counter, gr.update(interactive=True, value="Make voice"), ) AUDIO_CACHE[cache_key] = str(output_path) state = build_state(target_text, language, level, voice_style, source, str(output_path)) return ( render_reference_player(str(output_path)), render_phrase_card(target_text, language, level, voice_style), render_status("Voice ready. Listen once, then say it."), state, render_steps("say"), "", render_empty_score(), render_empty_feedback(), render_next_card("Try this next", "Record yourself saying the line above."), next_counter, gr.update(interactive=True, value="Make voice"), ) def build_state( target_text: str, language: str, level: str, voice_style: str, source: str, reference_audio: str, ) -> dict[str, Any]: return { "target_text": target_text, "language": language, "level": level, "voice_style": voice_style, "source": source, "reference_audio": reference_audio, "sample_rate": TARGET_SAMPLE_RATE, } def audio_path_from_gradio(value: Any) -> str: if isinstance(value, str): path = value elif isinstance(value, dict): path = str(value.get("path") or value.get("name") or "") else: path = "" if not path: raise ValueError("Record your voice first, then press Score.") if not Path(path).exists(): raise ValueError("I could not read that recording. Record once more, then press Score.") return path def suffix_for_mime(mime_type: str) -> str: mime_type = (mime_type or "").split(";", 1)[0].strip().lower() return { "audio/webm": ".webm", "audio/ogg": ".ogg", "audio/oga": ".ogg", "audio/mp4": ".m4a", "audio/mpeg": ".mp3", "audio/wav": ".wav", "audio/x-wav": ".wav", }.get(mime_type, ".webm") def decode_recording_payload(payload: str | None) -> str: payload = clean_text(payload, MAX_RECORDING_BYTES * 2) if not payload: raise ValueError("Record your voice first, then press Score.") try: data = json.loads(payload) except json.JSONDecodeError as exc: raise ValueError("The recording data was not readable. Record once more.") from exc data_url = str(data.get("dataUrl") or "") match = re.match(r"^data:([^;,]+)(?:;[^,]*)?;base64,(.+)$", data_url, flags=re.DOTALL) if not match: raise ValueError("The recording was incomplete. Record once more.") mime_type = str(data.get("mimeType") or match.group(1)) try: raw = base64.b64decode(match.group(2), validate=True) except Exception as exc: raise ValueError("The recording could not be decoded. Record once more.") from exc if len(raw) < 1200: raise ValueError("That recording is too small. Record the full line.") if len(raw) > MAX_RECORDING_BYTES: raise ValueError("That recording is too large. Keep it under 20 seconds.") token = uuid.uuid4().hex raw_path = RECORDING_DIR / f"attempt_{token}{suffix_for_mime(mime_type)}" wav_path = RECORDING_DIR / f"attempt_{token}.wav" raw_path.write_bytes(raw) command = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", str(raw_path), "-ac", "1", "-ar", str(TARGET_SAMPLE_RATE), str(wav_path), ] try: subprocess.run(command, check=True, capture_output=True, text=True) except Exception as exc: raise ValueError("The recording could not be prepared for scoring. Record once more.") from exc return str(wav_path) def load_audio(path: str, sr: int = TARGET_SAMPLE_RATE) -> tuple[np.ndarray, int]: audio, _ = librosa.load(path, sr=sr, mono=True) audio = np.asarray(audio, dtype=np.float32) if audio.size == 0: raise ValueError("The audio is empty. Record once more.") peak = float(np.max(np.abs(audio))) if peak < 1e-5: raise ValueError("The recording sounds silent. Check the microphone and record again.") audio = audio / peak return audio, sr def active_audio_seconds(audio: np.ndarray, sr: int) -> float: if audio.size == 0: return 0.0 rms = librosa.feature.rms(y=audio, frame_length=1024, hop_length=256)[0] if rms.size == 0: return 0.0 threshold = max(0.015, float(np.percentile(rms, 90)) * 0.35) active_frames = int(np.sum(rms > threshold)) return active_frames * 256 / sr def resample_vector(values: np.ndarray, length: int) -> np.ndarray: values = np.asarray(values, dtype=np.float32) if values.size == 0: return np.zeros(length, dtype=np.float32) if values.size == length: return values x_old = np.linspace(0.0, 1.0, values.size) x_new = np.linspace(0.0, 1.0, length) return np.interp(x_new, x_old, values).astype(np.float32) def safe_correlation(a: np.ndarray, b: np.ndarray) -> float: if a.size < 3 or b.size < 3: return 0.0 if float(np.std(a)) < 1e-6 or float(np.std(b)) < 1e-6: return 0.0 return float(np.corrcoef(a, b)[0, 1]) def feature_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: ref_mfcc = librosa.feature.mfcc(y=reference, sr=sr, n_mfcc=13) user_mfcc = librosa.feature.mfcc(y=attempt, sr=sr, n_mfcc=13) ref_mfcc = librosa.util.normalize(ref_mfcc, axis=1) user_mfcc = librosa.util.normalize(user_mfcc, axis=1) cost_matrix, _ = librosa.sequence.dtw(X=ref_mfcc, Y=user_mfcc, metric="cosine") mean_cost = float(cost_matrix[-1, -1] / max(cost_matrix.shape)) score = clamp_score(100.0 * (1.0 - min(mean_cost, 0.8) / 0.8)) return score, mean_cost def rhythm_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: ref_rms = librosa.feature.rms(y=reference, frame_length=1024, hop_length=256)[0] user_rms = librosa.feature.rms(y=attempt, frame_length=1024, hop_length=256)[0] target_len = max(16, min(240, max(ref_rms.size, user_rms.size))) ref_curve = resample_vector(ref_rms / (np.max(ref_rms) + 1e-6), target_len) user_curve = resample_vector(user_rms / (np.max(user_rms) + 1e-6), target_len) corr = safe_correlation(ref_curve, user_curve) score = clamp_score(55.0 + 45.0 * corr) return score, corr def melody_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: try: ref_pitch = librosa.yin(reference, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256) user_pitch = librosa.yin(attempt, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256) ref_pitch = np.log(np.maximum(ref_pitch, 1.0)) user_pitch = np.log(np.maximum(user_pitch, 1.0)) target_len = max(16, min(240, max(ref_pitch.size, user_pitch.size))) ref_curve = resample_vector(ref_pitch - np.median(ref_pitch), target_len) user_curve = resample_vector(user_pitch - np.median(user_pitch), target_len) corr = safe_correlation(ref_curve, user_curve) return clamp_score(55.0 + 45.0 * corr), corr except Exception as exc: print(f"Pitch scoring unavailable: {exc}") return 50, 0.0 def compare_audio(reference_path: str, attempt_path: str) -> ScoreResult: reference, sr = load_audio(reference_path) attempt, _ = load_audio(attempt_path, sr=sr) ref_duration = len(reference) / sr attempt_duration = len(attempt) / sr if attempt_duration > MAX_ATTEMPT_SECONDS: raise ValueError("Keep your recording under 20 seconds, then score again.") min_attempt_duration = max(0.75, min(1.25, ref_duration * 0.35)) if attempt_duration < min_attempt_duration: raise ValueError(f"The recording is too short ({attempt_duration:.1f}s). Say the whole line, then score again.") active_seconds = active_audio_seconds(attempt, sr) min_active_seconds = max(0.45, min(1.0, ref_duration * 0.22)) if active_seconds < min_active_seconds: raise ValueError("The recording is mostly quiet. Check the microphone, speak the line, then score again.") duration_ratio = attempt_duration / max(ref_duration, 0.1) timing = clamp_score(100.0 * (1.0 - min(abs(duration_ratio - 1.0), 0.75) / 0.75)) voice_shape, mfcc_cost = feature_score(reference, attempt, sr) rhythm, rhythm_corr = rhythm_score(reference, attempt, sr) melody, melody_corr = melody_score(reference, attempt, sr) overall = clamp_score(0.40 * voice_shape + 0.25 * timing + 0.22 * rhythm + 0.13 * melody) evidence = { "baseline_score": overall, "voice_shape": voice_shape, "timing": timing, "rhythm": rhythm, "melody": melody, "reference_duration_seconds": round(ref_duration, 2), "attempt_duration_seconds": round(attempt_duration, 2), "active_speech_seconds": round(active_seconds, 2), "duration_ratio": round(duration_ratio, 3), "mfcc_dtw_cost": round(mfcc_cost, 4), "rhythm_correlation": round(rhythm_corr, 3), "melody_correlation": round(melody_corr, 3), } return ScoreResult( overall=overall, voice_shape=voice_shape, timing=timing, rhythm=rhythm, melody=melody, reference_duration=ref_duration, attempt_duration=attempt_duration, duration_ratio=duration_ratio, evidence=evidence, ) def normalize_word(word: str) -> str: return re.sub(r"[^a-z0-9']", "", word.lower()) def word_similarity(a: str, b: str) -> float: return difflib.SequenceMatcher(None, a, b).ratio() def align_words(ref_text: str, user_text: str) -> tuple[list[dict[str, Any]], int]: ref_tokens = [token for token in ref_text.split() if normalize_word(token)] user_tokens = [token for token in user_text.split() if normalize_word(token)] ref_norm = [normalize_word(token) for token in ref_tokens] user_norm = [normalize_word(token) for token in user_tokens] matcher = difflib.SequenceMatcher(None, ref_norm, user_norm) feedback: list[dict[str, Any]] = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == "equal": for offset, idx in enumerate(range(i1, i2)): feedback.append( { "word": ref_tokens[idx], "spoken": user_tokens[j1 + offset], "status": "matched", } ) elif tag == "replace": ref_block = ref_tokens[i1:i2] user_block = user_tokens[j1:j2] for idx, ref_word in enumerate(ref_block): spoken = user_block[idx] if idx < len(user_block) else "" similarity = word_similarity(normalize_word(ref_word), normalize_word(spoken)) if spoken else 0.0 feedback.append( { "word": ref_word, "spoken": spoken, "status": "close" if similarity >= 0.68 else "missed", } ) if len(user_block) > len(ref_block): for extra in user_block[len(ref_block) :]: feedback.append({"word": "", "spoken": extra, "status": "extra"}) elif tag == "delete": for idx in range(i1, i2): feedback.append({"word": ref_tokens[idx], "spoken": "", "status": "missed"}) elif tag == "insert": for idx in range(j1, j2): feedback.append({"word": "", "spoken": user_tokens[idx], "status": "extra"}) target_count = max(1, len(ref_tokens)) matched = sum(1 for item in feedback if item["status"] == "matched") close = sum(1 for item in feedback if item["status"] == "close") extra = sum(1 for item in feedback if item["status"] == "extra") raw = (matched + close * 0.45) / target_count - extra * 0.12 / target_count return feedback, clamp_score(raw * 100) def transcribe_english(path: str) -> str: import torch processor, model, device = get_english_asr_model() audio, _ = load_audio(path) inputs = processor(audio, sampling_rate=TARGET_SAMPLE_RATE, return_tensors="pt", padding=True) inputs = {key: value.to(device) for key, value in inputs.items()} with torch.inference_mode(): logits = model(**inputs).logits predicted_ids = torch.argmax(logits, dim=-1) transcription = processor.batch_decode(predicted_ids)[0] return clean_text(transcription.lower(), 400) def english_word_evidence(language: str, target_text: str, attempt_path: str) -> dict[str, Any]: if language.strip().lower() != "english": return { "enabled": False, "status": "skipped", } transcript = transcribe_english(attempt_path) word_feedback, word_score = align_words(target_text, transcript) return { "enabled": True, "status": "ready", "target_text": target_text, "user_transcript": transcript, "word_match_score": word_score, "word_feedback": word_feedback[:24], } def judge_prompt(state: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> list[dict[str, str]]: payload = { "language": state.get("language"), "level": state.get("level"), "target_text": state.get("target_text"), "voice_style": state.get("voice_style"), "acoustic_evidence": score.evidence, "word_evidence": word_evidence, } return [ { "role": "system", "content": ( "You are the visible judge for a short language shadowing practice app. " "Use only the supplied evidence. Do not claim this is a validated pronunciation test, " "accent detector, fluency exam, or clinical tool. Do not mention model names, providers, " "internal feature names, JSON, or hidden implementation details. Return only one JSON object." ), }, { "role": "user", "content": ( "Judge this attempt and return only strict JSON. Use these keys and value types:\n" "{\n" ' "score": ,\n' ' "sub_scores": {"words": , "timing": , "rhythm": , "voice_shape": },\n' ' "short_feedback": ,\n' ' "try_next": [, , ],\n' ' "next_line": \n' "}\n" "Rules: scores must be integers from 0 to 100. Do not copy the schema text. " "Do not use placeholders like short action or one friendly sentence. " "Keep every sentence short and useful for a child. " "Do not mention skipped, missing, unavailable, or language-specific word tips to the learner. " "If English word evidence is ready, use it heavily for the words score. " "Audio duration and speech activity already passed validation, so do not set every score to 0. " "If word evidence is skipped, still judge timing, rhythm, and voice shape from acoustic evidence.\n\n" f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" ), }, ] def repair_judge_prompt( state: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any], bad_response: str, error: str, ) -> list[dict[str, str]]: payload = { "language": state.get("language"), "level": state.get("level"), "target_text": state.get("target_text"), "voice_style": state.get("voice_style"), "acoustic_evidence": score.evidence, "word_evidence": word_evidence, "previous_response_problem": error, "previous_response": clean_text(bad_response, 900), } return [ { "role": "system", "content": ( "You fix one bad judging response for a short language shadowing app. " "Use only the supplied evidence. Return only one valid JSON object. " "Do not mention model names, providers, JSON, or hidden implementation details in user-facing text." ), }, { "role": "user", "content": ( "The previous response was rejected. Return a real judgement now.\n" "Required JSON keys: score, sub_scores, short_feedback, try_next, next_line.\n" "sub_scores must include words, timing, rhythm, voice_shape.\n" "All scores must be integers 0 to 100.\n" "short_feedback and try_next must be specific to the evidence, not placeholders.\n" "Do not mention skipped, missing, unavailable, or language-specific word tips to the learner.\n" "If English word evidence is ready, use the word_match_score heavily for words.\n" "Because the attempt passed duration and speech checks, do not return all-zero scores.\n\n" f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" ), }, ] def extract_json_object(text: str) -> dict[str, Any]: text = text.strip() text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text, flags=re.IGNORECASE | re.DOTALL).strip() start = text.find("{") if start < 0: raise ValueError("No JSON object found") depth = 0 in_string = False escaped = False for index in range(start, len(text)): char = text[index] if in_string: if escaped: escaped = False elif char == "\\": escaped = True elif char == '"': in_string = False continue if char == '"': in_string = True elif char == "{": depth += 1 elif char == "}": depth -= 1 if depth == 0: return json.loads(text[start : index + 1]) raise ValueError("JSON object was incomplete") PLACEHOLDER_SNIPPETS = ( "one friendly sentence", "short action", "specific short action", "a short next practice line", "next practice line in the same language", " bool: text = re.sub(r"\s+", " ", str(value or "").strip().lower()) if not text: return True return any(snippet in text for snippet in PLACEHOLDER_SNIPPETS) def evidence_quality_error( judgement: dict[str, Any], score: ScoreResult | None, word_evidence: dict[str, Any] | None, ) -> str | None: if score is None: return None sub_scores = judgement["sub_scores"] if judgement["score"] == 0 and all(sub_scores[key] == 0 for key in sub_scores): if score.overall > 0 or int((word_evidence or {}).get("word_match_score") or 0) > 0: return "The judging response returned all-zero scores despite usable evidence." word_match = int((word_evidence or {}).get("word_match_score") or 0) if (word_evidence or {}).get("enabled") and word_match >= 65 and sub_scores["words"] < 35: return "The judging response ignored strong English word evidence." timing_evidence = int(score.evidence.get("timing") or 0) if timing_evidence >= 55 and sub_scores["timing"] < 20: return "The judging response ignored usable timing evidence." return None def normalize_judgement( raw_text: str, fallback_next_line: str, score: ScoreResult | None = None, word_evidence: dict[str, Any] | None = None, ) -> dict[str, Any]: try: data = extract_json_object(raw_text) except Exception as exc: raise ValueError("The judging response was not readable. Try Score again.") from exc if not isinstance(data, dict): raise ValueError("The judging response had the wrong shape. Try Score again.") sub_scores = data.get("sub_scores") if not isinstance(sub_scores, dict): raise ValueError("The judging response missed sub-scores. Try Score again.") required = ["words", "timing", "rhythm", "voice_shape"] if any(key not in sub_scores for key in required): raise ValueError("The judging response missed a score field. Try Score again.") steps = data.get("try_next") if not isinstance(steps, list) or not steps: raise ValueError("The judging response missed next steps. Try Score again.") try: normalized = { "score": clamp_score(float(data.get("score"))), "sub_scores": {key: clamp_score(float(sub_scores[key])) for key in required}, "short_feedback": clean_text(data.get("short_feedback"), 220), "try_next": [clean_text(step, 120) for step in steps[:3] if clean_text(step, 120)], "next_line": clean_text(data.get("next_line"), MAX_TARGET_CHARS) or fallback_next_line, } except Exception as exc: raise ValueError("The judging response used invalid score values. Try Score again.") from exc if looks_like_placeholder(normalized["short_feedback"]): raise ValueError("The judging response copied placeholder feedback.") if not normalized["try_next"] or any(looks_like_placeholder(step) for step in normalized["try_next"]): raise ValueError("The judging response copied placeholder practice steps.") if looks_like_placeholder(normalized["next_line"]): normalized["next_line"] = fallback_next_line quality_error = evidence_quality_error(normalized, score, word_evidence) if quality_error: raise ValueError(quality_error) return normalized def judge_with_retry( state: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any], fallback_next_line: str, ) -> tuple[dict[str, Any], str]: raw_judgement = run_judge(judge_prompt(state, score, word_evidence)) try: return normalize_judgement(raw_judgement, fallback_next_line, score, word_evidence), raw_judgement except ValueError as first_error: repaired = run_judge(repair_judge_prompt(state, score, word_evidence, raw_judgement, str(first_error))) try: return normalize_judgement(repaired, fallback_next_line, score, word_evidence), repaired except ValueError as second_error: raise ValueError("The judge returned unusable feedback. Press Score again.") from second_error def score_headline(value: int) -> str: value = clamp_score(value) if value >= 78: return "Great work" if value >= 52: return "Good work" if value >= 25: return "Keep going" return "Try again" @gpu_task(duration=120) def score_attempt( recording_payload: Any, state: dict[str, Any] | None, ) -> tuple[str, str, str, dict[str, Any], str, Any, str]: def score_error(message: str) -> tuple[str, str, str, dict[str, Any], str, Any, str]: return ( render_error_score(message), f""" """, render_next_card("Try this next", "Fix that, then press Score again."), {}, render_steps("say"), gr.update(interactive=True, value="Score"), render_score_status(message), ) if not gpu_available(): return score_error("Scoring needs GPU hardware. Try again when the Space is on GPU.") if not state or not state.get("reference_audio"): return score_error("Press Make voice first.") try: attempt_path = decode_recording_payload(str(recording_payload or "")) score = compare_audio(state["reference_audio"], attempt_path) word_evidence = english_word_evidence(state.get("language", ""), state.get("target_text", ""), attempt_path) judgement, raw_judgement = judge_with_retry( state, score, word_evidence, language_fallback(state.get("language", "English")), ) except ValueError as exc: return score_error(str(exc)) except Exception as exc: return score_error(f"Scoring could not finish: {exc}") evidence = { "state": state, "acoustic": score.evidence, "words": word_evidence, "judge": judgement, } return ( render_score_card(judgement, score, word_evidence), render_feedback_card(judgement, word_evidence), render_next_card("Try this next", judgement["next_line"]), evidence, render_steps("tips"), gr.update(interactive=True, value="Score"), render_score_status("Score ready."), ) def render_steps(active: str = "pick") -> str: steps = [ ("pick", "1", "Pick"), ("listen", "2", "Listen"), ("say", "3", "Say it"), ("tips", "4", "Score it"), ] chunks = ['
'] for key, number, label in steps: active_class = " is-active" if key == active else "" chunks.append( f'
' f'
{number}
' f'
{html.escape(label)}
' "
" ) chunks.append("
") return "".join(chunks) def render_phrase_card(target_text: str, language: str, level: str, voice_style: str) -> str: return f"""
{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}
{html.escape(target_text)}
""" def render_initial_phrase() -> str: return render_phrase_card("Pick your practice, then press Make voice.", "Ready", "Step 1", "Simple") def render_status(message: str, loading: bool = False) -> str: spinner = '' if loading else "" loading_class = " is-loading" if loading else "" return f'
{spinner}{html.escape(message)}
' def render_loading_phrase(target_text: str, language: str, level: str, voice_style: str) -> str: return f"""
{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}
Making your voice
{html.escape(target_text)}
""" def begin_make_voice( language: str, level: str, voice_style: str, custom_text: str, starter_counter: int, ) -> tuple[str, str, str, str, str, str, Any]: target_text, _, _ = selected_text(language, level, custom_text, starter_counter) return ( render_reference_player(None, "Making voice", loading=True), render_loading_phrase(target_text, language, level, voice_style), render_status("Making voice. This can take a little while the first time.", loading=True), render_steps("listen"), """ """, render_next_card("Try this next", "Wait for the voice, then press play."), gr.update(interactive=False, value="Making..."), ) def preview_selection( language: str, level: str, voice_style: str, custom_text: str, starter_counter: int, ) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, str]: target_text, _, _ = selected_text(language, level, custom_text, starter_counter) return ( render_phrase_card(target_text, language, level, voice_style), render_reference_player(None, "Press Make voice to use these choices."), render_status("Choices ready. Press Make voice."), {}, "", render_empty_score(), render_empty_feedback(), render_next_card("Try this next", "Press Make voice to hear the new choice."), render_steps("pick"), render_score_status(), ) def begin_scoring() -> tuple[str, str, str, Any]: return ( """ """, render_steps("tips"), render_score_status("Scoring your voice...", loading=True), gr.update(interactive=False, value="Scoring..."), ) def render_empty_score() -> str: return '
Your score appears here after you record and press Score.
' def render_score_status(message: str = "", loading: bool = False) -> str: if not message: return '
' spinner = '' if loading else "" loading_class = " is-loading" if loading else "" return f'
{spinner}{html.escape(message)}
' def render_error_score(message: str) -> str: return f"""
Try again {html.escape(message)}
""" def render_empty_feedback() -> str: return """ """ def render_meter(label: str, value: int) -> str: value = clamp_score(value) return f"""
{html.escape(label)}{value}
""" def render_score_card(judgement: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> str: sub_scores = judgement["sub_scores"] note = f"Reference {score.reference_duration:.1f}s. Your voice {score.attempt_duration:.1f}s." return f"""
{judgement['score']}
{html.escape(score_headline(judgement["score"]))} {html.escape(note)}
{render_meter("Words", sub_scores["words"])} {render_meter("Timing", sub_scores["timing"])} {render_meter("Rhythm", sub_scores["rhythm"])} {render_meter("Voice shape", sub_scores["voice_shape"])}
""" def render_word_note(word_evidence: dict[str, Any]) -> str: if not word_evidence.get("enabled"): return "" transcript = clean_text(word_evidence.get("user_transcript"), 160) if not transcript: transcript = "I could not hear clear words." return f'
I heard: {html.escape(transcript)}
' def render_feedback_card(judgement: dict[str, Any], word_evidence: dict[str, Any]) -> str: steps = "".join(f"
  • {html.escape(step)}
  • " for step in judgement["try_next"]) return f""" """ def render_next_card(title: str, line: str) -> str: return f"""
    {html.escape(title)} {html.escape(line)}
    """ RECORDER_HTML = """
    Press Record, say the line, then press Stop.
    """ RECORDER_JS = """ const startButton = element.querySelector("[data-start]"); const stopButton = element.querySelector("[data-stop]"); const status = element.querySelector("[data-status]"); const level = element.querySelector("[data-level]"); const preview = element.querySelector("[data-preview]"); let stream = null; let recorder = null; let chunks = []; let startedAt = 0; let audioContext = null; let analyser = null; let meterFrame = null; function setValue(value) { props.value = value; trigger("change"); } function preferredMimeType() { const candidates = [ "audio/webm;codecs=opus", "audio/webm", "audio/ogg;codecs=opus", "audio/ogg", "audio/mp4" ]; if (!window.MediaRecorder) return ""; for (const candidate of candidates) { if (MediaRecorder.isTypeSupported(candidate)) return candidate; } return ""; } function updateMeter() { if (!analyser) return; const data = new Uint8Array(analyser.fftSize); analyser.getByteTimeDomainData(data); let sum = 0; for (const sample of data) { const centered = (sample - 128) / 128; sum += centered * centered; } const rms = Math.sqrt(sum / data.length); level.style.width = Math.min(100, Math.round(rms * 380)) + "%"; meterFrame = requestAnimationFrame(updateMeter); } function cleanup() { if (meterFrame) cancelAnimationFrame(meterFrame); meterFrame = null; if (stream) stream.getTracks().forEach((track) => track.stop()); stream = null; if (audioContext) audioContext.close().catch(() => {}); audioContext = null; analyser = null; level.style.width = "0%"; } function blobToDataUrl(blob) { return new Promise((resolve, reject) => { const reader = new FileReader(); reader.onload = () => resolve(reader.result); reader.onerror = reject; reader.readAsDataURL(blob); }); } startButton.addEventListener("click", async () => { try { setValue(""); chunks = []; preview.removeAttribute("src"); preview.style.display = "none"; stream = await navigator.mediaDevices.getUserMedia({ audio: { echoCancellation: true, noiseSuppression: true, autoGainControl: true } }); audioContext = new (window.AudioContext || window.webkitAudioContext)(); const source = audioContext.createMediaStreamSource(stream); analyser = audioContext.createAnalyser(); analyser.fftSize = 256; source.connect(analyser); updateMeter(); const mimeType = preferredMimeType(); recorder = new MediaRecorder(stream, mimeType ? { mimeType } : undefined); recorder.ondataavailable = (event) => { if (event.data && event.data.size > 0) chunks.push(event.data); }; recorder.onstop = async () => { const durationSeconds = (performance.now() - startedAt) / 1000; const blob = new Blob(chunks, { type: recorder.mimeType || mimeType || "audio/webm" }); cleanup(); startButton.disabled = false; stopButton.disabled = true; if (durationSeconds < 0.7 || blob.size < 1200) { status.textContent = "Too short. Press Record and say the whole line."; return; } preview.src = URL.createObjectURL(blob); preview.style.display = "block"; const dataUrl = await blobToDataUrl(blob); setValue(JSON.stringify({ dataUrl, mimeType: blob.type || "audio/webm", durationSeconds, sizeBytes: blob.size, createdAt: Date.now() })); status.textContent = `Recorded ${durationSeconds.toFixed(1)}s. Play it back, then Score.`; }; recorder.start(); startedAt = performance.now(); startButton.disabled = true; stopButton.disabled = false; status.textContent = "Recording... speak now."; } catch (error) { cleanup(); startButton.disabled = false; stopButton.disabled = true; status.textContent = `Microphone error: ${error.message || error}`; } }); stopButton.addEventListener("click", () => { if (recorder && recorder.state !== "inactive") { status.textContent = "Preparing recording..."; recorder.stop(); } }); """ class NativeRecorder(gr.HTML): def __init__(self, value: str = "", **kwargs: Any) -> None: super().__init__( value=value, html_template=RECORDER_HTML, js_on_load=RECORDER_JS, container=False, **kwargs, ) def api_info(self) -> dict[str, str]: return {"type": "string"} with gr.Blocks( title="EchoYard", theme=gr.themes.Base(primary_hue="teal", neutral_hue="slate"), css=CSS, ) as demo: app_state = gr.State({}) score_state = gr.State({}) starter_counter = gr.State(0) gr.HTML( """
    EchoYard Speak. Echo. Grow.
    Tiny listen-and-repeat speaking practice
    """ ) stepper = gr.HTML(render_steps("pick")) with gr.Row(elem_classes=["lpl-layout"]): with gr.Column(scale=3, min_width=280, elem_classes=["lpl-choose"]): gr.HTML('

    Choose your practice

    ') language = gr.Dropdown(SUPPORTED_LANGUAGES, value="English", label="Language", filterable=True) level = gr.Radio(LEVELS, value="A2", label="Level", elem_classes=["lpl-level-radio"]) voice_style = gr.Radio( list(VOICE_STYLES), value="Careful", label="Voice style", elem_classes=["lpl-voice-radio"], ) custom_text = gr.Textbox( label="Words to say", value="", placeholder="Leave blank for a short practice line.", lines=3, max_lines=3, max_length=MAX_TARGET_CHARS, ) generate_btn = gr.Button("Make voice", variant="primary", elem_classes=["lpl-main-btn"]) with gr.Column(scale=6, min_width=420, elem_classes=["lpl-practice"]): phrase_card = gr.HTML(render_initial_phrase()) with gr.Row(elem_classes=["lpl-media-grid"]): with gr.Column(elem_classes=["lpl-audio-panel"]): gr.HTML('
    Reference listen first
    ') reference_player = gr.HTML(render_reference_player(), container=False) with gr.Column(elem_classes=["lpl-audio-panel", "is-record"]): gr.HTML('
    Your turn speak now
    ') attempt_recorder = NativeRecorder(value="", elem_id="native-recorder") generation_status = gr.HTML(render_status("Press Make voice. Then listen, record, and score.")) next_panel = gr.HTML(render_next_card("Try this next", "Make a voice to begin.")) with gr.Column(scale=3, min_width=300, elem_classes=["lpl-feedback"]): gr.HTML('

    Your feedback

    ') score_btn = gr.Button("Score", variant="primary", elem_classes=["lpl-score-btn"]) score_status = gr.HTML(render_score_status(), container=False) score_panel = gr.HTML(render_empty_score()) feedback_panel = gr.HTML(render_empty_feedback()) gr.HTML( """ """ ) for picker in (language, level, voice_style, custom_text): picker.change( fn=preview_selection, inputs=[language, level, voice_style, custom_text, starter_counter], outputs=[ phrase_card, reference_player, generation_status, app_state, attempt_recorder, score_panel, feedback_panel, next_panel, stepper, score_status, ], show_progress="hidden", api_visibility="private", ) make_voice_start = generate_btn.click( fn=begin_make_voice, inputs=[language, level, voice_style, custom_text, starter_counter], outputs=[reference_player, phrase_card, generation_status, stepper, feedback_panel, next_panel, generate_btn], show_progress="hidden", api_visibility="private", ) make_voice_start.then( fn=create_practice_audio, inputs=[language, level, voice_style, custom_text, starter_counter], outputs=[ reference_player, phrase_card, generation_status, app_state, stepper, attempt_recorder, score_panel, feedback_panel, next_panel, starter_counter, generate_btn, ], show_progress="minimal", show_progress_on=generate_btn, api_visibility="private", concurrency_id="voice", concurrency_limit=1, ) score_start = score_btn.click( fn=begin_scoring, inputs=None, outputs=[feedback_panel, stepper, score_status, score_btn], show_progress="hidden", api_visibility="private", ) score_start.then( fn=score_attempt, inputs=[attempt_recorder, app_state], outputs=[score_panel, feedback_panel, next_panel, score_state, stepper, score_btn, score_status], show_progress="minimal", show_progress_on=score_btn, api_visibility="private", concurrency_id="score", concurrency_limit=1, ) if __name__ == "__main__": demo.queue(default_concurrency_limit=1).launch()