frascuchon HF Staff commited on
Commit
f62bfdb
·
1 Parent(s): 2c6a090

fixing tools

Browse files
tools/audio_cleaning.py CHANGED
@@ -11,6 +11,9 @@ from scipy.signal import butter, lfilter, filtfilt
11
  def _load_audio(audio_path: str, mono: bool = False) -> tuple[np.ndarray, int]:
12
  """Load audio file with standard settings."""
13
  y, sr = librosa.load(audio_path, sr=None, mono=mono, res_type="soxr_vhq")
 
 
 
14
  return y, int(sr)
15
 
16
 
@@ -25,6 +28,20 @@ def detect_noise_profile(audio: np.ndarray, sample_rate: int) -> dict:
25
  Returns:
26
  Dictionary with noise profile information
27
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  # Compute spectral features for noise detection
29
  stft = librosa.stft(audio, n_fft=2048, hop_length=512)
30
  magnitude = np.abs(stft)
@@ -35,11 +52,11 @@ def detect_noise_profile(audio: np.ndarray, sample_rate: int) -> dict:
35
  # Detect steady noise (consistent low-frequency content)
36
  freqs = librosa.fft_frequencies(sr=sample_rate, n_fft=2048)
37
  low_freq_mask = freqs < 200 # Below 200 Hz
38
- steady_noise = np.mean(magnitude[:, low_freq_mask], axis=1)
39
 
40
  # Detect hiss (high frequency noise)
41
  high_freq_mask = freqs > 4000 # Above 4 kHz
42
- hiss_level = np.mean(magnitude[:, high_freq_mask], axis=1)
43
 
44
  # Compute overall noise characteristics
45
  signal_power = np.mean(magnitude**2, axis=1)
@@ -48,11 +65,12 @@ def detect_noise_profile(audio: np.ndarray, sample_rate: int) -> dict:
48
 
49
  return {
50
  "noise_floor": float(noise_floor),
51
- "steady_noise": float(steady_noise),
52
- "hiss_level": float(hiss_level),
53
- "snr_estimate": float(snr_estimate),
54
  "has_significant_noise": bool(
55
- steady_noise > noise_floor * 2 or hiss_level > noise_floor * 1.5
 
56
  ),
57
  }
58
 
