sreepathi-ravikumar commited on
Commit
a0f5f50
·
verified ·
1 Parent(s): db8baa3

Update video2.py

Browse files
Files changed (1) hide show
  1. video2.py +59 -31
video2.py CHANGED
@@ -40,7 +40,10 @@ for path in [BASE_DIR, AUDIO_DIR, CLIPS_DIR]:
40
  Path(path).mkdir(parents=True, exist_ok=True)
41
  warnings.filterwarnings('ignore')
42
  nest_asyncio.apply()
 
 
43
  VOICE_EN = "en-IN-NeerjaNeural"
 
44
  def clean_text_for_tts(text):
45
  """Cleans text before TTS so only the spoken words are read."""
46
  if not text:
@@ -65,6 +68,7 @@ def clean_text_for_tts(text):
65
  text = unicodedata.normalize('NFKD', text)
66
  text = re.sub(r'\s+', ' ', text)
67
  return text.strip()
 
68
  async def generate_safe_audio(text, voice):
69
  """Generate clean, plain text audio using edge-tts."""
70
  cleaned_text = clean_text_for_tts(text)
@@ -80,6 +84,7 @@ async def generate_safe_audio(text, voice):
80
  except Exception as e:
81
  print(f"Error generating audio: {e}")
82
  return None
 
83
  def smart_text_chunking(text, max_chars=80):
84
  """Split text into sensible, natural-length chunks for TTS."""
85
  text = clean_text_for_tts(text)
@@ -113,8 +118,9 @@ def smart_text_chunking(text, max_chars=80):
113
  if current_chunk:
114
  chunks.append(current_chunk.strip())
115
  return [chunk for chunk in chunks if chunk.strip()]
 
116
  async def bilingual_tts_fixed(text, output_file="audio0.mp3", VOICE_TA=None):
117
- """Main fixed function for bilingual TTS output."""
118
  print("Starting fixed bilingual TTS processing...")
119
  try:
120
  chunks = smart_text_chunking(text)
@@ -122,9 +128,9 @@ async def bilingual_tts_fixed(text, output_file="audio0.mp3", VOICE_TA=None):
122
  print("Error: No valid text chunks after cleaning")
123
  return None
124
  print(f"Processing {len(chunks)} text chunks...")
125
- audio_files = []
126
- merged_audio = None
127
  is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA
 
128
  for i, chunk in enumerate(chunks):
129
  is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk)
130
  if is_bilingual_tamil:
@@ -133,45 +139,67 @@ async def bilingual_tts_fixed(text, output_file="audio0.mp3", VOICE_TA=None):
133
  voice = VOICE_TA
134
  lang_label = "Tamil" if is_tamil else "English"
135
  print(f"Chunk {i+1}/{len(chunks)} ({lang_label}): {chunk[:40]}...")
136
- audio_file = await generate_safe_audio(chunk, voice)
137
- if audio_file:
138
- audio_files.append(audio_file)
139
- try:
140
- segment = AudioSegment.from_file(audio_file)
141
- segment = normalize(segment)
142
- # Only strip silence if segment is reasonably long
143
- if len(segment) > 200:
144
- try:
145
- segment = segment.strip_silence(silence_len=50, silence_thresh=-40)
146
- except Exception as e:
147
- print(f" (Info) Skipped strip_silence: {e}")
148
- if merged_audio is None:
149
- merged_audio = segment
150
- else:
151
- pause = AudioSegment.silent(duration=200)
152
- merged_audio += pause + segment
153
- except Exception as audio_error:
154
- print(f"Warning: Error processing audio for chunk {i+1}: {audio_error}")
155
- continue
156
- if merged_audio is None:
157
  print("Error: No audio was successfully generated")
158
  return None
159
- merged_audio.export(output_file, format="mp3", bitrate="128k")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  print(f"✅ Audio successfully generated: {output_file}")
161
- for temp_file in audio_files:
 
 
162
  try:
163
  if os.path.exists(temp_file):
164
  os.unlink(temp_file)
165
  except:
166
  pass
 
167
  return output_file
168
  except Exception as main_error:
169
  print(f"Main error in bilingual TTS: {main_error}")
170
  return None
 
171
  # USAGE EXAMPLE
172
  async def run_fixed_tts(text_input, output_file, lang):
173
  await bilingual_tts_fixed(text_input, output_file, lang)
174
-
175
  async def generate_tts(id, lines, lang):
176
  voice = {
177
  "English": "en-US-JennyNeural",
@@ -208,22 +236,22 @@ async def generate_tts(id, lines, lang):
208
  "Hungarian": "hu-HU-NoemiNeural"
209
  }
210
  audio_name = f"audio{id}.mp3"
211
- audio_path = os.path.join(AUDIO_DIR, audio_name)
212
  if "&&&" in lang:
213
  listf = lang.split("&&&")
214
  text = listf[0].strip()
215
  lang_name = listf[1].strip()
216
  voice_to_use = voice[lang_name]
217
  else:
218
- text = lines[id]
219
  voice_to_use = voice[lang]
220
- loop = asyncio.get_event_loop()
221
- output = loop.run_until_complete(run_fixed_tts(text, audio_path, voice_to_use))
222
  if os.path.exists(audio_path):
223
  audio = MP3(audio_path)
224
  duration = audio.info.length
225
  return duration, audio_path
226
  return None, None
 
227
  def audio_func(id, lines, lang):
228
  return asyncio.run(generate_tts(id, lines, lang))
229
  #-----------------------------
 
40
  Path(path).mkdir(parents=True, exist_ok=True)
