Spaces:
Paused
Paused
| import os | |
| import sys | |
| import uuid | |
| from typing import Optional | |
| import numpy as np | |
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import soundfile as sf | |
| import io | |
| # Add neutts-air to path (same as your working code) | |
| sys.path.append("neutts-air") | |
| try: | |
| from neuttsair.neutts import NeuTTSAir | |
| except ImportError as e: | |
| raise RuntimeError(f"Failed to import NeuTTS Air: {e}. Make sure neutts-air submodule is initialized.") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="NeuTTS Air Production API", | |
| description="Production-ready Text-to-Speech with Voice Cloning", | |
| version="1.0.0" | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global model instance (same initialization as your working code) | |
| tts = NeuTTSAir( | |
| backbone_repo="neuphonic/neutts-air", | |
| backbone_device="cpu", # Changed to CPU for Hugging Face Spaces | |
| codec_repo="neuphonic/neucodec", | |
| codec_device="cpu" # Changed to CPU for Hugging Face Spaces | |
| ) | |
| # Create directories | |
| os.makedirs("uploads", exist_ok=True) | |
| os.makedirs("outputs", exist_ok=True) | |
| async def root(): | |
| return {"status": "online", "service": "NeuTTS Air API"} | |
| async def health_check(): | |
| return {"status": "healthy", "model_loaded": tts is not None} | |
| async def synthesize_speech( | |
| ref_text: str = Form(..., description="Reference audio transcript"), | |
| gen_text: str = Form(..., description="Text to synthesize"), | |
| ref_audio: UploadFile = File(..., description="Reference audio file (WAV)") | |
| ): | |
| """ | |
| Synthesize speech using voice cloning | |
| """ | |
| try: | |
| # Validate audio file | |
| if not ref_audio.filename.lower().endswith('.wav'): | |
| raise HTTPException(400, "Only WAV files are supported as reference audio") | |
| # Save uploaded file | |
| upload_path = f"uploads/{uuid.uuid4()}_{ref_audio.filename}" | |
| with open(upload_path, "wb") as f: | |
| content = await ref_audio.read() | |
| f.write(content) | |
| # Perform inference (same pattern as your working code) | |
| ref_codes = tts.encode_reference(upload_path) | |
| wav = tts.infer(gen_text, ref_codes, ref_text) | |
| # Save output | |
| output_path = f"outputs/{uuid.uuid4()}.wav" | |
| sf.write(output_path, wav, 24000) | |
| return FileResponse( | |
| output_path, | |
| media_type="audio/wav", | |
| filename="synthesized_speech.wav" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(500, f"Synthesis failed: {str(e)}") | |
| async def synthesize_speech_base64( | |
| ref_text: str = Form(...), | |
| gen_text: str = Form(...), | |
| ref_audio: UploadFile = File(...) | |
| ): | |
| """ | |
| Synthesize speech and return as base64 encoded audio | |
| """ | |
| try: | |
| # Save uploaded file | |
| upload_path = f"uploads/{uuid.uuid4()}_{ref_audio.filename}" | |
| with open(upload_path, "wb") as f: | |
| content = await ref_audio.read() | |
| f.write(content) | |
| # Perform inference | |
| ref_codes = tts.encode_reference(upload_path) | |
| wav = tts.infer(gen_text, ref_codes, ref_text) | |
| # Convert to base64 | |
| buffer = io.BytesIO() | |
| sf.write(buffer, wav, 24000, format='WAV') | |
| buffer.seek(0) | |
| import base64 | |
| audio_b64 = base64.b64encode(buffer.read()).decode('utf-8') | |
| return JSONResponse({ | |
| "audio_data": audio_b64, | |
| "sample_rate": 24000, | |
| "format": "wav" | |
| }) | |
| except Exception as e: | |
| raise HTTPException(500, f"Synthesis failed: {str(e)}") | |
| # Batch processing endpoint | |
| async def batch_synthesize( | |
| ref_text: str = Form(...), | |
| ref_audio: UploadFile = File(...), | |
| texts: str = Form(..., description="JSON array of texts to synthesize") | |
| ): | |
| """ | |
| Synthesize multiple texts with the same voice | |
| """ | |
| try: | |
| import json | |
| text_list = json.loads(texts) | |
| # Save reference audio | |
| upload_path = f"uploads/{uuid.uuid4()}_{ref_audio.filename}" | |
| with open(upload_path, "wb") as f: | |
| content = await ref_audio.read() | |
| f.write(content) | |
| # Encode reference once | |
| ref_codes = tts.encode_reference(upload_path) | |
| results = [] | |
| for i, text in enumerate(text_list): | |
| wav = tts.infer(text, ref_codes, ref_text) | |
| output_path = f"outputs/{uuid.uuid4()}.wav" | |
| sf.write(output_path, wav, 24000) | |
| results.append(output_path) | |
| return {"generated_files": results} | |
| except Exception as e: | |
| raise HTTPException(500, f"Batch synthesis failed: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |