|
|
from fastapi import FastAPI, Query |
|
|
import uvicorn |
|
|
import requests |
|
|
import os |
|
|
from pydub import AudioSegment |
|
|
from uuid import uuid4 |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
AUDIO_DIR = "audio_files" |
|
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
TTS_API_URL = "https://hivecorp-s8test.hf.space/tts" |
|
|
|
|
|
|
|
|
|
|
|
def split_text(text, max_length=500): |
|
|
"""Splits text into chunks of max_length characters""" |
|
|
words = text.split() |
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
for word in words: |
|
|
if current_length + len(word) + 1 > max_length: |
|
|
chunks.append(" ".join(current_chunk)) |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
current_chunk.append(word) |
|
|
current_length += len(word) + 1 |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(" ".join(current_chunk)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
|
|
|
def call_tts_api(text, voice): |
|
|
"""Calls external TTS API and returns the saved MP3 filename.""" |
|
|
try: |
|
|
response = requests.post( |
|
|
TTS_API_URL, |
|
|
json={"text": text, "voice": voice}, |
|
|
headers={"Content-Type": "application/json"} |
|
|
) |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
print("TTS API Response:", data) |
|
|
|
|
|
if "audio_url" in data: |
|
|
audio_url = data["audio_url"] |
|
|
filename = f"audio_{uuid4().hex}.mp3" |
|
|
audio_path = os.path.join(AUDIO_DIR, filename) |
|
|
|
|
|
|
|
|
with requests.get(audio_url, stream=True) as r: |
|
|
r.raise_for_status() |
|
|
with open(audio_path, "wb") as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
return audio_path |
|
|
else: |
|
|
print("Error: Invalid TTS API response", data) |
|
|
return None |
|
|
except Exception as e: |
|
|
print("TTS API Error:", e) |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def merge_audio(audio_files): |
|
|
"""Merges multiple MP3 files into one final MP3.""" |
|
|
if not audio_files: |
|
|
return None |
|
|
|
|
|
final_audio = AudioSegment.empty() |
|
|
for file in audio_files: |
|
|
final_audio += AudioSegment.from_mp3(file) |
|
|
|
|
|
final_filename = f"{AUDIO_DIR}/final_{uuid4().hex}.mp3" |
|
|
final_audio.export(final_filename, format="mp3") |
|
|
|
|
|
return final_filename |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/generate-audio") |
|
|
def generate_audio(text: str = Query(...), voice: str = Query(...)): |
|
|
"""Splits text, calls TTS API, merges audio, and returns the final MP3.""" |
|
|
chunks = split_text(text) |
|
|
print("Total Chunks:", len(chunks)) |
|
|
|
|
|
audio_files = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
filename = call_tts_api(chunk, voice) |
|
|
if filename: |
|
|
audio_files.append(filename) |
|
|
|
|
|
if not audio_files: |
|
|
return {"error": "TTS API failed for all parts"} |
|
|
|
|
|
final_audio = merge_audio(audio_files) |
|
|
|
|
|
if final_audio: |
|
|
return {"success": True, "audio_url": f"/download/{os.path.basename(final_audio)}"} |
|
|
else: |
|
|
return {"error": "Failed to merge audio files"} |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/download/{filename}") |
|
|
def download_file(filename: str): |
|
|
"""Allows users to download the generated MP3 file.""" |
|
|
file_path = os.path.join(AUDIO_DIR, filename) |
|
|
if os.path.exists(file_path): |
|
|
return {"download_url": f"https://hivecorp-manager1.hf.space/audio_files/{filename}"} |
|
|
return {"error": "File not found"} |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|