Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import requests | |
| DEFAULT_SYNTH_URL = os.getenv( | |
| "READLOVER_SYNTH_URL", | |
| "https://api.readlover.app/v1/synthesize/json", | |
| ) | |
| API_SECRET = (os.getenv("API") or os.getenv("READLOVER_USER_ID") or "").strip() | |
| DEFAULT_TIMEOUT = int(os.getenv("READLOVER_TIMEOUT", "300")) | |
| DEFAULT_N_STEPS = 4 | |
| LANGUAGES_PATH = Path(__file__).resolve().parent / "languages.json" | |
| BLOCKED_SPEAKER_IDS = {27} | |
| def _base_url_from_synth_url(synth_url: str) -> str: | |
| synth_url = synth_url.strip().rstrip("/") | |
| suffix = "/v1/synthesize/json" | |
| if synth_url.endswith(suffix): | |
| return synth_url[: -len(suffix)] | |
| if synth_url.endswith("/v1/synthesize"): | |
| return synth_url[: -len("/v1/synthesize")] | |
| return synth_url | |
| def _int_or_none(value: Any) -> Optional[int]: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _is_blocked_speaker_id(value: Any) -> bool: | |
| speaker_id = _int_or_none(value) | |
| return speaker_id in BLOCKED_SPEAKER_IDS | |
| def _auth_headers(api_key: str) -> Dict[str, str]: | |
| return { | |
| "X-User-ID": api_key, | |
| "X-API-Key": api_key, | |
| "Authorization": f"Bearer {api_key}", | |
| } | |
| def _load_local_languages() -> Dict[str, Any]: | |
| with LANGUAGES_PATH.open("r", encoding="utf-8") as file: | |
| return json.load(file) | |
| def _metadata_from_languages(languages_payload: Dict[str, Any]) -> Dict[str, Any]: | |
| languages: List[Dict[str, Any]] = [] | |
| voices: List[Dict[str, Any]] = [] | |
| for language_key, language in languages_payload.items(): | |
| speaker_ids = sorted( | |
| int(speaker_id) | |
| for speaker_id in language.get("speakers", {}).keys() | |
| if not _is_blocked_speaker_id(speaker_id) | |
| ) | |
| speaker_names = [f"Speaker {speaker_id}" for speaker_id in speaker_ids] | |
| language_name = str(language.get("display") or language_key.title()) | |
| languages.append( | |
| { | |
| "id": int(language["id"]), | |
| "code": language_key, | |
| "name": language_name, | |
| "speaker_ids": speaker_ids, | |
| "speaker_names": speaker_names, | |
| } | |
| ) | |
| for speaker_id in speaker_ids: | |
| if _is_blocked_speaker_id(speaker_id): | |
| continue | |
| voices.append( | |
| { | |
| "id": speaker_id, | |
| "name": f"{language_name} Speaker {speaker_id}", | |
| "language_id": int(language["id"]), | |
| "language_name": language_name, | |
| "language_key": language_key, | |
| } | |
| ) | |
| return { | |
| "defaults": { | |
| "speaker_id": 0, | |
| "language_id": 0, | |
| "preset": "neutral", | |
| }, | |
| "languages": sorted(languages, key=lambda item: item["id"]), | |
| "voices": sorted(voices, key=lambda item: item["id"]), | |
| } | |
| def _fallback_metadata() -> Dict[str, Any]: | |
| try: | |
| return _metadata_from_languages(_load_local_languages()) | |
| except Exception: | |
| return _metadata_from_languages( | |
| { | |
| "english": { | |
| "id": 0, | |
| "display": "English", | |
| "espeak": "en-us", | |
| "speakers": { | |
| str(index): {"quality_id": index} | |
| for index in range(9) | |
| if index not in BLOCKED_SPEAKER_IDS | |
| }, | |
| } | |
| } | |
| ) | |
| def fetch_metadata(synth_url: str) -> Tuple[Dict[str, Any], str]: | |
| metadata_url = f"{_base_url_from_synth_url(synth_url)}/v1/metadata" | |
| try: | |
| response = requests.get(metadata_url, timeout=20) | |
| response.raise_for_status() | |
| return response.json(), f"Loaded metadata from {metadata_url}" | |
| except Exception as exc: | |
| return _fallback_metadata(), f"Using local language fallback. Metadata error: {exc}" | |
| def build_catalog(metadata: Dict[str, Any]) -> Dict[str, Any]: | |
| languages = sorted( | |
| metadata.get("languages") or [], | |
| key=lambda item: int(item.get("id", 0)), | |
| ) | |
| voices = sorted( | |
| [ | |
| voice | |
| for voice in metadata.get("voices") or [] | |
| if not _is_blocked_speaker_id(voice.get("id")) | |
| ], | |
| key=lambda item: int(item.get("id", 0)), | |
| ) | |
| language_labels: List[str] = [] | |
| language_by_label: Dict[str, Dict[str, Any]] = {} | |
| speakers_by_language: Dict[str, List[str]] = {} | |
| speaker_by_label: Dict[str, Dict[str, Any]] = {} | |
| for language in languages: | |
| language_id = int(language["id"]) | |
| language_name = str(language.get("name") or language.get("code") or language_id) | |
| language_label = f"{language_name} ({language_id})" | |
| language_labels.append(language_label) | |
| language_by_label[language_label] = language | |
| speaker_ids = { | |
| int(speaker_id) | |
| for speaker_id in language.get("speaker_ids", []) | |
| if not _is_blocked_speaker_id(speaker_id) | |
| } | |
| language_speakers = [ | |
| voice | |
| for voice in voices | |
| if ( | |
| _int_or_none(voice.get("id")) in speaker_ids | |
| or _int_or_none(voice.get("language_id")) == language_id | |
| ) | |
| and not _is_blocked_speaker_id(voice.get("id")) | |
| ] | |
| speaker_labels: List[str] = [] | |
| for voice in language_speakers: | |
| speaker_id = int(voice["id"]) | |
| if _is_blocked_speaker_id(speaker_id): | |
| continue | |
| speaker_label = str(speaker_id) | |
| speaker_labels.append(speaker_label) | |
| speaker_by_label[speaker_label] = voice | |
| speaker_labels = sorted( | |
| set(speaker_labels), | |
| key=lambda value: int(value), | |
| ) | |
| speakers_by_language[language_label] = speaker_labels | |
| defaults = metadata.get("defaults") or {} | |
| default_language_id = int(defaults.get("language_id", 0)) | |
| default_speaker_id = int(defaults.get("speaker_id", 0)) | |
| default_language = next( | |
| ( | |
| label | |
| for label, item in language_by_label.items() | |
| if int(item["id"]) == default_language_id | |
| ), | |
| language_labels[0] if language_labels else None, | |
| ) | |
| default_speakers = ( | |
| speakers_by_language.get(default_language, []) | |
| if default_language | |
| else [] | |
| ) | |
| if _is_blocked_speaker_id(default_speaker_id): | |
| default_speaker = default_speakers[0] if default_speakers else None | |
| else: | |
| default_speaker = ( | |
| str(default_speaker_id) | |
| if str(default_speaker_id) in default_speakers | |
| else default_speakers[0] if default_speakers else None | |
| ) | |
| return { | |
| "language_labels": language_labels, | |
| "language_by_label": language_by_label, | |
| "speakers_by_language": speakers_by_language, | |
| "speaker_by_label": speaker_by_label, | |
| "default_language": default_language, | |
| "default_speaker": default_speaker, | |
| } | |
| def load_catalog(synth_url: str): | |
| metadata, status = fetch_metadata(synth_url) | |
| catalog = build_catalog(metadata) | |
| language = catalog["default_language"] | |
| speaker_choices = catalog["speakers_by_language"].get(language, []) | |
| return ( | |
| catalog, | |
| status, | |
| gr.update(choices=catalog["language_labels"], value=language), | |
| gr.update(choices=speaker_choices, value=catalog["default_speaker"]), | |
| ) | |
| def load_default_catalog(): | |
| catalog, _status, language_update, speaker_update = load_catalog(DEFAULT_SYNTH_URL) | |
| return catalog, language_update, speaker_update | |
| def update_speakers(language_label: str, catalog: Dict[str, Any]): | |
| speaker_choices = ( | |
| (catalog or {}) | |
| .get("speakers_by_language", {}) | |
| .get(language_label, []) | |
| ) | |
| speaker_choices = [ | |
| speaker_id | |
| for speaker_id in speaker_choices | |
| if not _is_blocked_speaker_id(speaker_id) | |
| ] | |
| return gr.update( | |
| choices=speaker_choices, | |
| value=speaker_choices[0] if speaker_choices else None, | |
| ) | |
| def synthesize( | |
| text: str, | |
| language_label: str, | |
| speaker_id_label: str, | |
| preset: str, | |
| speech_temperature: float, | |
| duration_length: float, | |
| pace: float, | |
| cfg_strength: float, | |
| catalog: Dict[str, Any], | |
| ) -> Tuple[Optional[str], str]: | |
| if not text or not text.strip(): | |
| raise gr.Error("Wpisz tekst do syntezy.") | |
| if not API_SECRET: | |
| raise gr.Error("Brakuje sekretu API. Na Hugging Face dodaj secret o nazwie API.") | |
| if _is_blocked_speaker_id(speaker_id_label): | |
| raise gr.Error("Speaker ID 27 jest niedostępny.") | |
| if not catalog: | |
| metadata, _ = fetch_metadata(DEFAULT_SYNTH_URL) | |
| catalog = build_catalog(metadata) | |
| language = catalog.get("language_by_label", {}).get(language_label) | |
| speaker = catalog.get("speaker_by_label", {}).get(str(speaker_id_label)) | |
| if language is None: | |
| raise gr.Error("Nieznany language.") | |
| if speaker is None: | |
| raise gr.Error("Nieznany speaker_id.") | |
| if _is_blocked_speaker_id(speaker.get("id")): | |
| raise gr.Error("Speaker ID 27 jest niedostępny.") | |
| payload = { | |
| "text": text.strip(), | |
| "speaker_id": int(speaker["id"]), | |
| "language_id": int(language["id"]), | |
| "espeak_language": language.get("espeak_language"), | |
| "preset": preset, | |
| "temperature": float(speech_temperature), | |
| "length_scale": float(duration_length), | |
| "space_duration_scale": float(pace), | |
| "cfg_strength": float(cfg_strength), | |
| "n_steps": DEFAULT_N_STEPS, | |
| } | |
| response = requests.post( | |
| DEFAULT_SYNTH_URL, | |
| json=payload, | |
| headers=_auth_headers(API_SECRET), | |
| timeout=DEFAULT_TIMEOUT, | |
| ) | |
| if not response.ok: | |
| try: | |
| detail = response.json().get("detail", response.text) | |
| except Exception: | |
| detail = response.text | |
| raise gr.Error(f"API error {response.status_code}: {detail}") | |
| data = response.json() | |
| audio_base64 = data.get("audio_base64") | |
| if not audio_base64: | |
| raise gr.Error("API response does not contain audio_base64.") | |
| audio_bytes = base64.b64decode(audio_base64) | |
| audio_extension = data.get("audio_extension") or "wav" | |
| temp_file = tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix=f".{audio_extension}", | |
| ) | |
| temp_file.write(audio_bytes) | |
| temp_file.close() | |
| info = ( | |
| f"speaker_id: {data.get('speaker_id')}\n" | |
| f"language_id: {data.get('language_id')}\n" | |
| f"espeak_language: {data.get('espeak_language')}\n" | |
| f"preset: {data.get('preset')}\n" | |
| f"sample_rate: {data.get('sample_rate')}\n" | |
| f"audio_seconds: {data.get('audio_seconds')}\n" | |
| f"inference_seconds: {data.get('inference_seconds')}\n" | |
| f"rtf: {data.get('rtf')}\n" | |
| f"sentence_count: {data.get('sentence_count')}\n" | |
| f"temperature: {payload['temperature']}\n" | |
| f"duration_length: {payload['length_scale']}\n" | |
| f"pace: {payload['space_duration_scale']}\n" | |
| f"cfg_strength: {payload['cfg_strength']}\n" | |
| f"n_steps: {payload['n_steps']}" | |
| ) | |
| return temp_file.name, info | |
| def build_app() -> gr.Blocks: | |
| with gr.Blocks(title="ReadLover TTS API") as demo: | |
| catalog_state = gr.State({}) | |
| gr.Markdown( | |
| """ | |
| ## Model architecture | |
| SlopTTS is an experimental neural TTS system with an eSpeak-based phonemization frontend, contextual text encoder, and an **adversarial flow-matching acoustic predictor** operating in a **VAE-style latent space**. | |
| The predictor estimates phoneme durations and acoustic latents, which are decoded into waveform audio by a neural vocoder. Text is processed sentence by sentence with neighboring-context conditioning for smoother prosody across sentence boundaries. | |
| This model lacks generalization due to a small amount of data and computation. The model was trained using random datasets found online. | |
| **Note:** This model is not optimized for fast inference yet. | |
| """ | |
| ) | |
| text = gr.Textbox( | |
| label="Text", | |
| lines=8, | |
| value='''In an old land called Eldoria, where the mountains glowed silver beneath the moon and forests whispered forgotten names, there stood a ruined tower on the edge of the Blackwood. No one had entered it for a hundred years. | |
| The people of the nearby village believed the tower was cursed. At night, a pale blue light flickered from its broken windows, and strange music drifted across the fields like a memory no one could place.''' | |
| ) | |
| with gr.Row(): | |
| language = gr.Dropdown( | |
| label="Language", | |
| choices=[], | |
| value=None, | |
| ) | |
| speaker_id = gr.Dropdown( | |
| label="Speaker ID", | |
| choices=[], | |
| value=None, | |
| ) | |
| preset = gr.Dropdown( | |
| label="Preset", | |
| choices=["neutral", "expressive"], | |
| value="neutral", | |
| ) | |
| with gr.Row(): | |
| speech_temperature = gr.Slider( | |
| label="Speech temperature", | |
| minimum=0.1, | |
| maximum=2.0, | |
| step=0.01, | |
| value=0.65, | |
| ) | |
| duration_length = gr.Slider( | |
| label="Duration length", | |
| minimum=0.5, | |
| maximum=2.5, | |
| step=0.01, | |
| value=1.0, | |
| ) | |
| pace = gr.Slider( | |
| label="Pace", | |
| minimum=0.1, | |
| maximum=5.0, | |
| step=0.01, | |
| value=1.0, | |
| ) | |
| cfg_strength = gr.Slider( | |
| label="CFG strength", | |
| minimum=1.0, | |
| maximum=5.0, | |
| step=0.1, | |
| value=1.0, | |
| ) | |
| synth_button = gr.Button("Synthesize", variant="primary") | |
| audio = gr.Audio(label="Audio", type="filepath") | |
| info = gr.Textbox(label="Response info", lines=12) | |
| demo.load( | |
| load_default_catalog, | |
| inputs=None, | |
| outputs=[catalog_state, language, speaker_id], | |
| ) | |
| language.change( | |
| update_speakers, | |
| inputs=[language, catalog_state], | |
| outputs=[speaker_id], | |
| ) | |
| synth_button.click( | |
| synthesize, | |
| inputs=[ | |
| text, | |
| language, | |
| speaker_id, | |
| preset, | |
| speech_temperature, | |
| duration_length, | |
| pace, | |
| cfg_strength, | |
| catalog_state, | |
| ], | |
| outputs=[audio, info], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| build_app().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", "7860")), | |
| ) |