TGPro1 commited on
Commit
685b62a
·
verified ·
1 Parent(s): 6df1c67

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +42 -126
app.py CHANGED
@@ -11,6 +11,17 @@ import json
11
  import time
12
  import torchaudio
13
 
 
 
 
 
 
 
 
 
 
 
 
14
  # 🛠️ Monkeypatch torchaudio.backend (DeepFilterNet compatibility)
15
  # DeepFilterNet uses older torchaudio API structure (torchaudio.backend.common.AudioMetaData)
16
  # We mock it here before importing df
@@ -49,9 +60,8 @@ if not hasattr(torchaudio, "info"):
49
  torchaudio.info = mock_info
50
 
51
  from df.enhance import enhance, init_df, load_audio, save_audio
52
- # from df.utils import download_model # Not needed/Not found in this version
53
 
54
- # FORCE BUILD TRIGGER: 14:55:00 Jan 20 2026
55
 
56
  # 🛠️ Monkeypatch torchaudio.load to bypass TorchCodec requirement
57
  try:
@@ -100,13 +110,11 @@ def load_models():
100
 
101
  if MODELS["translate"] is None:
102
  print("🌍 Loading Google Translate (deep-translator)...")
103
- # No heavy object to load, just a placeholder or class ref
104
  MODELS["translate"] = "deep-translator-active"
105
 
106
  if MODELS["denoiser"] is None:
107
  print("🧹 Loading DeepFilterNet (Voice Cleaner)...")
108
  try:
109
- # Initialize DeepFilterNet model
110
  df_ret = init_df()
111
  if isinstance(df_ret, (list, tuple)) and len(df_ret) > 1:
112
  MODELS["denoiser"] = df_ret[0]
@@ -116,7 +124,6 @@ def load_models():
116
  except Exception as e:
117
  print(f"⚠️ Failed to load denoiser: {e}")
118
  try:
119
- print("🔄 Final attempt for DeepFilterNet init...")
120
  MODELS["denoiser"] = init_df()
121
  except:
122
  pass
@@ -125,7 +132,6 @@ def load_models():
125
  print("🔊 Loading XTTS-v2 (STRICT GPU PREFERRED)...")
126
  from TTS.api import TTS
127
  try:
128
- # Try GPU first
129
  MODELS["tts"] = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=True)
130
  print("✨ XTTS-v2 Loaded on GPU")
131
  except Exception as tts_e:
@@ -137,10 +143,11 @@ def load_models():
137
  print(f"❌ FATAL: Could not load XTTS-v2 on any hardware: {cpu_e}")
138
  raise cpu_e
139
 
 
140
  def core_process(request_dict):
141
  """Internal logic used by both FastAPI and Gradio"""
142
  action = request_dict.get("action")
143
- print(f"--- 🛠️ Processing Action: {action} ---")
144
  start_time = time.time()
145
 
146
  if action == "health":
@@ -160,20 +167,18 @@ def core_process(request_dict):
160
  temp_path = f.name
161
  try:
162
  print("🚀 Faster-Whisper Transcription Starting (Instant Mode)...")
163
- # beam_size=1 for instantaneous results (Greedy Search)
164
  segments, info = MODELS["stt"].transcribe(temp_path, language=lang, beam_size=1)
165
  text = " ".join([segment.text for segment in segments]).strip()
166
  print(f"✨ Transcription Done: '{text[:50]}...'")
167
  return {"text": text}
168
  finally:
169
- os.unlink(temp_path)
170
 
171
  elif action == "translate":
172
  text = request_dict.get("text")
173
  target_lang = request_dict.get("target_lang")
174
  print(f"🌍 Translate: '{text[:50]}...' to {target_lang}")
175
 
