File size: 8,534 Bytes
1d95600
 
 
 
 
 
 
 
 
 
 
 
 
a2438f7
 
1d95600
 
 
 
 
a2438f7
 
 
 
1d95600
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import openai
import os
import logging
from typing import Dict, Any, Optional
from pathlib import Path
import tempfile
import io

logger = logging.getLogger(__name__)

class VoiceSynthesizer:
    """Handles text-to-speech conversion for lecture content"""
    
    def __init__(self, openai_api_key: str):
        self.client = openai.OpenAI(api_key=openai_api_key)
        self.supported_voices = [
            "alloy", "echo", "fable", "onyx", "nova", "shimmer"
        ]
        self.default_voice = "nova"
    
    def set_api_key(self, api_key: str):
        """Set the OpenAI API key dynamically."""
        self.client = openai.OpenAI(api_key=api_key)
    
    def synthesize_lecture(self, lecture_content: str, voice: str = None, output_path: str = None) -> Dict[str, Any]:
        """
        Convert lecture text to speech using OpenAI TTS
        
        Args:
            lecture_content: The lecture text to convert
            voice: Voice to use (alloy, echo, fable, onyx, nova, shimmer)
            output_path: Where to save the audio file
        
        Returns:
            Dict with success status, file path, and metadata
        """
        try:
            if not lecture_content.strip():
                return {
                    'success': False,
                    'error': 'No content provided for synthesis',
                    'file_path': None,
                    'duration': 0
                }
            
            # Validate and set voice
            selected_voice = voice if voice in self.supported_voices else self.default_voice
            
            # Prepare content for TTS (remove markdown formatting)
            clean_content = self._clean_content_for_tts(lecture_content)
            
            # Split content into chunks if too long (OpenAI TTS has limits)
            chunks = self._split_content(clean_content, max_length=4000)
            
            if not output_path:
                output_path = os.path.join("output", f"lecture_audio_{hash(lecture_content)}.mp3")
            
            # Ensure output directory exists
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            
            if len(chunks) == 1:
                # Single chunk - direct synthesis
                response = self.client.audio.speech.create(
                    model="tts-1",
                    voice=selected_voice,
                    input=chunks[0],
                    response_format="mp3"
                )
                
                # Save the audio file
                with open(output_path, "wb") as f:
                    f.write(response.content)
                    
            else:
                # Multiple chunks - synthesize and combine
                self._synthesize_multiple_chunks(chunks, selected_voice, output_path)
            
            # Get file size and estimate duration
            file_size = os.path.getsize(output_path)
            estimated_duration = self._estimate_audio_duration(clean_content)
            
            return {
                'success': True,
                'file_path': output_path,
                'voice': selected_voice,
                'duration': estimated_duration,
                'file_size': file_size,
                'chunks_count': len(chunks)
            }
            
        except Exception as e:
            logger.error(f"Voice synthesis failed: {str(e)}")
            return {
                'success': False,
                'error': str(e),
                'file_path': None,
                'duration': 0
            }
    
    def _clean_content_for_tts(self, content: str) -> str:
        """Clean markdown and formatting for better TTS output"""
        import re
        
        # Remove markdown headers
        content = re.sub(r'^#{1,6}\s+', '', content, flags=re.MULTILINE)
        
        # Remove markdown emphasis
        content = re.sub(r'\*\*(.*?)\*\*', r'\1', content)  # Bold
        content = re.sub(r'\*(.*?)\*', r'\1', content)      # Italic
        
        # Remove markdown links
        content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content)
        
        # Remove horizontal rules
        content = re.sub(r'^---+$', '', content, flags=re.MULTILINE)
        
        # Clean up extra whitespace
        content = re.sub(r'\n{3,}', '\n\n', content)
        content = re.sub(r' {2,}', ' ', content)
        
        # Add pauses for better speech flow
        content = re.sub(r'\n\n', '\n\n... \n\n', content)  # Longer pause between sections
        
        return content.strip()
    
    def _split_content(self, content: str, max_length: int = 4000) -> list:
        """Split content into chunks suitable for TTS API"""
        if len(content) <= max_length:
            return [content]
        
        chunks = []
        sentences = content.split('. ')
        current_chunk = ""
        
        for sentence in sentences:
            # Check if adding this sentence would exceed the limit
            if len(current_chunk) + len(sentence) + 2 > max_length:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                    current_chunk = sentence + ". "
                else:
                    # Single sentence is too long, split by words
                    words = sentence.split()
                    word_chunk = ""
                    for word in words:
                        if len(word_chunk) + len(word) + 1 > max_length:
                            if word_chunk:
                                chunks.append(word_chunk.strip())
                                word_chunk = word + " "
                            else:
                                # Single word is too long, truncate
                                chunks.append(word[:max_length])
                        else:
                            word_chunk += word + " "
                    if word_chunk:
                        current_chunk = word_chunk + ". "
            else:
                current_chunk += sentence + ". "
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return [chunk for chunk in chunks if chunk.strip()]
    
    def _synthesize_multiple_chunks(self, chunks: list, voice: str, output_path: str):
        """Synthesize multiple chunks and combine them"""
        import tempfile
        import shutil
        
        temp_files = []
        
        try:
            # Synthesize each chunk
            for i, chunk in enumerate(chunks):
                temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk_{i}.mp3")
                temp_files.append(temp_file.name)
                temp_file.close()
                
                response = self.client.audio.speech.create(
                    model="tts-1",
                    voice=voice,
                    input=chunk,
                    response_format="mp3"
                )
                
                with open(temp_file.name, "wb") as f:
                    f.write(response.content)
            
            # Combine audio files (simple concatenation for MP3)
            with open(output_path, "wb") as outfile:
                for temp_file in temp_files:
                    with open(temp_file, "rb") as infile:
                        shutil.copyfileobj(infile, outfile)
            
        finally:
            # Clean up temporary files
            for temp_file in temp_files:
                try:
                    os.unlink(temp_file)
                except:
                    pass
    
    def _estimate_audio_duration(self, content: str) -> int:
        """Estimate audio duration in seconds based on content length"""
        # Average speaking rate: ~150 words per minute
        word_count = len(content.split())
        duration_minutes = word_count / 150
        return int(duration_minutes * 60)
    
    def get_available_voices(self) -> Dict[str, str]:
        """Get list of available voices with descriptions"""
        return {
            "alloy": "Neutral, balanced voice",
            "echo": "Crisp, clear voice",
            "fable": "Warm, engaging voice",
            "onyx": "Deep, authoritative voice",
            "nova": "Pleasant, professional voice (default)",
            "shimmer": "Bright, energetic voice"
        }
    
    def validate_voice(self, voice: str) -> bool:
        """Validate if the provided voice is supported"""
        return voice in self.supported_voices