Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import difflib | |
| import base64 | |
| import functools | |
| import hashlib | |
| import html | |
| import json | |
| import math | |
| import re | |
| import subprocess | |
| import urllib.error | |
| import urllib.request | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Callable | |
| import gradio as gr | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| try: | |
| import spaces | |
| gpu_task: Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]] = spaces.GPU | |
| except Exception: | |
| def gpu_task(*_args: Any, **_kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | |
| def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: | |
| return fn | |
| return decorator | |
| APP_DIR = Path(__file__).resolve().parent | |
| GENERATED_DIR = APP_DIR / "generated" | |
| RECORDING_DIR = APP_DIR / "recordings" | |
| GENERATED_DIR.mkdir(exist_ok=True) | |
| RECORDING_DIR.mkdir(exist_ok=True) | |
| TTS_MODEL_ID = "openbmb/VoxCPM2" | |
| JUDGE_MODEL_ID = "openbmb/MiniCPM5-1B" | |
| ENGLISH_ASR_MODEL_ID = "facebook/wav2vec2-base-960h" | |
| TARGET_SAMPLE_RATE = 16000 | |
| MAX_TARGET_CHARS = 180 | |
| MAX_ATTEMPT_SECONDS = 20.0 | |
| MAX_RECORDING_BYTES = 12 * 1024 * 1024 | |
| FAST_TTS_CFG_VALUE = 1.35 | |
| FAST_TTS_STEPS = 4 | |
| TTS_CACHE_VERSION = "fast-v3" | |
| AUDIO_CACHE: dict[str, str] = {} | |
| SAMPLE_DATASET_ID = "loay/build-small-shadowing-mini-audio" | |
| SAMPLE_DATASET_REVISION = "main" | |
| SAMPLE_VERSION = "v1" | |
| SAMPLE_BASE_URL = ( | |
| f"https://huggingface.co/datasets/{SAMPLE_DATASET_ID}/resolve/" | |
| f"{SAMPLE_DATASET_REVISION}/reference/{SAMPLE_VERSION}" | |
| ) | |
| SUPPORTED_LANGUAGES = [ | |
| "Arabic", | |
| "Burmese", | |
| "Chinese", | |
| "Danish", | |
| "Dutch", | |
| "English", | |
| "Finnish", | |
| "French", | |
| "German", | |
| "Greek", | |
| "Hebrew", | |
| "Hindi", | |
| "Indonesian", | |
| "Italian", | |
| "Japanese", | |
| "Khmer", | |
| "Korean", | |
| "Lao", | |
| "Malay", | |
| "Norwegian", | |
| "Polish", | |
| "Portuguese", | |
| "Russian", | |
| "Spanish", | |
| "Swahili", | |
| "Swedish", | |
| "Tagalog", | |
| "Thai", | |
| "Turkish", | |
| "Vietnamese", | |
| ] | |
| LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"] | |
| VOICE_STYLES = { | |
| "Careful": "clear friendly tutor voice, medium-slow pace", | |
| "Happy": "warm upbeat voice, natural pace", | |
| "Slow": "slow clear practice voice", | |
| "Story": "bright storyteller voice with gentle expression", | |
| } | |
| FALLBACK_PHRASES = { | |
| "Arabic": "أود أن أتمرن على المحادثة كل يوم.", | |
| "Burmese": "နေ့တိုင်း စကားပြော လေ့ကျင့်ချင်ပါတယ်။", | |
| "Chinese": "我每天都想练习说话。", | |
| "Danish": "Jeg vil gerne øve mig i at tale hver dag.", | |
| "Dutch": "Ik wil elke dag oefenen met spreken.", | |
| "English": "I want to practice speaking clearly every day.", | |
| "Finnish": "Haluan harjoitella puhumista selkeästi joka päivä.", | |
| "French": "Je veux pratiquer la parole clairement chaque jour.", | |
| "German": "Ich möchte jeden Tag klar sprechen üben.", | |
| "Greek": "Θέλω να εξασκούμαι στην ομιλία κάθε μέρα.", | |
| "Hebrew": "אני רוצה לתרגל דיבור ברור בכל יום.", | |
| "Hindi": "मैं हर दिन साफ़ बोलने का अभ्यास करना चाहता हूँ।", | |
| "Indonesian": "Saya ingin berlatih berbicara dengan jelas setiap hari.", | |
| "Italian": "Voglio esercitarmi a parlare chiaramente ogni giorno.", | |
| "Japanese": "毎日、はっきり話す練習をしたいです。", | |
| "Khmer": "ខ្ញុំចង់ហាត់និយាយឱ្យច្បាស់រាល់ថ្ងៃ។", | |
| "Korean": "저는 매일 또렷하게 말하는 연습을 하고 싶어요.", | |
| "Lao": "ຂ້ອຍຢາກຝຶກເວົ້າໃຫ້ຊັດເຈນທຸກມື້.", | |
| "Malay": "Saya mahu berlatih bercakap dengan jelas setiap hari.", | |
| "Norwegian": "Jeg vil øve på å snakke tydelig hver dag.", | |
| "Polish": "Chcę codziennie ćwiczyć wyraźne mówienie.", | |
| "Portuguese": "Quero praticar falar com clareza todos os dias.", | |
| "Russian": "Я хочу каждый день тренироваться говорить ясно.", | |
| "Spanish": "Quiero practicar hablar con claridad todos los días.", | |
| "Swahili": "Nataka kufanya mazoezi ya kuzungumza wazi kila siku.", | |
| "Swedish": "Jag vill öva på att tala tydligt varje dag.", | |
| "Tagalog": "Gusto kong magsanay magsalita nang malinaw araw-araw.", | |
| "Thai": "ฉันอยากฝึกพูดให้ชัดเจนทุกวัน", | |
| "Turkish": "Her gün açık konuşma pratiği yapmak istiyorum.", | |
| "Vietnamese": "Tôi muốn luyện nói rõ ràng mỗi ngày.", | |
| } | |
| STARTER_PHRASES = { | |
| "English": [ | |
| "I want to practice speaking clearly every day.", | |
| "Today I will speak slowly and clearly.", | |
| "Please help me say this sentence better.", | |
| "I can listen first and then repeat.", | |
| "My voice is getting clearer with practice.", | |
| ], | |
| } | |
| LEVEL_STARTER_PHRASES = { | |
| "English": { | |
| "A1": [ | |
| "I can say this slowly.", | |
| "My voice is clear.", | |
| "I listen and repeat.", | |
| ], | |
| "A2": STARTER_PHRASES["English"], | |
| "B1": [ | |
| "I want to explain my idea clearly today.", | |
| "Please listen while I repeat the sentence.", | |
| "I can speak with better rhythm and timing.", | |
| ], | |
| "B2": [ | |
| "I am practicing steady speech with natural rhythm.", | |
| "Clear pronunciation helps my ideas sound more confident.", | |
| "I can repeat the line while keeping the same pace.", | |
| ], | |
| "C1": [ | |
| "I am refining my speech so each phrase sounds precise and natural.", | |
| "I want my pacing, stress, and intonation to match the speaker.", | |
| "Careful listening helps me improve the shape of every sentence.", | |
| ], | |
| "C2": [ | |
| "I am polishing subtle rhythm, emphasis, and tone in connected speech.", | |
| "Shadowing helps me reproduce fluent speech patterns with greater control.", | |
| "I can adapt my delivery while preserving clarity, timing, and expression.", | |
| ], | |
| } | |
| } | |
| CSS = """ | |
| :root { | |
| --lpl-navy: #0d2547; | |
| --lpl-ink: #12233f; | |
| --lpl-muted: #59708f; | |
| --lpl-soft: #eef9fd; | |
| --lpl-panel: #ffffff; | |
| --lpl-line: #d6e7ef; | |
| --lpl-teal: #11a99d; | |
| --lpl-teal-dark: #05877f; | |
| --lpl-coral: #ff6258; | |
| --lpl-yellow: #ffc234; | |
| --lpl-cream: #fff6df; | |
| --lpl-shadow: 0 18px 44px rgba(13, 37, 71, 0.11); | |
| color-scheme: light; | |
| } | |
| html, | |
| body { | |
| background: linear-gradient(180deg, #f6fcff 0%, #eef8fe 48%, #f9fdff 100%); | |
| color: var(--lpl-ink); | |
| color-scheme: light !important; | |
| } | |
| .gradio-container { | |
| --body-background-fill: transparent; | |
| --body-text-color: var(--lpl-ink); | |
| --block-background-fill: transparent; | |
| --block-border-color: transparent; | |
| --block-info-text-color: var(--lpl-muted); | |
| --input-background-fill: #edf8fb; | |
| --input-border-color: transparent; | |
| --input-placeholder-color: #59708f; | |
| --input-text-color: var(--lpl-navy); | |
| --button-primary-background-fill: var(--lpl-teal); | |
| --button-primary-background-fill-hover: var(--lpl-teal-dark); | |
| --button-primary-text-color: #ffffff; | |
| max-width: 1480px !important; | |
| margin: 0 auto !important; | |
| padding: 18px 24px 22px !important; | |
| font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif !important; | |
| background: transparent !important; | |
| color: var(--lpl-ink) !important; | |
| color-scheme: light !important; | |
| } | |
| .gradio-container footer, | |
| .gradio-container .prose h1, | |
| .gradio-container .prose h2, | |
| .gradio-container .prose h3 { | |
| display: none !important; | |
| } | |
| .gradio-container .generating, | |
| .gradio-container .pending, | |
| .gradio-container .loading { | |
| opacity: 1 !important; | |
| filter: none !important; | |
| color: var(--lpl-ink) !important; | |
| } | |
| .gradio-container .generating *, | |
| .gradio-container .pending *, | |
| .gradio-container .loading * { | |
| opacity: 1 !important; | |
| filter: none !important; | |
| } | |
| .gradio-container .generating::before, | |
| .gradio-container .generating::after, | |
| .gradio-container .pending::before, | |
| .gradio-container .pending::after { | |
| display: none !important; | |
| } | |
| .lpl-topbar { | |
| min-height: 70px; | |
| display: flex; | |
| align-items: center; | |
| justify-content: space-between; | |
| gap: 20px; | |
| padding: 6px 2px 18px; | |
| } | |
| .lpl-brand { | |
| display: flex; | |
| align-items: center; | |
| flex-wrap: wrap; | |
| gap: 12px; | |
| color: var(--lpl-navy); | |
| font-weight: 830; | |
| font-size: clamp(1.32rem, 2vw, 2rem); | |
| letter-spacing: 0; | |
| } | |
| .lpl-mark { | |
| width: 54px; | |
| height: 42px; | |
| border-radius: 24px 24px 24px 8px; | |
| background: linear-gradient(145deg, var(--lpl-teal), #19c6bd); | |
| position: relative; | |
| box-shadow: 0 10px 20px rgba(17, 169, 157, 0.22); | |
| } | |
| .lpl-mark::before { | |
| content: ""; | |
| position: absolute; | |
| width: 7px; | |
| height: 7px; | |
| left: 14px; | |
| top: 16px; | |
| border-radius: 99px; | |
| background: white; | |
| box-shadow: 13px 0 0 white, 26px 0 0 white; | |
| } | |
| .lpl-divider { | |
| width: 1px; | |
| height: 28px; | |
| background: #c9dce7; | |
| } | |
| .lpl-product { | |
| color: var(--lpl-teal-dark); | |
| font-weight: 780; | |
| font-size: clamp(1rem, 1.5vw, 1.45rem); | |
| } | |
| .lpl-tagline { | |
| color: var(--lpl-muted); | |
| font-weight: 740; | |
| font-size: clamp(0.95rem, 1.35vw, 1.18rem); | |
| } | |
| .lpl-steps { | |
| display: grid; | |
| grid-template-columns: repeat(4, minmax(0, 1fr)); | |
| gap: 12px; | |
| margin-bottom: 20px; | |
| } | |
| .lpl-step { | |
| min-height: 88px; | |
| border-radius: 28px; | |
| border: 1px solid #e5f0f5; | |
| background: rgba(255, 255, 255, 0.92); | |
| color: var(--lpl-ink); | |
| display: flex; | |
| align-items: center; | |
| gap: 16px; | |
| padding: 16px 22px; | |
| box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); | |
| } | |
| .lpl-step.is-active { | |
| color: #ffffff; | |
| background: linear-gradient(135deg, var(--lpl-teal), #13bbb2); | |
| border-color: transparent; | |
| } | |
| .lpl-step-number { | |
| width: 46px; | |
| height: 46px; | |
| border-radius: 999px; | |
| display: grid; | |
| place-items: center; | |
| flex: 0 0 auto; | |
| background: #ffffff; | |
| color: var(--lpl-navy); | |
| font-size: 1.18rem; | |
| font-weight: 850; | |
| box-shadow: inset 0 0 0 1px rgba(13, 37, 71, 0.08); | |
| } | |
| .lpl-step-label { | |
| font-size: clamp(1.05rem, 1.7vw, 1.42rem); | |
| font-weight: 820; | |
| } | |
| .lpl-layout { | |
| align-items: stretch !important; | |
| } | |
| .lpl-choose, | |
| .lpl-practice, | |
| .lpl-feedback { | |
| background: rgba(255, 255, 255, 0.94); | |
| border: 1px solid #e2eef4; | |
| border-radius: 24px; | |
| box-shadow: var(--lpl-shadow); | |
| padding: 22px !important; | |
| color: var(--lpl-navy) !important; | |
| color-scheme: light !important; | |
| } | |
| .lpl-choose { | |
| min-height: 650px; | |
| } | |
| .lpl-choose *, | |
| .lpl-practice *, | |
| .lpl-feedback * { | |
| color-scheme: light !important; | |
| } | |
| .lpl-card-title { | |
| margin: 0 0 18px; | |
| color: var(--lpl-navy); | |
| font-weight: 860; | |
| font-size: 1.32rem; | |
| } | |
| .lpl-field-label { | |
| color: var(--lpl-navy); | |
| font-weight: 760; | |
| font-size: 0.95rem; | |
| margin: 14px 0 8px; | |
| } | |
| .lpl-choose .wrap, | |
| .lpl-choose label, | |
| .lpl-choose .block-info, | |
| .lpl-choose .label-wrap, | |
| .lpl-choose .container, | |
| .lpl-choose .form { | |
| color: var(--lpl-navy) !important; | |
| } | |
| .lpl-choose .form, | |
| .lpl-choose .block, | |
| .lpl-choose .wrap, | |
| .lpl-choose .container, | |
| .lpl-choose [data-testid="block-label"] { | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| .lpl-choose input, | |
| .lpl-choose textarea, | |
| .lpl-choose select, | |
| .lpl-choose .wrap, | |
| .lpl-choose [data-testid="dropdown"], | |
| .lpl-choose [role="textbox"], | |
| .lpl-choose [role="combobox"] { | |
| border-radius: 999px !important; | |
| background: #edf8fb !important; | |
| border-color: transparent !important; | |
| color: var(--lpl-navy) !important; | |
| } | |
| .lpl-choose input::placeholder, | |
| .lpl-choose textarea::placeholder { | |
| color: #6f8299 !important; | |
| opacity: 1 !important; | |
| } | |
| .lpl-choose textarea { | |
| border-radius: 22px !important; | |
| min-height: 82px !important; | |
| } | |
| .lpl-choose .wrap:focus-within, | |
| .lpl-choose [data-testid="dropdown"]:focus-within { | |
| box-shadow: 0 0 0 3px rgba(17, 169, 157, 0.18) !important; | |
| } | |
| .gradio-container [role="listbox"], | |
| .gradio-container [data-testid="dropdown-options"], | |
| .gradio-container .options, | |
| .gradio-container .dropdown-options, | |
| .gradio-container .select-options, | |
| body [role="listbox"], | |
| body [data-testid="dropdown-options"], | |
| body .options, | |
| body .dropdown-options, | |
| body .select-options { | |
| background: #ffffff !important; | |
| color: var(--lpl-navy) !important; | |
| border: 1px solid #d7eaf1 !important; | |
| border-radius: 18px !important; | |
| box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important; | |
| } | |
| .gradio-container [role="option"], | |
| .gradio-container .option, | |
| body [role="option"], | |
| body .option { | |
| background: #ffffff !important; | |
| color: var(--lpl-navy) !important; | |
| } | |
| .gradio-container [role="option"]:hover, | |
| .gradio-container [role="option"][aria-selected="true"], | |
| .gradio-container .option:hover, | |
| .gradio-container .option.selected, | |
| body [role="option"]:hover, | |
| body [role="option"][aria-selected="true"], | |
| body .option:hover, | |
| body .option.selected { | |
| background: #edf8fb !important; | |
| color: var(--lpl-teal-dark) !important; | |
| } | |
| body .toast-wrap, | |
| body .toast, | |
| body [data-testid="toast"], | |
| .gradio-container .toast-wrap, | |
| .gradio-container .toast, | |
| .gradio-container [data-testid="toast"] { | |
| background: #ffffff !important; | |
| color: var(--lpl-navy) !important; | |
| border-color: #d7eaf1 !important; | |
| box-shadow: 0 18px 38px rgba(13, 37, 71, 0.16) !important; | |
| } | |
| body .toast *, | |
| body [data-testid="toast"] *, | |
| .gradio-container .toast *, | |
| .gradio-container [data-testid="toast"] * { | |
| color: var(--lpl-navy) !important; | |
| } | |
| .lpl-level-radio .wrap, | |
| .lpl-voice-radio .wrap { | |
| background: transparent !important; | |
| } | |
| .lpl-level-radio, | |
| .lpl-voice-radio, | |
| .lpl-level-radio .wrap, | |
| .lpl-voice-radio .wrap, | |
| .lpl-level-radio .container, | |
| .lpl-voice-radio .container { | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| .lpl-level-radio label, | |
| .lpl-voice-radio label { | |
| min-height: 44px !important; | |
| border-radius: 999px !important; | |
| background: #edf8fb !important; | |
| border: 1px solid transparent !important; | |
| color: var(--lpl-navy) !important; | |
| font-weight: 760 !important; | |
| } | |
| .lpl-level-radio label span, | |
| .lpl-voice-radio label span { | |
| color: var(--lpl-navy) !important; | |
| } | |
| .lpl-level-radio input:checked + span, | |
| .lpl-voice-radio input:checked + span { | |
| color: var(--lpl-teal-dark) !important; | |
| } | |
| .lpl-level-radio label:has(input:checked), | |
| .lpl-voice-radio label:has(input:checked), | |
| .lpl-level-radio label[aria-checked="true"], | |
| .lpl-voice-radio label[aria-checked="true"], | |
| .lpl-level-radio [role="radio"][aria-checked="true"], | |
| .lpl-voice-radio [role="radio"][aria-checked="true"], | |
| .lpl-level-radio label.selected, | |
| .lpl-voice-radio label.selected { | |
| background: linear-gradient(135deg, var(--lpl-teal), #18c7be) !important; | |
| border-color: transparent !important; | |
| color: #ffffff !important; | |
| box-shadow: 0 12px 22px rgba(17, 169, 157, 0.24) !important; | |
| } | |
| .lpl-level-radio label:has(input:checked) *, | |
| .lpl-voice-radio label:has(input:checked) *, | |
| .lpl-level-radio label[aria-checked="true"] *, | |
| .lpl-voice-radio label[aria-checked="true"] *, | |
| .lpl-level-radio [role="radio"][aria-checked="true"] *, | |
| .lpl-voice-radio [role="radio"][aria-checked="true"] *, | |
| .lpl-level-radio label.selected *, | |
| .lpl-voice-radio label.selected * { | |
| color: #ffffff !important; | |
| } | |
| .lpl-level-radio label:hover, | |
| .lpl-voice-radio label:hover { | |
| border-color: #9fdedb !important; | |
| transform: translateY(-1px); | |
| } | |
| .lpl-main-btn, | |
| .lpl-score-btn { | |
| width: 100%; | |
| min-height: 64px !important; | |
| border-radius: 999px !important; | |
| border: 0 !important; | |
| color: #ffffff !important; | |
| font-size: 1.25rem !important; | |
| font-weight: 850 !important; | |
| box-shadow: 0 16px 32px rgba(17, 169, 157, 0.24) !important; | |
| } | |
| .lpl-main-btn { | |
| background: linear-gradient(135deg, var(--lpl-teal), #08bcb3) !important; | |
| margin-top: 16px !important; | |
| } | |
| .lpl-score-btn { | |
| background: linear-gradient(135deg, var(--lpl-navy), #123866) !important; | |
| box-shadow: 0 18px 36px rgba(13, 37, 71, 0.24) !important; | |
| } | |
| .lpl-main-btn:disabled, | |
| .lpl-main-btn[disabled], | |
| .lpl-score-btn:disabled, | |
| .lpl-score-btn[disabled] { | |
| opacity: 0.62 !important; | |
| cursor: wait !important; | |
| } | |
| .lpl-score-status { | |
| min-height: 32px; | |
| margin: 10px 0 16px; | |
| display: flex; | |
| align-items: center; | |
| gap: 10px; | |
| color: var(--lpl-muted); | |
| font-size: 0.96rem; | |
| font-weight: 760; | |
| } | |
| .lpl-score-status.is-loading { | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-spinner { | |
| width: 18px; | |
| height: 18px; | |
| border-radius: 999px; | |
| border: 3px solid #dff0f4; | |
| border-top-color: var(--lpl-teal); | |
| animation: lpl-spin 0.8s linear infinite; | |
| } | |
| @keyframes lpl-spin { | |
| to { | |
| transform: rotate(360deg); | |
| } | |
| } | |
| .lpl-practice { | |
| min-height: 650px; | |
| } | |
| .lpl-phrase { | |
| min-height: 225px; | |
| border-radius: 24px; | |
| background: #ffffff; | |
| border: 1px solid #e5eef4; | |
| box-shadow: 0 14px 34px rgba(13, 37, 71, 0.08); | |
| display: flex; | |
| flex-direction: column; | |
| justify-content: center; | |
| padding: 26px 30px; | |
| text-align: center; | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .lpl-phrase::after { | |
| content: ""; | |
| width: 52%; | |
| height: 2px; | |
| border-radius: 99px; | |
| background: repeating-linear-gradient(90deg, #9ee4e0 0 12px, transparent 12px 24px); | |
| margin: 22px auto 0; | |
| } | |
| .lpl-phrase.is-loading { | |
| border-color: #b9e9e6; | |
| background: | |
| radial-gradient(circle at 50% 42%, rgba(17, 169, 157, 0.13), transparent 34%), | |
| #ffffff; | |
| } | |
| .lpl-phrase.is-loading::before { | |
| content: ""; | |
| position: absolute; | |
| inset: -45% auto auto 50%; | |
| width: 260px; | |
| height: 260px; | |
| border-radius: 999px; | |
| border: 2px solid rgba(17, 169, 157, 0.22); | |
| transform: translateX(-50%); | |
| animation: lpl-breathe 1.5s ease-in-out infinite; | |
| } | |
| .lpl-phrase-meta { | |
| color: var(--lpl-muted); | |
| font-weight: 720; | |
| margin-bottom: 12px; | |
| position: relative; | |
| z-index: 1; | |
| } | |
| .lpl-phrase-text { | |
| color: var(--lpl-navy); | |
| font-size: clamp(2rem, 4.5vw, 3.35rem); | |
| line-height: 1.25; | |
| font-weight: 900; | |
| letter-spacing: 0; | |
| position: relative; | |
| z-index: 1; | |
| } | |
| .lpl-loading-line { | |
| max-width: 780px; | |
| margin: 16px auto 0; | |
| color: var(--lpl-muted); | |
| font-size: 1rem; | |
| line-height: 1.45; | |
| font-weight: 720; | |
| position: relative; | |
| z-index: 1; | |
| } | |
| .lpl-voice-loader { | |
| min-height: 42px; | |
| margin: 18px auto 2px; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| gap: 8px; | |
| position: relative; | |
| z-index: 1; | |
| } | |
| .lpl-voice-loader span { | |
| width: 10px; | |
| height: 18px; | |
| border-radius: 999px; | |
| background: linear-gradient(180deg, var(--lpl-teal), #2bd6cd); | |
| animation: lpl-wave 0.82s ease-in-out infinite; | |
| } | |
| .lpl-voice-loader span:nth-child(2) { | |
| animation-delay: 0.08s; | |
| } | |
| .lpl-voice-loader span:nth-child(3) { | |
| animation-delay: 0.16s; | |
| } | |
| .lpl-voice-loader span:nth-child(4) { | |
| animation-delay: 0.24s; | |
| } | |
| .lpl-voice-loader span:nth-child(5) { | |
| animation-delay: 0.32s; | |
| } | |
| @keyframes lpl-wave { | |
| 0%, | |
| 100% { | |
| transform: scaleY(0.58); | |
| opacity: 0.55; | |
| } | |
| 50% { | |
| transform: scaleY(1.7); | |
| opacity: 1; | |
| } | |
| } | |
| @keyframes lpl-breathe { | |
| 0%, | |
| 100% { | |
| opacity: 0.24; | |
| transform: translateX(-50%) scale(0.82); | |
| } | |
| 50% { | |
| opacity: 0.48; | |
| transform: translateX(-50%) scale(1); | |
| } | |
| } | |
| .lpl-media-grid { | |
| display: grid; | |
| grid-template-columns: repeat(2, minmax(0, 1fr)); | |
| gap: 16px; | |
| margin: 18px 0; | |
| } | |
| .lpl-audio-panel { | |
| border-radius: 24px; | |
| border: 1px solid #e4eef4; | |
| background: #ffffff; | |
| overflow: hidden; | |
| box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); | |
| } | |
| .lpl-audio-panel.is-record { | |
| background: linear-gradient(180deg, #fff8f6 0%, #ffffff 52%); | |
| } | |
| .lpl-audio-head { | |
| display: flex; | |
| align-items: center; | |
| justify-content: space-between; | |
| gap: 12px; | |
| min-height: 64px; | |
| padding: 16px 18px 8px; | |
| color: var(--lpl-navy); | |
| font-weight: 820; | |
| } | |
| .lpl-audio-head span { | |
| color: var(--lpl-muted); | |
| font-size: 0.9rem; | |
| font-weight: 700; | |
| } | |
| .lpl-audio-panel .audio-container, | |
| .lpl-audio-panel .block, | |
| .lpl-audio-panel .wrap, | |
| .lpl-audio-panel .form { | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| background: transparent !important; | |
| color: var(--lpl-navy) !important; | |
| } | |
| .lpl-audio-panel audio { | |
| width: 100% !important; | |
| min-height: 46px !important; | |
| padding: 0 18px 18px !important; | |
| box-sizing: border-box !important; | |
| color-scheme: light !important; | |
| } | |
| .lpl-audio-panel .label-wrap, | |
| .lpl-audio-panel .download, | |
| .lpl-audio-panel .share { | |
| display: none !important; | |
| } | |
| .lpl-native-player, | |
| .lpl-native-recorder { | |
| padding: 0 18px 18px; | |
| } | |
| .lpl-native-player audio, | |
| .lpl-native-recorder audio { | |
| width: 100%; | |
| min-height: 46px; | |
| padding: 0 !important; | |
| color-scheme: light !important; | |
| } | |
| .lpl-player-empty, | |
| .lpl-recorder-status { | |
| min-height: 46px; | |
| display: flex; | |
| align-items: center; | |
| color: var(--lpl-muted); | |
| font-weight: 720; | |
| } | |
| .lpl-player-empty.is-loading, | |
| .lpl-status-card.is-loading { | |
| color: var(--lpl-navy); | |
| gap: 10px; | |
| } | |
| .lpl-recorder-actions { | |
| display: flex; | |
| align-items: center; | |
| flex-wrap: wrap; | |
| gap: 12px; | |
| margin-bottom: 12px; | |
| } | |
| .lpl-recorder-actions button { | |
| min-height: 48px; | |
| border: 0; | |
| border-radius: 999px; | |
| padding: 0 20px; | |
| font-weight: 840; | |
| cursor: pointer; | |
| color: #ffffff; | |
| background: var(--lpl-coral); | |
| } | |
| .lpl-recorder-actions button[data-stop] { | |
| color: var(--lpl-coral); | |
| background: #fff2f1; | |
| border: 1px solid #ffbcb7; | |
| } | |
| .lpl-recorder-actions button:disabled { | |
| opacity: 0.5; | |
| cursor: not-allowed; | |
| } | |
| .lpl-recorder-meter { | |
| height: 9px; | |
| border-radius: 999px; | |
| overflow: hidden; | |
| background: #edf3f6; | |
| margin: 0 0 12px; | |
| } | |
| .lpl-recorder-fill { | |
| height: 100%; | |
| width: 0%; | |
| border-radius: 999px; | |
| background: linear-gradient(90deg, var(--lpl-coral), var(--lpl-yellow)); | |
| } | |
| .lpl-status { | |
| min-height: 34px; | |
| color: var(--lpl-muted); | |
| font-size: 0.98rem; | |
| font-weight: 680; | |
| } | |
| .lpl-status-card { | |
| color: var(--lpl-muted); | |
| font-size: 0.98rem; | |
| line-height: 1.45; | |
| display: flex; | |
| align-items: center; | |
| gap: 10px; | |
| } | |
| .lpl-feedback { | |
| min-height: 650px; | |
| } | |
| .lpl-score-empty, | |
| .lpl-score-card { | |
| border-radius: 24px; | |
| background: #ffffff; | |
| border: 1px solid #e4eef4; | |
| box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); | |
| padding: 22px; | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-score-empty { | |
| min-height: 160px; | |
| display: flex; | |
| align-items: center; | |
| color: var(--lpl-muted); | |
| font-weight: 720; | |
| line-height: 1.45; | |
| } | |
| .lpl-score-empty.is-error { | |
| align-items: flex-start; | |
| flex-direction: column; | |
| gap: 8px; | |
| border-color: #ffd0cc; | |
| background: #fff8f7; | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-score-empty.is-error strong { | |
| color: var(--lpl-coral); | |
| font-size: 1.08rem; | |
| } | |
| .lpl-score-top { | |
| display: flex; | |
| align-items: center; | |
| gap: 18px; | |
| margin-bottom: 20px; | |
| } | |
| .lpl-ring { | |
| --score: 0; | |
| width: 132px; | |
| height: 132px; | |
| border-radius: 999px; | |
| display: grid; | |
| place-items: center; | |
| flex: 0 0 auto; | |
| background: | |
| radial-gradient(circle at center, white 0 56%, transparent 58%), | |
| conic-gradient(var(--lpl-teal) calc(var(--score) * 1%), #e6f0f5 0); | |
| color: var(--lpl-navy); | |
| font-size: 2.55rem; | |
| font-weight: 900; | |
| } | |
| .lpl-score-copy strong { | |
| display: block; | |
| font-size: 1.42rem; | |
| margin-bottom: 8px; | |
| } | |
| .lpl-score-copy span { | |
| color: var(--lpl-muted); | |
| line-height: 1.45; | |
| font-weight: 650; | |
| } | |
| .lpl-meter { | |
| margin: 14px 0; | |
| } | |
| .lpl-meter-row { | |
| display: flex; | |
| align-items: center; | |
| justify-content: space-between; | |
| gap: 16px; | |
| color: var(--lpl-navy); | |
| font-weight: 760; | |
| margin-bottom: 7px; | |
| } | |
| .lpl-meter-row span:last-child { | |
| color: var(--lpl-teal-dark); | |
| } | |
| .lpl-bar { | |
| height: 9px; | |
| background: #e8f1f5; | |
| border-radius: 999px; | |
| overflow: hidden; | |
| } | |
| .lpl-fill { | |
| height: 100%; | |
| width: 0; | |
| border-radius: 999px; | |
| background: linear-gradient(90deg, var(--lpl-teal), #1f78c8); | |
| } | |
| .lpl-feedback-card, | |
| .lpl-next-card { | |
| margin-top: 16px; | |
| border-radius: 24px; | |
| background: #ffffff; | |
| border: 1px solid #e4eef4; | |
| box-shadow: 0 12px 26px rgba(13, 37, 71, 0.08); | |
| padding: 20px; | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-feedback-card h3, | |
| .lpl-next-card h3 { | |
| margin: 0 0 10px; | |
| font-size: 1.14rem; | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-feedback-card p, | |
| .lpl-next-card p { | |
| margin: 0; | |
| color: #37506f; | |
| font-weight: 650; | |
| line-height: 1.48; | |
| } | |
| .lpl-feedback-card ol { | |
| margin: 14px 0 0 22px; | |
| padding: 0; | |
| color: #37506f; | |
| font-weight: 650; | |
| line-height: 1.52; | |
| } | |
| .lpl-feedback-card li { | |
| margin: 7px 0; | |
| } | |
| .lpl-words { | |
| margin-top: 14px; | |
| padding-top: 14px; | |
| border-top: 1px solid #e7eff4; | |
| color: var(--lpl-muted); | |
| font-weight: 650; | |
| line-height: 1.5; | |
| } | |
| .lpl-words strong { | |
| color: var(--lpl-navy); | |
| } | |
| .lpl-bottom-next { | |
| margin-top: 18px; | |
| border-radius: 24px; | |
| background: linear-gradient(180deg, #fff8df 0%, #fff2c9 100%); | |
| border: 1px solid #ffe5a1; | |
| padding: 18px 22px; | |
| display: flex; | |
| gap: 18px; | |
| align-items: center; | |
| color: var(--lpl-navy); | |
| box-shadow: 0 16px 28px rgba(255, 194, 52, 0.16); | |
| } | |
| .lpl-bottom-next strong { | |
| font-size: 1.12rem; | |
| } | |
| .lpl-bottom-next span { | |
| color: #37506f; | |
| font-weight: 700; | |
| } | |
| .lpl-footer { | |
| margin: 22px auto 0; | |
| padding: 14px 18px; | |
| border: 1px solid #dbeaf1; | |
| border-radius: 999px; | |
| background: rgba(255, 255, 255, 0.82); | |
| color: #37506f; | |
| font-size: 0.94rem; | |
| font-weight: 700; | |
| line-height: 1.45; | |
| text-align: center; | |
| box-shadow: 0 14px 28px rgba(13, 37, 71, 0.07); | |
| } | |
| .lpl-footer strong { | |
| color: var(--lpl-navy); | |
| font-weight: 860; | |
| } | |
| @media (max-width: 1080px) { | |
| .lpl-layout { | |
| flex-direction: column !important; | |
| } | |
| .lpl-layout > .column, | |
| .lpl-layout > div { | |
| width: 100% !important; | |
| min-width: 0 !important; | |
| } | |
| .lpl-choose, | |
| .lpl-practice, | |
| .lpl-feedback { | |
| min-height: auto; | |
| } | |
| .lpl-steps, | |
| .lpl-media-grid { | |
| grid-template-columns: 1fr 1fr; | |
| } | |
| } | |
| @media (max-width: 760px) { | |
| .gradio-container { | |
| padding: 14px 12px 18px !important; | |
| } | |
| .lpl-topbar { | |
| align-items: flex-start; | |
| flex-direction: column; | |
| } | |
| .lpl-steps, | |
| .lpl-media-grid { | |
| grid-template-columns: 1fr; | |
| } | |
| .lpl-footer { | |
| border-radius: 22px; | |
| } | |
| .lpl-step { | |
| min-height: 66px; | |
| border-radius: 20px; | |
| } | |
| .lpl-choose, | |
| .lpl-practice, | |
| .lpl-feedback { | |
| min-height: auto; | |
| padding: 16px !important; | |
| } | |
| .lpl-phrase { | |
| min-height: 180px; | |
| padding: 22px 18px; | |
| } | |
| .lpl-score-top { | |
| align-items: flex-start; | |
| flex-direction: column; | |
| } | |
| .lpl-ring { | |
| width: 112px; | |
| height: 112px; | |
| } | |
| } | |
| """ | |
| class ScoreResult: | |
| overall: int | |
| voice_shape: int | |
| timing: int | |
| rhythm: int | |
| melody: int | |
| reference_duration: float | |
| attempt_duration: float | |
| duration_ratio: float | |
| evidence: dict[str, Any] | |
| def clamp_score(value: float) -> int: | |
| if not math.isfinite(value): | |
| return 0 | |
| return int(round(max(0.0, min(100.0, value)))) | |
| def clean_text(value: Any, limit: int = 500) -> str: | |
| value = re.sub(r"\s+", " ", str(value or "").strip()) | |
| return value[:limit] | |
| def gpu_available() -> bool: | |
| try: | |
| import torch | |
| return bool(torch.cuda.is_available()) | |
| except Exception: | |
| return False | |
| def require_gpu(action: str) -> None: | |
| if not gpu_available(): | |
| raise gr.Error( | |
| f"{action} needs GPU hardware. This Space is currently on CPU hardware; " | |
| "switch it to GPU or ZeroGPU, then try again." | |
| ) | |
| def language_fallback(language: str) -> str: | |
| return FALLBACK_PHRASES.get(language, FALLBACK_PHRASES["English"]) | |
| def starter_phrases(language: str, level: str = "A2") -> list[str]: | |
| level_phrases = LEVEL_STARTER_PHRASES.get(language, {}).get(level) | |
| if level_phrases: | |
| return level_phrases | |
| phrases = STARTER_PHRASES.get(language) | |
| if phrases: | |
| return phrases | |
| return [language_fallback(language)] | |
| def has_prebuilt_sample(language: str, target_text: str) -> bool: | |
| return clean_text(target_text, MAX_TARGET_CHARS) == clean_text(language_fallback(language), MAX_TARGET_CHARS) | |
| def get_tts_model() -> Any: | |
| from voxcpm import VoxCPM | |
| return VoxCPM.from_pretrained(TTS_MODEL_ID, load_denoiser=False) | |
| def get_judge_model() -> tuple[Any, Any]: | |
| import torch | |
| from transformers import AutoTokenizer | |
| model_errors: list[str] = [] | |
| model_classes: list[Any] = [] | |
| try: | |
| from transformers import AutoModelForMultimodalLM | |
| model_classes.append(AutoModelForMultimodalLM) | |
| except Exception as exc: | |
| model_errors.append(str(exc)) | |
| try: | |
| from transformers import AutoModelForCausalLM | |
| model_classes.append(AutoModelForCausalLM) | |
| except Exception as exc: | |
| model_errors.append(str(exc)) | |
| tokenizer = AutoTokenizer.from_pretrained(JUDGE_MODEL_ID, trust_remote_code=True) | |
| last_error: Exception | None = None | |
| for model_class in model_classes: | |
| try: | |
| model = model_class.from_pretrained( | |
| JUDGE_MODEL_ID, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ) | |
| model.eval() | |
| return tokenizer, model | |
| except Exception as exc: | |
| last_error = exc | |
| model_errors.append(str(exc)) | |
| raise RuntimeError("Could not load the judging model: " + "; ".join(model_errors)) from last_error | |
| def get_english_asr_model() -> tuple[Any, Any, Any]: | |
| import torch | |
| from transformers import AutoModelForCTC, AutoProcessor | |
| processor = AutoProcessor.from_pretrained(ENGLISH_ASR_MODEL_ID) | |
| model = AutoModelForCTC.from_pretrained(ENGLISH_ASR_MODEL_ID) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| model.eval() | |
| return processor, model, device | |
| def run_judge(messages: list[dict[str, str]], max_new_tokens: int = 360) -> str: | |
| import torch | |
| tokenizer, model = get_judge_model() | |
| template_kwargs = { | |
| "tokenize": True, | |
| "add_generation_prompt": True, | |
| "return_dict": True, | |
| "return_tensors": "pt", | |
| } | |
| try: | |
| inputs = tokenizer.apply_chat_template(messages, enable_thinking=False, **template_kwargs) | |
| except TypeError: | |
| inputs = tokenizer.apply_chat_template(messages, **template_kwargs) | |
| device = next(model.parameters()).device | |
| inputs = {key: value.to(device) for key, value in inputs.items()} | |
| with torch.inference_mode(): | |
| output_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| repetition_penalty=1.02, | |
| ) | |
| generated = output_ids[0][inputs["input_ids"].shape[-1] :] | |
| return tokenizer.decode(generated, skip_special_tokens=True).strip() | |
| def selected_text(language: str, level: str, custom_text: str, starter_counter: int = 0) -> tuple[str, str, int]: | |
| custom_text = clean_text(custom_text, MAX_TARGET_CHARS) | |
| if custom_text: | |
| return custom_text, "custom", -1 | |
| phrases = starter_phrases(language, level) | |
| line_index = int(starter_counter or 0) % max(1, len(phrases)) | |
| return phrases[line_index], "starter", line_index | |
| def build_voice_text(target_text: str, voice_style: str) -> str: | |
| description = VOICE_STYLES.get(voice_style, VOICE_STYLES["Careful"]) | |
| return f"({description}){target_text}" | |
| def slugify(value: str) -> str: | |
| slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") | |
| return slug or "item" | |
| def audio_cache_key(language: str, level: str, voice_style: str, target_text: str) -> str: | |
| payload = json.dumps( | |
| { | |
| "version": TTS_CACHE_VERSION, | |
| "language": language, | |
| "level": level, | |
| "voice_style": voice_style, | |
| "target_text": target_text, | |
| "cfg_value": FAST_TTS_CFG_VALUE, | |
| "steps": FAST_TTS_STEPS, | |
| }, | |
| ensure_ascii=False, | |
| sort_keys=True, | |
| ) | |
| return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:18] | |
| def sample_relpath(language: str, voice_style: str) -> str: | |
| return f"{slugify(language)}/{slugify(voice_style)}.wav" | |
| def sample_url(language: str, voice_style: str) -> str: | |
| return f"{SAMPLE_BASE_URL}/{sample_relpath(language, voice_style)}" | |
| def audio_data_uri(path: str) -> str: | |
| data = Path(path).read_bytes() | |
| return "data:audio/wav;base64," + base64.b64encode(data).decode("ascii") | |
| def render_reference_player( | |
| path: str | None = None, | |
| note: str = "Voice appears here after Make voice.", | |
| loading: bool = False, | |
| ) -> str: | |
| if not path or not Path(path).exists(): | |
| spinner = '<span class="lpl-spinner" aria-hidden="true"></span>' if loading else "" | |
| loading_class = " is-loading" if loading else "" | |
| return ( | |
| '<div class="lpl-native-player">' | |
| f'<div class="lpl-player-empty{loading_class}">{spinner}<span>{html.escape(note)}</span></div>' | |
| "</div>" | |
| ) | |
| return ( | |
| '<div class="lpl-native-player">' | |
| f'<audio controls preload="auto" src="{audio_data_uri(path)}"></audio>' | |
| "</div>" | |
| ) | |
| def ensure_sample_audio(language: str, voice_style: str, cache_key: str) -> str | None: | |
| path = GENERATED_DIR / f"sample_{cache_key}.wav" | |
| if path.exists(): | |
| return str(path) | |
| url = sample_url(language, voice_style) | |
| try: | |
| with urllib.request.urlopen(url, timeout=25) as response: | |
| raw = response.read() | |
| except (urllib.error.URLError, TimeoutError, OSError) as exc: | |
| print(f"Sample audio unavailable for {language}/{voice_style}: {exc}") | |
| return None | |
| if len(raw) < 1000: | |
| print(f"Sample audio too small for {language}/{voice_style}: {len(raw)} bytes") | |
| return None | |
| path.write_bytes(raw) | |
| try: | |
| audio, sr = load_audio(str(path)) | |
| if len(audio) / sr < 0.5: | |
| path.unlink(missing_ok=True) | |
| return None | |
| except Exception as exc: | |
| print(f"Sample audio invalid for {language}/{voice_style}: {exc}") | |
| path.unlink(missing_ok=True) | |
| return None | |
| return str(path) | |
| def normalize_wav(wav: np.ndarray, sample_rate: int) -> np.ndarray: | |
| wav = np.asarray(wav, dtype=np.float32).squeeze() | |
| if wav.ndim > 1: | |
| wav = np.mean(wav, axis=-1) | |
| if sample_rate != TARGET_SAMPLE_RATE: | |
| wav = librosa.resample(wav, orig_sr=sample_rate, target_sr=TARGET_SAMPLE_RATE) | |
| peak = float(np.max(np.abs(wav))) if wav.size else 0.0 | |
| if peak > 0: | |
| wav = wav / peak * 0.94 | |
| return wav | |
| def synthesize_reference_file(prompt_text: str, output_path: str) -> None: | |
| require_gpu("Making the voice") | |
| model = get_tts_model() | |
| try: | |
| wav = model.generate( | |
| text=prompt_text, | |
| cfg_value=FAST_TTS_CFG_VALUE, | |
| inference_timesteps=FAST_TTS_STEPS, | |
| retry_badcase=False, | |
| denoise=False, | |
| ) | |
| except TypeError: | |
| wav = model.generate( | |
| text=prompt_text, | |
| cfg_value=FAST_TTS_CFG_VALUE, | |
| inference_timesteps=FAST_TTS_STEPS, | |
| ) | |
| sample_rate = int(getattr(getattr(model, "tts_model", None), "sample_rate", 48000)) | |
| wav = normalize_wav(wav, sample_rate) | |
| sf.write(output_path, wav, TARGET_SAMPLE_RATE, subtype="PCM_16") | |
| def create_practice_audio( | |
| language: str, | |
| level: str, | |
| voice_style: str, | |
| custom_text: str, | |
| starter_counter: int, | |
| ) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, int, Any]: | |
| target_text, source, line_index = selected_text(language, level, custom_text, starter_counter) | |
| cache_key = audio_cache_key(language, level, voice_style, target_text) | |
| output_path = GENERATED_DIR / f"reference_{cache_key}.wav" | |
| cached_path = AUDIO_CACHE.get(cache_key) | |
| next_counter = int(starter_counter or 0) + (1 if source == "starter" else 0) | |
| if source == "starter" and line_index == 0 and has_prebuilt_sample(language, target_text): | |
| sample_path = ensure_sample_audio(language, voice_style, cache_key) | |
| if sample_path: | |
| AUDIO_CACHE[cache_key] = sample_path | |
| state = build_state(target_text, language, level, voice_style, "sample", sample_path) | |
| return ( | |
| render_reference_player(sample_path), | |
| render_phrase_card(target_text, language, level, voice_style), | |
| render_status("Voice ready from the example library."), | |
| state, | |
| render_steps("say"), | |
| "", | |
| render_empty_score(), | |
| render_empty_feedback(), | |
| render_next_card("Try this next", "Record yourself saying the line above."), | |
| next_counter, | |
| gr.update(interactive=True, value="Make voice"), | |
| ) | |
| if (cached_path and Path(cached_path).exists()) or output_path.exists(): | |
| path = str(Path(cached_path) if cached_path else output_path) | |
| AUDIO_CACHE[cache_key] = path | |
| state = build_state(target_text, language, level, voice_style, source, path) | |
| return ( | |
| render_reference_player(path), | |
| render_phrase_card(target_text, language, level, voice_style), | |
| render_status("Voice ready. Same choices play instantly next time."), | |
| state, | |
| render_steps("say"), | |
| "", | |
| render_empty_score(), | |
| render_empty_feedback(), | |
| render_next_card("Try this next", "Record yourself saying the line above."), | |
| next_counter, | |
| gr.update(interactive=True, value="Make voice"), | |
| ) | |
| prompt_text = build_voice_text(target_text, voice_style) | |
| try: | |
| synthesize_reference_file(prompt_text, str(output_path)) | |
| except Exception as exc: | |
| print(f"Voice generation failed: {exc}") | |
| return ( | |
| render_reference_player(None, "Could not make the voice. Try again."), | |
| render_phrase_card(target_text, language, level, voice_style), | |
| render_status("Could not make the voice. Try a shorter line or press Make voice again."), | |
| {}, | |
| render_steps("pick"), | |
| "", | |
| render_empty_score(), | |
| """ | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>Make the voice first. Then listen, record, and score.</p> | |
| </div> | |
| """, | |
| render_next_card("Try this next", "Try a shorter line, then press Make voice."), | |
| next_counter, | |
| gr.update(interactive=True, value="Make voice"), | |
| ) | |
| AUDIO_CACHE[cache_key] = str(output_path) | |
| state = build_state(target_text, language, level, voice_style, source, str(output_path)) | |
| return ( | |
| render_reference_player(str(output_path)), | |
| render_phrase_card(target_text, language, level, voice_style), | |
| render_status("Voice ready. Listen once, then say it."), | |
| state, | |
| render_steps("say"), | |
| "", | |
| render_empty_score(), | |
| render_empty_feedback(), | |
| render_next_card("Try this next", "Record yourself saying the line above."), | |
| next_counter, | |
| gr.update(interactive=True, value="Make voice"), | |
| ) | |
| def build_state( | |
| target_text: str, | |
| language: str, | |
| level: str, | |
| voice_style: str, | |
| source: str, | |
| reference_audio: str, | |
| ) -> dict[str, Any]: | |
| return { | |
| "target_text": target_text, | |
| "language": language, | |
| "level": level, | |
| "voice_style": voice_style, | |
| "source": source, | |
| "reference_audio": reference_audio, | |
| "sample_rate": TARGET_SAMPLE_RATE, | |
| } | |
| def audio_path_from_gradio(value: Any) -> str: | |
| if isinstance(value, str): | |
| path = value | |
| elif isinstance(value, dict): | |
| path = str(value.get("path") or value.get("name") or "") | |
| else: | |
| path = "" | |
| if not path: | |
| raise ValueError("Record your voice first, then press Score.") | |
| if not Path(path).exists(): | |
| raise ValueError("I could not read that recording. Record once more, then press Score.") | |
| return path | |
| def suffix_for_mime(mime_type: str) -> str: | |
| mime_type = (mime_type or "").split(";", 1)[0].strip().lower() | |
| return { | |
| "audio/webm": ".webm", | |
| "audio/ogg": ".ogg", | |
| "audio/oga": ".ogg", | |
| "audio/mp4": ".m4a", | |
| "audio/mpeg": ".mp3", | |
| "audio/wav": ".wav", | |
| "audio/x-wav": ".wav", | |
| }.get(mime_type, ".webm") | |
| def decode_recording_payload(payload: str | None) -> str: | |
| payload = clean_text(payload, MAX_RECORDING_BYTES * 2) | |
| if not payload: | |
| raise ValueError("Record your voice first, then press Score.") | |
| try: | |
| data = json.loads(payload) | |
| except json.JSONDecodeError as exc: | |
| raise ValueError("The recording data was not readable. Record once more.") from exc | |
| data_url = str(data.get("dataUrl") or "") | |
| match = re.match(r"^data:([^;,]+)(?:;[^,]*)?;base64,(.+)$", data_url, flags=re.DOTALL) | |
| if not match: | |
| raise ValueError("The recording was incomplete. Record once more.") | |
| mime_type = str(data.get("mimeType") or match.group(1)) | |
| try: | |
| raw = base64.b64decode(match.group(2), validate=True) | |
| except Exception as exc: | |
| raise ValueError("The recording could not be decoded. Record once more.") from exc | |
| if len(raw) < 1200: | |
| raise ValueError("That recording is too small. Record the full line.") | |
| if len(raw) > MAX_RECORDING_BYTES: | |
| raise ValueError("That recording is too large. Keep it under 20 seconds.") | |
| token = uuid.uuid4().hex | |
| raw_path = RECORDING_DIR / f"attempt_{token}{suffix_for_mime(mime_type)}" | |
| wav_path = RECORDING_DIR / f"attempt_{token}.wav" | |
| raw_path.write_bytes(raw) | |
| command = [ | |
| "ffmpeg", | |
| "-y", | |
| "-hide_banner", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| str(raw_path), | |
| "-ac", | |
| "1", | |
| "-ar", | |
| str(TARGET_SAMPLE_RATE), | |
| str(wav_path), | |
| ] | |
| try: | |
| subprocess.run(command, check=True, capture_output=True, text=True) | |
| except Exception as exc: | |
| raise ValueError("The recording could not be prepared for scoring. Record once more.") from exc | |
| return str(wav_path) | |
| def load_audio(path: str, sr: int = TARGET_SAMPLE_RATE) -> tuple[np.ndarray, int]: | |
| audio, _ = librosa.load(path, sr=sr, mono=True) | |
| audio = np.asarray(audio, dtype=np.float32) | |
| if audio.size == 0: | |
| raise ValueError("The audio is empty. Record once more.") | |
| peak = float(np.max(np.abs(audio))) | |
| if peak < 1e-5: | |
| raise ValueError("The recording sounds silent. Check the microphone and record again.") | |
| audio = audio / peak | |
| return audio, sr | |
| def active_audio_seconds(audio: np.ndarray, sr: int) -> float: | |
| if audio.size == 0: | |
| return 0.0 | |
| rms = librosa.feature.rms(y=audio, frame_length=1024, hop_length=256)[0] | |
| if rms.size == 0: | |
| return 0.0 | |
| threshold = max(0.015, float(np.percentile(rms, 90)) * 0.35) | |
| active_frames = int(np.sum(rms > threshold)) | |
| return active_frames * 256 / sr | |
| def resample_vector(values: np.ndarray, length: int) -> np.ndarray: | |
| values = np.asarray(values, dtype=np.float32) | |
| if values.size == 0: | |
| return np.zeros(length, dtype=np.float32) | |
| if values.size == length: | |
| return values | |
| x_old = np.linspace(0.0, 1.0, values.size) | |
| x_new = np.linspace(0.0, 1.0, length) | |
| return np.interp(x_new, x_old, values).astype(np.float32) | |
| def safe_correlation(a: np.ndarray, b: np.ndarray) -> float: | |
| if a.size < 3 or b.size < 3: | |
| return 0.0 | |
| if float(np.std(a)) < 1e-6 or float(np.std(b)) < 1e-6: | |
| return 0.0 | |
| return float(np.corrcoef(a, b)[0, 1]) | |
| def feature_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: | |
| ref_mfcc = librosa.feature.mfcc(y=reference, sr=sr, n_mfcc=13) | |
| user_mfcc = librosa.feature.mfcc(y=attempt, sr=sr, n_mfcc=13) | |
| ref_mfcc = librosa.util.normalize(ref_mfcc, axis=1) | |
| user_mfcc = librosa.util.normalize(user_mfcc, axis=1) | |
| cost_matrix, _ = librosa.sequence.dtw(X=ref_mfcc, Y=user_mfcc, metric="cosine") | |
| mean_cost = float(cost_matrix[-1, -1] / max(cost_matrix.shape)) | |
| score = clamp_score(100.0 * (1.0 - min(mean_cost, 0.8) / 0.8)) | |
| return score, mean_cost | |
| def rhythm_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: | |
| ref_rms = librosa.feature.rms(y=reference, frame_length=1024, hop_length=256)[0] | |
| user_rms = librosa.feature.rms(y=attempt, frame_length=1024, hop_length=256)[0] | |
| target_len = max(16, min(240, max(ref_rms.size, user_rms.size))) | |
| ref_curve = resample_vector(ref_rms / (np.max(ref_rms) + 1e-6), target_len) | |
| user_curve = resample_vector(user_rms / (np.max(user_rms) + 1e-6), target_len) | |
| corr = safe_correlation(ref_curve, user_curve) | |
| score = clamp_score(55.0 + 45.0 * corr) | |
| return score, corr | |
| def melody_score(reference: np.ndarray, attempt: np.ndarray, sr: int) -> tuple[int, float]: | |
| try: | |
| ref_pitch = librosa.yin(reference, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256) | |
| user_pitch = librosa.yin(attempt, fmin=55, fmax=500, sr=sr, frame_length=1024, hop_length=256) | |
| ref_pitch = np.log(np.maximum(ref_pitch, 1.0)) | |
| user_pitch = np.log(np.maximum(user_pitch, 1.0)) | |
| target_len = max(16, min(240, max(ref_pitch.size, user_pitch.size))) | |
| ref_curve = resample_vector(ref_pitch - np.median(ref_pitch), target_len) | |
| user_curve = resample_vector(user_pitch - np.median(user_pitch), target_len) | |
| corr = safe_correlation(ref_curve, user_curve) | |
| return clamp_score(55.0 + 45.0 * corr), corr | |
| except Exception as exc: | |
| print(f"Pitch scoring unavailable: {exc}") | |
| return 50, 0.0 | |
| def compare_audio(reference_path: str, attempt_path: str) -> ScoreResult: | |
| reference, sr = load_audio(reference_path) | |
| attempt, _ = load_audio(attempt_path, sr=sr) | |
| ref_duration = len(reference) / sr | |
| attempt_duration = len(attempt) / sr | |
| if attempt_duration > MAX_ATTEMPT_SECONDS: | |
| raise ValueError("Keep your recording under 20 seconds, then score again.") | |
| min_attempt_duration = max(0.75, min(1.25, ref_duration * 0.35)) | |
| if attempt_duration < min_attempt_duration: | |
| raise ValueError(f"The recording is too short ({attempt_duration:.1f}s). Say the whole line, then score again.") | |
| active_seconds = active_audio_seconds(attempt, sr) | |
| min_active_seconds = max(0.45, min(1.0, ref_duration * 0.22)) | |
| if active_seconds < min_active_seconds: | |
| raise ValueError("The recording is mostly quiet. Check the microphone, speak the line, then score again.") | |
| duration_ratio = attempt_duration / max(ref_duration, 0.1) | |
| timing = clamp_score(100.0 * (1.0 - min(abs(duration_ratio - 1.0), 0.75) / 0.75)) | |
| voice_shape, mfcc_cost = feature_score(reference, attempt, sr) | |
| rhythm, rhythm_corr = rhythm_score(reference, attempt, sr) | |
| melody, melody_corr = melody_score(reference, attempt, sr) | |
| overall = clamp_score(0.40 * voice_shape + 0.25 * timing + 0.22 * rhythm + 0.13 * melody) | |
| evidence = { | |
| "baseline_score": overall, | |
| "voice_shape": voice_shape, | |
| "timing": timing, | |
| "rhythm": rhythm, | |
| "melody": melody, | |
| "reference_duration_seconds": round(ref_duration, 2), | |
| "attempt_duration_seconds": round(attempt_duration, 2), | |
| "active_speech_seconds": round(active_seconds, 2), | |
| "duration_ratio": round(duration_ratio, 3), | |
| "mfcc_dtw_cost": round(mfcc_cost, 4), | |
| "rhythm_correlation": round(rhythm_corr, 3), | |
| "melody_correlation": round(melody_corr, 3), | |
| } | |
| return ScoreResult( | |
| overall=overall, | |
| voice_shape=voice_shape, | |
| timing=timing, | |
| rhythm=rhythm, | |
| melody=melody, | |
| reference_duration=ref_duration, | |
| attempt_duration=attempt_duration, | |
| duration_ratio=duration_ratio, | |
| evidence=evidence, | |
| ) | |
| def normalize_word(word: str) -> str: | |
| return re.sub(r"[^a-z0-9']", "", word.lower()) | |
| def word_similarity(a: str, b: str) -> float: | |
| return difflib.SequenceMatcher(None, a, b).ratio() | |
| def align_words(ref_text: str, user_text: str) -> tuple[list[dict[str, Any]], int]: | |
| ref_tokens = [token for token in ref_text.split() if normalize_word(token)] | |
| user_tokens = [token for token in user_text.split() if normalize_word(token)] | |
| ref_norm = [normalize_word(token) for token in ref_tokens] | |
| user_norm = [normalize_word(token) for token in user_tokens] | |
| matcher = difflib.SequenceMatcher(None, ref_norm, user_norm) | |
| feedback: list[dict[str, Any]] = [] | |
| for tag, i1, i2, j1, j2 in matcher.get_opcodes(): | |
| if tag == "equal": | |
| for offset, idx in enumerate(range(i1, i2)): | |
| feedback.append( | |
| { | |
| "word": ref_tokens[idx], | |
| "spoken": user_tokens[j1 + offset], | |
| "status": "matched", | |
| } | |
| ) | |
| elif tag == "replace": | |
| ref_block = ref_tokens[i1:i2] | |
| user_block = user_tokens[j1:j2] | |
| for idx, ref_word in enumerate(ref_block): | |
| spoken = user_block[idx] if idx < len(user_block) else "" | |
| similarity = word_similarity(normalize_word(ref_word), normalize_word(spoken)) if spoken else 0.0 | |
| feedback.append( | |
| { | |
| "word": ref_word, | |
| "spoken": spoken, | |
| "status": "close" if similarity >= 0.68 else "missed", | |
| } | |
| ) | |
| if len(user_block) > len(ref_block): | |
| for extra in user_block[len(ref_block) :]: | |
| feedback.append({"word": "", "spoken": extra, "status": "extra"}) | |
| elif tag == "delete": | |
| for idx in range(i1, i2): | |
| feedback.append({"word": ref_tokens[idx], "spoken": "", "status": "missed"}) | |
| elif tag == "insert": | |
| for idx in range(j1, j2): | |
| feedback.append({"word": "", "spoken": user_tokens[idx], "status": "extra"}) | |
| target_count = max(1, len(ref_tokens)) | |
| matched = sum(1 for item in feedback if item["status"] == "matched") | |
| close = sum(1 for item in feedback if item["status"] == "close") | |
| extra = sum(1 for item in feedback if item["status"] == "extra") | |
| raw = (matched + close * 0.45) / target_count - extra * 0.12 / target_count | |
| return feedback, clamp_score(raw * 100) | |
| def transcribe_english(path: str) -> str: | |
| import torch | |
| processor, model, device = get_english_asr_model() | |
| audio, _ = load_audio(path) | |
| inputs = processor(audio, sampling_rate=TARGET_SAMPLE_RATE, return_tensors="pt", padding=True) | |
| inputs = {key: value.to(device) for key, value in inputs.items()} | |
| with torch.inference_mode(): | |
| logits = model(**inputs).logits | |
| predicted_ids = torch.argmax(logits, dim=-1) | |
| transcription = processor.batch_decode(predicted_ids)[0] | |
| return clean_text(transcription.lower(), 400) | |
| def english_word_evidence(language: str, target_text: str, attempt_path: str) -> dict[str, Any]: | |
| if language.strip().lower() != "english": | |
| return { | |
| "enabled": False, | |
| "status": "skipped", | |
| } | |
| transcript = transcribe_english(attempt_path) | |
| word_feedback, word_score = align_words(target_text, transcript) | |
| return { | |
| "enabled": True, | |
| "status": "ready", | |
| "target_text": target_text, | |
| "user_transcript": transcript, | |
| "word_match_score": word_score, | |
| "word_feedback": word_feedback[:24], | |
| } | |
| def judge_prompt(state: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> list[dict[str, str]]: | |
| payload = { | |
| "language": state.get("language"), | |
| "level": state.get("level"), | |
| "target_text": state.get("target_text"), | |
| "voice_style": state.get("voice_style"), | |
| "acoustic_evidence": score.evidence, | |
| "word_evidence": word_evidence, | |
| } | |
| return [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are the visible judge for a short language shadowing practice app. " | |
| "Use only the supplied evidence. Do not claim this is a validated pronunciation test, " | |
| "accent detector, fluency exam, or clinical tool. Do not mention model names, providers, " | |
| "internal feature names, JSON, or hidden implementation details. Return only one JSON object." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| "Judge this attempt and return only strict JSON. Use these keys and value types:\n" | |
| "{\n" | |
| ' "score": <integer from 0 to 100>,\n' | |
| ' "sub_scores": {"words": <integer>, "timing": <integer>, "rhythm": <integer>, "voice_shape": <integer>},\n' | |
| ' "short_feedback": <specific friendly sentence about this exact attempt>,\n' | |
| ' "try_next": [<specific short action>, <specific short action>, <specific short action>],\n' | |
| ' "next_line": <short next practice line in the same language>\n' | |
| "}\n" | |
| "Rules: scores must be integers from 0 to 100. Do not copy the schema text. " | |
| "Do not use placeholders like short action or one friendly sentence. " | |
| "Keep every sentence short and useful for a child. " | |
| "Do not mention skipped, missing, unavailable, or language-specific word tips to the learner. " | |
| "If English word evidence is ready, use it heavily for the words score. " | |
| "Audio duration and speech activity already passed validation, so do not set every score to 0. " | |
| "If word evidence is skipped, still judge timing, rhythm, and voice shape from acoustic evidence.\n\n" | |
| f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" | |
| ), | |
| }, | |
| ] | |
| def repair_judge_prompt( | |
| state: dict[str, Any], | |
| score: ScoreResult, | |
| word_evidence: dict[str, Any], | |
| bad_response: str, | |
| error: str, | |
| ) -> list[dict[str, str]]: | |
| payload = { | |
| "language": state.get("language"), | |
| "level": state.get("level"), | |
| "target_text": state.get("target_text"), | |
| "voice_style": state.get("voice_style"), | |
| "acoustic_evidence": score.evidence, | |
| "word_evidence": word_evidence, | |
| "previous_response_problem": error, | |
| "previous_response": clean_text(bad_response, 900), | |
| } | |
| return [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You fix one bad judging response for a short language shadowing app. " | |
| "Use only the supplied evidence. Return only one valid JSON object. " | |
| "Do not mention model names, providers, JSON, or hidden implementation details in user-facing text." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| "The previous response was rejected. Return a real judgement now.\n" | |
| "Required JSON keys: score, sub_scores, short_feedback, try_next, next_line.\n" | |
| "sub_scores must include words, timing, rhythm, voice_shape.\n" | |
| "All scores must be integers 0 to 100.\n" | |
| "short_feedback and try_next must be specific to the evidence, not placeholders.\n" | |
| "Do not mention skipped, missing, unavailable, or language-specific word tips to the learner.\n" | |
| "If English word evidence is ready, use the word_match_score heavily for words.\n" | |
| "Because the attempt passed duration and speech checks, do not return all-zero scores.\n\n" | |
| f"Evidence:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" | |
| ), | |
| }, | |
| ] | |
| def extract_json_object(text: str) -> dict[str, Any]: | |
| text = text.strip() | |
| text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text, flags=re.IGNORECASE | re.DOTALL).strip() | |
| start = text.find("{") | |
| if start < 0: | |
| raise ValueError("No JSON object found") | |
| depth = 0 | |
| in_string = False | |
| escaped = False | |
| for index in range(start, len(text)): | |
| char = text[index] | |
| if in_string: | |
| if escaped: | |
| escaped = False | |
| elif char == "\\": | |
| escaped = True | |
| elif char == '"': | |
| in_string = False | |
| continue | |
| if char == '"': | |
| in_string = True | |
| elif char == "{": | |
| depth += 1 | |
| elif char == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| return json.loads(text[start : index + 1]) | |
| raise ValueError("JSON object was incomplete") | |
| PLACEHOLDER_SNIPPETS = ( | |
| "one friendly sentence", | |
| "short action", | |
| "specific short action", | |
| "a short next practice line", | |
| "next practice line in the same language", | |
| "<integer", | |
| "<specific", | |
| "<short", | |
| ) | |
| def looks_like_placeholder(value: Any) -> bool: | |
| text = re.sub(r"\s+", " ", str(value or "").strip().lower()) | |
| if not text: | |
| return True | |
| return any(snippet in text for snippet in PLACEHOLDER_SNIPPETS) | |
| def evidence_quality_error( | |
| judgement: dict[str, Any], | |
| score: ScoreResult | None, | |
| word_evidence: dict[str, Any] | None, | |
| ) -> str | None: | |
| if score is None: | |
| return None | |
| sub_scores = judgement["sub_scores"] | |
| if judgement["score"] == 0 and all(sub_scores[key] == 0 for key in sub_scores): | |
| if score.overall > 0 or int((word_evidence or {}).get("word_match_score") or 0) > 0: | |
| return "The judging response returned all-zero scores despite usable evidence." | |
| word_match = int((word_evidence or {}).get("word_match_score") or 0) | |
| if (word_evidence or {}).get("enabled") and word_match >= 65 and sub_scores["words"] < 35: | |
| return "The judging response ignored strong English word evidence." | |
| timing_evidence = int(score.evidence.get("timing") or 0) | |
| if timing_evidence >= 55 and sub_scores["timing"] < 20: | |
| return "The judging response ignored usable timing evidence." | |
| return None | |
| def normalize_judgement( | |
| raw_text: str, | |
| fallback_next_line: str, | |
| score: ScoreResult | None = None, | |
| word_evidence: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| try: | |
| data = extract_json_object(raw_text) | |
| except Exception as exc: | |
| raise ValueError("The judging response was not readable. Try Score again.") from exc | |
| if not isinstance(data, dict): | |
| raise ValueError("The judging response had the wrong shape. Try Score again.") | |
| sub_scores = data.get("sub_scores") | |
| if not isinstance(sub_scores, dict): | |
| raise ValueError("The judging response missed sub-scores. Try Score again.") | |
| required = ["words", "timing", "rhythm", "voice_shape"] | |
| if any(key not in sub_scores for key in required): | |
| raise ValueError("The judging response missed a score field. Try Score again.") | |
| steps = data.get("try_next") | |
| if not isinstance(steps, list) or not steps: | |
| raise ValueError("The judging response missed next steps. Try Score again.") | |
| try: | |
| normalized = { | |
| "score": clamp_score(float(data.get("score"))), | |
| "sub_scores": {key: clamp_score(float(sub_scores[key])) for key in required}, | |
| "short_feedback": clean_text(data.get("short_feedback"), 220), | |
| "try_next": [clean_text(step, 120) for step in steps[:3] if clean_text(step, 120)], | |
| "next_line": clean_text(data.get("next_line"), MAX_TARGET_CHARS) or fallback_next_line, | |
| } | |
| except Exception as exc: | |
| raise ValueError("The judging response used invalid score values. Try Score again.") from exc | |
| if looks_like_placeholder(normalized["short_feedback"]): | |
| raise ValueError("The judging response copied placeholder feedback.") | |
| if not normalized["try_next"] or any(looks_like_placeholder(step) for step in normalized["try_next"]): | |
| raise ValueError("The judging response copied placeholder practice steps.") | |
| if looks_like_placeholder(normalized["next_line"]): | |
| normalized["next_line"] = fallback_next_line | |
| quality_error = evidence_quality_error(normalized, score, word_evidence) | |
| if quality_error: | |
| raise ValueError(quality_error) | |
| return normalized | |
| def judge_with_retry( | |
| state: dict[str, Any], | |
| score: ScoreResult, | |
| word_evidence: dict[str, Any], | |
| fallback_next_line: str, | |
| ) -> tuple[dict[str, Any], str]: | |
| raw_judgement = run_judge(judge_prompt(state, score, word_evidence)) | |
| try: | |
| return normalize_judgement(raw_judgement, fallback_next_line, score, word_evidence), raw_judgement | |
| except ValueError as first_error: | |
| repaired = run_judge(repair_judge_prompt(state, score, word_evidence, raw_judgement, str(first_error))) | |
| try: | |
| return normalize_judgement(repaired, fallback_next_line, score, word_evidence), repaired | |
| except ValueError as second_error: | |
| raise ValueError("The judge returned unusable feedback. Press Score again.") from second_error | |
| def score_headline(value: int) -> str: | |
| value = clamp_score(value) | |
| if value >= 78: | |
| return "Great work" | |
| if value >= 52: | |
| return "Good work" | |
| if value >= 25: | |
| return "Keep going" | |
| return "Try again" | |
| def score_attempt( | |
| recording_payload: Any, | |
| state: dict[str, Any] | None, | |
| ) -> tuple[str, str, str, dict[str, Any], str, Any, str]: | |
| def score_error(message: str) -> tuple[str, str, str, dict[str, Any], str, Any, str]: | |
| return ( | |
| render_error_score(message), | |
| f""" | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>{html.escape(message)}</p> | |
| </div> | |
| """, | |
| render_next_card("Try this next", "Fix that, then press Score again."), | |
| {}, | |
| render_steps("say"), | |
| gr.update(interactive=True, value="Score"), | |
| render_score_status(message), | |
| ) | |
| if not gpu_available(): | |
| return score_error("Scoring needs GPU hardware. Try again when the Space is on GPU.") | |
| if not state or not state.get("reference_audio"): | |
| return score_error("Press Make voice first.") | |
| try: | |
| attempt_path = decode_recording_payload(str(recording_payload or "")) | |
| score = compare_audio(state["reference_audio"], attempt_path) | |
| word_evidence = english_word_evidence(state.get("language", ""), state.get("target_text", ""), attempt_path) | |
| judgement, raw_judgement = judge_with_retry( | |
| state, | |
| score, | |
| word_evidence, | |
| language_fallback(state.get("language", "English")), | |
| ) | |
| except ValueError as exc: | |
| return score_error(str(exc)) | |
| except Exception as exc: | |
| return score_error(f"Scoring could not finish: {exc}") | |
| evidence = { | |
| "state": state, | |
| "acoustic": score.evidence, | |
| "words": word_evidence, | |
| "judge": judgement, | |
| } | |
| return ( | |
| render_score_card(judgement, score, word_evidence), | |
| render_feedback_card(judgement, word_evidence), | |
| render_next_card("Try this next", judgement["next_line"]), | |
| evidence, | |
| render_steps("tips"), | |
| gr.update(interactive=True, value="Score"), | |
| render_score_status("Score ready."), | |
| ) | |
| def render_steps(active: str = "pick") -> str: | |
| steps = [ | |
| ("pick", "1", "Pick"), | |
| ("listen", "2", "Listen"), | |
| ("say", "3", "Say it"), | |
| ("tips", "4", "Score it"), | |
| ] | |
| chunks = ['<div class="lpl-steps">'] | |
| for key, number, label in steps: | |
| active_class = " is-active" if key == active else "" | |
| chunks.append( | |
| f'<div class="lpl-step{active_class}">' | |
| f'<div class="lpl-step-number">{number}</div>' | |
| f'<div class="lpl-step-label">{html.escape(label)}</div>' | |
| "</div>" | |
| ) | |
| chunks.append("</div>") | |
| return "".join(chunks) | |
| def render_phrase_card(target_text: str, language: str, level: str, voice_style: str) -> str: | |
| return f""" | |
| <section class="lpl-phrase"> | |
| <div class="lpl-phrase-meta">{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}</div> | |
| <div class="lpl-phrase-text">{html.escape(target_text)}</div> | |
| </section> | |
| """ | |
| def render_initial_phrase() -> str: | |
| return render_phrase_card("Pick your practice, then press Make voice.", "Ready", "Step 1", "Simple") | |
| def render_status(message: str, loading: bool = False) -> str: | |
| spinner = '<span class="lpl-spinner" aria-hidden="true"></span>' if loading else "" | |
| loading_class = " is-loading" if loading else "" | |
| return f'<div class="lpl-status-card{loading_class}">{spinner}<span>{html.escape(message)}</span></div>' | |
| def render_loading_phrase(target_text: str, language: str, level: str, voice_style: str) -> str: | |
| return f""" | |
| <section class="lpl-phrase is-loading" aria-busy="true"> | |
| <div class="lpl-phrase-meta">{html.escape(language)} · {html.escape(level)} · {html.escape(voice_style)}</div> | |
| <div class="lpl-phrase-text">Making your voice</div> | |
| <div class="lpl-voice-loader" aria-hidden="true"><span></span><span></span><span></span><span></span><span></span></div> | |
| <div class="lpl-loading-line">{html.escape(target_text)}</div> | |
| </section> | |
| """ | |
| def begin_make_voice( | |
| language: str, | |
| level: str, | |
| voice_style: str, | |
| custom_text: str, | |
| starter_counter: int, | |
| ) -> tuple[str, str, str, str, str, str, Any]: | |
| target_text, _, _ = selected_text(language, level, custom_text, starter_counter) | |
| return ( | |
| render_reference_player(None, "Making voice", loading=True), | |
| render_loading_phrase(target_text, language, level, voice_style), | |
| render_status("Making voice. This can take a little while the first time.", loading=True), | |
| render_steps("listen"), | |
| """ | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>First we make the reference voice. Then you can listen and record.</p> | |
| </div> | |
| """, | |
| render_next_card("Try this next", "Wait for the voice, then press play."), | |
| gr.update(interactive=False, value="Making..."), | |
| ) | |
| def preview_selection( | |
| language: str, | |
| level: str, | |
| voice_style: str, | |
| custom_text: str, | |
| starter_counter: int, | |
| ) -> tuple[str, str, str, dict[str, Any], str, str, str, str, str, str]: | |
| target_text, _, _ = selected_text(language, level, custom_text, starter_counter) | |
| return ( | |
| render_phrase_card(target_text, language, level, voice_style), | |
| render_reference_player(None, "Press Make voice to use these choices."), | |
| render_status("Choices ready. Press Make voice."), | |
| {}, | |
| "", | |
| render_empty_score(), | |
| render_empty_feedback(), | |
| render_next_card("Try this next", "Press Make voice to hear the new choice."), | |
| render_steps("pick"), | |
| render_score_status(), | |
| ) | |
| def begin_scoring() -> tuple[str, str, str, Any]: | |
| return ( | |
| """ | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>Checking your voice now...</p> | |
| </div> | |
| """, | |
| render_steps("tips"), | |
| render_score_status("Scoring your voice...", loading=True), | |
| gr.update(interactive=False, value="Scoring..."), | |
| ) | |
| def render_empty_score() -> str: | |
| return '<div class="lpl-score-empty">Your score appears here after you record and press Score.</div>' | |
| def render_score_status(message: str = "", loading: bool = False) -> str: | |
| if not message: | |
| return '<div class="lpl-score-status"></div>' | |
| spinner = '<span class="lpl-spinner" aria-hidden="true"></span>' if loading else "" | |
| loading_class = " is-loading" if loading else "" | |
| return f'<div class="lpl-score-status{loading_class}">{spinner}<span>{html.escape(message)}</span></div>' | |
| def render_error_score(message: str) -> str: | |
| return f""" | |
| <div class="lpl-score-empty is-error"> | |
| <strong>Try again</strong> | |
| <span>{html.escape(message)}</span> | |
| </div> | |
| """ | |
| def render_empty_feedback() -> str: | |
| return """ | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>Listen first. Then record your voice. Then press Score.</p> | |
| </div> | |
| """ | |
| def render_meter(label: str, value: int) -> str: | |
| value = clamp_score(value) | |
| return f""" | |
| <div class="lpl-meter"> | |
| <div class="lpl-meter-row"><span>{html.escape(label)}</span><span>{value}</span></div> | |
| <div class="lpl-bar"><div class="lpl-fill" style="width:{value}%"></div></div> | |
| </div> | |
| """ | |
| def render_score_card(judgement: dict[str, Any], score: ScoreResult, word_evidence: dict[str, Any]) -> str: | |
| sub_scores = judgement["sub_scores"] | |
| note = f"Reference {score.reference_duration:.1f}s. Your voice {score.attempt_duration:.1f}s." | |
| return f""" | |
| <div class="lpl-score-card"> | |
| <div class="lpl-score-top"> | |
| <div class="lpl-ring" style="--score:{judgement['score']}">{judgement['score']}</div> | |
| <div class="lpl-score-copy"> | |
| <strong>{html.escape(score_headline(judgement["score"]))}</strong> | |
| <span>{html.escape(note)}</span> | |
| </div> | |
| </div> | |
| {render_meter("Words", sub_scores["words"])} | |
| {render_meter("Timing", sub_scores["timing"])} | |
| {render_meter("Rhythm", sub_scores["rhythm"])} | |
| {render_meter("Voice shape", sub_scores["voice_shape"])} | |
| </div> | |
| """ | |
| def render_word_note(word_evidence: dict[str, Any]) -> str: | |
| if not word_evidence.get("enabled"): | |
| return "" | |
| transcript = clean_text(word_evidence.get("user_transcript"), 160) | |
| if not transcript: | |
| transcript = "I could not hear clear words." | |
| return f'<div class="lpl-words"><strong>I heard:</strong> {html.escape(transcript)}</div>' | |
| def render_feedback_card(judgement: dict[str, Any], word_evidence: dict[str, Any]) -> str: | |
| steps = "".join(f"<li>{html.escape(step)}</li>" for step in judgement["try_next"]) | |
| return f""" | |
| <div class="lpl-feedback-card"> | |
| <h3>Your feedback</h3> | |
| <p>{html.escape(judgement["short_feedback"])}</p> | |
| <ol>{steps}</ol> | |
| {render_word_note(word_evidence)} | |
| </div> | |
| """ | |
| def render_next_card(title: str, line: str) -> str: | |
| return f""" | |
| <div class="lpl-bottom-next"> | |
| <strong>{html.escape(title)}</strong> | |
| <span>{html.escape(line)}</span> | |
| </div> | |
| """ | |
| RECORDER_HTML = """ | |
| <div class="lpl-native-recorder"> | |
| <div class="lpl-recorder-actions"> | |
| <button type="button" data-start>Record</button> | |
| <button type="button" data-stop disabled>Stop</button> | |
| <span class="lpl-recorder-status" data-status>Press Record, say the line, then press Stop.</span> | |
| </div> | |
| <div class="lpl-recorder-meter" aria-hidden="true"><div class="lpl-recorder-fill" data-level></div></div> | |
| <audio data-preview controls preload="metadata" style="display:none"></audio> | |
| </div> | |
| """ | |
| RECORDER_JS = """ | |
| const startButton = element.querySelector("[data-start]"); | |
| const stopButton = element.querySelector("[data-stop]"); | |
| const status = element.querySelector("[data-status]"); | |
| const level = element.querySelector("[data-level]"); | |
| const preview = element.querySelector("[data-preview]"); | |
| let stream = null; | |
| let recorder = null; | |
| let chunks = []; | |
| let startedAt = 0; | |
| let audioContext = null; | |
| let analyser = null; | |
| let meterFrame = null; | |
| function setValue(value) { | |
| props.value = value; | |
| trigger("change"); | |
| } | |
| function preferredMimeType() { | |
| const candidates = [ | |
| "audio/webm;codecs=opus", | |
| "audio/webm", | |
| "audio/ogg;codecs=opus", | |
| "audio/ogg", | |
| "audio/mp4" | |
| ]; | |
| if (!window.MediaRecorder) return ""; | |
| for (const candidate of candidates) { | |
| if (MediaRecorder.isTypeSupported(candidate)) return candidate; | |
| } | |
| return ""; | |
| } | |
| function updateMeter() { | |
| if (!analyser) return; | |
| const data = new Uint8Array(analyser.fftSize); | |
| analyser.getByteTimeDomainData(data); | |
| let sum = 0; | |
| for (const sample of data) { | |
| const centered = (sample - 128) / 128; | |
| sum += centered * centered; | |
| } | |
| const rms = Math.sqrt(sum / data.length); | |
| level.style.width = Math.min(100, Math.round(rms * 380)) + "%"; | |
| meterFrame = requestAnimationFrame(updateMeter); | |
| } | |
| function cleanup() { | |
| if (meterFrame) cancelAnimationFrame(meterFrame); | |
| meterFrame = null; | |
| if (stream) stream.getTracks().forEach((track) => track.stop()); | |
| stream = null; | |
| if (audioContext) audioContext.close().catch(() => {}); | |
| audioContext = null; | |
| analyser = null; | |
| level.style.width = "0%"; | |
| } | |
| function blobToDataUrl(blob) { | |
| return new Promise((resolve, reject) => { | |
| const reader = new FileReader(); | |
| reader.onload = () => resolve(reader.result); | |
| reader.onerror = reject; | |
| reader.readAsDataURL(blob); | |
| }); | |
| } | |
| startButton.addEventListener("click", async () => { | |
| try { | |
| setValue(""); | |
| chunks = []; | |
| preview.removeAttribute("src"); | |
| preview.style.display = "none"; | |
| stream = await navigator.mediaDevices.getUserMedia({ | |
| audio: { | |
| echoCancellation: true, | |
| noiseSuppression: true, | |
| autoGainControl: true | |
| } | |
| }); | |
| audioContext = new (window.AudioContext || window.webkitAudioContext)(); | |
| const source = audioContext.createMediaStreamSource(stream); | |
| analyser = audioContext.createAnalyser(); | |
| analyser.fftSize = 256; | |
| source.connect(analyser); | |
| updateMeter(); | |
| const mimeType = preferredMimeType(); | |
| recorder = new MediaRecorder(stream, mimeType ? { mimeType } : undefined); | |
| recorder.ondataavailable = (event) => { | |
| if (event.data && event.data.size > 0) chunks.push(event.data); | |
| }; | |
| recorder.onstop = async () => { | |
| const durationSeconds = (performance.now() - startedAt) / 1000; | |
| const blob = new Blob(chunks, { type: recorder.mimeType || mimeType || "audio/webm" }); | |
| cleanup(); | |
| startButton.disabled = false; | |
| stopButton.disabled = true; | |
| if (durationSeconds < 0.7 || blob.size < 1200) { | |
| status.textContent = "Too short. Press Record and say the whole line."; | |
| return; | |
| } | |
| preview.src = URL.createObjectURL(blob); | |
| preview.style.display = "block"; | |
| const dataUrl = await blobToDataUrl(blob); | |
| setValue(JSON.stringify({ | |
| dataUrl, | |
| mimeType: blob.type || "audio/webm", | |
| durationSeconds, | |
| sizeBytes: blob.size, | |
| createdAt: Date.now() | |
| })); | |
| status.textContent = `Recorded ${durationSeconds.toFixed(1)}s. Play it back, then Score.`; | |
| }; | |
| recorder.start(); | |
| startedAt = performance.now(); | |
| startButton.disabled = true; | |
| stopButton.disabled = false; | |
| status.textContent = "Recording... speak now."; | |
| } catch (error) { | |
| cleanup(); | |
| startButton.disabled = false; | |
| stopButton.disabled = true; | |
| status.textContent = `Microphone error: ${error.message || error}`; | |
| } | |
| }); | |
| stopButton.addEventListener("click", () => { | |
| if (recorder && recorder.state !== "inactive") { | |
| status.textContent = "Preparing recording..."; | |
| recorder.stop(); | |
| } | |
| }); | |
| """ | |
| class NativeRecorder(gr.HTML): | |
| def __init__(self, value: str = "", **kwargs: Any) -> None: | |
| super().__init__( | |
| value=value, | |
| html_template=RECORDER_HTML, | |
| js_on_load=RECORDER_JS, | |
| container=False, | |
| **kwargs, | |
| ) | |
| def api_info(self) -> dict[str, str]: | |
| return {"type": "string"} | |
| with gr.Blocks( | |
| title="EchoYard", | |
| theme=gr.themes.Base(primary_hue="teal", neutral_hue="slate"), | |
| css=CSS, | |
| ) as demo: | |
| app_state = gr.State({}) | |
| score_state = gr.State({}) | |
| starter_counter = gr.State(0) | |
| gr.HTML( | |
| """ | |
| <header class="lpl-topbar"> | |
| <div class="lpl-brand"> | |
| <span class="lpl-mark" aria-hidden="true"></span> | |
| <span>EchoYard</span> | |
| <span class="lpl-divider" aria-hidden="true"></span> | |
| <span class="lpl-product">Speak. Echo. Grow.</span> | |
| </div> | |
| <div class="lpl-tagline">Tiny listen-and-repeat speaking practice</div> | |
| </header> | |
| """ | |
| ) | |
| stepper = gr.HTML(render_steps("pick")) | |
| with gr.Row(elem_classes=["lpl-layout"]): | |
| with gr.Column(scale=3, min_width=280, elem_classes=["lpl-choose"]): | |
| gr.HTML('<h2 class="lpl-card-title">Choose your practice</h2>') | |
| language = gr.Dropdown(SUPPORTED_LANGUAGES, value="English", label="Language", filterable=True) | |
| level = gr.Radio(LEVELS, value="A2", label="Level", elem_classes=["lpl-level-radio"]) | |
| voice_style = gr.Radio( | |
| list(VOICE_STYLES), | |
| value="Careful", | |
| label="Voice style", | |
| elem_classes=["lpl-voice-radio"], | |
| ) | |
| custom_text = gr.Textbox( | |
| label="Words to say", | |
| value="", | |
| placeholder="Leave blank for a short practice line.", | |
| lines=3, | |
| max_lines=3, | |
| max_length=MAX_TARGET_CHARS, | |
| ) | |
| generate_btn = gr.Button("Make voice", variant="primary", elem_classes=["lpl-main-btn"]) | |
| with gr.Column(scale=6, min_width=420, elem_classes=["lpl-practice"]): | |
| phrase_card = gr.HTML(render_initial_phrase()) | |
| with gr.Row(elem_classes=["lpl-media-grid"]): | |
| with gr.Column(elem_classes=["lpl-audio-panel"]): | |
| gr.HTML('<div class="lpl-audio-head">Reference <span>listen first</span></div>') | |
| reference_player = gr.HTML(render_reference_player(), container=False) | |
| with gr.Column(elem_classes=["lpl-audio-panel", "is-record"]): | |
| gr.HTML('<div class="lpl-audio-head">Your turn <span>speak now</span></div>') | |
| attempt_recorder = NativeRecorder(value="", elem_id="native-recorder") | |
| generation_status = gr.HTML(render_status("Press Make voice. Then listen, record, and score.")) | |
| next_panel = gr.HTML(render_next_card("Try this next", "Make a voice to begin.")) | |
| with gr.Column(scale=3, min_width=300, elem_classes=["lpl-feedback"]): | |
| gr.HTML('<h2 class="lpl-card-title">Your feedback</h2>') | |
| score_btn = gr.Button("Score", variant="primary", elem_classes=["lpl-score-btn"]) | |
| score_status = gr.HTML(render_score_status(), container=False) | |
| score_panel = gr.HTML(render_empty_score()) | |
| feedback_panel = gr.HTML(render_empty_feedback()) | |
| gr.HTML( | |
| """ | |
| <div class="lpl-footer" role="contentinfo"> | |
| <strong>Built by @loay for Build Small</strong> with VoxCPM2 + MiniCPM5-1B · Gradio app · No account needed | |
| </div> | |
| """ | |
| ) | |
| for picker in (language, level, voice_style, custom_text): | |
| picker.change( | |
| fn=preview_selection, | |
| inputs=[language, level, voice_style, custom_text, starter_counter], | |
| outputs=[ | |
| phrase_card, | |
| reference_player, | |
| generation_status, | |
| app_state, | |
| attempt_recorder, | |
| score_panel, | |
| feedback_panel, | |
| next_panel, | |
| stepper, | |
| score_status, | |
| ], | |
| show_progress="hidden", | |
| api_visibility="private", | |
| ) | |
| make_voice_start = generate_btn.click( | |
| fn=begin_make_voice, | |
| inputs=[language, level, voice_style, custom_text, starter_counter], | |
| outputs=[reference_player, phrase_card, generation_status, stepper, feedback_panel, next_panel, generate_btn], | |
| show_progress="hidden", | |
| api_visibility="private", | |
| ) | |
| make_voice_start.then( | |
| fn=create_practice_audio, | |
| inputs=[language, level, voice_style, custom_text, starter_counter], | |
| outputs=[ | |
| reference_player, | |
| phrase_card, | |
| generation_status, | |
| app_state, | |
| stepper, | |
| attempt_recorder, | |
| score_panel, | |
| feedback_panel, | |
| next_panel, | |
| starter_counter, | |
| generate_btn, | |
| ], | |
| show_progress="minimal", | |
| show_progress_on=generate_btn, | |
| api_visibility="private", | |
| concurrency_id="voice", | |
| concurrency_limit=1, | |
| ) | |
| score_start = score_btn.click( | |
| fn=begin_scoring, | |
| inputs=None, | |
| outputs=[feedback_panel, stepper, score_status, score_btn], | |
| show_progress="hidden", | |
| api_visibility="private", | |
| ) | |
| score_start.then( | |
| fn=score_attempt, | |
| inputs=[attempt_recorder, app_state], | |
| outputs=[score_panel, feedback_panel, next_panel, score_state, stepper, score_btn, score_status], | |
| show_progress="minimal", | |
| show_progress_on=score_btn, | |
| api_visibility="private", | |
| concurrency_id="score", | |
| concurrency_limit=1, | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=1).launch() | |