sreepathi-ravikumar commited on
Commit
a52313b
ยท
verified ยท
1 Parent(s): 67e7115

Update video2.py

Browse files
Files changed (1) hide show
  1. video2.py +261 -93
video2.py CHANGED
@@ -43,56 +43,71 @@ nest_asyncio.apply()
43
 
44
  import re
45
  import html
46
- import unicodedata
47
  import tempfile
48
  import os
49
  import asyncio
50
- from concurrent.futures import ThreadPoolExecutor
51
  from functools import lru_cache
52
  import edge_tts
53
  from pydub import AudioSegment
54
- from pydub.effects import normalize, compress_dynamic_range
 
55
  from mutagen.mp3 import MP3
 
56
 
57
  # --- Configuration ---
58
  AUDIO_DIR = "output_audio"
59
  os.makedirs(AUDIO_DIR, exist_ok=True)
60
 
61
  # Voice Mapping
62
- # using 'NeerjaNeural' for English as it blends better with Indian contexts
63
  VOICE_MAPPING = {
64
  "English": "en-IN-NeerjaNeural",
65
  "Tamil": "ta-IN-PallaviNeural",
66
  "Hindi": "hi-IN-SwaraNeural",
67
  }
68
 
69
- # Regex to find Indian Language characters (Tamil, Hindi, Malayalam, etc.)
70
- # Tamil Unicode range is inside this block (\u0B80-\u0BFF)
71
- INDIC_SCRIPT_PATTERN = re.compile(r'[\u0900-\u0D7F]+')
 
 
 
 
 
 
 
72
 
73
  @lru_cache(maxsize=1024)
74
  def clean_text(text):
75
  if not text: return ""
76
  text = html.unescape(str(text))
77
- # Remove URLs and Markdown, but keep basic punctuation
78
- text = re.sub(r'https?://\S+', '', text)
79
- text = re.sub(r'[\*\#\<\>\[\]\{\}]', '', text)
80
- text = re.sub(r'\s+', ' ', text).strip()
81
  return text
82
 
83
  def detect_language_group(word):
84
- """
85
- Returns 'indic' if the word has Tamil/Hindi chars.
86
- Returns 'english' otherwise (for words like 'Voltage', '1.5V', 'circuit').
87
- """
88
  if INDIC_SCRIPT_PATTERN.search(word):
89
  return 'indic'
90
  return 'english'
91
 
92
- def split_by_language_and_sentence(text):
93
  """
94
- Splits text into chunks of English vs Native language.
95
- Example: "Voltage เฎฉเฏ" -> [("Voltage", "english"), ("เฎฉเฏ", "indic")]
 
 
 
 
 
 
 
 
 
 
 
 
96
  """
97
  text = clean_text(text)
98
  words = text.split(' ')
@@ -102,73 +117,189 @@ def split_by_language_and_sentence(text):
102
  current_type = None
103
 
104
  for word in words:
105
- # Clean punctuation for detection (e.g. "force," -> "force")
106
- # But keep the original word for the audio generation
107
- clean_word_for_check = word.strip(".,!?")
108
 
109
- if not clean_word_for_check:
110
- # If word was just "...", keep it with previous chunk
111
  if current_chunk:
112
  current_chunk.append(word)
113
  continue
114
 
115
- word_type = detect_language_group(clean_word_for_check)
116
 
117
- # Start first chunk
118
  if current_type is None:
119
  current_type = word_type
120
  current_chunk.append(word)
121
-
122
- # If type matches current chunk, add to it
123
  elif word_type == current_type:
124
  current_chunk.append(word)
125
-
126
- # Type switched (e.g., from English 'Voltage' to Tamil 'เฎฉเฏ')
127
  else:
128
- segments.append((" ".join(current_chunk), current_type))
 
 
 
 
129
  current_chunk = [word]
130
  current_type = word_type
131
 
132
- # Add valid final chunk
133
  if current_chunk:
134
- segments.append((" ".join(current_chunk), current_type))
 
 
135
 
136
  return segments
137
 
138
- async def generate_segment_audio(text, voice, rate_limit_sem):
139
- """Generates audio for a specific text segment using EdgeTTS."""
140
  if not text.strip():
