Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from fastapi.responses import StreamingResponse | |
| import edge_tts | |
| import tempfile | |
| import logging | |
| # Initialize logging for better error tracking | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Allow CORS (for frontend or cross-origin calls) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class TTSRequest(BaseModel): | |
| text: str | |
| voice: str | |
| rate: int = 0 | |
| pitch: int = 0 | |
| async def get_voices(): | |
| try: | |
| voices = await edge_tts.list_voices() | |
| return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v["ShortName"] for v in voices} | |
| except Exception as e: | |
| logger.error(f"Error getting voices: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error getting voices: {str(e)}") | |
| async def text_to_speech(req: TTSRequest): | |
| if not req.text.strip(): | |
| logger.error("Text is empty.") | |
| raise HTTPException(status_code=400, detail="Text is empty.") | |
| # Extract voice short name | |
| try: | |
| voice_short_name = req.voice.split(" - ")[0] | |
| except Exception as e: | |
| logger.error(f"Error parsing voice name: {str(e)}") | |
| raise HTTPException(status_code=400, detail=f"Invalid voice format: {str(e)}") | |
| # Prepare rate and pitch strings | |
| rate_str = f"{req.rate:+d}%" | |
| pitch_str = f"{req.pitch:+d}Hz" | |
| try: | |
| logger.info(f"Generating speech for text: {req.text} with voice {voice_short_name}, rate {rate_str}, pitch {pitch_str}") | |
| # Generate speech using edge_tts | |
| communicate = edge_tts.Communicate(req.text, voice_short_name, rate=rate_str, pitch=pitch_str) | |
| # Save the audio to a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| # Return the audio as a StreamingResponse | |
| logger.info(f"Audio generated and saved to {tmp_path}") | |
| return StreamingResponse(open(tmp_path, "rb"), media_type="audio/mpeg") | |
| except Exception as e: | |
| logger.error(f"Error generating speech: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error generating speech: {str(e)}") | |