hivecorp commited on
Commit
17f72f9
·
verified ·
1 Parent(s): b2e635f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +219 -66
app.py CHANGED
@@ -5,6 +5,21 @@ import os
5
  import asyncio
6
  import uuid
7
  import re
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  def get_audio_length(audio_file):
10
  audio = AudioSegment.from_file(audio_file)
@@ -16,96 +31,234 @@ def format_time_ms(milliseconds):
16
  hrs, mins = divmod(mins, 60)
17
  return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
18
 
19
- def smart_text_split(text, words_per_line, lines_per_segment):
20
- # First split by major punctuation (periods, exclamation marks, question marks)
21
- sentences = re.split(r'([.!?]+)', text)
22
-
23
- # Recombine sentences with their punctuation
24
- sentences = [''.join(i) for i in zip(sentences[::2], sentences[1::2] + [''])]
25
-
26
- segments = []
27
- current_segment = []
28
- current_line = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
- for sentence in sentences:
31
- # Split sentence into words
32
- words = sentence.strip().split()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  for word in words:
35
  current_line.append(word)
 
 
 
 
 
 
 
 
 
36
 
37
- # Check if current line has reached words_per_line
38
- if len(current_line) >= words_per_line:
39
- current_segment.append(' '.join(current_line))
40
  current_line = []
41
-
42
- # Check if current segment has reached lines_per_segment
43
- if len(current_segment) >= lines_per_segment:
44
- segments.append('\n'.join(current_segment))
45
- current_segment = []
46
 
47
- # If there are words in current_line, add them as a line
48
  if current_line:
49
- current_segment.append(' '.join(current_line))
50
- current_line = []
51
-
52
- # Check if we should start a new segment at sentence boundary
53
- if len(current_segment) >= lines_per_segment:
54
- segments.append('\n'.join(current_segment))
55
- current_segment = []
56
-
57
- # Add any remaining lines
58
- if current_segment:
59
- segments.append('\n'.join(current_segment))
60
-
61
- return segments
 
 
 
 
 
 
 
 
 
 
62
 
63
- async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment):
64
- segments = smart_text_split(text, words_per_line, lines_per_segment)
 
65
 
66
- srt_content = ""
67
- combined_audio = AudioSegment.empty()
68
  current_time = 0
 
 
69
 
70
- for idx, segment in enumerate(segments, 1):
71
- # Generate audio for this segment
72
- audio_file = f"temp_segment_{idx}.wav"
73
- tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch)
74
- await tts.save(audio_file)
75
 
76
- # Get segment duration
77
- segment_audio = AudioSegment.from_file(audio_file)
78
- segment_duration = len(segment_audio)
79
 
80
- # Add to SRT content with precise timing
81
- srt_content += f"{idx}\n"
82
- srt_content += f"{format_time_ms(current_time)} --> {format_time_ms(current_time + segment_duration)}\n"
83
- srt_content += segment + "\n\n"
 
 
84
 
85
- # Update timing and combine audio
86
- current_time += segment_duration
87
- combined_audio += segment_audio
88
 
89
- # Cleanup
90
- os.remove(audio_file)
 
91
 
92
- # Export final files
93
  unique_id = uuid.uuid4()
94
  audio_path = f"final_audio_{unique_id}.mp3"
95
  srt_path = f"final_subtitles_{unique_id}.srt"
96
 
97
- combined_audio.export(audio_path, format="mp3", bitrate="320k")
 
 
 
 
 
 
 
98
  with open(srt_path, "w", encoding='utf-8') as f:
99
  f.write(srt_content)
100
 
101
  return srt_path, audio_path
102
 
103
  async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
104
- pitch_str = f"{pitch}Hz" if pitch != 0 else "0Hz"
105
- rate_str = f"{'+' if rate > 0 else ''}{rate}%"
 
106
 
