sreepathi-ravikumar commited on
Commit
df79249
·
verified ·
1 Parent(s): 9467330

Update video2.py

Browse files
Files changed (1) hide show
  1. video2.py +64 -100
video2.py CHANGED
@@ -55,209 +55,181 @@ from pydub.effects import normalize, compress_dynamic_range
55
  from mutagen.mp3 import MP3
56
 
57
  # --- Configuration ---
58
- AUDIO_DIR = "output_audio" # Directory to save files
59
  os.makedirs(AUDIO_DIR, exist_ok=True)
60
 
61
- # Default Voices
 
62
  VOICE_MAPPING = {
63
- "English": "en-IN-NeerjaNeural", # Indian English for better blending with Indian languages
64
  "Tamil": "ta-IN-PallaviNeural",
65
  "Hindi": "hi-IN-SwaraNeural",
66
- "Malayalam": "ml-IN-SobhanaNeural",
67
- "Kannada": "kn-IN-SapnaNeural",
68
- "Telugu": "te-IN-ShrutiNeural",
69
- "Bengali": "bn-IN-TanishaaNeural",
70
- "Marathi": "mr-IN-AarohiNeural",
71
- # Add others as needed
72
  }
73
 
74
- # --- Regex Patterns ---
75
- # Detects Tamil, Devanagari (Hindi), etc. based on Unicode ranges
76
- # Tamil: \u0B80-\u0BFF, Devanagari: \u0900-\u097F, Malayalam: \u0D00-\u0D7F
77
  INDIC_SCRIPT_PATTERN = re.compile(r'[\u0900-\u0D7F]+')
78
- SENTENCE_ENDINGS = re.compile(r'[.!?।]\s+')
79
 
80
  @lru_cache(maxsize=1024)
81
  def clean_text(text):
82
- """Basic cleanup to remove artifacts but keep punctuation for pauses."""
83
  if not text: return ""
84
  text = html.unescape(str(text))
85
- text = re.sub(r'https?://\S+', '', text) # Remove URLs
86
- text = re.sub(r'[\*\#\<\>\[\]\{\}]', '', text) # Remove markdown/brackets
 
87
  text = re.sub(r'\s+', ' ', text).strip()
88
  return text
89
 
90
- def detect_language_group(text_segment):
91
  """
92
- Determines if a segment is primarily English or an Indian Language.
93
- Returns: 'indic' or 'english'
94
  """
95
- # If the segment contains Indian script characters, treat as Indic
96
- if INDIC_SCRIPT_PATTERN.search(text_segment):
97
  return 'indic'
98
  return 'english'
99
 
100
  def split_by_language_and_sentence(text):
101
  """
102
- Intelligent splitter that groups words by language to ensure
103
- the correct voice is used for English words inside Tamil sentences.
104
  """
105
  text = clean_text(text)
106
  words = text.split(' ')
107
 
108
  segments = []
109
  current_chunk = []
110
- current_type = None # 'english' or 'indic'
111
 
112
  for word in words:
113
- # Check if word ends with sentence punctuation
114
- has_punctuation = any(char in ".!?," for char in word)
115
- clean_word = word.strip(".,!?")
116
 
117
- # Determine type of this specific word
118
- word_type = detect_language_group(clean_word)
 
 
 
 
 
119
 
120
- # Initialize first chunk
121
  if current_type is None:
122
  current_type = word_type
123
  current_chunk.append(word)
124
 
125
- # If type matches, keep adding to chunk
126
  elif word_type == current_type:
127
  current_chunk.append(word)
128
 
129
- # If type changes (Language switch), save chunk and start new one
130
  else:
131
  segments.append((" ".join(current_chunk), current_type))
132
  current_chunk = [word]
133
  current_type = word_type
134
-
135
- # If this word had punctuation, it implies a natural pause,
136
- # so we might want to force a segment break to allow breathing room,
137
- # but for smoothness, we keep it in the stream unless logic dictates otherwise.
138
 
139
- # Append the final chunk
140
  if current_chunk:
141
  segments.append((" ".join(current_chunk), current_type))
142
 
143
  return segments
144
 
145
  async def generate_segment_audio(text, voice, rate_limit_sem):
146
- """Generates audio for a single segment."""
147
  if not text.strip():
148
  return None
149
 
150
  async with rate_limit_sem:
151
  try:
152
- # Create a unique temp file
153
  fd, path = tempfile.mkstemp(suffix=".mp3")
154
  os.close(fd)
155
 
156
- # Rate adjustment: Make English slightly faster to match Indian speech rates usually
157
- rate = "+0%"
158
-
159
  comm = edge_tts.Communicate(text, voice, rate=rate)