176
- # Map codes to Google standard (ISO 639-1)
177
  g_lang_map = {
178
  "en": "en", "fr": "fr", "es": "es", "de": "de",
179
  "ar": "ar", "it": "it", "pt": "pt", "ru": "ru",
@@ -182,7 +187,6 @@ def core_process(request_dict):
182
  g_target = g_lang_map.get(target_lang, "en")
183
 
184
  from deep_translator import GoogleTranslator
185
- # deep-translator handles 'auto' source by default
186
  result = GoogleTranslator(source='auto', target=g_target).translate(text)
187
 
188
  print(f"✨ Translation Done: '{result[:50]}...'")
@@ -200,7 +204,6 @@ def core_process(request_dict):
200
  f.write(speaker_bytes)
201
  speaker_wav_path = f.name
202
  else:
203
- # 🛡️ FALLBACK: Generate dummy speaker if missing
204
  print("⚠️ No speaker ref provided. Using generated default.")
205
  import wave, struct, math
206
  default_path = "default_speaker.wav"
@@ -210,12 +213,9 @@ def core_process(request_dict):
210
  wav_file.setnchannels(1)
211
  wav_file.setsampwidth(2)
212
  wav_file.setframerate(24000)
213
- # Generate 1 sec of silence/noise to satisfy XTTS input requirement
214
  data = [struct.pack('<h', int(math.sin(x/100.0)*3000)) for x in range(24000)]
215
  wav_file.writeframes(b''.join(data))
216
- except Exception as e:
217
- print(f"❌ Failed to create default speaker: {e}")
218
-
219
  if os.path.exists(default_path):
220
  speaker_wav_path = default_path
221
 
@@ -225,41 +225,29 @@ def core_process(request_dict):
225
 
226
  if speaker_wav_path:
227
  MODELS["tts"].tts_to_file(text=text, language=lang, file_path=output_path, speaker_wav=speaker_wav_path)
228
- else:
229
- # If fallback failed, try cloning from self (hack) or fail gracefully
230
- raise RuntimeError("No speaker_wav available for XTTS voice cloning.")
231
 
232
- # --- 🧹 DEEPFILTERNET DENOISING ---
233
  if MODELS["denoiser"]:
234
- print("🧹 Cleaning Audio with DeepFilterNet...")
235
  try:
236
  noisy_audio, _ = load_audio(output_path, sr=48000)
237
  enhanced_audio = enhance(MODELS["denoiser"], noisy_audio, pad=True)
238
  save_audio(output_path, enhanced_audio, 48000)
239
- print("✨ Audio Cleaned Successfully")
240
- except Exception as e:
241
- print(f"⚠️ Denoising failed, using original: {e}")
242
- # ----------------------------------
243
 
244
  with open(output_path, "rb") as f:
245
  audio_b64 = base64.b64encode(f.read()).decode()
246
  print("✨ TTS Done")
247
  return {"audio": audio_b64}
248
  finally:
249
- if speaker_wav_path and os.path.exists(speaker_wav_path):
250
  os.unlink(speaker_wav_path)
251
- if os.path.exists(output_path):
252
- os.unlink(output_path)
253
 
254
  elif action == "s2st":
255
  audio_b64 = request_dict.get("file")
256
  source_lang = request_dict.get("source_lang")
257
  target_lang = request_dict.get("target_lang")
258
  speaker_wav_b64 = request_dict.get("speaker_wav")
259
-
260
- print(f"🚀 [S2ST] Action Started (Source: {source_lang}, Target: {target_lang})")
261
 
262
- # 1. Decode Audio
263
  audio_bytes = base64.b64decode(audio_b64)
264
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
265
  f.write(audio_bytes)
@@ -272,115 +260,47 @@ def core_process(request_dict):
272
  sf.write(speaker_bytes)
273
  speaker_wav_path = sf.name
274
  else:
275
- # 🛡️ FALLBACK: Use default speaker if input audio is too short/bad for cloning or not provided
276
  default_path = "default_speaker.wav"
277
  if os.path.exists(default_path):
278
- print(f"⚠️ [S2ST] No valid speaker ref provided. Using default: {default_path}")
279
  speaker_wav_path = default_path
280
 
281
  try:
282
- # 2. STT (Whisper Pro)
283
- print("🎙️ [S2ST] Phase 1: Whisper Transcription...")
284
-
285
- # 🛡️ AUDIO PADDING (1.5s)
286
  try:
287
  waveform, sr = torchaudio.load(temp_path)
288
-
289
- # --- 🧹 PRE-PROCESS: DeepFilterNet on INPUT ---
290
  if MODELS["denoiser"]:
291
- try:
292
- noisy_in, _ = load_audio(temp_path, sr=48000)
293
- clean_in = enhance(MODELS["denoiser"], noisy_in, pad=True)
294
- save_audio(temp_path, clean_in, 48000) # Overwrite temp with clean
295
- print("🧹 [S2ST] Input Audio Cleaned (Pre-ASR/Clone)")
296
-
297
- waveform, sr = torchaudio.load(temp_path)
298
- except Exception as df_e:
299
- print(f"⚠️ Input cleaning failed: {df_e}")
300
-
301
- silence_frames = int(1.5 * sr)
302
- silence = torch.zeros((waveform.shape[0], silence_frames))
303
- padded_waveform = torch.cat([waveform, silence], dim=1)
304
 
305
- torchaudio.save(temp_path, padded_waveform, sr)
306
- print(f"🛡️ Added 1.5s silence padding to audio (New duration: {padded_waveform.shape[1]/sr:.2f}s)")
307
- except Exception as pe:
308
- print(f"⚠️ Padding/Cleaning failed: {pe}")
309
 
310
- print("🎙️ [S2ST] Phase 1: Faster-Whisper Transcription...")
311
- segments, info = MODELS["stt"].transcribe(
312
- temp_path,
313
- language=source_lang,
314
- beam_size=1,
315
- best_of=1
316
- )
317
  text = " ".join([segment.text for segment in segments]).strip()
318
-
319
- valid_endings = ('.', '!', '?', '…', '。', '!', '?')
320
- if text and not text.endswith(valid_endings):
321
- print(f"⚠️ Incomplete sentence detected: '{text}' -> Appending ellipsis")
322
- text += "..."
323
-
324
- print(f"✨ [S2ST] Transcribed: '{text[:50]}...'")
325
-
326
- if not text:
327
- return {"error": "No speech detected"}
328
-
329
- # 3. Translate
330
- print("🌍 [S2ST] Phase 2: Google Translation...")
331
- try:
332
- g_lang_map = {
333
- "en": "en", "fr": "fr", "es": "es", "de": "de",
334
- "ar": "ar", "it": "it", "pt": "pt", "ru": "ru",
335
- "zh": "zh-cn", "ja": "ja", "ko": "ko", "hi": "hi"
336
- }
337
- g_target = g_lang_map.get(target_lang, "en")
338
- from deep_translator import GoogleTranslator
339
- translated_text = GoogleTranslator(source='auto', target=g_target).translate(text)
340
- except Exception as tr_e:
341
- translated_text = text
342
 
343
- print(f"✨ [S2ST] Translated: '{translated_text[:50]}...'")
 
 
 
 
344
 
345
- # 4. TTS
346
- print("🔊 [S2ST] Phase 3: XTTS Synthesis...")
347
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file:
348
  output_path = output_file.name
349
 
350
- if len(translated_text) > 240:
351
- print(f"✂️ Text too long ({len(translated_text)}), splitting...")
352
- import re
353
- sub_segments = re.split(r'(?<=[.!?])\s+', translated_text)
354
- combined_audio = []
355
- for idx, sub in enumerate(sub_segments):
356
- if not sub.strip(): continue
357
- with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as sub_file:
358
- sub_path = sub_file.name
359
- try:
360
- MODELS["tts"].tts_to_file(text=sub, language=target_lang, file_path=sub_path, speaker_wav=speaker_wav_path)
361
- wav, sr = torchaudio.load(sub_path)
362
- combined_audio.append(wav)
363
- finally:
364
- if os.path.exists(sub_path): os.unlink(sub_path)
365
-
366
- if combined_audio:
367
- final_wav = torch.cat(combined_audio, dim=1)
368
- torchaudio.save(output_path, final_wav, sr)
369
- else:
370
- MODELS["tts"].tts_to_file(text="Error", language=target_lang, file_path=output_path, speaker_wav=speaker_wav_path)
371
- else:
372
- MODELS["tts"].tts_to_file(text=translated_text, language=target_lang, file_path=output_path, speaker_wav=speaker_wav_path)
373
 
374
  with open(output_path, "rb") as o:
375
  audio_out_b64 = base64.b64encode(o.read()).decode()
376
 
377
- print("🏁 [S2ST] All phases complete!")
378
- return {
379
- "text": text,
380
- "translated": translated_text,
381
- "audio": audio_out_b64
382
- }
383
-
384
  finally:
385
  if os.path.exists(temp_path): os.unlink(temp_path)
386
  if speaker_wav_path and os.path.exists(speaker_wav_path) and "default_speaker" not in speaker_wav_path:
@@ -419,6 +339,7 @@ def create_wav_header(sample_rate=24000, channels=1, bit_depth=16):
419
  return header
420
 
421
  @app.post("/api/v1/tts_stream")
 
422
  async def api_tts_stream(request: Request):
423
  try:
424
  load_models()
@@ -427,8 +348,6 @@ async def api_tts_stream(request: Request):
427
  lang = data.get("lang")
428
  speaker_wav_b64 = data.get("speaker_wav")
429
 
430
- print(f"🌊 [TTS Stream] Starting for: '{text[:50]}...' in {lang}")
431
-
432
  speaker_wav_path = None
433
  if speaker_wav_b64:
434
  speaker_bytes = base64.b64decode(speaker_wav_b64)
@@ -448,16 +367,14 @@ async def api_tts_stream(request: Request):
448
  stream_chunk_size=20
449
  ):