107
  srt_path, audio_path = await generate_accurate_srt(
108
- text,
109
  voice_options[voice],
110
  rate_str,
111
  pitch_str,
@@ -163,11 +316,11 @@ app = gr.Interface(
163
  fn=process_text,
164
  inputs=[
165
  gr.Textbox(label="Enter Text", lines=10),
166
- gr.Slider(label="Pitch Adjustment (Hz)", minimum=-20, maximum=20, value=0, step=1),
167
- gr.Slider(label="Rate Adjustment (%)", minimum=-50, maximum=50, value=0, step=1),
168
  gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"),
169
- gr.Slider(label="Words per Line", minimum=1, maximum=15, value=8, step=1),
170
- gr.Slider(label="Lines per Segment", minimum=1, maximum=5, value=2, step=1)
171
  ],
172
  outputs=[
173
  gr.File(label="Download SRT"),
@@ -175,7 +328,7 @@ app = gr.Interface(
175
  gr.Audio(label="Preview Audio")
176
  ],
177
  title="Advanced TTS with Configurable SRT Generation",
178
- description="Generate perfectly synchronized audio and subtitles with custom segmentation control."
179
  )
180
 
181
  app.launch()
 
5
  import asyncio
6
  import uuid
7
  import re
8
+ from concurrent.futures import ThreadPoolExecutor
9
+ from typing import List, Tuple, Optional
10
+ import math
11
+ from dataclasses import dataclass
12
+
13
+ class TimingManager:
14
+ def __init__(self):
15
+ self.current_time = 0
16
+ self.segment_gap = 100 # ms gap between segments
17
+
18
+ def get_timing(self, duration):
19
+ start_time = self.current_time
20
+ end_time = start_time + duration
21
+ self.current_time = end_time + self.segment_gap
22
+ return start_time, end_time
23
 
24
  def get_audio_length(audio_file):
25
  audio = AudioSegment.from_file(audio_file)
 
31
  hrs, mins = divmod(mins, 60)
32
  return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
33
 
34
+ @dataclass
35
+ class Segment:
36
+ id: int
37
+ text: str
38
+ start_time: int = 0
39
+ end_time: int = 0
40
+ duration: int = 0
41
+ audio: Optional[AudioSegment] = None
42
+ lines: List[str] = None # Add lines field for display purposes only
43
+
44
+ class TextProcessor:
45
+ def __init__(self, words_per_line: int, lines_per_segment: int):
46
+ self.words_per_line = words_per_line
47
+ self.lines_per_segment = lines_per_segment
48
+ self.min_segment_words = 3
49
+ self.max_segment_words = words_per_line * lines_per_segment * 1.5 # Allow 50% more for natural breaks
50
+ self.punctuation_weights = {
51
+ '.': 1.0, # Strong break
52
+ '!': 1.0,
53
+ '?': 1.0,
54
+ ';': 0.8, # Medium-strong break
55
+ ':': 0.7,
56
+ ',': 0.5, # Medium break
57
+ '-': 0.3, # Weak break
58
+ '(': 0.2,
59
+ ')': 0.2
60
+ }
61
 
62
+ def analyze_sentence_complexity(self, text: str) -> float:
63
+ """Analyze sentence complexity to determine optimal segment length"""
64
+ words = text.split()
65
+ complexity = 1.0
66
+
67
+ # Adjust for sentence length
68
+ if len(words) > self.words_per_line * 2:
69
+ complexity *= 1.2
70
+
71
+ # Adjust for punctuation density
72
+ punct_count = sum(text.count(p) for p in self.punctuation_weights.keys())
73
+ complexity *= (1 + (punct_count / len(words)) * 0.5)
74
+
75
+ return complexity
76
+
77
+ def find_natural_breaks(self, text: str) -> List[Tuple[int, float]]:
78
+ """Find natural break points with their weights"""
79
+ breaks = []
80
+ words = text.split()
81
+
82
+ for i, word in enumerate(words):
83
+ weight = 0
84
+
85
+ # Check for punctuation
86
+ for punct, punct_weight in self.punctuation_weights.items():
87
+ if word.endswith(punct):
88
+ weight = max(weight, punct_weight)
89
+
90
+ # Check for natural phrase boundaries
91
+ phrase_starters = {'however', 'therefore', 'moreover', 'furthermore', 'meanwhile', 'although', 'because'}
92
+ if i < len(words) - 1 and words[i+1].lower() in phrase_starters:
93
+ weight = max(weight, 0.6)
94
+
95
+ # Check for conjunctions at natural points
96
+ if i > self.min_segment_words:
97
+ conjunctions = {'and', 'but', 'or', 'nor', 'for', 'yet', 'so'}
98
+ if word.lower() in conjunctions:
99
+ weight = max(weight, 0.4)
100
+
101
+ if weight > 0:
102
+ breaks.append((i, weight))
103
+
104
+ return breaks
105
+
106
+ def split_into_segments(self, text: str) -> List[Segment]:
107
+ # Normalize text and add proper spacing around punctuation
108
+ text = re.sub(r'\s+', ' ', text.strip())
109
+ text = re.sub(r'([.!?,;:])\s*', r'\1 ', text)
110
+ text = re.sub(r'\s+([.!?,;:])', r'\1', text)
111
+
112
+ # First, split into major segments by strong punctuation
113
+ segments = []
114
+ current_segment = []
115
+ current_text = ""
116
+ words = text.split()
117
+
118
+ i = 0
119
+ while i < len(words):
120
+ complexity = self.analyze_sentence_complexity(' '.join(words[i:i + self.words_per_line * 2]))
121
+ breaks = self.find_natural_breaks(' '.join(words[i:i + int(self.max_segment_words * complexity)]))
122
+
123
+ # Find best break point
124
+ best_break = None
125
+ best_weight = 0
126
+
127
+ for break_idx, weight in breaks:
128
+ actual_idx = i + break_idx
129
+ if (actual_idx - i >= self.min_segment_words and
130
+ actual_idx - i <= self.max_segment_words):
131
+ if weight > best_weight:
132
+ best_break = break_idx
133
+ best_weight = weight
134
+
135
+ if best_break is None:
136
+ # If no good break found, use maximum length
137
+ best_break = min(self.words_per_line * self.lines_per_segment, len(words) - i)
138
+
139
+ # Create segment
140
+ segment_words = words[i:i + best_break + 1]
141
+ segment_text = ' '.join(segment_words)
142
+
143
+ # Split segment into lines
144
+ lines = self.split_into_lines(segment_text)
145
+ final_segment_text = '\n'.join(lines)
146
+
147
+ segments.append(Segment(
148
+ id=len(segments) + 1,
149
+ text=final_segment_text
150
+ ))
151
+
152
+ i += best_break + 1
153
+
154
+ return segments
155
+
156
+ def split_into_lines(self, text: str) -> List[str]:
157
+ """Split segment text into natural lines"""
158
+ words = text.split()
159
+ lines = []
160
+ current_line = []
161
+ word_count = 0
162
 
163
  for word in words:
164
  current_line.append(word)
165
+ word_count += 1
166
+
167
+ # Check for natural line breaks
168
+ is_break = (
169
+ word_count >= self.words_per_line or
170
+ any(word.endswith(p) for p in '.!?') or
171
+ (word_count >= self.words_per_line * 0.7 and
172
+ any(word.endswith(p) for p in ',;:'))
173
+ )
174
 
175
+ if is_break:
176
+ lines.append(' '.join(current_line))
 
177
  current_line = []
178
+ word_count = 0
 
 
 
 
179
 
 
180
  if current_line:
181
+ lines.append(' '.join(current_line))
182
+
183
+ return lines
184
+
185
+ async def process_segment_with_timing(segment: Segment, voice: str, rate: str, pitch: str) -> Segment:
186
+ """Process a complete segment as a single TTS unit"""
187
+ audio_file = f"temp_segment_{segment.id}_{uuid.uuid4()}.wav"
188
+ try:
189
+ # Process the entire segment text as one unit, replacing newlines with spaces
190
+ segment_text = ' '.join(segment.text.split('\n'))
191
+ tts = edge_tts.Communicate(segment_text, voice, rate=rate, pitch=pitch)
192
+ await tts.save(audio_file)
193
+
194
+ segment.audio = AudioSegment.from_file(audio_file)
195
+ # Add small silence at start and end for natural spacing
196
+ silence = AudioSegment.silent(duration=50)
197
+ segment.audio = silence + segment.audio + silence
198
+ segment.duration = len(segment.audio)
199
+
200
+ return segment
201
+ finally:
202
+ if os.path.exists(audio_file):
203
+ os.remove(audio_file)
204
 
205
+ async def generate_accurate_srt(text: str, voice: str, rate: str, pitch: str, words_per_line: int, lines_per_segment: int) -> Tuple[str, str]:
206
+ processor = TextProcessor(words_per_line, lines_per_segment)
207
+ segments = processor.split_into_segments(text)
208
 
209
+ # Process segments sequentially for better timing control
210
+ processed_segments = []
211
  current_time = 0
212
+ final_audio = AudioSegment.empty()
213
+ srt_content = ""
214
 
215
+ for segment in segments:
216
+ # Process segment
217
+ processed_segment = await process_segment_with_timing(segment, voice, rate, pitch)
 
 
218
 
219
+ # Calculate precise timing
220
+ processed_segment.start_time = current_time
221
+ processed_segment.end_time = current_time + processed_segment.duration
222
 
223
+ # Add to SRT with precise timing
224
+ srt_content += (
225
+ f"{processed_segment.id}\n"
226
+ f"{format_time_ms(processed_segment.start_time)} --> {format_time_ms(processed_segment.end_time)}\n"
227
+ f"{processed_segment.text}\n\n"
228
+ )
229
 
230
+ # Add to final audio with precise positioning
231
+ final_audio = final_audio.append(processed_segment.audio, crossfade=0)
 
232
 
233
+ # Update timing with precise gap
234
+ current_time = processed_segment.end_time
235
+ processed_segments.append(processed_segment)
236
 
237
+ # Export with high precision
238
  unique_id = uuid.uuid4()
239
  audio_path = f"final_audio_{unique_id}.mp3"
240
  srt_path = f"final_subtitles_{unique_id}.srt"
241
 
242
+ # Export with high quality settings for precise timing
243
+ final_audio.export(
244
+ audio_path,
245
+ format="mp3",
246
+ bitrate="320k",
247
+ parameters=["-ar", "48000", "-ac", "2"]
248
+ )
249
+
250
  with open(srt_path, "w", encoding='utf-8') as f:
251
  f.write(srt_content)
252
 
253
  return srt_path, audio_path
254
 
255
  async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
256
+ # Format pitch and rate strings
257
+ pitch_str = f"{pitch:+d}Hz" if pitch != 0 else "+0Hz"
258
+ rate_str = f"{rate:+d}%" if rate != 0 else "+0%"
259
 
260
  srt_path, audio_path = await generate_accurate_srt(
261
+ text,
262
  voice_options[voice],
263
  rate_str,
264
  pitch_str,
 
316
  fn=process_text,
317
  inputs=[
318
  gr.Textbox(label="Enter Text", lines=10),
319
+ gr.Slider(label="Pitch Adjustment (Hz)", minimum=-10, maximum=10, value=0, step=1),
320
+ gr.Slider(label="Rate Adjustment (%)", minimum=-25, maximum=25, value=0, step=1),
321
  gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"),
322
+ gr.Slider(label="Words per Line", minimum=3, maximum=12, value=6, step=1),
323
+ gr.Slider(label="Lines per Segment", minimum=1, maximum=4, value=2, step=1)
324
  ],
325
  outputs=[
326
  gr.File(label="Download SRT"),
 
328
  gr.Audio(label="Preview Audio")
329
  ],
330
  title="Advanced TTS with Configurable SRT Generation",
331
+ description="Generate perfectly synchronized audio and subtitles with natural speech patterns."
332
  )
333
 
334
  app.launch()