Spaces:
Running
Running
| import base64 | |
| import os | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Dict, Optional | |
| import requests | |
| import torch | |
| import torchaudio | |
| from torchaudio.transforms import Resample | |
| from fastapi import BackgroundTasks, Body, FastAPI, Header, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel, Field, HttpUrl | |
| # Environment configuration | |
| SPACE_API_KEY = os.getenv("SPACE_API_KEY") | |
| HF_TOKEN = ( | |
| os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| or os.getenv("HF_TOKEN") | |
| ) | |
| # Model configuration | |
| MODEL_REPO = "IndexTeam/IndexTTS-2" | |
| MODEL_DIR = os.getenv("MODEL_DIR", "/data/indextts2") | |
| MAX_TEXT_LENGTH = 1000 | |
| DEFAULT_LANGUAGE = "en" | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Job management | |
| JOBS: Dict[str, Dict[str, str]] = {} | |
| JOB_LOCK = Lock() | |
| # Set token in environment before importing | |
| if HF_TOKEN: | |
| os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN | |
| os.environ["HF_TOKEN"] = HF_TOKEN | |
| try: | |
| from huggingface_hub import login | |
| login(token=HF_TOKEN, add_to_git_credential=False) | |
| except ImportError: | |
| pass | |
| # Download model checkpoints from Hugging Face | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| try: | |
| from huggingface_hub import snapshot_download | |
| # Download model if not already present | |
| if not Path(MODEL_DIR, "config.yaml").exists(): | |
| print(f"Downloading IndexTTS2 model from {MODEL_REPO}...") | |
| snapshot_download( | |
| repo_id=MODEL_REPO, | |
| local_dir=MODEL_DIR, | |
| token=HF_TOKEN, | |
| ) | |
| print("Model download complete.") | |
| except Exception as exc: | |
| print(f"Warning: Could not download model: {exc}") | |
| # Continue anyway - model might already be present | |
| # Initialize IndexTTS2 | |
| try: | |
| from indextts.infer_v2 import IndexTTS2 | |
| cfg_path = os.path.join(MODEL_DIR, "config.yaml") | |
| if not Path(cfg_path).exists(): | |
| raise FileNotFoundError( | |
| f"Config file not found at {cfg_path}. Model may not be downloaded." | |
| ) | |
| tts_model = IndexTTS2( | |
| cfg_path=cfg_path, | |
| model_dir=MODEL_DIR, | |
| use_fp16=False, # CPU doesn't support FP16 | |
| use_cuda_kernel=False, # CPU mode | |
| use_deepspeed=False, # CPU mode | |
| ) | |
| print("IndexTTS2 model loaded successfully.") | |
| except Exception as exc: | |
| raise RuntimeError(f"Failed to load IndexTTS2 model: {exc}") from exc | |
| # Initialize FastAPI app | |
| app = FastAPI(title="indextts2-api", version="1.0.0") | |
| class GenerateRequest(BaseModel): | |
| text: str = Field(..., min_length=1, max_length=MAX_TEXT_LENGTH) | |
| speaker_wav: str = Field(..., description="HTTPS URL or base64-encoded audio") | |
| language: Optional[str] = Field(DEFAULT_LANGUAGE, description="ISO code, default en") | |
| def _require_api_key(x_api_key: Optional[str]): | |
| """Validate API key if configured.""" | |
| if not SPACE_API_KEY: | |
| return | |
| if x_api_key != SPACE_API_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| def _write_temp_audio_from_url(url: HttpUrl) -> str: | |
| """Download audio from URL to temporary file.""" | |
| response = requests.get(url, stream=True, timeout=30) | |
| if response.status_code >= 400: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Could not fetch speaker audio: {response.status_code}" | |
| ) | |
| suffix = Path(url.path).suffix or ".wav" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| tmp.write(chunk) | |
| return tmp.name | |
| def _write_temp_audio_from_base64(payload: str) -> str: | |
| """Decode base64 audio to temporary file.""" | |
| try: | |
| raw = base64.b64decode(payload) | |
| except Exception as exc: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid base64 speaker_wav" | |
| ) from exc | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
| tmp.write(raw) | |
| return tmp.name | |
| def _temp_speaker_file(speaker_wav: str) -> str: | |
| """Handle speaker audio input from URL or base64.""" | |
| if speaker_wav.startswith("http://") or speaker_wav.startswith("https://"): | |
| return _write_temp_audio_from_url(HttpUrl(speaker_wav)) | |
| return _write_temp_audio_from_base64(speaker_wav) | |
| def _preprocess_audio_wav( | |
| path: str, | |
| target_sr: int = 24000, | |
| target_peak: float = 0.98 | |
| ) -> str: | |
| """ | |
| Light preprocessing to stabilize embeddings and output quality: | |
| - convert to mono | |
| - resample to target_sr | |
| - peak-normalize to target_peak (avoid clipping) | |
| """ | |
| wav, sr = torchaudio.load(path) | |
| # Convert to mono | |
| if wav.shape[0] > 1: | |
| wav = wav.mean(dim=0, keepdim=True) | |
| # Resample if needed | |
| if sr != target_sr: | |
| resampler = Resample(orig_freq=sr, new_freq=target_sr) | |
| wav = resampler(wav) | |
| sr = target_sr | |
| # Peak normalize | |
| peak = wav.abs().max().item() if wav.numel() else 0.0 | |
| if peak > 0: | |
| scale = min(target_peak / peak, 1.0) | |
| wav = wav * scale | |
| # Overwrite input file to avoid extra temp files | |
| torchaudio.save(path, wav, sr, bits_per_sample=16) | |
| return path | |
| def _set_job(job_id: str, **kwargs): | |
| """Thread-safe job update.""" | |
| with JOB_LOCK: | |
| JOBS[job_id] = {**JOBS.get(job_id, {}), **kwargs} | |
| def _get_job(job_id: str) -> Optional[Dict[str, str]]: | |
| """Thread-safe job retrieval.""" | |
| with JOB_LOCK: | |
| data = JOBS.get(job_id) | |
| return dict(data) if data else None | |
| def _pop_job(job_id: str) -> Optional[Dict[str, str]]: | |
| """Thread-safe job removal.""" | |
| with JOB_LOCK: | |
| return JOBS.pop(job_id, None) | |
| def _cleanup_files(*files: str): | |
| """Background task to clean up temporary files after response is sent.""" | |
| for file_path in files: | |
| if file_path and Path(file_path).exists(): | |
| try: | |
| Path(file_path).unlink(missing_ok=True) | |
| except Exception: | |
| pass # Ignore cleanup errors | |
| def _run_generate_job(job_id: str, payload: Dict[str, str]): | |
| """Background job for TTS generation.""" | |
| speaker_file = None | |
| output_file = None | |
| _set_job(job_id, status="processing") | |
| try: | |
| speaker_file = _temp_speaker_file(payload["speaker_wav"]) | |
| speaker_file = _preprocess_audio_wav(speaker_file) | |
| output_file = os.path.join( | |
| tempfile.gettempdir(), | |
| f"indextts2-{uuid.uuid4()}.wav" | |
| ) | |
| tts_model.infer( | |
| spk_audio_prompt=speaker_file, | |
| text=payload["text"], | |
| output_path=output_file, | |
| use_random=False, | |
| verbose=False, | |
| ) | |
| output_file = _preprocess_audio_wav(output_file) | |
| if not Path(output_file).exists(): | |
| raise RuntimeError( | |
| f"TTS generation failed: output file was not created at {output_file}" | |
| ) | |
| _cleanup_files(speaker_file) | |
| _set_job(job_id, status="completed", output_file=output_file) | |
| except Exception as exc: | |
| _cleanup_files(speaker_file, output_file) | |
| _set_job(job_id, status="error", error=str(exc)) | |
| def health(x_api_key: Optional[str] = Header(default=None)): | |
| """Health check endpoint.""" | |
| _require_api_key(x_api_key) | |
| return {"status": "ok", "model": "indextts2", "device": DEVICE} | |
| def generate( | |
| payload: GenerateRequest = Body(...), | |
| background_tasks: BackgroundTasks = BackgroundTasks(), | |
| x_api_key: Optional[str] = Header(default=None), | |
| ): | |
| """ | |
| Generate speech from text using voice cloning. | |
| Returns job information for async processing. | |
| """ | |
| _require_api_key(x_api_key) | |
| job_id = str(uuid.uuid4()) | |
| _set_job(job_id, status="queued") | |
| # Offload the long-running synthesis so the HTTP request stays fast (<100s) | |
| background_tasks.add_task(_run_generate_job, job_id, payload.dict()) | |
| return JSONResponse( | |
| status_code=202, | |
| content={ | |
| "job_id": job_id, | |
| "status": "queued", | |
| "status_url": f"/status/{job_id}", | |
| "result_url": f"/result/{job_id}", | |
| }, | |
| ) | |
| def job_status(job_id: str, x_api_key: Optional[str] = Header(default=None)): | |
| """Check the status of a generation job.""" | |
| _require_api_key(x_api_key) | |
| job = _get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| payload: Dict[str, str] = { | |
| "job_id": job_id, | |
| "status": job.get("status", "unknown") | |
| } | |
| if "error" in job: | |
| payload["error"] = job["error"] | |
| return payload | |
| def job_result( | |
| job_id: str, | |
| background_tasks: BackgroundTasks = BackgroundTasks(), | |
| x_api_key: Optional[str] = Header(default=None), | |
| ): | |
| """Retrieve the result of a completed generation job.""" | |
| _require_api_key(x_api_key) | |
| job = _get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| status = job.get("status") | |
| if status != "completed": | |
| raise HTTPException( | |
| status_code=409, | |
| detail=f"Job not ready (status={status})" | |
| ) | |
| output_file = job.get("output_file") | |
| if not output_file or not Path(output_file).exists(): | |
| _pop_job(job_id) | |
| raise HTTPException(status_code=410, detail="Result expired or missing") | |
| # Remove job from memory and cleanup output after sending | |
| _pop_job(job_id) | |
| background_tasks.add_task(_cleanup_files, output_file) | |
| return FileResponse( | |
| output_file, | |
| media_type="audio/wav", | |
| filename="output.wav" | |
| ) | |
| def root(): | |
| """API root with available endpoints.""" | |
| return { | |
| "name": "indextts2-api", | |
| "endpoints": [ | |
| "/health", | |
| "/generate", | |
| "/status/{job_id}", | |
| "/result/{job_id}" | |
| ], | |
| } | |