41
  warnings.filterwarnings('ignore')
42
  nest_asyncio.apply()
43
+
44
+
45
  VOICE_EN = "en-IN-NeerjaNeural"
46
+
47
  def clean_text_for_tts(text):
48
  """Cleans text before TTS so only the spoken words are read."""
49
  if not text:
 
68
  text = unicodedata.normalize('NFKD', text)
69
  text = re.sub(r'\s+', ' ', text)
70
  return text.strip()
71
+
72
  async def generate_safe_audio(text, voice):
73
  """Generate clean, plain text audio using edge-tts."""
74
  cleaned_text = clean_text_for_tts(text)
 
84
  except Exception as e:
85
  print(f"Error generating audio: {e}")
86
  return None
87
+
88
  def smart_text_chunking(text, max_chars=80):
89
  """Split text into sensible, natural-length chunks for TTS."""
90
  text = clean_text_for_tts(text)
 
118
  if current_chunk:
119
  chunks.append(current_chunk.strip())
120
  return [chunk for chunk in chunks if chunk.strip()]
121
+
122
  async def bilingual_tts_fixed(text, output_file="audio0.mp3", VOICE_TA=None):
123
+ """Main fixed function for bilingual TTS output with concurrent audio generation for speed."""
124
  print("Starting fixed bilingual TTS processing...")
125
  try:
126
  chunks = smart_text_chunking(text)
 
128
  print("Error: No valid text chunks after cleaning")
129
  return None
130
  print(f"Processing {len(chunks)} text chunks...")
131
+
 
132
  is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA
133
+ tasks = []
134
  for i, chunk in enumerate(chunks):
135
  is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk)
136
  if is_bilingual_tamil:
 
139
  voice = VOICE_TA
140
  lang_label = "Tamil" if is_tamil else "English"
141
  print(f"Chunk {i+1}/{len(chunks)} ({lang_label}): {chunk[:40]}...")
142
+ tasks.append(generate_safe_audio(chunk, voice))
143
+
144
+ audio_files = await asyncio.gather(*tasks, return_exceptions=True)
145
+ processed_audio_files = [f for f in audio_files if isinstance(f, str)] # Filter successful files
146
+ errors = [e for e in audio_files if isinstance(e, Exception)]
147
+ if errors:
148
+ for e in errors:
149
+ print(f"Warning: Audio generation error: {e}")
150
+
151
+ if not processed_audio_files:
 
 
 
 
 
 
 
 
 
 
 
152
  print("Error: No audio was successfully generated")
153
  return None
154
+
155
+ merged_audio = None
156
+ for audio_file in processed_audio_files:
157
+ try:
158
+ segment = AudioSegment.from_file(audio_file)
159
+ segment = normalize(segment)
160
+ # Only strip silence if segment is reasonably long
161
+ if len(segment) > 200:
162
+ try:
163
+ segment = segment.strip_silence(silence_len=50, silence_thresh=-40)
164
+ except Exception as e:
165
+ print(f" (Info) Skipped strip_silence: {e}")
166
+ if merged_audio is None:
167
+ merged_audio = segment
168
+ else:
169
+ pause = AudioSegment.silent(duration=200)
170
+ merged_audio += pause + segment
171
+ except Exception as audio_error:
172
+ print(f"Warning: Error processing audio: {audio_error}")
173
+ continue
174
+
175
+ if merged_audio is None:
176
+ print("Error: No audio segments were successfully processed")
177
+ return None
178
+
179
+ # Improved quality: Apply overall compression and normalization
180
+ merged_audio = merged_audio.compress_dynamic_range(threshold=-20.0, ratio=4.0, attack=5.0, release=50.0)
181
+ merged_audio = normalize(merged_audio)
182
+
183
+ merged_audio.export(output_file, format="mp3", bitrate="192k") # Increased bitrate for better quality
184
  print(f"✅ Audio successfully generated: {output_file}")
185
+
186
+ # Cleanup temp files
187
+ for temp_file in processed_audio_files:
188
  try:
189
  if os.path.exists(temp_file):
190
  os.unlink(temp_file)
191
  except:
192
  pass
193
+
194
  return output_file
195
  except Exception as main_error:
196
  print(f"Main error in bilingual TTS: {main_error}")
197
  return None
198
+
199
  # USAGE EXAMPLE
200
  async def run_fixed_tts(text_input, output_file, lang):
201
  await bilingual_tts_fixed(text_input, output_file, lang)
202
+
203
  async def generate_tts(id, lines, lang):
204
  voice = {
205
  "English": "en-US-JennyNeural",
 
236
  "Hungarian": "hu-HU-NoemiNeural"
237
  }
238
  audio_name = f"audio{id}.mp3"
239
+ audio_path = os.path.join(AUDIO_DIR, audio_name) # Assuming AUDIO_DIR is defined elsewhere
240
  if "&&&" in lang:
241
  listf = lang.split("&&&")
242
  text = listf[0].strip()
243
  lang_name = listf[1].strip()
244
  voice_to_use = voice[lang_name]
245
  else:
246
+ text = lines[id] # Assuming lines is a dict or list indexed by id
247
  voice_to_use = voice[lang]
248
+ output = await run_fixed_tts(text, audio_path, voice_to_use)
 
249
  if os.path.exists(audio_path):
250
  audio = MP3(audio_path)
251
  duration = audio.info.length
252
  return duration, audio_path
253
  return None, None
254
+
255
  def audio_func(id, lines, lang):
256
  return asyncio.run(generate_tts(id, lines, lang))
257
  #-----------------------------