Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from typing import Optional, Dict, Any, List, Tuple | |
| import time | |
| import uvicorn | |
| from datetime import datetime | |
| import psutil | |
| import asyncio | |
| import edge_tts | |
| from pydub import AudioSegment | |
| import os | |
| import uuid | |
| import tempfile | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="TTS API Service", | |
| description="Text-to-Speech API with real-time status monitoring", | |
| version="1.0.0" | |
| ) | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Core functionality (moved from app.py) | |
| class TTSError(Exception): | |
| pass | |
| class FileManager: | |
| def __init__(self): | |
| self.temp_dir = tempfile.mkdtemp(prefix="tts_api_") | |
| self.output_files = [] | |
| def get_temp_path(self, prefix: str) -> str: | |
| return os.path.join(self.temp_dir, f"{prefix}_{uuid.uuid4()}") | |
| def cleanup_old_files(self): | |
| for path in self.output_files[:-5]: # Keep only last 5 files | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| except Exception: | |
| pass | |
| self.output_files = self.output_files[-5:] | |
| # Global state management | |
| class ProcessingState: | |
| def __init__(self): | |
| self.active_jobs: Dict[str, Dict[str, Any]] = {} | |
| self.file_manager = FileManager() | |
| state = ProcessingState() | |
| # Pydantic models | |
| class TTSRequest(BaseModel): | |
| text: str | |
| voice: str = "en-US-JennyNeural" | |
| pitch: int = 0 | |
| rate: int = 0 | |
| class HealthResponse(BaseModel): | |
| status: str | |
| timestamp: str | |
| cpu_usage: float | |
| memory_usage: float | |
| active_jobs: int | |
| # Voice options dictionary (simplified) | |
| voice_options = { | |
| "Jenny": "en-US-JennyNeural", | |
| "Guy": "en-US-GuyNeural", | |
| "Ana": "en-US-AnaNeural", | |
| "Aria": "en-US-AriaNeural" | |
| } | |
| async def generate_tts(text: str, voice: str, rate: str, pitch: str) -> Tuple[str, str]: | |
| """Core TTS generation function""" | |
| try: | |
| audio_path = state.file_manager.get_temp_path("audio") + ".mp3" | |
| tts = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch) | |
| await tts.save(audio_path) | |
| if not os.path.exists(audio_path): | |
| raise TTSError("Failed to generate audio file") | |
| state.file_manager.output_files.append(audio_path) | |
| state.file_manager.cleanup_old_files() | |
| return audio_path | |
| except Exception as e: | |
| raise TTSError(f"TTS generation failed: {str(e)}") | |
| # API endpoints | |
| async def create_tts(request: TTSRequest, background_tasks: BackgroundTasks): | |
| job_id = f"job_{int(time.time())}_{hash(request.text)}" | |
| state.active_jobs[job_id] = { | |
| "id": job_id, | |
| "status": "queued", | |
| "progress": 0, | |
| "created_at": datetime.now().isoformat(), | |
| "last_update": datetime.now().isoformat(), | |
| "request": request.dict() | |
| } | |
| async def process_tts(): | |
| try: | |
| pitch_str = f"{request.pitch:+d}Hz" | |
| rate_str = f"{request.rate:+d}%" | |
| audio_path = await generate_tts( | |
| request.text, | |
| request.voice, | |
| rate_str, | |
| pitch_str | |
| ) | |
| state.active_jobs[job_id].update({ | |
| "status": "completed", | |
| "progress": 1.0, | |
| "result": { | |
| "audio_path": audio_path | |
| } | |
| }) | |
| except Exception as e: | |
| state.active_jobs[job_id].update({ | |
| "status": "failed", | |
| "error": str(e) | |
| }) | |
| background_tasks.add_task(process_tts) | |
| return {"job_id": job_id, "status": "queued"} | |
| async def get_job_status(job_id: str): | |
| if job_id not in state.active_jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return state.active_jobs[job_id] | |
| async def download_file(job_id: str): | |
| if job_id not in state.active_jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| job = state.active_jobs[job_id] | |
| if job["status"] != "completed": | |
| raise HTTPException(status_code=400, detail="Job not completed") | |
| file_path = job["result"]["audio_path"] | |
| return FileResponse( | |
| file_path, | |
| filename=f"tts_output.mp3" | |
| ) | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "cpu_usage": psutil.cpu_percent(), | |
| "memory_usage": psutil.virtual_memory().percent, | |
| "active_jobs": len(state.active_jobs) | |
| } | |
| async def list_voices(): | |
| return {"voices": voice_options} | |
| if __name__ == "__main__": | |
| # Change to use import string format | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |