|
|
import os |
|
|
import random |
|
|
from typing import Dict, Optional, List, Union |
|
|
from google.cloud import texttospeech |
|
|
from moviepy.editor import AudioFileClip |
|
|
import tempfile |
|
|
from src.logger_config import logger |
|
|
from .gcs_utils import get_gcs_credentials |
|
|
import json |
|
|
from . import ai_studio_sdk |
|
|
from src.config import get_config_value |
|
|
from src.file_downloader import get_file_downloader |
|
|
from pathlib import Path |
|
|
|
|
|
class GoogleTTS: |
|
|
def __init__(self, credentials=None): |
|
|
if not credentials: |
|
|
credentials = get_gcs_credentials("final_data") |
|
|
self.client = texttospeech.TextToSpeechClient(credentials=credentials) |
|
|
|
|
|
|
|
|
self.voice_profiles = { |
|
|
"female_young": [ |
|
|
"en-AU-Chirp3-HD-Achernar", |
|
|
"en-AU-Chirp3-HD-Gacrux", |
|
|
"en-AU-Chirp3-HD-Laomedeia", |
|
|
"en-AU-Chirp3-HD-Sulafat", |
|
|
"en-AU-Chirp3-HD-Leda", |
|
|
"en-AU-Chirp3-HD-Aoede", |
|
|
"en-AU-Chirp-HD-O", |
|
|
"en-GB-Chirp-HD-O", |
|
|
"en-GB-Chirp-HD-F", |
|
|
"en-GB-Chirp3-HD-Aoede", |
|
|
"en-GB-Chirp3-HD-Callirrhoe", |
|
|
"en-GB-Chirp3-HD-Gacrux", |
|
|
"en-GB-Chirp3-HD-Laomedeia", |
|
|
"en-US-Chirp3-HD-Aoede", |
|
|
"en-US-Chirp3-HD-Leda", |
|
|
], |
|
|
"female_mature": ["en-GB-Chirp-HD-F", "en-US-Chirp3-HD-Leda"], |
|
|
"male_young": [ |
|
|
"en-AU-Chirp-HD-D", |
|
|
"en-AU-Chirp3-HD-Algenib", |
|
|
"en-AU-Chirp3-HD-Enceladus", |
|
|
"en-AU-Chirp3-HD-Puck", |
|
|
"en-AU-Chirp3-HD-Schedar", |
|
|
"en-GB-Chirp3-HD-Achird", |
|
|
"en-GB-Chirp3-HD-Algenib", |
|
|
"en-GB-Chirp3-HD-Enceladus", |
|
|
"en-US-Chirp3-HD-Iapetus", |
|
|
], |
|
|
"male_mature": ["en-AU-Chirp3-HD-Schedar", "en-GB-Chirp3-HD-Achird"], |
|
|
} |
|
|
|
|
|
|
|
|
self.current_voice_indices = {category: 0 for category in self.voice_profiles.keys()} |
|
|
|
|
|
def select_sequential_voice(self, persona: str = "female_young") -> str: |
|
|
""" |
|
|
Select voice sequentially (not random) to ensure variety across videos |
|
|
""" |
|
|
if persona not in self.voice_profiles: |
|
|
persona = "female_young" |
|
|
|
|
|
voices = self.voice_profiles[persona] |
|
|
if not voices: |
|
|
return "en-AU-Chirp3-HD-Achernar" |
|
|
|
|
|
current_index = self.current_voice_indices[persona] |
|
|
|
|
|
if isinstance(voices, list) and len(voices) > 0: |
|
|
selected_voice = voices[current_index] |
|
|
else: |
|
|
selected_voice = "en-AU-Chirp3-HD-Achernar" |
|
|
|
|
|
|
|
|
self.current_voice_indices[persona] = (current_index + 1) % len(voices) |
|
|
|
|
|
logger.debug(f"π Selected sequential voice #{current_index + 1} for {persona}: {selected_voice}") |
|
|
return selected_voice |
|
|
|
|
|
async def select_voice_for_persona(self, image_prompt: str) -> str: |
|
|
""" |
|
|
Select appropriate voice based on image prompt/description |
|
|
Uses Gemini to analyze the persona and select matching voice |
|
|
""" |
|
|
try: |
|
|
logger.debug(f"π Analyzing persona for voice selection: {image_prompt[:100]}...") |
|
|
|
|
|
|
|
|
voice_options = json.dumps(self.voice_profiles, indent=2) |
|
|
|
|
|
analysis_prompt = f"""Analyze this image description and select the most suitable voice from the available options. |
|
|
|
|
|
Image Description: {image_prompt} |
|
|
|
|
|
Available Voices (grouped by category): |
|
|
{voice_options} |
|
|
|
|
|
Determine the persona (gender, age, style) and select the specific voice ID that best fits. |
|
|
|
|
|
Return ONLY valid JSON: |
|
|
{{ |
|
|
"gender": "female", |
|
|
"age": "young", |
|
|
"style": "casual", |
|
|
"selected_voice_id": "en-US-Chirp3-HD-Leda", |
|
|
"reason": "Voice description matches..." |
|
|
}}""" |
|
|
|
|
|
response = ai_studio_sdk.generate(analysis_prompt) |
|
|
|
|
|
|
|
|
response_text = response.strip() |
|
|
if response_text.startswith("```"): |
|
|
response_text = response_text.split("```")[1] |
|
|
if response_text.startswith("json"): |
|
|
response_text = response_text[4:] |
|
|
response_text = response_text.strip() |
|
|
|
|
|
persona = json.loads(response_text) |
|
|
selected_voice = persona.get("selected_voice_id") |
|
|
|
|
|
|
|
|
voice_found = False |
|
|
for category, voices in self.voice_profiles.items(): |
|
|
if selected_voice in voices: |
|
|
voice_found = True |
|
|
break |
|
|
|
|
|
if not voice_found: |
|
|
logger.warning(f"β οΈ Selected voice '{selected_voice}' not found in profiles. Falling back to sequential selection.") |
|
|
|
|
|
gender = persona.get("gender", "female") |
|
|
age = persona.get("age", "young") |
|
|
voice_key = f"{gender}_{age}" |
|
|
selected_voice = self.select_sequential_voice(voice_key) |
|
|
|
|
|
logger.debug(f"β Selected voice: {selected_voice}") |
|
|
return selected_voice |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Voice selection failed: {e}, using default") |
|
|
return self.select_sequential_voice("female_young") |
|
|
|
|
|
def generate_tts_audio( |
|
|
self, |
|
|
text: str, |
|
|
voice_name: Optional[str] = None, |
|
|
duration: Optional[float] = None, |
|
|
volume_gain_db: float = 6.0 |
|
|
) -> Dict: |
|
|
""" |
|
|
Generate TTS audio using Google Cloud TTS. |
|
|
|
|
|
Args: |
|
|
text: Text to synthesize. |
|
|
voice_name: Specific voice name to use. If None, selects sequentially based on default persona. |
|
|
duration: Target duration in seconds. If provided, speaking rate is adjusted. |
|
|
volume_gain_db: Volume gain in dB. |
|
|
|
|
|
Returns: |
|
|
Dict containing: |
|
|
- audio_content: The binary audio data. |
|
|
- duration: Actual duration in seconds. |
|
|
- speaking_rate: Used speaking rate. |
|
|
- voice_name: The voice name used. |
|
|
""" |
|
|
try: |
|
|
if not voice_name: |
|
|
voice_name = self.select_sequential_voice() |
|
|
|
|
|
|
|
|
if get_config_value("TEST_AUTOMATION"): |
|
|
logger.debug(f"π§ͺ TEST_MODE: Skipping Google TTS for '{text[:30]}...'") |
|
|
|
|
|
downloader = get_file_downloader() |
|
|
sample_path = Path("testData/sample_tts.mp3") |
|
|
drive_url = "https://drive.google.com/file/d/1B4LsRhIXpV57ADR5sZ7IMZpMqE_JUj8Q/view?usp=sharing" |
|
|
|
|
|
file_id = downloader._extract_drive_file_id(drive_url) |
|
|
|
|
|
|
|
|
downloaded_path = downloader.download_from_drive( |
|
|
file_id=file_id, |
|
|
output_path=sample_path |
|
|
) |
|
|
|
|
|
with open(downloaded_path, "rb") as f: |
|
|
dummy_content = f.read() |
|
|
|
|
|
return { |
|
|
"audio_content": dummy_content, |
|
|
"duration": 16.05, |
|
|
"speaking_rate": 1.0, |
|
|
"voice_name": "test_voice" |
|
|
} |
|
|
|
|
|
synthesis_input = texttospeech.SynthesisInput(text=text) |
|
|
|
|
|
|
|
|
|
|
|
male_keywords = ["D", "Algenib", "Enceladus", "Puck", "Schedar", "Achird", "Iapetus"] |
|
|
is_male = any(keyword in voice_name for keyword in male_keywords) |
|
|
ssml_gender = texttospeech.SsmlVoiceGender.MALE if is_male else texttospeech.SsmlVoiceGender.FEMALE |
|
|
|
|
|
language_code = "-".join(voice_name.split("-")[:2]) |
|
|
|
|
|
voice = texttospeech.VoiceSelectionParams( |
|
|
language_code=language_code, name=voice_name, ssml_gender=ssml_gender |
|
|
) |
|
|
|
|
|
speaking_rate = 1.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if duration: |
|
|
|
|
|
temp_audio_config = texttospeech.AudioConfig( |
|
|
audio_encoding=texttospeech.AudioEncoding.MP3, |
|
|
speaking_rate=1.0, |
|
|
pitch=0.0, |
|
|
volume_gain_db=0.0, |
|
|
) |
|
|
temp_response = self.client.synthesize_speech( |
|
|
input=synthesis_input, voice=voice, audio_config=temp_audio_config |
|
|
) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=True) as temp_file: |
|
|
temp_file.write(temp_response.audio_content) |
|
|
temp_file.flush() |
|
|
try: |
|
|
with AudioFileClip(temp_file.name) as clip: |
|
|
baseline_duration = clip.duration |
|
|
except Exception as e: |
|
|
logger.error(f"Error measuring baseline duration: {e}") |
|
|
|
|
|
|
|
|
baseline_duration = 0 |
|
|
|
|
|
if baseline_duration > 0: |
|
|
speaking_rate = baseline_duration / duration |
|
|
speaking_rate = max(0.15, min(4.0, speaking_rate)) |
|
|
|
|
|
logger.debug( |
|
|
f"π Baseline: {baseline_duration:.2f}s, Target: {duration:.2f}s, Rate: {speaking_rate:.2f}x" |
|
|
) |
|
|
|
|
|
|
|
|
audio_config = texttospeech.AudioConfig( |
|
|
audio_encoding=texttospeech.AudioEncoding.MP3, |
|
|
speaking_rate=speaking_rate, |
|
|
pitch=0.0, |
|
|
volume_gain_db=volume_gain_db, |
|
|
) |
|
|
|
|
|
response = self.client.synthesize_speech( |
|
|
input=synthesis_input, voice=voice, audio_config=audio_config |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
actual_duration = 0.0 |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=True) as temp_file: |
|
|
temp_file.write(response.audio_content) |
|
|
temp_file.flush() |
|
|
try: |
|
|
with AudioFileClip(temp_file.name) as clip: |
|
|
actual_duration = clip.duration |
|
|
except: |
|
|
pass |
|
|
|
|
|
return { |
|
|
"audio_content": response.audio_content, |
|
|
"duration": actual_duration, |
|
|
"speaking_rate": speaking_rate, |
|
|
"voice_name": voice_name |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error generating TTS in GoogleTTS: {e}") |
|
|
raise |
|
|
|