|
|
from fastapi import FastAPI, BackgroundTasks |
|
|
import edge_tts |
|
|
import asyncio |
|
|
import os |
|
|
import time |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from typing import List |
|
|
import pydub |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
active_requests = {} |
|
|
|
|
|
def split_text(text, max_chunk_size=500): |
|
|
"""Split text into chunks at sentence boundaries.""" |
|
|
sentences = text.replace('।', '.').replace('؟', '?').split('.') |
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() + '.' |
|
|
sentence_length = len(sentence) |
|
|
|
|
|
if current_length + sentence_length > max_chunk_size and current_chunk: |
|
|
chunks.append(' '.join(current_chunk)) |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
current_chunk.append(sentence) |
|
|
current_length += sentence_length |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(' '.join(current_chunk)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
async def process_chunk(text, voice, temp_dir, chunk_index): |
|
|
"""Process a single chunk of text asynchronously.""" |
|
|
tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}_{int(time.time())}.mp3") |
|
|
communicate = edge_tts.Communicate(text, voice) |
|
|
await communicate.save(tmp_path) |
|
|
return tmp_path |
|
|
|
|
|
async def combine_audio_files(chunk_files, output_path): |
|
|
"""Combine multiple MP3 files into one.""" |
|
|
combined = pydub.AudioSegment.empty() |
|
|
for file in chunk_files: |
|
|
audio_segment = pydub.AudioSegment.from_mp3(file) |
|
|
combined += audio_segment |
|
|
|
|
|
combined.export(output_path, format="mp3") |
|
|
|
|
|
|
|
|
for file in chunk_files: |
|
|
try: |
|
|
os.remove(file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
@app.get("/") |
|
|
def home(): |
|
|
return {"message": "EdgeTTS FastAPI is running!"} |
|
|
|
|
|
@app.get("/health") |
|
|
def health_check(): |
|
|
"""Check if the API is running and how many requests are active.""" |
|
|
return {"status": "running", "active_requests": len(active_requests)} |
|
|
|
|
|
@app.get("/status") |
|
|
def status(): |
|
|
"""Return the list of active requests being processed.""" |
|
|
return {"active_requests": list(active_requests.keys())} |
|
|
|
|
|
@app.get("/tts") |
|
|
async def tts(text: str, voice: str = "en-US-JennyNeural", background_tasks: BackgroundTasks = None): |
|
|
"""Generate speech from text using EdgeTTS with parallel processing.""" |
|
|
request_id = f"{int(time.time())}_{os.urandom(4).hex()}" |
|
|
active_requests[request_id] = "processing" |
|
|
|
|
|
try: |
|
|
output_file = f"output_{request_id}.mp3" |
|
|
temp_dir = f"temp_{request_id}" |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
|
|
chunks = split_text(text) |
|
|
tasks = [process_chunk(chunk, voice, temp_dir, i) for i, chunk in enumerate(chunks)] |
|
|
chunk_files = await asyncio.gather(*tasks) |
|
|
|
|
|
await combine_audio_files(chunk_files, output_file) |
|
|
|
|
|
background_tasks.add_task(cleanup_request, request_id) |
|
|
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") |
|
|
|
|
|
except Exception as e: |
|
|
del active_requests[request_id] |
|
|
return JSONResponse(content={"error": str(e)}, status_code=500) |
|
|
|
|
|
def cleanup_request(request_id): |
|
|
"""Cleanup function to remove temporary files.""" |
|
|
del active_requests[request_id] |
|
|
temp_dir = f"temp_{request_id}" |
|
|
if os.path.exists(temp_dir): |
|
|
for file in os.listdir(temp_dir): |
|
|
os.remove(os.path.join(temp_dir, file)) |
|
|
os.rmdir(temp_dir) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|