| import os |
| import requests |
| from requests.exceptions import Timeout |
| import tempfile |
| import shutil |
| from pathlib import Path |
|
|
| |
| try: |
| from config import BACKEND_URL, TTS_MODELS, LANGUAGES, MODEL_MAPPING |
| except ImportError: |
| from ..config import BACKEND_URL, TTS_MODELS, LANGUAGES, MODEL_MAPPING |
|
|
| class TTSService: |
| def __init__(self, api_url: str = None): |
| self.api_url = f"{api_url or BACKEND_URL}/api/tts" |
|
|
| def generate_speech(self, text: str, voice: str = "Deepgram Aura2") -> tuple: |
| """Generate speech from text using the TTS API.""" |
| if not text: |
| return None, "Please enter some text." |
| |
| try: |
| resp = requests.post(self.api_url, json={"text": text, "voice": voice}, timeout=30) |
| print("DEBUG RAW RESPONSE:", resp.text) |
| |
| if resp.status_code == 200: |
| data = resp.json() |
| audio_url = data.get("audio_url") |
| metadata = data.get("metadata", {}) |
| |
| if audio_url: |
| audio_resp = requests.get(f"{BACKEND_URL}{audio_url}", stream=True, timeout=30) |
| if audio_resp.status_code == 200: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: |
| shutil.copyfileobj(audio_resp.raw, tmp) |
| audio_path = tmp.name |
| |
| meta_str = "" |
| return audio_path, meta_str |
| return None, "Audio file not found." |
| return None, f"Error from backend: {resp.text}" |
| |
| except Timeout: |
| return None, "TTS request timed out." |
| except Exception as e: |
| return None, f"Request failed: {str(e)}" |
|
|
|
|
| class STTService: |
| def __init__(self, api_url: str = None): |
| self.api_url = f"{api_url or BACKEND_URL}/api/stt" |
|
|
| def transcribe(self, audio_url: str = None, audio_path: str = None, language: str = "en", model: str = "whisper") -> str: |
| """Transcribe audio using the STT API.""" |
| try: |
| if audio_url and audio_url.strip(): |
| return self._transcribe_url(audio_url, language, model) |
| elif audio_path: |
| return self._transcribe_file(audio_path, language, model) |
| else: |
| return "Please provide an audio URL or upload an audio file." |
| except Exception as e: |
| return f"Transcription failed: {str(e)}" |
|
|
| def _transcribe_url(self, audio_url: str, language: str, model: str) -> str: |
| """Transcribe audio from URL.""" |
| data = { |
| "audio_url": audio_url, |
| "language": language, |
| "model": model |
| } |
| |
| |
| resp = requests.post(self.api_url, data=data, files={}, timeout=30) |
| |
| if resp.status_code != 200: |
| |
| resp = requests.post(self.api_url, json=data, timeout=30) |
| |
| if resp.status_code == 200: |
| return resp.json().get("transcript", "[No transcript returned]") |
| else: |
| return f"[Backend error: {resp.text}]" |
|
|
| def _transcribe_file(self, audio_path: str, language: str, model: str) -> str: |
| """Transcribe audio from uploaded file.""" |
| with open(audio_path, "rb") as f: |
| files = {"audio": (os.path.basename(audio_path), f, "audio/wav")} |
| data = {"language": language, "model": model} |
| resp = requests.post(self.api_url, files=files, data=data, timeout=30) |
| |
| if resp.status_code == 200: |
| return resp.json().get("transcript", "[No transcript returned]") |
| else: |
| return f"[Backend error: {resp.text}]" |
|
|