import time import uuid from pathlib import Path import edge_tts from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from starlette.background import BackgroundTask app = FastAPI(title="Edge TTS API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) TMP_DIR = Path("/tmp/edge_tts_api") TMP_DIR.mkdir(parents=True, exist_ok=True) class TTSRequest(BaseModel): text: str voice: str = "bg-BG-BorislavNeural" rate: str = "+0%" volume: str = "+0%" pitch: str = "+0Hz" def simplify_voice(v: dict) -> dict: short_name = v.get("ShortName", "") locale = v.get("Locale", "") return { "Name": v.get("Name", ""), "ShortName": short_name, "FriendlyName": v.get("FriendlyName", ""), "Locale": locale, "Gender": v.get("Gender", ""), "SuggestedCodec": v.get("SuggestedCodec", ""), "Status": v.get("Status", ""), "IsMultilingual": "Multilingual" in short_name, "IsBulgarian": locale == "bg-BG", } def cleanup_old_files(folder: Path, max_age_seconds: int = 3600): now = time.time() for file in folder.glob("*.mp3"): try: age = now - file.stat().st_mtime if age > max_age_seconds: file.unlink(missing_ok=True) except Exception: pass def delete_file(path: Path): try: path.unlink(missing_ok=True) except Exception: pass @app.get("/") async def root(): return { "ok": True, "service": "Edge TTS API", "routes": { "health": "/health", "voices": "/voices", "tts": "/tts", }, } @app.get("/health") async def health(): return {"status": "ok"} @app.get("/voices") async def voices( locale: str | None = Query(default=None), multilingual_only: bool = Query(default=False), bulgarian_only: bool = Query(default=False), ): try: voices_data = await edge_tts.list_voices() simplified = [simplify_voice(v) for v in voices_data] simplified.sort(key=lambda v: (v["Locale"], v["ShortName"])) if locale: simplified = [v for v in simplified if v["Locale"].lower() == locale.lower()] if multilingual_only: simplified = [v for v in simplified if v["IsMultilingual"]] if bulgarian_only: simplified = [v for v in simplified if v["IsBulgarian"]] return JSONResponse( content={ "count": len(simplified), "voices": simplified, } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch voices: {str(e)}") @app.get("/voices/bulgarian") async def voices_bulgarian(): try: voices_data = await edge_tts.list_voices() simplified = [simplify_voice(v) for v in voices_data] simplified = [v for v in simplified if v["IsBulgarian"]] simplified.sort(key=lambda v: v["ShortName"]) return {"count": len(simplified), "voices": simplified} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch Bulgarian voices: {str(e)}") @app.get("/voices/multilingual") async def voices_multilingual(): try: voices_data = await edge_tts.list_voices() simplified = [simplify_voice(v) for v in voices_data] simplified = [v for v in simplified if v["IsMultilingual"]] simplified.sort(key=lambda v: (v["Locale"], v["ShortName"])) return {"count": len(simplified), "voices": simplified} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch multilingual voices: {str(e)}") @app.post("/tts") async def tts(req: TTSRequest): text = req.text.strip() if not text: raise HTTPException(status_code=400, detail="Text is empty") cleanup_old_files(TMP_DIR, max_age_seconds=3600) output_file = TMP_DIR / f"{uuid.uuid4().hex}.mp3" try: communicate = edge_tts.Communicate( text=text, voice=req.voice, rate=req.rate, volume=req.volume, pitch=req.pitch, ) await communicate.save(str(output_file)) if not output_file.exists(): raise HTTPException(status_code=500, detail="Audio file was not created") return FileResponse( path=str(output_file), media_type="audio/mpeg", filename="speech.mp3", background=BackgroundTask(delete_file, output_file), ) except Exception as e: delete_file(output_file) raise HTTPException(status_code=500, detail=f"TTS generation failed: {str(e)}")