AI_tutor / app /voice_synthesizer.py
vishalshelke's picture
Upload 10 files
a2438f7 verified
import openai
import os
import logging
from typing import Dict, Any, Optional
from pathlib import Path
import tempfile
import io
logger = logging.getLogger(__name__)
class VoiceSynthesizer:
"""Handles text-to-speech conversion for lecture content"""
def __init__(self, openai_api_key: str):
self.client = openai.OpenAI(api_key=openai_api_key)
self.supported_voices = [
"alloy", "echo", "fable", "onyx", "nova", "shimmer"
]
self.default_voice = "nova"
def set_api_key(self, api_key: str):
"""Set the OpenAI API key dynamically."""
self.client = openai.OpenAI(api_key=api_key)
def synthesize_lecture(self, lecture_content: str, voice: str = None, output_path: str = None) -> Dict[str, Any]:
"""
Convert lecture text to speech using OpenAI TTS
Args:
lecture_content: The lecture text to convert
voice: Voice to use (alloy, echo, fable, onyx, nova, shimmer)
output_path: Where to save the audio file
Returns:
Dict with success status, file path, and metadata
"""
try:
if not lecture_content.strip():
return {
'success': False,
'error': 'No content provided for synthesis',
'file_path': None,
'duration': 0
}
# Validate and set voice
selected_voice = voice if voice in self.supported_voices else self.default_voice
# Prepare content for TTS (remove markdown formatting)
clean_content = self._clean_content_for_tts(lecture_content)
# Split content into chunks if too long (OpenAI TTS has limits)
chunks = self._split_content(clean_content, max_length=4000)
if not output_path:
output_path = os.path.join("output", f"lecture_audio_{hash(lecture_content)}.mp3")
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
if len(chunks) == 1:
# Single chunk - direct synthesis
response = self.client.audio.speech.create(
model="tts-1",
voice=selected_voice,
input=chunks[0],
response_format="mp3"
)
# Save the audio file
with open(output_path, "wb") as f:
f.write(response.content)
else:
# Multiple chunks - synthesize and combine
self._synthesize_multiple_chunks(chunks, selected_voice, output_path)
# Get file size and estimate duration
file_size = os.path.getsize(output_path)
estimated_duration = self._estimate_audio_duration(clean_content)
return {
'success': True,
'file_path': output_path,
'voice': selected_voice,
'duration': estimated_duration,
'file_size': file_size,
'chunks_count': len(chunks)
}
except Exception as e:
logger.error(f"Voice synthesis failed: {str(e)}")
return {
'success': False,
'error': str(e),
'file_path': None,
'duration': 0
}
def _clean_content_for_tts(self, content: str) -> str:
"""Clean markdown and formatting for better TTS output"""
import re
# Remove markdown headers
content = re.sub(r'^#{1,6}\s+', '', content, flags=re.MULTILINE)
# Remove markdown emphasis
content = re.sub(r'\*\*(.*?)\*\*', r'\1', content) # Bold
content = re.sub(r'\*(.*?)\*', r'\1', content) # Italic
# Remove markdown links
content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content)
# Remove horizontal rules
content = re.sub(r'^---+$', '', content, flags=re.MULTILINE)
# Clean up extra whitespace
content = re.sub(r'\n{3,}', '\n\n', content)
content = re.sub(r' {2,}', ' ', content)
# Add pauses for better speech flow
content = re.sub(r'\n\n', '\n\n... \n\n', content) # Longer pause between sections
return content.strip()
def _split_content(self, content: str, max_length: int = 4000) -> list:
"""Split content into chunks suitable for TTS API"""
if len(content) <= max_length:
return [content]
chunks = []
sentences = content.split('. ')
current_chunk = ""
for sentence in sentences:
# Check if adding this sentence would exceed the limit
if len(current_chunk) + len(sentence) + 2 > max_length:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
else:
# Single sentence is too long, split by words
words = sentence.split()
word_chunk = ""
for word in words:
if len(word_chunk) + len(word) + 1 > max_length:
if word_chunk:
chunks.append(word_chunk.strip())
word_chunk = word + " "
else:
# Single word is too long, truncate
chunks.append(word[:max_length])
else:
word_chunk += word + " "
if word_chunk:
current_chunk = word_chunk + ". "
else:
current_chunk += sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return [chunk for chunk in chunks if chunk.strip()]
def _synthesize_multiple_chunks(self, chunks: list, voice: str, output_path: str):
"""Synthesize multiple chunks and combine them"""
import tempfile
import shutil
temp_files = []
try:
# Synthesize each chunk
for i, chunk in enumerate(chunks):
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk_{i}.mp3")
temp_files.append(temp_file.name)
temp_file.close()
response = self.client.audio.speech.create(
model="tts-1",
voice=voice,
input=chunk,
response_format="mp3"
)
with open(temp_file.name, "wb") as f:
f.write(response.content)
# Combine audio files (simple concatenation for MP3)
with open(output_path, "wb") as outfile:
for temp_file in temp_files:
with open(temp_file, "rb") as infile:
shutil.copyfileobj(infile, outfile)
finally:
# Clean up temporary files
for temp_file in temp_files:
try:
os.unlink(temp_file)
except:
pass
def _estimate_audio_duration(self, content: str) -> int:
"""Estimate audio duration in seconds based on content length"""
# Average speaking rate: ~150 words per minute
word_count = len(content.split())
duration_minutes = word_count / 150
return int(duration_minutes * 60)
def get_available_voices(self) -> Dict[str, str]:
"""Get list of available voices with descriptions"""
return {
"alloy": "Neutral, balanced voice",
"echo": "Crisp, clear voice",
"fable": "Warm, engaging voice",
"onyx": "Deep, authoritative voice",
"nova": "Pleasant, professional voice (default)",
"shimmer": "Bright, energetic voice"
}
def validate_voice(self, voice: str) -> bool:
"""Validate if the provided voice is supported"""
return voice in self.supported_voices