Spaces:
Runtime error
Runtime error
| """ | |
| Utility Functions | |
| Helper functions for audio processing, visualization, and optimization | |
| """ | |
| import numpy as np | |
| import librosa | |
| import matplotlib.pyplot as plt | |
| import soundfile as sf | |
| from pathlib import Path | |
| from typing import Union, Tuple, Optional | |
| import torch | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| def normalize_audio( | |
| audio: np.ndarray, | |
| target_level: float = -20.0 | |
| ) -> np.ndarray: | |
| """ | |
| Normalize audio to target dB level | |
| Args: | |
| audio: Audio array | |
| target_level: Target level in dB (default: -20 dB) | |
| Returns: | |
| Normalized audio | |
| """ | |
| # Calculate current RMS level | |
| rms = np.sqrt(np.mean(audio ** 2)) | |
| current_level = 20 * np.log10(rms + 1e-8) | |
| # Calculate gain needed | |
| gain_db = target_level - current_level | |
| gain_linear = 10 ** (gain_db / 20) | |
| # Apply gain | |
| normalized = audio * gain_linear | |
| # Prevent clipping | |
| normalized = np.clip(normalized, -1.0, 1.0) | |
| return normalized | |
| def trim_silence( | |
| audio: np.ndarray, | |
| sr: int, | |
| top_db: int = 30, | |
| frame_length: int = 2048, | |
| hop_length: int = 512 | |
| ) -> np.ndarray: | |
| """ | |
| Trim silence from beginning and end of audio | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| top_db: Threshold in dB below reference to consider as silence | |
| frame_length: Frame length for analysis | |
| hop_length: Hop length for analysis | |
| Returns: | |
| Trimmed audio | |
| """ | |
| trimmed, _ = librosa.effects.trim( | |
| audio, | |
| top_db=top_db, | |
| frame_length=frame_length, | |
| hop_length=hop_length | |
| ) | |
| return trimmed | |
| def split_audio_by_silence( | |
| audio: np.ndarray, | |
| sr: int, | |
| min_silence_len: float = 0.5, | |
| silence_thresh: int = -40, | |
| keep_silence: float = 0.1 | |
| ) -> list: | |
| """ | |
| Split audio into segments based on silence | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| min_silence_len: Minimum silence length in seconds | |
| silence_thresh: Silence threshold in dB | |
| keep_silence: Amount of silence to keep at edges (seconds) | |
| Returns: | |
| List of audio segments | |
| """ | |
| # Convert parameters to samples | |
| min_silence_samples = int(min_silence_len * sr) | |
| keep_silence_samples = int(keep_silence * sr) | |
| # Compute energy | |
| energy = librosa.feature.rms(y=audio, frame_length=2048, hop_length=512)[0] | |
| energy_db = librosa.amplitude_to_db(energy, ref=np.max) | |
| # Find silent regions | |
| silent = energy_db < silence_thresh | |
| # Find segment boundaries | |
| segments = [] | |
| start = 0 | |
| in_silence = False | |
| silence_start = 0 | |
| for i, is_silent in enumerate(silent): | |
| if is_silent and not in_silence: | |
| # Start of silence | |
| silence_start = i | |
| in_silence = True | |
| elif not is_silent and in_silence: | |
| # End of silence | |
| silence_len = i - silence_start | |
| if silence_len >= min_silence_samples // 512: # Account for hop length | |
| # Split here | |
| end = max(0, silence_start * 512 - keep_silence_samples) | |
| if end > start: | |
| segments.append(audio[start:end]) | |
| start = min(len(audio), i * 512 + keep_silence_samples) | |
| in_silence = False | |
| # Add final segment | |
| if start < len(audio): | |
| segments.append(audio[start:]) | |
| return segments if segments else [audio] | |
| def resample_audio( | |
| audio: np.ndarray, | |
| orig_sr: int, | |
| target_sr: int | |
| ) -> np.ndarray: | |
| """ | |
| Resample audio to target sample rate | |
| Args: | |
| audio: Audio array | |
| orig_sr: Original sample rate | |
| target_sr: Target sample rate | |
| Returns: | |
| Resampled audio | |
| """ | |
| if orig_sr == target_sr: | |
| return audio | |
| resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| return resampled | |
| def plot_waveform( | |
| audio: np.ndarray, | |
| sr: int, | |
| title: str = "Waveform", | |
| figsize: Tuple[int, int] = (12, 4) | |
| ) -> plt.Figure: | |
| """ | |
| Plot audio waveform | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| title: Plot title | |
| figsize: Figure size | |
| Returns: | |
| Matplotlib figure | |
| """ | |
| fig, ax = plt.subplots(figsize=figsize) | |
| time = np.arange(len(audio)) / sr | |
| ax.plot(time, audio, linewidth=0.5) | |
| ax.set_xlabel("Time (s)") | |
| ax.set_ylabel("Amplitude") | |
| ax.set_title(title) | |
| ax.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| return fig | |
| def plot_spectrogram( | |
| audio: np.ndarray, | |
| sr: int, | |
| title: str = "Spectrogram", | |
| figsize: Tuple[int, int] = (12, 6) | |
| ) -> plt.Figure: | |
| """ | |
| Plot audio spectrogram | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| title: Plot title | |
| figsize: Figure size | |
| Returns: | |
| Matplotlib figure | |
| """ | |
| fig, ax = plt.subplots(figsize=figsize) | |
| # Compute spectrogram | |
| D = librosa.amplitude_to_db( | |
| np.abs(librosa.stft(audio)), | |
| ref=np.max | |
| ) | |
| # Plot | |
| img = librosa.display.specshow( | |
| D, | |
| sr=sr, | |
| x_axis='time', | |
| y_axis='hz', | |
| ax=ax, | |
| cmap='viridis' | |
| ) | |
| ax.set_title(title) | |
| fig.colorbar(img, ax=ax, format='%+2.0f dB') | |
| plt.tight_layout() | |
| return fig | |
| def plot_mel_spectrogram( | |
| audio: np.ndarray, | |
| sr: int, | |
| n_mels: int = 80, | |
| title: str = "Mel Spectrogram", | |
| figsize: Tuple[int, int] = (12, 6) | |
| ) -> plt.Figure: | |
| """ | |
| Plot mel spectrogram | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| n_mels: Number of mel bands | |
| title: Plot title | |
| figsize: Figure size | |
| Returns: | |
| Matplotlib figure | |
| """ | |
| fig, ax = plt.subplots(figsize=figsize) | |
| # Compute mel spectrogram | |
| mel_spec = librosa.feature.melspectrogram( | |
| y=audio, | |
| sr=sr, | |
| n_mels=n_mels | |
| ) | |
| mel_spec_db = librosa.amplitude_to_db(mel_spec, ref=np.max) | |
| # Plot | |
| img = librosa.display.specshow( | |
| mel_spec_db, | |
| sr=sr, | |
| x_axis='time', | |
| y_axis='mel', | |
| ax=ax, | |
| cmap='viridis' | |
| ) | |
| ax.set_title(title) | |
| fig.colorbar(img, ax=ax, format='%+2.0f dB') | |
| plt.tight_layout() | |
| return fig | |
| def compute_audio_metrics( | |
| audio: np.ndarray, | |
| sr: int | |
| ) -> dict: | |
| """ | |
| Compute comprehensive audio metrics | |
| Args: | |
| audio: Audio array | |
| sr: Sample rate | |
| Returns: | |
| Dict of audio metrics | |
| """ | |
| metrics = {} | |
| # Duration | |
| metrics["duration_seconds"] = len(audio) / sr | |
| # RMS Energy | |
| rms = np.sqrt(np.mean(audio ** 2)) | |
| metrics["rms_energy"] = float(rms) | |
| metrics["rms_db"] = float(20 * np.log10(rms + 1e-8)) | |
| # Peak amplitude | |
| metrics["peak_amplitude"] = float(np.max(np.abs(audio))) | |
| # Dynamic range | |
| metrics["dynamic_range_db"] = float( | |
| 20 * np.log10((np.max(np.abs(audio)) + 1e-8) / (np.mean(np.abs(audio)) + 1e-8)) | |
| ) | |
| # Zero crossing rate | |
| zcr = librosa.feature.zero_crossing_rate(audio) | |
| metrics["zero_crossing_rate"] = float(np.mean(zcr)) | |
| # Spectral features | |
| spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sr) | |
| metrics["spectral_centroid_hz"] = float(np.mean(spectral_centroid)) | |
| spectral_bandwidth = librosa.feature.spectral_bandwidth(y=audio, sr=sr) | |
| metrics["spectral_bandwidth_hz"] = float(np.mean(spectral_bandwidth)) | |
| spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr) | |
| metrics["spectral_rolloff_hz"] = float(np.mean(spectral_rolloff)) | |
| # Clipping detection | |
| clipping_ratio = np.sum(np.abs(audio) > 0.99) / len(audio) | |
| metrics["clipping_ratio"] = float(clipping_ratio) | |
| metrics["is_clipped"] = clipping_ratio > 0.01 | |
| return metrics | |
| def get_gpu_memory_info() -> dict: | |
| """ | |
| Get GPU memory information | |
| Returns: | |
| Dict with GPU memory stats | |
| """ | |
| if not torch.cuda.is_available(): | |
| return {"available": False} | |
| info = { | |
| "available": True, | |
| "device_name": torch.cuda.get_device_name(0), | |
| "total_gb": torch.cuda.get_device_properties(0).total_memory / 1e9, | |
| "allocated_gb": torch.cuda.memory_allocated(0) / 1e9, | |
| "reserved_gb": torch.cuda.memory_reserved(0) / 1e9, | |
| "free_gb": (torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)) / 1e9 | |
| } | |
| return info | |
| def optimize_for_inference(model: torch.nn.Module) -> torch.nn.Module: | |
| """ | |
| Optimize model for inference | |
| Args: | |
| model: PyTorch model | |
| Returns: | |
| Optimized model | |
| """ | |
| model.eval() | |
| # Disable gradient computation | |
| for param in model.parameters(): | |
| param.requires_grad = False | |
| # Try to compile (PyTorch 2.0+) | |
| try: | |
| if hasattr(torch, 'compile'): | |
| model = torch.compile(model, mode='reduce-overhead') | |
| print("✓ Model compiled with torch.compile") | |
| except Exception as e: | |
| print(f"⚠️ Could not compile model: {e}") | |
| return model | |
| def save_audio_with_metadata( | |
| audio: np.ndarray, | |
| output_path: Union[str, Path], | |
| sr: int, | |
| metadata: Optional[dict] = None | |
| ): | |
| """ | |
| Save audio with metadata | |
| Args: | |
| audio: Audio array | |
| output_path: Output file path | |
| sr: Sample rate | |
| metadata: Optional metadata dict | |
| """ | |
| output_path = Path(output_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Save audio | |
| sf.write(str(output_path), audio, sr) | |
| # Save metadata if provided | |
| if metadata: | |
| metadata_path = output_path.with_suffix('.json') | |
| import json | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| def benchmark_inference( | |
| func, | |
| *args, | |
| n_runs: int = 10, | |
| warmup: int = 2, | |
| **kwargs | |
| ) -> dict: | |
| """ | |
| Benchmark inference speed | |
| Args: | |
| func: Function to benchmark | |
| *args: Function arguments | |
| n_runs: Number of runs | |
| warmup: Number of warmup runs | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| Dict with benchmark results | |
| """ | |
| import time | |
| # Warmup | |
| for _ in range(warmup): | |
| func(*args, **kwargs) | |
| # Benchmark | |
| times = [] | |
| for _ in range(n_runs): | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| start = time.time() | |
| func(*args, **kwargs) | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| end = time.time() | |
| times.append(end - start) | |
| results = { | |
| "mean_time": np.mean(times), | |
| "std_time": np.std(times), | |
| "min_time": np.min(times), | |
| "max_time": np.max(times), | |
| "n_runs": n_runs | |
| } | |
| return results | |
| def main(): | |
| """Demo utility functions""" | |
| print("=" * 60) | |
| print("Utility Functions Demo") | |
| print("=" * 60) | |
| print("\n📦 Available utilities:") | |
| print(" - Audio normalization") | |
| print(" - Silence trimming and splitting") | |
| print(" - Resampling") | |
| print(" - Waveform and spectrogram plotting") | |
| print(" - Audio metrics computation") | |
| print(" - GPU memory monitoring") | |
| print(" - Inference optimization") | |
| print(" - Benchmarking") | |
| # Show GPU info | |
| gpu_info = get_gpu_memory_info() | |
| if gpu_info["available"]: | |
| print(f"\n🎮 GPU Information:") | |
| print(f" Device: {gpu_info['device_name']}") | |
| print(f" Total: {gpu_info['total_gb']:.2f} GB") | |
| print(f" Free: {gpu_info['free_gb']:.2f} GB") | |
| else: | |
| print("\n⚠️ No GPU available") | |
| print("\n" + "=" * 60) | |
| if __name__ == "__main__": | |
| main() | |