Spaces:
Paused
Paused
| import asyncio | |
| import os | |
| import tempfile | |
| import time | |
| import logging | |
| from fastapi import FastAPI, File, Form, UploadFile, HTTPException | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| # Ensure OMP_NUM_THREADS is a valid positive integer before importing heavy libs. | |
| _omp_val = os.environ.get("OMP_NUM_THREADS") | |
| if not _omp_val or not _omp_val.isdigit() or int(_omp_val) <= 0: | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| from auralis import TTS, TTSRequest, AudioPreprocessingConfig | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # Default reference voices (you must add these files to the repo) | |
| DEFAULT_MALE_VOICE = os.path.join(BASE_DIR, "malear.wav") | |
| DEFAULT_FEMALE_VOICE = os.path.join(BASE_DIR, "femalten.wav") | |
| app = FastAPI( | |
| title="TTS API", | |
| version="1.1.0", | |
| ) | |
| logger = logging.getLogger("uvicorn.error") | |
| tts: TTS | None = None | |
| async def root(): | |
| """ | |
| Basic root endpoint so that GET / returns 200 instead of 404. | |
| Useful for Hugging Face's automatic health/log checks and quick status. | |
| """ | |
| is_loaded = tts is not None | |
| return { | |
| "status": "Model is ready" if is_loaded else "Model is loading", | |
| "model_loaded": is_loaded, | |
| } | |
| async def load_model() -> None: | |
| """ | |
| Load the model once when the application starts. | |
| We create the model inside a worker thread so can freely | |
| manage its own event loop without conflicting with FastAPI/uvicorn. | |
| """ | |
| global tts | |
| if tts is not None: | |
| return | |
| loop = asyncio.get_event_loop() | |
| def _init_model() -> TTS: | |
| return TTS().from_pretrained( | |
| "AstraMindAI/xttsv2", | |
| gpt_model="AstraMindAI/xtts2-gpt", | |
| ) | |
| tts = await loop.run_in_executor(None, _init_model) | |
| async def health(): | |
| """ | |
| Simple health check endpoint. | |
| """ | |
| is_loaded = tts is not None | |
| return JSONResponse( | |
| { | |
| "status": "Model is ready" if is_loaded else "Model is loading", | |
| "model_loaded": is_loaded, | |
| } | |
| ) | |
| async def tts_endpoint( | |
| text: str = Form(..., description="Text to synthesize"), | |
| language: str = Form( | |
| "English", | |
| description="Language name, e.g. 'English' or 'Arabic' (case-insensitive).", | |
| ), | |
| gender: str = Form( | |
| "Male", | |
| description="Used when no clone_voice file is provided: 'Male' or 'Female'.", | |
| ), | |
| clone_voice: UploadFile | None = File( | |
| None, | |
| description=( | |
| "Optional reference audio for voice cloning (WAV/FLAC/MP3). " | |
| "If omitted, a default male/female voice is used." | |
| ), | |
| ), | |
| ): | |
| """ | |
| Generate speech from text. | |
| - If use_voice_cloning is true AND speaker_file is provided: use that as the voice. | |
| - Otherwise, fall back to bundled default voices: malear.wav / femalten.wav. | |
| Returns raw WAV audio as the response body. | |
| """ | |
| if tts is None: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Model is still loading, please try again in a few seconds.", | |
| ) | |
| if not text.strip(): | |
| raise HTTPException(status_code=400, detail="Text must not be empty.") | |
| # Normalize language selection to values expected by Auralis | |
| lang_name = language.strip().lower() | |
| if lang_name in {"english", "en", "eng"}: | |
| lang = "en" | |
| elif lang_name in {"arabic", "ar", "arb"}: | |
| lang = "ar" | |
| elif lang_name in {"auto", ""}: | |
| lang = "auto" | |
| else: | |
| # Fallback: pass through as auto, but keep behavior predictable | |
| lang = "auto" | |
| # Decide which speaker reference file to use | |
| speaker_path = None | |
| if clone_voice is not None: | |
| # Basic content-type guard; Auralis can read various formats | |
| allowed_types = { | |
| "audio/wav", | |
| "audio/x-wav", | |
| "audio/flac", | |
| "audio/x-flac", | |
| "audio/mpeg", | |
| "audio/mp3", | |
| "audio/ogg", | |
| } | |
| if clone_voice.content_type not in allowed_types: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=( | |
| "Unsupported speaker_file content-type: " | |
| f"{clone_voice.content_type}" | |
| ), | |
| ) | |
| # Save uploaded speaker file to a temporary path Auralis can use | |
| try: | |
| data = await clone_voice.read() | |
| if not data: | |
| raise HTTPException(status_code=400, detail="Empty speaker_file.") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
| tmp.write(data) | |
| speaker_path = tmp.name | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to read speaker_file: {e}", | |
| ) | |
| else: | |
| # Use default bundled voice based on gender | |
| g = gender.lower() | |
| if g not in {"male", "female"}: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid gender. Use 'male' or 'female'.", | |
| ) | |
| speaker_path = ( | |
| DEFAULT_MALE_VOICE if g == "male" else DEFAULT_FEMALE_VOICE | |
| ) | |
| if not os.path.exists(speaker_path): | |
| # This is a deployment/config error; make it clear. | |
| raise HTTPException( | |
| status_code=500, | |
| detail=( | |
| f"Default reference voice file not found at {speaker_path}. " | |
| "Make sure malear.wav and femalten.wav are present next to app.py." | |
| ), | |
| ) | |
| # Build TTSRequest with audio enhancement config | |
| request = TTSRequest( | |
| text=text, | |
| speaker_files=[speaker_path], | |
| language=lang, | |
| audio_config=AudioPreprocessingConfig( | |
| # Use fixed, sensible defaults; no need to expose as API params | |
| normalize=True, | |
| trim_silence=True, | |
| enhance_speech=True, | |
| ), | |
| # Generation parameters; tweak if needed | |
| temperature=0.75, | |
| top_p=0.85, | |
| top_k=50, | |
| stream=False, | |
| ) | |
| # Run blocking generation in a thread so FastAPI's event loop is not blocked | |
| loop = asyncio.get_event_loop() | |
| def _generate(): | |
| return tts.generate_speech(request) | |
| try: | |
| start = time.perf_counter() | |
| output = await loop.run_in_executor(None, _generate) | |
| elapsed_ms = int((time.perf_counter() - start) * 1000) | |
| # Get audio duration information for the client | |
| _num_samples, _sr, duration = output.get_info() | |
| audio_bytes = output.to_bytes() # WAV bytes | |
| except RuntimeError as exc: | |
| # Gracefully surface CUDA OOM errors instead of crashing the app | |
| message = str(exc) | |
| if "CUDA out of memory" in message: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="CUDA out of memory on the Space GPU. Try shorter text, shorter speaker audio, or fewer concurrent requests.", | |
| ) | |
| raise | |
| finally: | |
| # Cleanup temp file used for cloning (if any) | |
| if clone_voice is not None and speaker_path and os.path.isfile(speaker_path): | |
| try: | |
| os.remove(speaker_path) | |
| except OSError: | |
| pass | |
| logger.info( | |
| "Generated audio in %.3f seconds (duration=%.3f sec)", | |
| elapsed_ms / 1000.0, | |
| duration, | |
| ) | |
| return StreamingResponse( | |
| iter([audio_bytes]), | |
| media_type="audio/wav", | |
| headers={ | |
| "Content-Disposition": 'attachment; filename="output.wav"', | |
| "X-Generation-Time-ms": str(elapsed_ms), | |
| "X-Audio-Duration-sec": f"{duration:.3f}", | |
| }, | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860))) |