160
  await comm.save(path)
161
  return path
162
  except Exception as e:
163
- print(f"Error generating segment '{text[:20]}...': {e}")
164
  return None
165
 
166
  def process_audio_segment(file_path):
167
- """
168
- Reads MP3, removes static silence, and normalizes volume.
169
- Run in ThreadPool to avoid blocking event loop.
170
- """
171
  if not file_path or not os.path.exists(file_path):
172
  return None
173
 
174
  try:
175
  audio = AudioSegment.from_mp3(file_path)
176
 
177
- # 1. Gentle Silence Trimming (Don't cut off word endings)
178
- # We only trim if silence is longer than 300ms at ends
179
- def trim_silence(sound, silence_threshold=-40.0, chunk_size=10):
180
- trim_ms = 0
181
- while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
182
- trim_ms += chunk_size
183
- return sound[trim_ms:]
184
-
185
- audio = trim_silence(audio) # Trim start
186
- audio = trim_silence(audio.reverse()).reverse() # Trim end
187
 
188
- # 2. Add a tiny bit of padding (50ms) to prevent abrupt cuts
 
189
  silence_pad = AudioSegment.silent(duration=50)
190
  audio = silence_pad + audio + silence_pad
191
 
192
  return audio
193
  except Exception as e:
194
- print(f"Error processing audio file {file_path}: {e}")
195
  return None
196
  finally:
197
- # Cleanup temp file
198
  try:
199
  os.remove(file_path)
200
  except:
201
  pass
202
 
203
  async def bilingual_tts_optimized(full_text, output_file, native_lang_code):
204
- """
205
- Main Orchestrator.
206
- """
207
- print(f"Processing: {full_text[:50]}...")
208
 
209
- # 1. Split text into Language chunks (English vs Native)
210
- # The native_lang_code should be something like "Tamil", "Hindi" keys in VOICE_MAPPING
211
  segments_data = split_by_language_and_sentence(full_text)
212
 
213
- # 2. Define voices
 
 
 
 
 
214
  native_voice = VOICE_MAPPING.get(native_lang_code, VOICE_MAPPING["English"])
215
  english_voice = VOICE_MAPPING["English"]
216
 
217
  tasks = []
218
- # Limit concurrent connections to Edge TTS to avoid 429 Too Many Requests
219
- semaphore = asyncio.Semaphore(8)
220
 
221
- # 3. Queue up generation tasks
222
  for text_chunk, type_group in segments_data:
223
  voice = native_voice if type_group == 'indic' else english_voice
224
  tasks.append(generate_segment_audio(text_chunk, voice, semaphore))
225
 
226
- # 4. Generate Raw Audio Files (Async)
 
227
  raw_files = await asyncio.gather(*tasks)
228
 
229
- # 5. Process Audio (Normalization & Stitching)
230
- # Using ThreadPool for CPU intensive pydub operations
231
  final_audio = AudioSegment.empty()
232
 
233
  with ThreadPoolExecutor(max_workers=4) as executor:
234
  processed_segments = list(executor.map(process_audio_segment, raw_files))
235
 
236
- # 6. Stitch with Crossfade for smoothness
237
- # We ignore None types
238
  valid_segments = [seg for seg in processed_segments if seg is not None]
239
 
240
  if not valid_segments:
 
241
  return None
242
 
243
- # Logic: If the segments are short, crossfade. If it looks like a sentence end, add pause.
244
  for i, seg in enumerate(valid_segments):
245
  if i == 0:
246
  final_audio += seg
247
  else:
248
- # Crossfade logic: overlap the previous segment end with next segment start
249
- # by 30ms to create a smooth flow instead of a hard cut.
250
- try:
251
- final_audio = final_audio.append(seg, crossfade=30)
252
- except:
253
- # Fallback if segment is too short to crossfade
254
- final_audio += seg
255
 
256
- # 7. Final Mastering
257
- # Normalize to standard -3dB
258
- final_audio = normalize(final_audio, headroom=3.0)
259
-
260
- # Optional: Dynamic Range Compression to make voice sound "richer" and consistent
261
  final_audio = compress_dynamic_range(
262
  final_audio,
263
  threshold=-15.0,
@@ -265,22 +237,15 @@ async def bilingual_tts_optimized(full_text, output_file, native_lang_code):
265
  attack=5.0,
266
  release=50.0
267
  )
 
268
 
269
- # 8. Export
270
  final_audio.export(output_file, format="mp3", bitrate="192k")
271
- print(f"Saved: {output_file}")
272
 
273
  return output_file
274
 
275
- # --- Wrapper for usage ---
276
-
277
  async def generate_tts(id, lines, lang_input):