141
  return None
142
 
143
  async with rate_limit_sem:
144
  try:
 
 
 
145
  fd, path = tempfile.mkstemp(suffix=".mp3")
146
  os.close(fd)
147
 
148
- # Slight speed adjustment for flow
149
- rate = "+0%"
150
- comm = edge_tts.Communicate(text, voice, rate=rate)
 
 
 
 
 
 
 
 
151
  await comm.save(path)
152
  return path
153
  except Exception as e:
154
- print(f"Error generating segment '{text}': {e}")
155
  return None
156
 
157
- def process_audio_segment(file_path):
158
- """Process individual segment: normalize and add micro-padding."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  if not file_path or not os.path.exists(file_path):
160
  return None
161
 
162
  try:
163
  audio = AudioSegment.from_mp3(file_path)
164
 
165
- # Normalize volume
166
- audio = normalize(audio)
 
 
 
167
 
168
- # Add tiny silence (50ms) to start/end to prevent 'clipped' words
169
- # This makes the transition between "Voltage" and "nu" sound natural
170
- silence_pad = AudioSegment.silent(duration=50)
171
- audio = silence_pad + audio + silence_pad
 
172
 
173
  return audio
174
  except Exception as e:
@@ -180,71 +311,107 @@ def process_audio_segment(file_path):
180
  except:
181
  pass
182
 
183
- async def bilingual_tts_optimized(full_text, output_file, native_lang_code):
184
- print("\n--- Starting Processing ---")
 
 
 
 
 
185
 
186
- # 1. Split Text
187
- segments_data = split_by_language_and_sentence(full_text)
188
 
189
- # DEBUG: Print the split logic so user can see it
190
- print(f"Detected {len(segments_data)} segments:")
191
- for i, (text, lang_type) in enumerate(segments_data):
192
- print(f" {i+1}. [{lang_type.upper()}] : {text}")
193
 
194
- # 2. Assign Voices
195
- native_voice = VOICE_MAPPING.get(native_lang_code, VOICE_MAPPING["English"])
196
  english_voice = VOICE_MAPPING["English"]
197
 
198
- tasks = []
199
- semaphore = asyncio.Semaphore(5) # Prevent overloading API
 
 
200
 
201
- # 3. Create Tasks
202
- for text_chunk, type_group in segments_data:
203
- voice = native_voice if type_group == 'indic' else english_voice
204
- tasks.append(generate_segment_audio(text_chunk, voice, semaphore))
205
 
206
- # 4. Run Generation
207
- print("\nGenerating Audio Segments...")
208
  raw_files = await asyncio.gather(*tasks)
209
 
210
- # 5. Process Audio (Stitching)
211
- print("Stitching and Mastering...")
212
- final_audio = AudioSegment.empty()
 
 
 
213
 
214
- with ThreadPoolExecutor(max_workers=4) as executor:
215
- processed_segments = list(executor.map(process_audio_segment, raw_files))
216
 
217
- valid_segments = [seg for seg in processed_segments if seg is not None]
 
 
 
 
 
 
 
 
218
 
219
- if not valid_segments:
220
- print("Error: No audio generated.")
221
  return None
222
-
223
- # Crossfade Stitching
224
- for i, seg in enumerate(valid_segments):
225
- if i == 0:
226
- final_audio += seg
227
- else:
228
- # 30ms crossfade blends the English word ending into the Tamil start
229
- final_audio = final_audio.append(seg, crossfade=30)
230
-
231
- # 6. Final Mastering
232
- # Compress dynamic range to make it sound punchy like a podcast
 
 
 
 
 
 
 
 
 
 
 
 
233
  final_audio = compress_dynamic_range(
234
- final_audio,
235
- threshold=-15.0,
236
- ratio=2.5,
237
- attack=5.0,
238
- release=50.0
239
  )
 
 
240
  final_audio = normalize(final_audio)
241
-
242
- final_audio.export(output_file, format="mp3", bitrate="192k")
243
- print(f"โœ… Success! Audio saved to: {output_file}")
244
 
 
 
 
 
 
 
 
 
 
 
245
  return output_file
246
 
247
- # --- Wrapper for your usage ---
248
  async def generate_tts(id, lines, lang_input):
249
  if "&&&" in lang_input:
250
  parts = lang_input.split("&&&")
@@ -255,7 +422,7 @@ async def generate_tts(id, lines, lang_input):
255
  lang_name = lang_input.strip()
256
 
257
  output_path = os.path.join(AUDIO_DIR, f"audio_{id}.mp3")
258
- result = await bilingual_tts_optimized(text, output_path, lang_name)
259
 
260
  if result:
261
  audio_info = MP3(result)
@@ -265,6 +432,7 @@ async def generate_tts(id, lines, lang_input):
265
 
266
 
267
 
 
268
  def audio_func(id, lines, lang):
269
  loop = asyncio.new_event_loop()
270
  asyncio.set_event_loop(loop)
 
43
 
44
  import re
45
  import html
 
46
  import tempfile
47
  import os
48
  import asyncio
49
+ import random
50
  from functools import lru_cache
51
  import edge_tts
52
  from pydub import AudioSegment
53
+ from pydub.effects import normalize, compress_dynamic_range, low_pass_filter, high_pass_filter
54
+ from pydub.scipy_effects import eq
55
  from mutagen.mp3 import MP3
56
+ import numpy as np
57
 
58
  # --- Configuration ---
59
  AUDIO_DIR = "output_audio"
60
  os.makedirs(AUDIO_DIR, exist_ok=True)
61
 
62
  # Voice Mapping
 
63
  VOICE_MAPPING = {
64
  "English": "en-IN-NeerjaNeural",
65
  "Tamil": "ta-IN-PallaviNeural",
66
  "Hindi": "hi-IN-SwaraNeural",
67
  }
68
 
69
+ # Indic script detection
70
+ INDIC_SCRIPT_PATTERN = re.compile(r'[เค€-เตฟ]+')
71
+
72
+ # === ELEVENLABS-STYLE SETTINGS ===
73
+ CROSSFADE_LANG_SWITCH = 80 # Longer crossfade for language switches
74
+ CROSSFADE_SAME_LANG = 25 # Short crossfade for same language
75
+ BREATH_PAUSE_MS = 120 # Natural breath at sentence end
76
+ MICRO_PAUSE_MS = 40 # Tiny pause at commas
77
+ TARGET_DBFS = -16.0 # Podcast-quality loudness
78
+ COMPRESSION_RATIO = 1.8 # Gentle compression (not squashed)
79
 
80
  @lru_cache(maxsize=1024)
81
  def clean_text(text):
82
  if not text: return ""
83
  text = html.unescape(str(text))
84
+ text = re.sub(r'https?://S+', '', text)
85
+ text = re.sub(r'[*#<>[]{}]', '', text)
86
+ text = re.sub(r's+', ' ', text).strip()
 
87
  return text
88
 
89
  def detect_language_group(word):
90
+ """Detect if word is Indic or English."""
 
 
 
91
  if INDIC_SCRIPT_PATTERN.search(word):
92
  return 'indic'
93
  return 'english'
94
 
95
+ def analyze_punctuation(text):
96
  """
