Spaces:
Runtime error
Runtime error
| import openai | |
| import os | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from pathlib import Path | |
| import tempfile | |
| import io | |
| logger = logging.getLogger(__name__) | |
| class VoiceSynthesizer: | |
| """Handles text-to-speech conversion for lecture content""" | |
| def __init__(self, openai_api_key: str): | |
| self.client = openai.OpenAI(api_key=openai_api_key) | |
| self.supported_voices = [ | |
| "alloy", "echo", "fable", "onyx", "nova", "shimmer" | |
| ] | |
| self.default_voice = "nova" | |
| def set_api_key(self, api_key: str): | |
| """Set the OpenAI API key dynamically.""" | |
| self.client = openai.OpenAI(api_key=api_key) | |
| def synthesize_lecture(self, lecture_content: str, voice: str = None, output_path: str = None) -> Dict[str, Any]: | |
| """ | |
| Convert lecture text to speech using OpenAI TTS | |
| Args: | |
| lecture_content: The lecture text to convert | |
| voice: Voice to use (alloy, echo, fable, onyx, nova, shimmer) | |
| output_path: Where to save the audio file | |
| Returns: | |
| Dict with success status, file path, and metadata | |
| """ | |
| try: | |
| if not lecture_content.strip(): | |
| return { | |
| 'success': False, | |
| 'error': 'No content provided for synthesis', | |
| 'file_path': None, | |
| 'duration': 0 | |
| } | |
| # Validate and set voice | |
| selected_voice = voice if voice in self.supported_voices else self.default_voice | |
| # Prepare content for TTS (remove markdown formatting) | |
| clean_content = self._clean_content_for_tts(lecture_content) | |
| # Split content into chunks if too long (OpenAI TTS has limits) | |
| chunks = self._split_content(clean_content, max_length=4000) | |
| if not output_path: | |
| output_path = os.path.join("output", f"lecture_audio_{hash(lecture_content)}.mp3") | |
| # Ensure output directory exists | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| if len(chunks) == 1: | |
| # Single chunk - direct synthesis | |
| response = self.client.audio.speech.create( | |
| model="tts-1", | |
| voice=selected_voice, | |
| input=chunks[0], | |
| response_format="mp3" | |
| ) | |
| # Save the audio file | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| # Multiple chunks - synthesize and combine | |
| self._synthesize_multiple_chunks(chunks, selected_voice, output_path) | |
| # Get file size and estimate duration | |
| file_size = os.path.getsize(output_path) | |
| estimated_duration = self._estimate_audio_duration(clean_content) | |
| return { | |
| 'success': True, | |
| 'file_path': output_path, | |
| 'voice': selected_voice, | |
| 'duration': estimated_duration, | |
| 'file_size': file_size, | |
| 'chunks_count': len(chunks) | |
| } | |
| except Exception as e: | |
| logger.error(f"Voice synthesis failed: {str(e)}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'file_path': None, | |
| 'duration': 0 | |
| } | |
| def _clean_content_for_tts(self, content: str) -> str: | |
| """Clean markdown and formatting for better TTS output""" | |
| import re | |
| # Remove markdown headers | |
| content = re.sub(r'^#{1,6}\s+', '', content, flags=re.MULTILINE) | |
| # Remove markdown emphasis | |
| content = re.sub(r'\*\*(.*?)\*\*', r'\1', content) # Bold | |
| content = re.sub(r'\*(.*?)\*', r'\1', content) # Italic | |
| # Remove markdown links | |
| content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) | |
| # Remove horizontal rules | |
| content = re.sub(r'^---+$', '', content, flags=re.MULTILINE) | |
| # Clean up extra whitespace | |
| content = re.sub(r'\n{3,}', '\n\n', content) | |
| content = re.sub(r' {2,}', ' ', content) | |
| # Add pauses for better speech flow | |
| content = re.sub(r'\n\n', '\n\n... \n\n', content) # Longer pause between sections | |
| return content.strip() | |
| def _split_content(self, content: str, max_length: int = 4000) -> list: | |
| """Split content into chunks suitable for TTS API""" | |
| if len(content) <= max_length: | |
| return [content] | |
| chunks = [] | |
| sentences = content.split('. ') | |
| current_chunk = "" | |
| for sentence in sentences: | |
| # Check if adding this sentence would exceed the limit | |
| if len(current_chunk) + len(sentence) + 2 > max_length: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence + ". " | |
| else: | |
| # Single sentence is too long, split by words | |
| words = sentence.split() | |
| word_chunk = "" | |
| for word in words: | |
| if len(word_chunk) + len(word) + 1 > max_length: | |
| if word_chunk: | |
| chunks.append(word_chunk.strip()) | |
| word_chunk = word + " " | |
| else: | |
| # Single word is too long, truncate | |
| chunks.append(word[:max_length]) | |
| else: | |
| word_chunk += word + " " | |
| if word_chunk: | |
| current_chunk = word_chunk + ". " | |
| else: | |
| current_chunk += sentence + ". " | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return [chunk for chunk in chunks if chunk.strip()] | |
| def _synthesize_multiple_chunks(self, chunks: list, voice: str, output_path: str): | |
| """Synthesize multiple chunks and combine them""" | |
| import tempfile | |
| import shutil | |
| temp_files = [] | |
| try: | |
| # Synthesize each chunk | |
| for i, chunk in enumerate(chunks): | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk_{i}.mp3") | |
| temp_files.append(temp_file.name) | |
| temp_file.close() | |
| response = self.client.audio.speech.create( | |
| model="tts-1", | |
| voice=voice, | |
| input=chunk, | |
| response_format="mp3" | |
| ) | |
| with open(temp_file.name, "wb") as f: | |
| f.write(response.content) | |
| # Combine audio files (simple concatenation for MP3) | |
| with open(output_path, "wb") as outfile: | |
| for temp_file in temp_files: | |
| with open(temp_file, "rb") as infile: | |
| shutil.copyfileobj(infile, outfile) | |
| finally: | |
| # Clean up temporary files | |
| for temp_file in temp_files: | |
| try: | |
| os.unlink(temp_file) | |
| except: | |
| pass | |
| def _estimate_audio_duration(self, content: str) -> int: | |
| """Estimate audio duration in seconds based on content length""" | |
| # Average speaking rate: ~150 words per minute | |
| word_count = len(content.split()) | |
| duration_minutes = word_count / 150 | |
| return int(duration_minutes * 60) | |
| def get_available_voices(self) -> Dict[str, str]: | |
| """Get list of available voices with descriptions""" | |
| return { | |
| "alloy": "Neutral, balanced voice", | |
| "echo": "Crisp, clear voice", | |
| "fable": "Warm, engaging voice", | |
| "onyx": "Deep, authoritative voice", | |
| "nova": "Pleasant, professional voice (default)", | |
| "shimmer": "Bright, energetic voice" | |
| } | |
| def validate_voice(self, voice: str) -> bool: | |
| """Validate if the provided voice is supported""" | |
| return voice in self.supported_voices | |