278
- """
279
- Called by external script.
280
- lang_input format examples: "Tamil", "Text &&& Tamil"
281
- """
282
-
283
- # Parse input
284
  if "&&&" in lang_input:
285
  parts = lang_input.split("&&&")
286
  text = parts[0].strip()
@@ -290,8 +255,6 @@ async def generate_tts(id, lines, lang_input):
290
  lang_name = lang_input.strip()
291
 
292
  output_path = os.path.join(AUDIO_DIR, f"audio_{id}.mp3")
293
-
294
- # Run the generator
295
  result = await bilingual_tts_optimized(text, output_path, lang_name)
296
 
297
  if result:
@@ -300,6 +263,7 @@ async def generate_tts(id, lines, lang_input):
300
  else:
301
  return 0, None
302
 
 
303
  def audio_func(id, lines, lang):
304
  """Synchronous wrapper for audio generation."""
305
  return asyncio.run(generate_tts(id, lines, lang))
 
55
  from mutagen.mp3 import MP3
56
 
57
  # --- Configuration ---
58
+ AUDIO_DIR = "output_audio"
59
  os.makedirs(AUDIO_DIR, exist_ok=True)
60
 
61
+ # Voice Mapping
62
+ # using 'NeerjaNeural' for English as it blends better with Indian contexts
63
  VOICE_MAPPING = {
64
+ "English": "en-IN-NeerjaNeural",
65
  "Tamil": "ta-IN-PallaviNeural",
66
  "Hindi": "hi-IN-SwaraNeural",
 
 
 
 
 
 
67
  }
68
 
69
+ # Regex to find Indian Language characters (Tamil, Hindi, Malayalam, etc.)
70
+ # Tamil Unicode range is inside this block (\u0B80-\u0BFF)
 
71
  INDIC_SCRIPT_PATTERN = re.compile(r'[\u0900-\u0D7F]+')
 
72
 
73
  @lru_cache(maxsize=1024)
74
  def clean_text(text):
 
75
  if not text: return ""
76
  text = html.unescape(str(text))
77
+ # Remove URLs and Markdown, but keep basic punctuation
78
+ text = re.sub(r'https?://\S+', '', text)
79
+ text = re.sub(r'[\*\#\<\>\[\]\{\}]', '', text)
80
  text = re.sub(r'\s+', ' ', text).strip()
81
  return text
82
 
83
+ def detect_language_group(word):
84
  """
85
+ Returns 'indic' if the word has Tamil/Hindi chars.
86
+ Returns 'english' otherwise (for words like 'Voltage', '1.5V', 'circuit').
87
  """
88
+ if INDIC_SCRIPT_PATTERN.search(word):
 
89
  return 'indic'
90
  return 'english'
91
 
92
  def split_by_language_and_sentence(text):
93
  """
94
+ Splits text into chunks of English vs Native language.
95
+ Example: "Voltage னு" -> [("Voltage", "english"), ("னு", "indic")]
96
  """
97
  text = clean_text(text)
98
  words = text.split(' ')
99
 
100
  segments = []
101
  current_chunk = []
102
+ current_type = None
103
 
104
  for word in words:
105
+ # Clean punctuation for detection (e.g. "force," -> "force")
106
+ # But keep the original word for the audio generation
107
+ clean_word_for_check = word.strip(".,!?")
108
 
109
+ if not clean_word_for_check:
110
+ # If word was just "...", keep it with previous chunk
111
+ if current_chunk:
112
+ current_chunk.append(word)
113
+ continue
114
+
115
+ word_type = detect_language_group(clean_word_for_check)
116
 
117
+ # Start first chunk
118
  if current_type is None:
119
  current_type = word_type
120
  current_chunk.append(word)
121
 
122
+ # If type matches current chunk, add to it
123
  elif word_type == current_type:
124
  current_chunk.append(word)
125
 
126
+ # Type switched (e.g., from English 'Voltage' to Tamil 'னு')
127
  else:
128
  segments.append((" ".join(current_chunk), current_type))
129
  current_chunk = [word]
130
  current_type = word_type
 
 
 
 
131
 
132
+ # Add valid final chunk
133
  if current_chunk:
134
  segments.append((" ".join(current_chunk), current_type))
135
 
136
  return segments
137
 
138
  async def generate_segment_audio(text, voice, rate_limit_sem):
139
+ """Generates audio for a specific text segment using EdgeTTS."""
140
  if not text.strip():
141
  return None
142
 
143
  async with rate_limit_sem:
144
  try:
 
145
  fd, path = tempfile.mkstemp(suffix=".mp3")
146
  os.close(fd)