97
+ Determines pause type based on ending punctuation.
98
+ Returns: ('breath', 'micro', 'none')
99
+ """
100
+ text = text.rstrip()
101
+ if text.endswith(('.', '!', '?', 'เฅค')):
102
+ return 'breath' # Full stop = breath pause
103
+ elif text.endswith((',', ';', ':')):
104
+ return 'micro' # Comma = tiny pause
105
+ return 'none'
106
+
107
+ def split_with_context(text):
108
+ """
109
+ Splits text by language while preserving punctuation context.
110
+ Returns: [(text, lang_type, pause_type), ...]
111
  """
112
  text = clean_text(text)
113
  words = text.split(' ')
 
117
  current_type = None
118
 
119
  for word in words:
120
+ clean_word = word.strip(".,!?;:เฅค")
 
 
121
 
122
+ if not clean_word:
 
123
  if current_chunk:
124
  current_chunk.append(word)
125
  continue
126
 
127
+ word_type = detect_language_group(clean_word)
128
 
 
129
  if current_type is None:
130
  current_type = word_type
131
  current_chunk.append(word)
 
 
132
  elif word_type == current_type:
133
  current_chunk.append(word)
 
 
134
  else:
135
+ # Save chunk with pause info
136
+ chunk_text = " ".join(current_chunk)
137
+ pause_type = analyze_punctuation(chunk_text)
138
+ segments.append((chunk_text, current_type, pause_type))
139
+
140
  current_chunk = [word]
141
  current_type = word_type
142
 
143
+ # Final chunk
144
  if current_chunk:
145
+ chunk_text = " ".join(current_chunk)
146
+ pause_type = analyze_punctuation(chunk_text)
147
+ segments.append((chunk_text, current_type, pause_type))
148
 
149
  return segments
150
 
151
+ async def generate_segment_audio(text, voice, rate_limit_sem, lang_type):
152
+ """Generate audio with optimized speech rate."""
153
  if not text.strip():
154
  return None
155
 
156
  async with rate_limit_sem:
157
  try:
158
+ # Add jitter to prevent rate limiting
159
+ await asyncio.sleep(random.uniform(0.05, 0.15))
160
+
161
  fd, path = tempfile.mkstemp(suffix=".mp3")
162
  os.close(fd)
163
 
164
+ # ๐Ÿ”ฅ SPEED OPTIMIZATION: Match syllable density
165
+ # Tamil has more syllables per word, so English needs to speed up
166
+ if lang_type == 'english':
167
+ rate = "+12%" # Faster to match Tamil flow
168
+ else:
169
+ rate = "+3%" # Slightly faster for tighter delivery
170
+
171
+ # Pitch variation for naturalness
172
+ pitch = "+0Hz"
173
+
174
+ comm = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch)
175
  await comm.save(path)
176
  return path
177
  except Exception as e:
178
+ print(f"Error generating segment '{text[:30]}...': {e}")
179
  return None
180
 
181
+ def apply_pro_audio_processing(audio_segment):
182
+ """
183
+ ๐ŸŽš๏ธ PROFESSIONAL AUDIO MASTERING
184
+ - EQ for clarity
185
+ - De-essing
186
+ - Gentle compression
187
+ - Warmth enhancement
188
+ """
189
+ try:
190
+ # 1. High-pass filter: Remove rumble below 80Hz
191
+ audio_segment = high_pass_filter(audio_segment, 80)
192
+
193
+ # 2. Presence boost: 2-4kHz for voice clarity (like ElevenLabs)
194
+ audio_segment = eq(audio_segment, focus_freq=3000, bandwidth=1000, gain_dB=2.5)
195
+
196
+ # 3. De-essing: Reduce harsh 's' sounds (6-8kHz)
197
+ audio_segment = eq(audio_segment, focus_freq=7000, bandwidth=2000, gain_dB=-3)
198
+
199
+ # 4. Warmth: Gentle low-mid boost (200-400Hz)
200
+ audio_segment = eq(audio_segment, focus_freq=300, bandwidth=200, gain_dB=1.5)
201
+
202
+ # 5. Low-pass filter: Remove digital harshness above 12kHz
203
+ audio_segment = low_pass_filter(audio_segment, 12000)
204
+
205
+ return audio_segment
206
+ except:
207
+ # Fallback if scipy not available
208
+ return audio_segment
209
+
210
+ def create_natural_breath(duration_ms=120):
211
+ """
212
+ Creates a subtle breath sound (silence with very quiet noise).
213
+ This mimics human breathing between sentences.
214
+ """
215
+ # Pure silence for now (can add pink noise for realism)
216
+ return AudioSegment.silent(duration=duration_ms)
217
+
218
+ def intelligent_crossfade(audio1, audio2, lang1, lang2, pause_type):
219
+ """
220
+ ๐Ÿง  SMART CROSSFADE LOGIC
221
+ - Language switch: Long crossfade (80ms) for smooth tonal blend
222
+ - Same language: Short crossfade (25ms) for tight flow
223
+ - Punctuation: Insert breath pause before crossfade
224
+ """
225
+
226
+ # If previous segment ended with punctuation, add breath
227
+ if pause_type == 'breath':
228
+ breath = create_natural_breath(BREATH_PAUSE_MS)
229
+ audio1 = audio1 + breath
230
+ crossfade_duration = 15 # Short crossfade after breath
231
+ elif pause_type == 'micro':
232
+ breath = create_natural_breath(MICRO_PAUSE_MS)
233
+ audio1 = audio1 + breath
234
+ crossfade_duration = 10
235
+ else:
236
+ # No punctuation - determine crossfade by language switch
237
+ if lang1 != lang2:
238
+ crossfade_duration = CROSSFADE_LANG_SWITCH # Long for tonal blend
239
+ else:
240
+ crossfade_duration = CROSSFADE_SAME_LANG # Short for flow
241
+
242
+ try:
243
+ return audio1.append(audio2, crossfade=crossfade_duration)
244
+ except:
245
+ # If segment too short, direct append
246
+ return audio1 + audio2
247
+
248
+ def trim_silence_smart(audio_segment, silence_thresh=-48):
249
+ """
250
+ Trims Edge TTS's excessive pauses while preserving micro-breaths.
251
+ Keeps 15ms at edges for natural attack/release.
252
+ """
253
+ try:
254
+ non_silent = audio_segment.detect_nonsilent(
255
+ min_silence_len=40,
256
+ silence_thresh=silence_thresh
257
+ )
258
+
259
+ if not non_silent:
260
+ return audio_segment
261
+
262
+ start = max(0, non_silent[0][0] - 15) # Keep 15ms breath
263
+ end = min(len(audio_segment), non_silent[-1][1] + 15)
264
+
265
+ return audio_segment[start:end]
266
+ except:
267
+ return audio_segment
268
+
269
+ def apply_micro_dynamics(audio_segment):
270
+ """
271
+ Apply 3ms fade-in/out to prevent digital clicks.
272
+ This is crucial for clean crossfades.
273
+ """
274
+ return audio_segment.fade_in(3).fade_out(3)
275
+
276
+ def match_loudness(audio_segment, target_dbfs=TARGET_DBFS):
277
+ """
278
+ RMS-based loudness matching (like ElevenLabs).
279
+ Better than peak normalization.
280
+ """
281
+ change_in_dbfs = target_dbfs - audio_segment.dBFS
282
+ return audio_segment.apply_gain(change_in_dbfs)
283
+
284
+ async def process_segment(file_path, lang_type):
285
+ """Process each segment with pro audio treatment."""
286
  if not file_path or not os.path.exists(file_path):
287
  return None
288
 
289
  try:
290
  audio = AudioSegment.from_mp3(file_path)
291
 
292
+ # 1. Trim excessive silence
293
+ audio = trim_silence_smart(audio, silence_thresh=-50)
294
+
295
+ # 2. Match loudness (before processing)
296
+ audio = match_loudness(audio, TARGET_DBFS)
297
 
298
+ # 3. Professional EQ and mastering
299
+ audio = apply_pro_audio_processing(audio)
300
+
301
+ # 4. Add micro-fades to prevent clicks
302
+ audio = apply_micro_dynamics(audio)
303
 
304
  return audio
305
  except Exception as e:
 
311
  except:
312
  pass
313
 
314
+ async def elevenlabs_quality_tts(full_text, output_file, native_lang_code):
315
+ """
316
+ ๐ŸŽ™๏ธ ELEVENLABS-QUALITY TTS ENGINE
317
+ Natural flow, professional mastering, intelligent crossfading.
318
+ """
319
+ print("
320
+ ๐ŸŽฌ Starting ElevenLabs-Quality TTS...")
321
 
322
+ # 1. Split text with context
323
+ segments_data = split_with_context(full_text)
324
 
325
+ print(f"๐Ÿ“Š Detected {len(segments_data)} segments:")
326
+ for i, (text, lang_type, pause_type) in enumerate(segments_data):
327
+ pause_icon = "๐Ÿซ" if pause_type == 'breath' else "," if pause_type == 'micro' else "โ†’"
328
+ print(f" {i+1}. [{lang_type.upper()}] {pause_icon} : {text[:50]}...")
329
 
330
+ # 2. Voice assignment
331
+ native_voice = VOICE_MAPPING.get(native_lang_code, VOICE_MAPPING["Tamil"])
332
  english_voice = VOICE_MAPPING["English"]
333
 
334
+ # 3. Generate audio segments
335
+ print("
336
+ ๐ŸŽค Generating audio...")
337
+ semaphore = asyncio.Semaphore(5)
338
 
339
+ tasks = []
340
+ for text_chunk, lang_type, pause_type in segments_data:
341
+ voice = native_voice if lang_type == 'indic' else english_voice
342
+ tasks.append(generate_segment_audio(text_chunk, voice, semaphore, lang_type))
343
 
 
 
344
  raw_files = await asyncio.gather(*tasks)
345
 
346
+ # 4. Process segments in parallel
347
+ print("๐ŸŽš๏ธ Applying professional audio processing...")
348
+ process_tasks = []
349
+ for i, file_path in enumerate(raw_files):
350
+ lang_type = segments_data[i][1]
351
+ process_tasks.append(process_segment(file_path, lang_type))
352
 
353
+ processed_segments = await asyncio.gather(*process_tasks)
 
354
 
355
+ # Filter valid segments
356
+ valid_data = []
357
+ for i, seg in enumerate(processed_segments):
358
+ if seg is not None:
359
+ valid_data.append({
360
+ 'audio': seg,
361
+ 'lang': segments_data[i][1],
362
+ 'pause': segments_data[i][2]
363
+ })
364
 
365
+ if not valid_data:
366
+ print("โŒ No audio generated.")
367
  return None
368
+
369
+ # 5. Intelligent stitching
370
+ print("๐Ÿงต Stitching with intelligent crossfades...")
371
+ final_audio = valid_data[0]['audio']
372
+
373
+ for i in range(1, len(valid_data)):
374
+ current_seg = valid_data[i]['audio']
375
+ prev_lang = valid_data[i-1]['lang']
376
+ prev_pause = valid_data[i-1]['pause']
377
+ current_lang = valid_data[i]['lang']
378
+
379
+ final_audio = intelligent_crossfade(
380
+ final_audio,
381
+ current_seg,
382
+ prev_lang,
383
+ current_lang,
384
+ prev_pause
385
+ )
386
+
387
+ # 6. Final mastering pass
388
+ print("๐ŸŽ›๏ธ Final mastering...")
389
+
390
+ # Gentle broadcast-quality compression
391
  final_audio = compress_dynamic_range(
392
+ final_audio,
393
+ threshold=-20.0, # Gentle threshold
394
+ ratio=COMPRESSION_RATIO, # Light compression (1.8:1)
395
+ attack=2.0, # Fast attack for clarity
396
+ release=30.0 # Quick release for naturalness
397
  )
398
+
399
+ # Final loudness normalization
400
  final_audio = normalize(final_audio)
 
 
 
401
 
402
+ # 7. Export with high quality
403
+ print("๐Ÿ’พ Exporting...")
404
+ final_audio.export(
405
+ output_file,
406
+ format="mp3",
407
+ bitrate="256k", # High quality
408
+ parameters=["-q:a", "0"] # Best VBR quality
409
+ )
410
+
411
+ print(f"โœ… ElevenLabs-quality audio saved: {output_file}")
412
  return output_file
413
 
414
+ # --- Wrapper ---
415
  async def generate_tts(id, lines, lang_input):
416
  if "&&&" in lang_input:
417
  parts = lang_input.split("&&&")
 
422
  lang_name = lang_input.strip()
423
 
424
  output_path = os.path.join(AUDIO_DIR, f"audio_{id}.mp3")
425
+ result = await elevenlabs_quality_tts(text, output_path, lang_name)
426
 
427
  if result:
428
  audio_info = MP3(result)
 
432
 
433
 
434
 
435
+
436
  def audio_func(id, lines, lang):
437
  loop = asyncio.new_event_loop()
438
  asyncio.set_event_loop(loop)