Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| import noisereduce as nr | |
| from scipy import signal | |
| from scipy.signal import butter, filtfilt | |
| import tempfile | |
| import os | |
| from typing import Tuple, Optional | |
| import io | |
| class AudioProcessor: | |
| """Advanced audio processing for voice cloning""" | |
| def __init__(self): | |
| self.target_sr = 22050 | |
| def preprocess_audio(self, audio: np.ndarray, sr: int) -> np.ndarray: | |
| """Comprehensive audio preprocessing""" | |
| # Resample to target sample rate | |
| if sr != self.target_sr: | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=self.target_sr) | |
| # Normalize amplitude | |
| audio = self.normalize_audio(audio) | |
| # Trim silence | |
| audio = self.trim_silence(audio) | |
| # Apply noise reduction | |
| audio = self.reduce_noise(audio) | |
| # Apply pre-emphasis filter | |
| audio = self.apply_preemphasis(audio) | |
| return audio | |
| def normalize_audio(self, audio: np.ndarray, target_db: float = -20.0) -> np.ndarray: | |
| """Normalize audio to target dB level""" | |
| # Calculate RMS | |
| rms = np.sqrt(np.mean(audio**2)) | |
| if rms > 0: | |
| # Convert target dB to linear scale | |
| target_rms = 10**(target_db / 20) | |
| # Apply normalization | |
| audio = audio * (target_rms / rms) | |
| # Prevent clipping | |
| max_val = np.max(np.abs(audio)) | |
| if max_val > 0.95: | |
| audio = audio * (0.95 / max_val) | |
| return audio | |
| def trim_silence(self, audio: np.ndarray, threshold_db: float = -40.0) -> np.ndarray: | |
| """Trim silence from beginning and end""" | |
| # Use librosa's trim function | |
| trimmed_audio, _ = librosa.effects.trim( | |
| audio, | |
| top_db=-threshold_db, | |
| frame_length=2048, | |
| hop_length=512 | |
| ) | |
| return trimmed_audio | |
| def reduce_noise(self, audio: np.ndarray) -> np.ndarray: | |
| """Apply noise reduction""" | |
| try: | |
| # Use noisereduce library | |
| reduced_noise = nr.reduce_noise(y=audio, sr=self.target_sr) | |
| return reduced_noise | |
| except: | |
| # Fallback: simple high-pass filter | |
| return self.apply_highpass_filter(audio, cutoff=80) | |
| def apply_preemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray: | |
| """Apply pre-emphasis filter""" | |
| return signal.lfilter([1, -coeff], [1], audio) | |
| def apply_deemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray: | |
| """Apply de-emphasis filter""" | |
| return signal.lfilter([1], [1, -coeff], audio) | |
| def apply_highpass_filter(self, audio: np.ndarray, cutoff: float = 80) -> np.ndarray: | |
| """Apply high-pass filter""" | |
| nyquist = self.target_sr * 0.5 | |
| normal_cutoff = cutoff / nyquist | |
| b, a = butter(5, normal_cutoff, btype='high', analog=False) | |
| return filtfilt(b, a, audio) | |
| def apply_lowpass_filter(self, audio: np.ndarray, cutoff: float = 8000) -> np.ndarray: | |
| """Apply low-pass filter""" | |
| nyquist = self.target_sr * 0.5 | |
| normal_cutoff = cutoff / nyquist | |
| b, a = butter(5, normal_cutoff, btype='low', analog=False) | |
| return filtfilt(b, a, audio) | |
| def apply_fade(self, audio: np.ndarray, fade_duration: float = 0.01) -> np.ndarray: | |
| """Apply fade in/out""" | |
| fade_samples = int(fade_duration * self.target_sr) | |
| if len(audio) > 2 * fade_samples: | |
| # Fade in | |
| fade_in = np.linspace(0, 1, fade_samples) | |
| audio[:fade_samples] *= fade_in | |
| # Fade out | |
| fade_out = np.linspace(1, 0, fade_samples) | |
| audio[-fade_samples:] *= fade_out | |
| return audio | |
| def enhance_audio(self, audio: np.ndarray) -> np.ndarray: | |
| """Enhance audio quality""" | |
| # Apply noise reduction | |
| enhanced = self.reduce_noise(audio) | |
| # Apply gentle compression | |
| enhanced = self.apply_compression(enhanced) | |
| # Apply EQ boost for clarity | |
| enhanced = self.apply_eq_boost(enhanced) | |
| # Final normalization | |
| enhanced = self.normalize_audio(enhanced) | |
| # Apply fade | |
| enhanced = self.apply_fade(enhanced) | |
| return enhanced | |
| def apply_compression(self, audio: np.ndarray, threshold: float = 0.5, ratio: float = 4.0) -> np.ndarray: | |
| """Apply dynamic range compression""" | |
| # Simple compression algorithm | |
| compressed = audio.copy() | |
| # Find samples above threshold | |
| above_threshold = np.abs(compressed) > threshold | |
| # Apply compression to samples above threshold | |
| compressed[above_threshold] = np.sign(compressed[above_threshold]) * ( | |
| threshold + (np.abs(compressed[above_threshold]) - threshold) / ratio | |
| ) | |
| return compressed | |
| def apply_eq_boost(self, audio: np.ndarray) -> np.ndarray: | |
| """Apply EQ boost for vocal clarity""" | |
| # Boost frequencies important for speech (1-4 kHz) | |
| # This is a simplified EQ - would use more sophisticated filtering in practice | |
| # High-pass filter to remove low frequency noise | |
| audio = self.apply_highpass_filter(audio, cutoff=85) | |
| # Gentle low-pass to prevent harsh highs | |
| audio = self.apply_lowpass_filter(audio, cutoff=7500) | |
| return audio | |
| def pitch_shift(self, audio: np.ndarray, semitones: float) -> np.ndarray: | |
| """Shift pitch by semitones""" | |
| return librosa.effects.pitch_shift(audio, sr=self.target_sr, n_steps=semitones) | |
| def time_stretch(self, audio: np.ndarray, rate: float) -> np.ndarray: | |
| """Change playback speed without affecting pitch""" | |
| return librosa.effects.time_stretch(audio, rate=rate) | |
| def detect_voice_activity(self, audio: np.ndarray, frame_duration: float = 0.025) -> np.ndarray: | |
| """Detect voice activity in audio""" | |
| frame_length = int(frame_duration * self.target_sr) | |
| hop_length = frame_length // 2 | |
| # Calculate energy for each frame | |
| energy = [] | |
| for i in range(0, len(audio) - frame_length + 1, hop_length): | |
| frame = audio[i:i + frame_length] | |
| frame_energy = np.sum(frame ** 2) | |
| energy.append(frame_energy) | |
| energy = np.array(energy) | |
| # Simple threshold-based VAD | |
| threshold = np.mean(energy) * 0.1 | |
| voice_activity = energy > threshold | |
| return voice_activity | |
| def audio_to_bytes(audio: np.ndarray, sample_rate: int) -> bytes: | |
| """Convert audio array to bytes for streaming""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
| sf.write(tmp_file.name, audio, sample_rate) | |
| with open(tmp_file.name, 'rb') as f: | |
| audio_bytes = f.read() | |
| # Clean up | |
| os.unlink(tmp_file.name) | |
| return audio_bytes | |
| def bytes_to_audio(audio_bytes: bytes) -> Tuple[np.ndarray, int]: | |
| """Convert bytes to audio array""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
| tmp_file.write(audio_bytes) | |
| tmp_file.flush() | |
| audio, sr = librosa.load(tmp_file.name, sr=None) | |
| # Clean up | |
| os.unlink(tmp_file.name) | |
| return audio, sr | |