147
 
148
+ # Slight speed adjustment for flow
149
+ rate = "+0%"
 
150
  comm = edge_tts.Communicate(text, voice, rate=rate)
151
  await comm.save(path)
152
  return path
153
  except Exception as e:
154
+ print(f"Error generating segment '{text}': {e}")
155
  return None
156
 
157
  def process_audio_segment(file_path):
158
+ """Process individual segment: normalize and add micro-padding."""
 
 
 
159
  if not file_path or not os.path.exists(file_path):
160
  return None
161
 
162
  try:
163
  audio = AudioSegment.from_mp3(file_path)
164
 
165
+ # Normalize volume
166
+ audio = normalize(audio)
 
 
 
 
 
 
 
 
167
 
168
+ # Add tiny silence (50ms) to start/end to prevent 'clipped' words
169
+ # This makes the transition between "Voltage" and "nu" sound natural
170
  silence_pad = AudioSegment.silent(duration=50)
171
  audio = silence_pad + audio + silence_pad
172
 
173
  return audio
174
  except Exception as e:
175
+ print(f"Error processing segment: {e}")
176
  return None
177
  finally:
 
178
  try:
179
  os.remove(file_path)
180
  except:
181
  pass
182
 
183
  async def bilingual_tts_optimized(full_text, output_file, native_lang_code):
184
+ print("\n--- Starting Processing ---")
 
 
 
185
 
186
+ # 1. Split Text
 
187
  segments_data = split_by_language_and_sentence(full_text)
188
 
189
+ # DEBUG: Print the split logic so user can see it
190
+ print(f"Detected {len(segments_data)} segments:")
191
+ for i, (text, lang_type) in enumerate(segments_data):
192
+ print(f" {i+1}. [{lang_type.upper()}] : {text}")
193
+
194
+ # 2. Assign Voices
195
  native_voice = VOICE_MAPPING.get(native_lang_code, VOICE_MAPPING["English"])
196
  english_voice = VOICE_MAPPING["English"]
197
 
198
  tasks = []
199
+ semaphore = asyncio.Semaphore(5) # Prevent overloading API
 
200
 
201
+ # 3. Create Tasks
202
  for text_chunk, type_group in segments_data:
203
  voice = native_voice if type_group == 'indic' else english_voice
204
  tasks.append(generate_segment_audio(text_chunk, voice, semaphore))
205
 
206
+ # 4. Run Generation
207
+ print("\nGenerating Audio Segments...")
208
  raw_files = await asyncio.gather(*tasks)
209
 
210
+ # 5. Process Audio (Stitching)
211
+ print("Stitching and Mastering...")
212
  final_audio = AudioSegment.empty()
213
 
214
  with ThreadPoolExecutor(max_workers=4) as executor:
215
  processed_segments = list(executor.map(process_audio_segment, raw_files))
216
 
 
 
217
  valid_segments = [seg for seg in processed_segments if seg is not None]
218
 
219
  if not valid_segments:
220
+ print("Error: No audio generated.")
221
  return None
222
 
223
+ # Crossfade Stitching
224
  for i, seg in enumerate(valid_segments):
225
  if i == 0:
226
  final_audio += seg
227
  else:
228
+ # 30ms crossfade blends the English word ending into the Tamil start
229
+ final_audio = final_audio.append(seg, crossfade=30)
 
 
 
 
 
230
 
231
+ # 6. Final Mastering
232
+ # Compress dynamic range to make it sound punchy like a podcast
 
 
 
233
  final_audio = compress_dynamic_range(
234
  final_audio,
235
  threshold=-15.0,
 
237
  attack=5.0,
238
  release=50.0
239
  )
240
+ final_audio = normalize(final_audio)
241
 
 
242
  final_audio.export(output_file, format="mp3", bitrate="192k")
243
+ print(f"✅ Success! Audio saved to: {output_file}")
244
 
245
  return output_file
246
 
247
+ # --- Wrapper for your usage ---
 
248
  async def generate_tts(id, lines, lang_input):
 
 
 
 
 
 
249
  if "&&&" in lang_input:
250
  parts = lang_input.split("&&&")
251
  text = parts[0].strip()
 
255
  lang_name = lang_input.strip()
256
 
257
  output_path = os.path.join(AUDIO_DIR, f"audio_{id}.mp3")
 
 
258
  result = await bilingual_tts_optimized(text, output_path, lang_name)
259
 
260
  if result:
 
263
  else:
264
  return 0, None
265
 
266
+
267
  def audio_func(id, lines, lang):
268
  """Synchronous wrapper for audio generation."""
269
  return asyncio.run(generate_tts(id, lines, lang))