@@ -71,6 +89,28 @@ def spectral_subtraction(
71
  Returns:
72
  Cleaned audio data
73
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
  # Compute STFT of audio
75
  stft = librosa.stft(audio, n_fft=2048, hop_length=512)
76
  magnitude = np.abs(stft)
@@ -85,7 +125,7 @@ def spectral_subtraction(
85
 
86
  # Reconstruct audio
87
  cleaned_stft = cleaned_magnitude * np.exp(1j * phase)
88
- cleaned_audio = librosa.istft(cleaned_stft, hop_length=512)
89
 
90
  return cleaned_audio
91
 
@@ -104,6 +144,24 @@ def adaptive_filter(
104
  Returns:
105
  Filtered audio data
106
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  if noise_type == "hiss":
108
  # High-pass filter to reduce hiss (above 4kHz)
109
  cutoff = 4000
@@ -197,7 +255,12 @@ def remove_noise(
197
  # High-pass filter for hiss removal
198
  cutoff = 4000 - sensitivity * 2000 # 2000-4000 Hz range
199
  b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba")
200
- filtered_audio = filtfilt(b, a, audio)
 
 
 
 
 
201
 
202
  elif noise_type == "hum":
203
  # Multiple notch filters for harmonics
@@ -217,13 +280,24 @@ def remove_noise(
217
  btype="bandstop",
218
  output="ba",
219
  )
220
- filtered_audio = filtfilt(b, a, filtered_audio)
 
 
 
 
 
 
221
 
222
  elif noise_type == "rumble":
223
  # High-pass filter for rumble removal
224
  cutoff = 20 + sensitivity * 80 # 20-100 Hz range
225
  b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba")
226
- filtered_audio = filtfilt(b, a, audio)
 
 
 
 
 
227
 
228
  else: # background or general
229
  # General noise reduction
@@ -233,9 +307,10 @@ def remove_noise(
233
  strength = 0.2 + sensitivity * 0.6
234
  filtered_audio = (1 - strength) * filtered_audio + strength * audio
235
 
236
- # Normalize output
 
237
  max_val = np.max(np.abs(filtered_audio))
238
- if max_val > 0:
239
  filtered_audio = filtered_audio / max_val * 0.95
240
 
241
  # Save output
@@ -244,13 +319,38 @@ def remove_noise(
244
  else:
245
  os.makedirs(output_path, exist_ok=True)
246
 
247
- # Generate output filename
 
 
 
248
  input_filename = os.path.splitext(os.path.basename(audio_path))[0]
249
- output_filename = f"{input_filename}_{noise_type}_removed.{output_format}"
 
 
250
  output_file = os.path.join(output_path, output_filename)
251
 
252
- # Save processed audio
253
- sf.write(output_file, filtered_audio.T, sample_rate)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
 
255
  return output_file
256
 
@@ -278,7 +378,7 @@ def remove_noise_wrapper(audio_path: str, noise_reduction_factor: float = 0.5) -
278
  if __name__ == "__main__":
279
  """
280
  Script section for running audio cleaning locally.
281
-
282
  Usage:
283
  python tools/audio_cleaning.py input.wav
284
  python tools/audio_cleaning.py input.wav --reduction 0.7
@@ -317,16 +417,16 @@ Examples:
317
  print()
318
 
319
  try:
320
- result = remove_noise_wrapper(
321
- audio_path=args.audio_path, noise_reduction_factor=args.reduction
 
 
 
 
322
  )
323
 
324
- if result.startswith("Error:"):
325
- print(f" {result}")
326
- sys.exit(1)
327
- else:
328
- print("✅ Audio cleaning completed!")
329
- print(f"Output saved to: {result}")
330
 
331
  except Exception as e:
332
  print(f"❌ Error: {e}")
 
11
  def _load_audio(audio_path: str, mono: bool = False) -> tuple[np.ndarray, int]:
12
  """Load audio file with standard settings."""
13
  y, sr = librosa.load(audio_path, sr=None, mono=mono, res_type="soxr_vhq")
14
+ # Ensure shape is (samples, channels) for stereo audio
15
+ if not mono and y.ndim > 1 and y.shape[0] == 2:
16
+ y = y.T
17
  return y, int(sr)
18
 
19
 
 
28
  Returns:
29
  Dictionary with noise profile information
30
  """
31
+ # Convert to mono for analysis if stereo
32
+ if audio.ndim > 1:
33
+ audio = np.mean(audio, axis=1)
34
+
35
+ # Ensure audio is long enough for STFT
36
+ if len(audio) < 2048:
37
+ return {
38
+ "noise_floor": 0.001,
39
+ "steady_noise": 0.001,
40
+ "hiss_level": 0.001,
41
+ "snr_estimate": 20.0,
42
+ "has_significant_noise": False,
43
+ }
44
+
45
  # Compute spectral features for noise detection
46
  stft = librosa.stft(audio, n_fft=2048, hop_length=512)
47
  magnitude = np.abs(stft)
 
52
  # Detect steady noise (consistent low-frequency content)
53
  freqs = librosa.fft_frequencies(sr=sample_rate, n_fft=2048)
54
  low_freq_mask = freqs < 200 # Below 200 Hz
55
+ steady_noise = np.mean(magnitude[low_freq_mask, :], axis=0)
56
 
57
  # Detect hiss (high frequency noise)
58
  high_freq_mask = freqs > 4000 # Above 4 kHz
59
+ hiss_level = np.mean(magnitude[high_freq_mask, :], axis=0)
60
 
61
  # Compute overall noise characteristics
62
  signal_power = np.mean(magnitude**2, axis=1)
 
65
 
66
  return {
67
  "noise_floor": float(noise_floor),
68
+ "steady_noise": float(np.mean(steady_noise)),
69
+ "hiss_level": float(np.mean(hiss_level)),
70
+ "snr_estimate": float(np.mean(snr_estimate)),
71
  "has_significant_noise": bool(
72
+ np.mean(steady_noise) > noise_floor * 2
73
+ or np.mean(hiss_level) > noise_floor * 1.5
74
  ),
75
  }
76
 
 
89
  Returns:
90
  Cleaned audio data
91
  """
92
+ # Handle stereo audio by processing each channel separately
93
+ if audio.ndim > 1:
94
+ cleaned_channels = []
95
+ for channel in range(audio.shape[1]):
96
+ channel_audio = audio[:, channel]
97
+ cleaned_channel = _process_channel_spectral_subtraction(
98
+ channel_audio, noise_profile, sample_rate
99
+ )
100
+ cleaned_channels.append(cleaned_channel)
101
+ return np.column_stack(cleaned_channels)
102
+ else:
103
+ return _process_channel_spectral_subtraction(audio, noise_profile, sample_rate)
104
+
105
+
106
+ def _process_channel_spectral_subtraction(
107
+ audio: np.ndarray, noise_profile: dict, sample_rate: int
108
+ ) -> np.ndarray:
109
+ """Process a single channel with spectral subtraction."""
110
+ # Ensure audio is long enough for STFT
111
+ if len(audio) < 2048:
112
+ return audio
113
+
114
  # Compute STFT of audio
115
  stft = librosa.stft(audio, n_fft=2048, hop_length=512)
116
  magnitude = np.abs(stft)
 
125
 
126
  # Reconstruct audio
127
  cleaned_stft = cleaned_magnitude * np.exp(1j * phase)
128
+ cleaned_audio = librosa.istft(cleaned_stft, hop_length=512, length=len(audio))
129
 
130
  return cleaned_audio
131
 
 
144
  Returns:
145
  Filtered audio data
146
  """
147
+ # Handle stereo audio by processing each channel separately
148
+ if audio.ndim > 1:
149
+ filtered_channels = []
150
+ for channel in range(audio.shape[1]):
151
+ channel_audio = audio[:, channel]
152
+ filtered_channel = _process_channel_adaptive_filter(
153
+ channel_audio, sample_rate, noise_type
154
+ )
155
+ filtered_channels.append(filtered_channel)
156
+ return np.column_stack(filtered_channels)
157
+ else:
158
+ return _process_channel_adaptive_filter(audio, sample_rate, noise_type)
159
+
160
+
161
+ def _process_channel_adaptive_filter(
162
+ audio: np.ndarray, sample_rate: int, noise_type: str = "general"
163
+ ) -> np.ndarray:
164
+ """Process a single channel with adaptive filtering."""
165
  if noise_type == "hiss":
166
  # High-pass filter to reduce hiss (above 4kHz)
167
  cutoff = 4000
 
255
  # High-pass filter for hiss removal
256
  cutoff = 4000 - sensitivity * 2000 # 2000-4000 Hz range
257
  b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba")
258
+ if audio.ndim > 1:
259
+ filtered_audio = np.zeros_like(audio)
260
+ for channel in range(audio.shape[1]):
261
+ filtered_audio[:, channel] = filtfilt(b, a, audio[:, channel])
262
+ else:
263
+ filtered_audio = filtfilt(b, a, audio)
264
 
265
  elif noise_type == "hum":
266
  # Multiple notch filters for harmonics
 
280
  btype="bandstop",
281
  output="ba",
282
  )
283
+ if filtered_audio.ndim > 1:
284
+ for channel in range(filtered_audio.shape[1]):
285
+ filtered_audio[:, channel] = filtfilt(
286
+ b, a, filtered_audio[:, channel]
287
+ )
288
+ else:
289
+ filtered_audio = filtfilt(b, a, filtered_audio)
290
 
291
  elif noise_type == "rumble":
292
  # High-pass filter for rumble removal
293
  cutoff = 20 + sensitivity * 80 # 20-100 Hz range
294
  b, a = butter(4, cutoff, fs=sample_rate, btype="high", output="ba")
295
+ if audio.ndim > 1:
296
+ filtered_audio = np.zeros_like(audio)
297
+ for channel in range(audio.shape[1]):
298
+ filtered_audio[:, channel] = filtfilt(b, a, audio[:, channel])
299
+ else:
300
+ filtered_audio = filtfilt(b, a, audio)
301
 
302
  else: # background or general
303
  # General noise reduction
 
307
  strength = 0.2 + sensitivity * 0.6
308
  filtered_audio = (1 - strength) * filtered_audio + strength * audio
309
 
310
+ # Skip normalization to preserve original dynamics and pitch
311
+ # Only normalize if clipping would occur
312
  max_val = np.max(np.abs(filtered_audio))
313
+ if max_val > 1.0:
314
  filtered_audio = filtered_audio / max_val * 0.95
315
 
316
  # Save output
 
319
  else:
320
  os.makedirs(output_path, exist_ok=True)
321
 
322
+ # Generate output filename with timestamp
323
+ from datetime import datetime
324
+
325
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
326
  input_filename = os.path.splitext(os.path.basename(audio_path))[0]
327
+ output_filename = (
328
+ f"{input_filename}_{noise_type}_removed_{timestamp}.{output_format}"
329
+ )
330
  output_file = os.path.join(output_path, output_filename)
331
 
332
+ # Save using librosa's output function (most reliable)
333
+ # librosa expects (samples, channels) format
334
+ audio_for_saving = filtered_audio
335
+
336
+ try:
337
+ # Use librosa to save - this should preserve pitch correctly
338
+ sf.write(output_file, audio_for_saving, sample_rate)
339
+ print("Successfully saved audio file using librosa/soundfile")
340
+
341
+ except Exception as e:
342
+ print(f"librosa/soundfile failed: {e}")
343
+
344
+ # Try with FLAC format as fallback
345
+ try:
346
+ flac_path = output_file.replace(".wav", ".flac")
347
+ sf.write(flac_path, audio_for_saving, sample_rate, format="FLAC")
348
+ print(f"Successfully saved as FLAC: {flac_path}")
349
+ return flac_path
350
+
351
+ except Exception as e2:
352
+ print(f"FLAC also failed: {e2}")
353
+ raise RuntimeError("Could not save audio file with any method")
354
 
355
  return output_file
356
 
 
378
  if __name__ == "__main__":
379
  """
380
  Script section for running audio cleaning locally.
381
+
382
  Usage:
383
  python tools/audio_cleaning.py input.wav
384
  python tools/audio_cleaning.py input.wav --reduction 0.7
 
417
  print()
418
 
419
  try:
420
+ result = remove_noise(
421
+ audio_path=args.audio_path,
422
+ noise_type="general",
423
+ sensitivity=args.reduction,
424
+ output_path=args.output or "output",
425
+ output_format="wav",
426
  )
427
 
428
+ print("✅ Audio cleaning completed!")
429
+ print(f"Output saved to: {result}")
 
 
 
 
430
 
431
  except Exception as e:
432
  print(f"❌ Error: {e}")
tools/audio_cutting.py CHANGED
@@ -6,7 +6,10 @@ import librosa
6
  import numpy as np
7
  import soundfile as sf
8
 
9
- from .audio_info import validate_audio_path
 
 
 
10
 
11
 
12
  def cut_audio(
 
6
  import numpy as np
7
  import soundfile as sf
8
 
9
+ try:
10
+ from .audio_info import validate_audio_path
11
+ except ImportError:
12
+ from audio_info import validate_audio_path
13
 
14
 
15
  def cut_audio(
tools/audio_insertion.py CHANGED
@@ -10,6 +10,9 @@ import soundfile as sf
10
  def _load_audio(audio_path: str, mono: bool = False) -> tuple[np.ndarray, int]:
11
  """Load audio file with standard settings."""
12
  y, sr = librosa.load(audio_path, sr=None, mono=mono, res_type="soxr_vhq")
 
 
 
13
  return y, int(sr)
14
 
15
 
@@ -55,11 +58,19 @@ def apply_crossfade(
55
  # Create crossfade envelope
56
  fade_in = np.linspace(0, 1, fade_samples)
57
  fade_out = np.linspace(1, 0, fade_samples)
58
- crossfade = fade_in * fade_out
 
 
 
 
 
59
 
60
  # Apply crossfade to section end
61
  section_end = section[-fade_samples:] if len(section) > fade_samples else section
62
- section_end[:fade_samples] *= crossfade
 
 
 
63
 
64
  # Insert section into target
65
  insert_sample = int(len(target) * 0.5) # Insert at middle
@@ -117,9 +128,20 @@ def insert_section(
117
 
118
  # Resample if needed
119
  if main_sr != section_sr:
120
- section_audio = librosa.resample(
121
- section_audio, orig_sr=section_sr, target_sr=main_sr
122
- )
 
 
 
 
 
 
 
 
 
 
 
123
 
124
  # Calculate timing
125
  main_duration = len(main_audio) / main_sr
@@ -159,7 +181,7 @@ def insert_section(
159
  output_file = os.path.join(output_path, output_filename)
160
 
161
  # Save final audio
162
- sf.write(output_file, final_audio.T, main_sr)
163
 
164
  return output_file
165
 
@@ -223,9 +245,22 @@ def insert_multiple_sections(
223
 
224
  # Resample if needed
225
  if section_sr != main_sr:
226
- section_audio = librosa.resample(
227
- section_audio, orig_sr=section_sr, target_sr=main_sr
228
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
229
 
230
  # Calculate crossfade points
231
  fade_start, fade_end = detect_crossfade_point(
@@ -259,7 +294,7 @@ def insert_multiple_sections(
259
  output_file = os.path.join(output_path, output_filename)
260
 
261
  # Save final audio
262
- sf.write(output_file, current_audio.T, main_sr)
263
 
264
  return output_file
265
 
@@ -327,9 +362,22 @@ def replace_section(
327
 
328
  # Resample replacement if needed
329
  if replacement_sr != main_sr:
330
- replacement_audio = librosa.resample(
331
- replacement_audio, orig_sr=replacement_sr, target_sr=main_sr
332
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
333
 
334
  # Trim replacement to specified duration
335
  replacement_duration = end_time - start_time
@@ -345,10 +393,14 @@ def replace_section(
345
 
346
  # Fade in replacement
347
  fade_in = np.linspace(0, 1, fade_samples)
 
 
348
  trimmed_replacement[:fade_samples] *= fade_in
349
 
350
  # Fade out at end of replacement
351
  fade_out = np.linspace(1, 0, fade_samples)
 
 
352
  trimmed_replacement[-fade_samples:] *= fade_out
353
 
354
  # Combine all parts
@@ -366,7 +418,7 @@ def replace_section(
366
  output_file = os.path.join(output_path, output_filename)
367
 
368
  # Save final audio
369
- sf.write(output_file, final_audio.T, main_sr)
370
 
371
  return output_file
372
 
 
10
  def _load_audio(audio_path: str, mono: bool = False) -> tuple[np.ndarray, int]:
11
  """Load audio file with standard settings."""
12
  y, sr = librosa.load(audio_path, sr=None, mono=mono, res_type="soxr_vhq")
13
+ # Ensure consistent (samples, channels) format
14
+ if not mono and y.ndim > 1 and y.shape[0] == 2:
15
+ y = y.T
16
  return y, int(sr)
17
 
18
 
 
58
  # Create crossfade envelope
59
  fade_in = np.linspace(0, 1, fade_samples)
60
  fade_out = np.linspace(1, 0, fade_samples)
61
+
62
+ # Handle stereo audio
63
+ if section.ndim > 1:
64
+ crossfade = np.outer(fade_in * fade_out, np.ones(section.shape[1]))
65
+ else:
66
+ crossfade = fade_in * fade_out
67
 
68
  # Apply crossfade to section end
69
  section_end = section[-fade_samples:] if len(section) > fade_samples else section
70
+ if section_end.ndim > 1:
71
+ section_end[:fade_samples] *= crossfade
72
+ else:
73
+ section_end[:fade_samples] *= crossfade
74
 
75
  # Insert section into target
76
  insert_sample = int(len(target) * 0.5) # Insert at middle
 
128
 
129
  # Resample if needed
130
  if main_sr != section_sr:
131
+ if section_audio.ndim > 1:
132
+ # Resample each channel separately
133
+ section_audio = np.array(
134
+ [
135
+ librosa.resample(
136
+ section_audio[:, ch], orig_sr=section_sr, target_sr=main_sr
137
+ )
138
+ for ch in range(section_audio.shape[1])
139
+ ]
140
+ ).T
141
+ else:
142
+ section_audio = librosa.resample(
143
+ section_audio, orig_sr=section_sr, target_sr=main_sr
144
+ )
145
 
146
  # Calculate timing
147
  main_duration = len(main_audio) / main_sr
 
181
  output_file = os.path.join(output_path, output_filename)
182
 
183
  # Save final audio
184
+ sf.write(output_file, final_audio, main_sr)
185
 
186
  return output_file
187
 
 
245
 
246
  # Resample if needed
247
  if section_sr != main_sr:
248
+ if section_audio.ndim > 1:
249
+ # Resample each channel separately
250
+ section_audio = np.array(
251
+ [
252
+ librosa.resample(
253
+ section_audio[:, ch],
254
+ orig_sr=section_sr,
255
+ target_sr=main_sr,
256
+ )
257
+ for ch in range(section_audio.shape[1])
258
+ ]
259
+ ).T
260
+ else:
261
+ section_audio = librosa.resample(
262
+ section_audio, orig_sr=section_sr, target_sr=main_sr
263
+ )
264
 
265
  # Calculate crossfade points
266
  fade_start, fade_end = detect_crossfade_point(
 
294
  output_file = os.path.join(output_path, output_filename)
295
 
296
  # Save final audio
297
+ sf.write(output_file, current_audio, main_sr)
298
 
299
  return output_file
300
 
 
362
 
363
  # Resample replacement if needed
364
  if replacement_sr != main_sr:
365
+ if replacement_audio.ndim > 1:
366
+ # Resample each channel separately
367
+ replacement_audio = np.array(
368
+ [
369
+ librosa.resample(
370
+ replacement_audio[:, ch],
371
+ orig_sr=replacement_sr,
372
+ target_sr=main_sr,
373
+ )
374
+ for ch in range(replacement_audio.shape[1])
375
+ ]
376
+ ).T
377
+ else:
378
+ replacement_audio = librosa.resample(
379
+ replacement_audio, orig_sr=replacement_sr, target_sr=main_sr
380
+ )
381
 
382
  # Trim replacement to specified duration
383
  replacement_duration = end_time - start_time
 
393
 
394
  # Fade in replacement
395
  fade_in = np.linspace(0, 1, fade_samples)
396
+ if trimmed_replacement.ndim > 1:
397
+ fade_in = np.outer(fade_in, np.ones(trimmed_replacement.shape[1]))
398
  trimmed_replacement[:fade_samples] *= fade_in
399
 
400
  # Fade out at end of replacement
401
  fade_out = np.linspace(1, 0, fade_samples)
402
+ if trimmed_replacement.ndim > 1:
403
+ fade_out = np.outer(fade_out, np.ones(trimmed_replacement.shape[1]))
404
  trimmed_replacement[-fade_samples:] *= fade_out
405
 
406
  # Combine all parts
 
418
  output_file = os.path.join(output_path, output_filename)
419
 
420
  # Save final audio
421
+ sf.write(output_file, final_audio, main_sr)
422
 
423
  return output_file
424
 
tools/stems_separation.py CHANGED
@@ -9,8 +9,51 @@ class Error(Exception):
9
  pass
10
 
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  def separate_audio(
13
- audio_path: str, output_path: Optional[str] = None, output_format: str = "wav"
 
 
 
 
 
 
14
  ) -> Tuple[str, str, str, str]:
15
  """
16
  Separate audio into vocals, drums, bass, and other stems using Demucs.
@@ -23,6 +66,10 @@ def separate_audio(
23
  audio_path: Path to the input audio file or URL (supports common formats: WAV, MP3, FLAC, M4A)
24
  output_path: Directory to save the separated stems (default: 'output' directory)
25
  output_format: Output format for separated stems ('wav' or 'mp3', default: 'wav')
 
 
 
 
26
 
27
  Returns:
28
  tuple[str, str, str, str]: Paths to the separated audio files in order:
@@ -38,7 +85,7 @@ def separate_audio(
38
  - Create instrumental versions by combining drums+bass+other
39
 
40
  Note:
41
- Uses the htdemucs model which is optimized for high-quality separation
42
  Processing time depends on audio length and system performance
43
  Output files are saved in WAV format for maximum quality
44
  """
@@ -50,7 +97,7 @@ def separate_audio(
50
  output_dir = os.path.join(output_path, "separated")
51
  os.makedirs(output_dir, exist_ok=True)
52
 
53
- # Run Demucs separation
54
  cmd = [
55
  "python",
56
  "-m",
@@ -58,63 +105,48 @@ def separate_audio(
58
  "--out",
59
  output_dir,
60
  "--name",
61
- "htdemucs",
62
- audio_path,
 
63
  ]
64
 
65
- result = subprocess.run(cmd, capture_output=True, text=True)
 
 
 
 
 
 
 
 
 
 
66
 
67
- if result.returncode != 0:
68
- raise Error(f"Demucs separation failed: {result.stderr}")
69
 
70
  # Find the separated files
71
  track_name = Path(audio_path).stem
72
- htdemucs_dir = os.path.join(output_dir, "htdemucs", track_name)
73
 
74
  # Original WAV files from Demucs
75
- vocals_wav = os.path.join(htdemucs_dir, "vocals.wav")
76
- drums_wav = os.path.join(htdemucs_dir, "drums.wav")
77
- bass_wav = os.path.join(htdemucs_dir, "bass.wav")
78
- other_wav = os.path.join(htdemucs_dir, "other.wav")
 
 
 
 
 
 
 
79
 
80
  # Verify all files exist
81
- for file_path in [vocals_wav, drums_wav, bass_wav, other_wav]:
82
  if not os.path.exists(file_path):
83
  raise Error(f"Separated file not found: {file_path}")
84
 
85
- # Convert to requested format if needed
86
- if output_format.lower() == "mp3":
87
- vocals_path = vocals_wav.replace(".wav", ".mp3")
88
- drums_path = drums_wav.replace(".wav", ".mp3")
89
- bass_path = bass_wav.replace(".wav", ".mp3")
90
- other_path = other_wav.replace(".wav", ".mp3")
91
-
92
- # Convert each stem to MP3
93
- for wav_file, mp3_file in [
94
- (vocals_wav, vocals_path),
95
- (drums_wav, drums_path),
96
- (bass_wav, bass_path),
97
- (other_wav, other_path),
98
- ]:
99
- cmd = [
100
- "ffmpeg",
101
- "-y",
102
- "-i",
103
- wav_file,
104
- "-c:a",
105
- "libmp3lame",
106
- "-b:a",
107
- "192k",
108
- mp3_file,
109
- ]
110
- subprocess.run(cmd, capture_output=True, check=True)
111
- else:
112
- # Use original WAV files
113
- vocals_path = vocals_wav
114
- drums_path = drums_wav
115
- bass_path = bass_wav
116
- other_path = other_wav
117
-
118
  return vocals_path, drums_path, bass_path, other_path
119
 
120
  except Exception as e:
@@ -186,7 +218,13 @@ def extract_selected_stems(
186
 
187
 
188
  def extract_vocal_non_vocal(
189
- audio_path: str, output_path: Optional[str] = None, output_format: str = "wav"
 
 
 
 
 
 
190
  ) -> Tuple[str, str]:
191
  """
192
  Extract vocals and non-vocals (instrumental) stems from an audio file.
@@ -198,7 +236,11 @@ def extract_vocal_non_vocal(
198
  Args:
199
  audio_path: Path to the input audio file or URL (supports common formats: WAV, MP3, FLAC, M4A)
200
  output_path: Directory to save the separated stems (default: 'output' directory)
 
201
  output_format: Output format for stems ('wav' or 'mp3', default: 'wav')
 
 
 
202
 
203
  Returns:
204
  tuple[str, str]: Paths to (vocals_file, non_vocals_file)
@@ -214,105 +256,59 @@ def extract_vocal_non_vocal(
214
  Uses the same high-quality Demucs model as separate_audio
215
  Non-vocals track is automatically mixed and normalized
216
  """
217
- # Extract all stems
218
- all_stems = separate_audio(audio_path, output_path, output_format)
219
- vocals_path, drums_path, bass_path, other_path = all_stems
220
 
221
- # Create non-vocals by combining drums, bass, and other
222
  try:
223
- # Load all non-vocal stems
224
- import librosa
225
- import numpy as np
226
- import soundfile as sf
227
-
228
- y_drums, sr_drums = librosa.load(drums_path, sr=None, mono=False)
229
- y_bass, sr_bass = librosa.load(bass_path, sr=None, mono=False)
230
- y_other, sr_other = librosa.load(other_path, sr=None, mono=False)
231
-
232
- # Ensure same sample rate
233
- target_sr = max(sr_drums, sr_bass, sr_other)
234
-
235
- if sr_drums != target_sr:
236
- y_drums = librosa.resample(y_drums, orig_sr=sr_drums, target_sr=target_sr)
237
- if sr_bass != target_sr:
238
- y_bass = librosa.resample(y_bass, orig_sr=sr_bass, target_sr=target_sr)
239
- if sr_other != target_sr:
240
- y_other = librosa.resample(y_other, orig_sr=sr_other, target_sr=target_sr)
241
-
242
- # Ensure same shape
243
- max_length = max(y_drums.shape[-1], y_bass.shape[-1], y_other.shape[-1])
244
-
245
- def pad_to_length(y, target_length):
246
- if y.shape[-1] < target_length:
247
- if y.ndim == 1:
248
- return np.pad(y, (0, target_length - y.shape[-1]), mode="constant")
249
- else:
250
- return np.pad(
251
- y, ((0, 0), (0, target_length - y.shape[-1])), mode="constant"
252
- )
253
- return y
254
-
255
- y_drums = pad_to_length(y_drums, max_length)
256
- y_bass = pad_to_length(y_bass, max_length)
257
- y_other = pad_to_length(y_other, max_length)
258
-
259
- # Combine non-vocal stems
260
- non_vocals = y_drums + y_bass + y_other
261
-
262
- # Normalize to prevent clipping
263
- max_val = np.max(np.abs(non_vocals))
264
- if max_val > 0:
265
- non_vocals = non_vocals / max_val * 0.95
266
-
267
- # Save non-vocals file
268
- if output_path:
269
- os.makedirs(output_path, exist_ok=True)
270
- non_vocals_filename = os.path.join(
271
- output_path, f"non_vocals.{output_format.lower()}"
272
- )
273
- else:
274
- non_vocals_filename = os.path.join(
275
- os.path.dirname(drums_path), f"non_vocals.{output_format.lower()}"
276
- )
277
 
278
- if non_vocals.ndim == 2:
279
- non_vocals = non_vocals.T
 
 
 
 
 
 
 
 
 
 
 
 
280
 
 
 
 
 
 
 
281
  if output_format.lower() == "mp3":
282
- # For MP3, save as WAV first then convert
283
- import tempfile
284
-
285
- with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav:
286
- sf.write(
287
- temp_wav.name, non_vocals, target_sr, format="wav", subtype="PCM_16"
288
- )
289
-
290
- # Convert to MP3 using ffmpeg
291
- cmd = [
292
- "ffmpeg",
293
- "-y",
294
- "-i",
295
- temp_wav.name,
296
- "-c:a",
297
- "libmp3lame",
298
- "-b:a",
299
- "192k",
300
- non_vocals_filename,
301
- ]
302
- subprocess.run(cmd, capture_output=True, check=True)
303
-
304
- # Clean up temp file
305
- os.unlink(temp_wav.name)
306
- else:
307
- sf.write(
308
- non_vocals_filename,
309
- non_vocals,
310
- target_sr,
311
- format="wav",
312
- subtype="PCM_16",
313
- )
314
 
315
- return vocals_path, non_vocals_filename
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
316
 
317
  except Exception as e:
318
  raise RuntimeError(f"Error creating non-vocals track: {str(e)}")
@@ -370,6 +366,26 @@ if __name__ == "__main__":
370
  choices=["wav", "mp3"],
371
  help="Output format (default: wav)",
372
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
373
 
374
  # New selective stems command
375
  select_parser = subparsers.add_parser("select", help="Extract specific stems only")
@@ -429,7 +445,13 @@ if __name__ == "__main__":
429
  try:
430
  if args.command == "separate":
431
  vocals, drums, bass, other = separate_audio(
432
- args.audio_path, args.output_dir, args.format
 
 
 
 
 
 
433
  )
434
  print(f"Vocals: {vocals}")
435
  print(f"Drums: {drums}")
 
9
  pass
10
 
11
 
12
+ def run_command_with_streaming(cmd, description="Processing"):
13
+ """Run command with real-time output streaming"""
14
+
15
+ print(f"🎵 {description}...")
16
+ print(f"Command: {' '.join(str(c) for c in cmd)}")
17
+ print("━" * 60)
18
+
19
+ process = subprocess.Popen(
20
+ cmd,
21
+ stdout=subprocess.PIPE,
22
+ stderr=subprocess.STDOUT,
23
+ text=True,
24
+ universal_newlines=True,
25
+ )
26
+
27
+ # Stream output in real-time
28
+ return_code = None
29
+ while return_code is None:
30
+ if process.stdout:
31
+ line = process.stdout.readline()
32
+ if line:
33
+ print(line.strip())
34
+
35
+ return_code = process.poll()
36
+
37
+ if return_code != 0:
38
+ error_output = process.stderr.read() if process.stderr else ""
39
+ raise RuntimeError(
40
+ f"{description} failed (code {return_code}):\n{error_output}"
41
+ )
42
+
43
+ print("━" * 60)
44
+ print(f"✅ {description} completed successfully!")
45
+
46
+ return return_code
47
+
48
+
49
  def separate_audio(
50
+ audio_path: str,
51
+ output_path: Optional[str] = None,
52
+ output_format: str = "wav",
53
+ model: str = "hdemucs_mmi",
54
+ device: Optional[str] = None,
55
+ segment: Optional[int] = None,
56
+ jobs: int = 1,
57
  ) -> Tuple[str, str, str, str]:
58
  """
59
  Separate audio into vocals, drums, bass, and other stems using Demucs.
 
66
  audio_path: Path to the input audio file or URL (supports common formats: WAV, MP3, FLAC, M4A)
67
  output_path: Directory to save the separated stems (default: 'output' directory)
68
  output_format: Output format for separated stems ('wav' or 'mp3', default: 'wav')
69
+ model: Demucs model to use (default: 'hdemucs_mmi')
70
+ device: Device to use for processing (default: cuda if available else cpu)
71
+ segment: Set split size of each chunk to save memory (default: None)
72
+ jobs: Number of parallel jobs (default: 1)
73
 
74
  Returns:
75
  tuple[str, str, str, str]: Paths to the separated audio files in order:
 
85
  - Create instrumental versions by combining drums+bass+other
86
 
87
  Note:
88
+ Uses the hdemucs_mmi model which is optimized for high-quality separation
89
  Processing time depends on audio length and system performance
90
  Output files are saved in WAV format for maximum quality
91
  """
 
97
  output_dir = os.path.join(output_path, "separated")
98
  os.makedirs(output_dir, exist_ok=True)
99
 
100
+ # Build Demucs separation command with all parameters
101
  cmd = [
102
  "python",
103
  "-m",
 
105
  "--out",
106
  output_dir,
107
  "--name",
108
+ model,
109
+ "--jobs",
110
+ str(jobs),
111
  ]
112
 
113
+ # Add optional parameters if provided
114
+ if device:
115
+ cmd.extend(["--device", device])
116
+ if segment:
117
+ cmd.extend(["--segment", str(segment)])
118
+
119
+ # Add MP3 output if requested
120
+ if output_format.lower() == "mp3":
121
+ cmd.extend(["--mp3", "--mp3-bitrate", "192"])
122
+
123
+ cmd.append(audio_path)
124
 
125
+ # Run Demucs separation with real-time output
126
+ run_command_with_streaming(cmd, "Demucs stem separation")
127
 
128
  # Find the separated files
129
  track_name = Path(audio_path).stem
130
+ model_dir = os.path.join(output_dir, model, track_name)
131
 
132
  # Original WAV files from Demucs
133
+ vocals_path = os.path.join(model_dir, "vocals.wav")
134
+ drums_path = os.path.join(model_dir, "drums.wav")
135
+ bass_path = os.path.join(model_dir, "bass.wav")
136
+ other_path = os.path.join(model_dir, "other.wav")
137
+
138
+ # If MP3 output is requested, set the proper file names
139
+ if output_format.lower() == "mp3":
140
+ vocals_path = vocals_path.replace(".wav", ".mp3")
141
+ drums_path = drums_path.replace(".wav", ".mp3")
142
+ bass_path = bass_path.replace(".wav", ".mp3")
143
+ other_path = other_path.replace(".wav", ".mp3")
144
 
145
  # Verify all files exist
146
+ for file_path in [vocals_path, drums_path, bass_path, other_path]:
147
  if not os.path.exists(file_path):
148
  raise Error(f"Separated file not found: {file_path}")
149
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
  return vocals_path, drums_path, bass_path, other_path
151
 
152
  except Exception as e:
 
218
 
219
 
220
  def extract_vocal_non_vocal(
221
+ audio_path: str,
222
+ output_path: str = "output",
223
+ model: str = "hdemucs_mmi",
224
+ output_format: str = "wav",
225
+ device: Optional[str] = None,
226
+ segment: Optional[int] = None,
227
+ jobs: int = 1,
228
  ) -> Tuple[str, str]:
229
  """
230
  Extract vocals and non-vocals (instrumental) stems from an audio file.
 
236
  Args:
237
  audio_path: Path to the input audio file or URL (supports common formats: WAV, MP3, FLAC, M4A)
238
  output_path: Directory to save the separated stems (default: 'output' directory)
239
+ model: Demucs model to use (default: 'hdemucs_mmi')
240
  output_format: Output format for stems ('wav' or 'mp3', default: 'wav')
241
+ device: Device to use for processing (default: cuda if available else cpu)
242
+ segment: Set split size of each chunk to save memory (default: None)
243
+ jobs: Number of parallel jobs (default: 1)
244
 
245
  Returns:
246
  tuple[str, str]: Paths to (vocals_file, non_vocals_file)
 
256
  Uses the same high-quality Demucs model as separate_audio
257
  Non-vocals track is automatically mixed and normalized
258
  """
 
 
 
259
 
 
260
  try:
261
+ output_dir = os.path.join(output_path, "separated")
262
+ os.makedirs(output_dir, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
+ # Build Demucs separation command with all parameters
265
+ cmd = [
266
+ "python",
267
+ "-m",
268
+ "demucs.separate",
269
+ "--out",
270
+ output_dir,
271
+ "--name",
272
+ model,
273
+ "--jobs",
274
+ str(jobs),
275
+ "--two-stems",
276
+ "vocals",
277
+ ]
278
 
279
+ # Add optional parameters if provided
280
+ if device:
281
+ cmd.extend(["--device", device])
282
+ if segment:
283
+ cmd.extend(["--segment", str(segment)])
284
+ # Add MP3 output if requested
285
  if output_format.lower() == "mp3":
286
+ cmd.extend(["--mp3", "--mp3-bitrate", "192"])
287
+
288
+ cmd.append(audio_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
 
290
+ # Run Demucs separation with real-time output
291
+ run_command_with_streaming(cmd, "Demucs stem separation")
292
+
293
+ # Find the separated files
294
+ track_name = Path(audio_path).stem
295
+ model_dir = os.path.join(output_dir, model, track_name)
296
+
297
+ # Original WAV files from Demucs
298
+ vocals_path = os.path.join(model_dir, "vocals.wav")
299
+ non_vocals_path = os.path.join(model_dir, "no_vocals.wav")
300
+
301
+ # If MP3 output is requested, set the proper file names
302
+ if output_format.lower() == "mp3":
303
+ vocals_path = vocals_path.replace(".wav", ".mp3")
304
+ non_vocals_path = non_vocals_path.replace(".wav", ".mp3")
305
+
306
+ # Verify all files exist
307
+ for file_path in [vocals_path, non_vocals_path]:
308
+ if not os.path.exists(file_path):
309
+ raise Error(f"Separated file not found: {file_path}")
310
+
311
+ return vocals_path, non_vocals_path
312
 
313
  except Exception as e:
314
  raise RuntimeError(f"Error creating non-vocals track: {str(e)}")
 
366
  choices=["wav", "mp3"],
367
  help="Output format (default: wav)",
368
  )
369
+ separate_parser.add_argument(
370
+ "--model",
371
+ default="htdemucs",
372
+ help="Demucs model to use (default: htdemucs)",
373
+ )
374
+ separate_parser.add_argument(
375
+ "--device",
376
+ help="Device to use for processing (default: cuda if available else cpu)",
377
+ )
378
+ separate_parser.add_argument(
379
+ "--segment",
380
+ type=float,
381
+ help="Set split size of each chunk to save memory",
382
+ )
383
+ separate_parser.add_argument(
384
+ "--jobs",
385
+ type=int,
386
+ default=1,
387
+ help="Number of parallel jobs (default: 1)",
388
+ )
389
 
390
  # New selective stems command
391
  select_parser = subparsers.add_parser("select", help="Extract specific stems only")
 
445
  try:
446
  if args.command == "separate":
447
  vocals, drums, bass, other = separate_audio(
448
+ args.audio_path,
449
+ args.output_dir,
450
+ args.format,
451
+ args.model,
452
+ args.device,
453
+ args.segment,
454
+ args.jobs,
455
  )
456
  print(f"Vocals: {vocals}")
457
  print(f"Drums: {drums}")
tools/voice_replacement.py CHANGED
@@ -1,4 +1,7 @@
 
 
1
  import ssl
 
2
  import tempfile
3
  import urllib.request
4
  from datetime import datetime
@@ -256,7 +259,7 @@ def replace_voice(
256
  if len(result) > 1:
257
  item = result[1]
258
 
259
- if url:= item.get("url"):
260
  # Download each URL to a separate file
261
  item_output = str(output_path)
262
  download_audio_from_url(url, item_output)
@@ -381,9 +384,6 @@ if __name__ == "__main__":
381
  python tools/voice_replacement.py https://example.com/source.wav target.wav
382
  python tools/voice_replacement.py source.wav https://example.com/target.mp3 --pitch 2
383
  """
384
- import argparse
385
- import sys
386
- import os
387
 
388
  # Add parent directory to path for imports
389
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
1
+ import argparse
2
+ import os
3
  import ssl
4
+ import sys
5
  import tempfile
6
  import urllib.request
7
  from datetime import datetime
 
259
  if len(result) > 1:
260
  item = result[1]
261
 
262
+ if url := item.get("url"):
263
  # Download each URL to a separate file
264
  item_output = str(output_path)
265
  download_audio_from_url(url, item_output)
 
384
  python tools/voice_replacement.py https://example.com/source.wav target.wav
385
  python tools/voice_replacement.py source.wav https://example.com/target.mp3 --pitch 2
386
  """
 
 
 
387
 
388
  # Add parent directory to path for imports
389
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))