sameerbanchhor's picture
update model paths
8d060d1 verified
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse, HTMLResponse
from pydantic import BaseModel
import os
import subprocess
from urllib.request import urlretrieve
import traceback
import uuid
from typing import Optional
import asyncio
from pathlib import Path
# --- Create FastAPI app ---
app = FastAPI(
title="Chhattisgarhi TTS API",
description="Text-to-Speech API for Chhattisgarhi language using VITS model",
version="1.0.0"
)
# --- Create directories ---
os.makedirs("tts_model", exist_ok=True)
os.makedirs("audio_outputs", exist_ok=True)
# --- Model and Config URLs ---
MODEL_URL = "https://huggingface.co/sameerbanchhor/chattisgarhi-tts-models/resolve/main/Male/best_model.pth?download=true"
CONFIG_URL = "https://huggingface.co/sameerbanchhor/chattisgarhi-tts-models/resolve/main/Male/config.json?download=true"
# --- Define local paths for the model and config ---
model_path = os.path.join("tts_model", "best_model.pth")
config_path = os.path.join("tts_model", "config.json")
# --- Pydantic models ---
class TTSRequest(BaseModel):
text: str
class TTSResponse(BaseModel):
status: str
message: str
audio_id: Optional[str] = None
audio_url: Optional[str] = None
# --- Download function ---
def download_file(url, destination):
if not os.path.exists(destination):
print(f"Downloading {os.path.basename(destination)}...")
try:
urlretrieve(url, destination)
print("Download complete.")
except Exception as e:
print(f"FATAL: Error downloading {url}: {e}")
raise
# --- Initialize models on startup ---
@app.on_event("startup")
async def startup_event():
"""Download model files on startup if they don't exist"""
try:
download_file(MODEL_URL, model_path)
download_file(CONFIG_URL, config_path)
print("Model files ready!")
except Exception as e:
print("Application cannot start because model files could not be downloaded.")
raise
# --- TTS processing function ---
async def chhattisgarhi_tts_cli(text: str, output_filename: str):
"""
Converts Chhattisgarhi text to speech using the TTS command-line interface.
Returns a tuple: (status, message, audio_path)
"""
if not text.strip():
return ("error", "Please enter some text to convert.", None)
try:
output_wav_path = os.path.join("audio_outputs", output_filename)
# Construct the command-line instruction
command = [
"tts",
"--text", text,
"--model_path", model_path,
"--config_path", config_path,
"--out_path", output_wav_path
]
print(f"Running command: {' '.join(command)}")
# Execute the command asynchronously
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
print(f"TTS command successful. Output saved to {output_wav_path}")
return ("success", "Audio generated successfully!", output_wav_path)
else:
stderr_output = stderr.decode().strip()
print(f"ERROR: TTS command failed with return code {process.returncode}")
print(f"STDERR from TTS tool: {stderr_output}")
return ("error", f"Audio generation failed: {stderr_output}", None)
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
print(error_message)
traceback.print_exc()
return ("error", error_message, None)
# --- API Routes ---
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve a simple HTML interface for testing"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Chhattisgarhi TTS API</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
.container { background: #f5f5f5; padding: 20px; border-radius: 8px; margin: 20px 0; }
textarea { width: 100%; height: 100px; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
button { background: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background: #0056b3; }
.examples { margin-top: 20px; }
.example { background: #e9ecef; padding: 10px; margin: 5px 0; border-radius: 4px; cursor: pointer; }
</style>
</head>
<body>
<h1>Chhattisgarhi Text-to-Speech API</h1>
<p>Enter text in Chhattisgarhi to generate audio using the VITS model.</p>
<div class="container">
<textarea id="textInput" placeholder="यहाँ छत्तीसगढ़ी पाठ लिखें..."></textarea>
<br><br>
<button onclick="generateAudio()">Generate Audio</button>
</div>
<div id="result"></div>
<div class="examples">
<h3>Example Texts (click to use):</h3>
<div class="example" onclick="useExample(this)">राजस्थान के नामी ब्यंजन चूरमालाड़ू गुड़ के पाग म गहूँ के दरदरहा पिसान के लाड़ू म तिली अउ नरियल के सुवाद म सजथे</div>
<div class="example" onclick="useExample(this)">दुग्ध क्रान्ति भारत के योजना हे जेखर ले भारत म दूध के कमी ला दुरिहा करे जा सकथे एला श्वेत क्रांति घलोक कहिथे</div>
<div class="example" onclick="useExample(this)">जम्मू कश्मीर म पर्यटन उद्योग ला बढ़ावा देना उहाँ के अर्थबेवस्था ला सुचारू रूप ले चलाय बर जरुरी हे</div>
<div class="example" onclick="useExample(this)">फोरेन्सिक विज्ञान ल कानूनी प्रश्न के उत्तर दिए बर अउ अपराध ल सिद्ध करे बर बउरे जाथे</div>
</div>
<script>
function useExample(element) {
document.getElementById('textInput').value = element.textContent;
}
async function generateAudio() {
const text = document.getElementById('textInput').value;
const resultDiv = document.getElementById('result');
if (!text.trim()) {
resultDiv.innerHTML = '<div style="color: red;">Please enter some text!</div>';
return;
}
resultDiv.innerHTML = '<div>Generating audio... Please wait.</div>';
try {
const response = await fetch('/generate-audio', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ text: text })
});
const data = await response.json();
if (data.status === 'success') {
resultDiv.innerHTML = `
<div style="color: green;">${data.message}</div>
<audio controls style="width: 100%; margin-top: 10px;">
<source src="${data.audio_url}" type="audio/wav">
Your browser does not support the audio element.
</audio>
`;
} else {
resultDiv.innerHTML = `<div style="color: red;">Error: ${data.message}</div>`;
}
} catch (error) {
resultDiv.innerHTML = `<div style="color: red;">Network error: ${error.message}</div>`;
}
}
</script>
</body>
</html>
"""
return html_content
@app.post("/generate-audio", response_model=TTSResponse)
async def generate_audio(request: TTSRequest):
"""Generate audio from Chhattisgarhi text"""
# Generate unique filename
audio_id = str(uuid.uuid4())
output_filename = f"{audio_id}.wav"
try:
# Process TTS
status, message, audio_path = await chhattisgarhi_tts_cli(request.text, output_filename)
if status == "success":
return TTSResponse(
status=status,
message=message,
audio_id=audio_id,
audio_url=f"/audio/{audio_id}"
)
else:
raise HTTPException(status_code=400, detail=message)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/audio/{audio_id}")
async def get_audio(audio_id: str):
"""Serve generated audio files"""
audio_path = os.path.join("audio_outputs", f"{audio_id}.wav")
if not os.path.exists(audio_path):
raise HTTPException(status_code=404, detail="Audio file not found")
return FileResponse(
audio_path,
media_type="audio/wav",
filename=f"chhattisgarhi_audio_{audio_id}.wav"
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
model_exists = os.path.exists(model_path)
config_exists = os.path.exists(config_path)
return {
"status": "healthy" if model_exists and config_exists else "unhealthy",
"model_downloaded": model_exists,
"config_downloaded": config_exists,
"model_path": model_path,
"config_path": config_path
}
@app.get("/examples")
async def get_examples():
"""Get example texts for testing"""
examples = [
"राजस्थान के नामी ब्यंजन चूरमालाड़ू गुड़ के पाग म गहूँ के दरदरहा पिसान के लाड़ू म तिली अउ नरियल के सुवाद म सजथे",
"दुग्ध क्रान्ति भारत के योजना हे जेखर ले भारत म दूध के कमी ला दुरिहा करे जा सकथे एला श्वेत क्रांति घलोक कहिथे",
"जम्मू कश्मीर म पर्यटन उद्योग ला बढ़ावा देना उहाँ के अर्थबेवस्था ला सुचारू रूप ले चलाय बर जरुरी हे",
"फोरेन्सिक विज्ञान ल कानूनी प्रश्न के उत्तर दिए बर अउ अपराध ल सिद्ध करे बर बउरे जाथे"
]
return {"examples": examples}
# --- Background task to clean up old audio files ---
async def cleanup_old_files():
"""Clean up audio files older than 1 hour"""
import time
current_time = time.time()
for filename in os.listdir("audio_outputs"):
file_path = os.path.join("audio_outputs", filename)
if os.path.isfile(file_path):
file_age = current_time - os.path.getctime(file_path)
if file_age > 3600: # 1 hour
try:
os.remove(file_path)
print(f"Cleaned up old audio file: {filename}")
except Exception as e:
print(f"Error cleaning up file {filename}: {e}")
@app.on_event("startup")
async def setup_cleanup():
"""Set up background cleanup task"""
# This would typically be handled by a proper task scheduler in production
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)