450
  yield (chunk * 32767).to(torch.int16).cpu().numpy().tobytes()
451
- print("✨ [TTS Stream] Generation Complete")
452
  except Exception as ge:
453
- print(f"❌ [TTS Stream] Generator error: {ge}")
454
  finally:
455
  if speaker_wav_path and os.path.exists(speaker_wav_path) and "default_speaker" not in speaker_wav_path:
456
  os.unlink(speaker_wav_path)
457
 
458
  return StreamingResponse(stream_generator(), media_type="audio/wav")
459
  except Exception as e:
460
- print(f"❌ [TTS Stream] Global Error: {traceback.format_exc()}")
461
  return {"error": str(e)}
462
 
463
  @app.get("/health")
@@ -483,5 +400,4 @@ demo = gr.Interface(
483
  app = gr.mount_gradio_app(app, demo, path="/")
484
 
485
  if __name__ == "__main__":
486
- print("🚀 Starting FastAPI Server on port 7860...")
487
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
11
  import time
12
  import torchaudio
13
 
14
+ # 🛡️ ZeroGPU Support (v68)
15
+ try:
16
+ import spaces
17
+ print("✅ ZeroGPU/Spaces detected")
18
+ except ImportError:
19
+ print("⚠️ Spaces library not found. Using mock decorator for local run.")
20
+ class spaces:
21
+ @staticmethod
22
+ def GPU(f): return f
23
+
24
+
25
  # 🛠️ Monkeypatch torchaudio.backend (DeepFilterNet compatibility)
26
  # DeepFilterNet uses older torchaudio API structure (torchaudio.backend.common.AudioMetaData)
27
  # We mock it here before importing df
 
60
  torchaudio.info = mock_info
61
 
62
  from df.enhance import enhance, init_df, load_audio, save_audio
 
63
 
64
+ # FORCE BUILD TRIGGER: 15:10:00 Jan 20 2026
65
 
66
  # 🛠️ Monkeypatch torchaudio.load to bypass TorchCodec requirement
67
  try:
 
110
 
111
  if MODELS["translate"] is None:
112
  print("🌍 Loading Google Translate (deep-translator)...")
 
113
  MODELS["translate"] = "deep-translator-active"
114
 
115
  if MODELS["denoiser"] is None:
116
  print("🧹 Loading DeepFilterNet (Voice Cleaner)...")
117
  try:
 
118
  df_ret = init_df()
119
  if isinstance(df_ret, (list, tuple)) and len(df_ret) > 1:
120
  MODELS["denoiser"] = df_ret[0]
 
124
  except Exception as e:
125
  print(f"⚠️ Failed to load denoiser: {e}")
126
  try:
 
127
  MODELS["denoiser"] = init_df()
128
  except:
129
  pass
 
132
  print("🔊 Loading XTTS-v2 (STRICT GPU PREFERRED)...")
133
  from TTS.api import TTS
134
  try:
 
135
  MODELS["tts"] = TTS(model_name="tts_models/multilingual/multi-dataset/xtts_v2", gpu=True)
136
  print("✨ XTTS-v2 Loaded on GPU")
137
  except Exception as tts_e:
 
143
  print(f"❌ FATAL: Could not load XTTS-v2 on any hardware: {cpu_e}")
144
  raise cpu_e
145
 
146
+ @spaces.GPU
147
  def core_process(request_dict):
148
  """Internal logic used by both FastAPI and Gradio"""
149
  action = request_dict.get("action")
150
+ print(f"--- 🛠️ Processing Action: {action} (ZeroGPU Context) ---")
151
  start_time = time.time()
152
 
153
  if action == "health":
 
167
  temp_path = f.name
168
  try:
169
  print("🚀 Faster-Whisper Transcription Starting (Instant Mode)...")
 
170
  segments, info = MODELS["stt"].transcribe(temp_path, language=lang, beam_size=1)
171
  text = " ".join([segment.text for segment in segments]).strip()
172
  print(f"✨ Transcription Done: '{text[:50]}...'")
173
  return {"text": text}
174
  finally:
175
+ if os.path.exists(temp_path): os.unlink(temp_path)
176
 
177
  elif action == "translate":
178
  text = request_dict.get("text")
179
  target_lang = request_dict.get("target_lang")
180
  print(f"🌍 Translate: '{text[:50]}...' to {target_lang}")
181
 
 
182
  g_lang_map = {
183
  "en": "en", "fr": "fr", "es": "es", "de": "de",
184
  "ar": "ar", "it": "it", "pt": "pt", "ru": "ru",
 
187
  g_target = g_lang_map.get(target_lang, "en")
188
 
189
  from deep_translator import GoogleTranslator
 
190
  result = GoogleTranslator(source='auto', target=g_target).translate(text)
191
 
192
  print(f"✨ Translation Done: '{result[:50]}...'")
 
204
  f.write(speaker_bytes)
205
  speaker_wav_path = f.name
206
  else:
 
207
  print("⚠️ No speaker ref provided. Using generated default.")
208
  import wave, struct, math
209
  default_path = "default_speaker.wav"
 
213
  wav_file.setnchannels(1)
214
  wav_file.setsampwidth(2)
215
  wav_file.setframerate(24000)
 
216
  data = [struct.pack('<h', int(math.sin(x/100.0)*3000)) for x in range(24000)]
217
  wav_file.writeframes(b''.join(data))
218
+ except: pass
 
 
219
  if os.path.exists(default_path):
220
  speaker_wav_path = default_path
221
 
 
225
 
226
  if speaker_wav_path:
227
  MODELS["tts"].tts_to_file(text=text, language=lang, file_path=output_path, speaker_wav=speaker_wav_path)
 
 
 
228
 
 
229
  if MODELS["denoiser"]:
 
230
  try:
231
  noisy_audio, _ = load_audio(output_path, sr=48000)
232
  enhanced_audio = enhance(MODELS["denoiser"], noisy_audio, pad=True)
233
  save_audio(output_path, enhanced_audio, 48000)
234
+ except: pass
 
 
 
235
 
236
  with open(output_path, "rb") as f:
237
  audio_b64 = base64.b64encode(f.read()).decode()
238
  print("✨ TTS Done")
239
  return {"audio": audio_b64}
240
  finally:
241
+ if speaker_wav_path and os.path.exists(speaker_wav_path) and "default_speaker" not in speaker_wav_path:
242
  os.unlink(speaker_wav_path)
243
+ if 'output_path' in locals() and os.path.exists(output_path): os.unlink(output_path)
 
244
 
245
  elif action == "s2st":
246
  audio_b64 = request_dict.get("file")
247
  source_lang = request_dict.get("source_lang")
248
  target_lang = request_dict.get("target_lang")
249
  speaker_wav_b64 = request_dict.get("speaker_wav")
 
 
250
 
 
251
  audio_bytes = base64.b64decode(audio_b64)
252
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
253
  f.write(audio_bytes)
 
260
  sf.write(speaker_bytes)
261
  speaker_wav_path = sf.name
262
  else:
 
263
  default_path = "default_speaker.wav"
264
  if os.path.exists(default_path):
 
265
  speaker_wav_path = default_path
266
 
267
  try:
268
+ # Padding & Denoising
 
 
 
269
  try:
270
  waveform, sr = torchaudio.load(temp_path)
 
 
271
  if MODELS["denoiser"]:
272
+ noisy_in, _ = load_audio(temp_path, sr=48000)
273
+ clean_in = enhance(MODELS["denoiser"], noisy_in, pad=True)
274
+ save_audio(temp_path, clean_in, 48000)
275
+ waveform, sr = torchaudio.load(temp_path)
 
 
 
 
 
 
 
 
 
276
 
277
+ silence = torch.zeros((waveform.shape[0], int(1.5 * sr)))
278
+ padded = torch.cat([waveform, silence], dim=1)
279
+ torchaudio.save(temp_path, padded, sr)
280
+ except: pass
281
 
282
+ # STT
283
+ segments, info = MODELS["stt"].transcribe(temp_path, language=source_lang, beam_size=1)
 
 
 
 
 
284
  text = " ".join([segment.text for segment in segments]).strip()
285
+ if text and not text.endswith(('.', '!', '?', '…')): text += "..."
286
+ if not text: return {"error": "No speech detected"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
287
 
288
+ # Translate
289
+ g_lang_map = {"en": "en", "fr": "fr", "es": "es", "de": "de", "ar": "ar", "it": "it", "pt": "pt", "ru": "ru", "zh": "zh-cn", "ja": "ja", "ko": "ko", "hi": "hi"}
290
+ g_target = g_lang_map.get(target_lang, "en")
291
+ from deep_translator import GoogleTranslator
292
+ translated_text = GoogleTranslator(source='auto', target=g_target).translate(text)
293
 
294
+ # TTS
 
295
  with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as output_file:
296
  output_path = output_file.name
297
 
298
+ MODELS["tts"].tts_to_file(text=translated_text, language=target_lang, file_path=output_path, speaker_wav=speaker_wav_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
299
 
300
  with open(output_path, "rb") as o:
301
  audio_out_b64 = base64.b64encode(o.read()).decode()
302
 
303
+ return {"text": text, "translated": translated_text, "audio": audio_out_b64}
 
 
 
 
 
 
304
  finally:
305
  if os.path.exists(temp_path): os.unlink(temp_path)
306
  if speaker_wav_path and os.path.exists(speaker_wav_path) and "default_speaker" not in speaker_wav_path:
 
339
  return header
340
 
341
  @app.post("/api/v1/tts_stream")
342
+ @spaces.GPU
343
  async def api_tts_stream(request: Request):
344
  try:
345
  load_models()
 
348
  lang = data.get("lang")
349
  speaker_wav_b64 = data.get("speaker_wav")
350
 
 
 
351
  speaker_wav_path = None
352
  if speaker_wav_b64:
353
  speaker_bytes = base64.b64decode(speaker_wav_b64)
 
367
  stream_chunk_size=20
368
  ):
369
  yield (chunk * 32767).to(torch.int16).cpu().numpy().tobytes()
 
370
  except Exception as ge:
371
+ print(f"❌ [Stream Error]: {ge}")
372
  finally:
373
  if speaker_wav_path and os.path.exists(speaker_wav_path) and "default_speaker" not in speaker_wav_path:
374
  os.unlink(speaker_wav_path)
375
 
376
  return StreamingResponse(stream_generator(), media_type="audio/wav")
377
  except Exception as e:
 
378
  return {"error": str(e)}
379
 
380
  @app.get("/health")
 
400
  app = gr.mount_gradio_app(app, demo, path="/")
401
 
402
  if __name__ == "__main__":
 
403
  uvicorn.run(app, host="0.0.0.0", port=7860)