import numpy as np import librosa import soundfile as sf import noisereduce as nr from scipy import signal from scipy.signal import butter, filtfilt import tempfile import os from typing import Tuple, Optional import io class AudioProcessor: """Advanced audio processing for voice cloning""" def __init__(self): self.target_sr = 22050 def preprocess_audio(self, audio: np.ndarray, sr: int) -> np.ndarray: """Comprehensive audio preprocessing""" # Resample to target sample rate if sr != self.target_sr: audio = librosa.resample(audio, orig_sr=sr, target_sr=self.target_sr) # Normalize amplitude audio = self.normalize_audio(audio) # Trim silence audio = self.trim_silence(audio) # Apply noise reduction audio = self.reduce_noise(audio) # Apply pre-emphasis filter audio = self.apply_preemphasis(audio) return audio def normalize_audio(self, audio: np.ndarray, target_db: float = -20.0) -> np.ndarray: """Normalize audio to target dB level""" # Calculate RMS rms = np.sqrt(np.mean(audio**2)) if rms > 0: # Convert target dB to linear scale target_rms = 10**(target_db / 20) # Apply normalization audio = audio * (target_rms / rms) # Prevent clipping max_val = np.max(np.abs(audio)) if max_val > 0.95: audio = audio * (0.95 / max_val) return audio def trim_silence(self, audio: np.ndarray, threshold_db: float = -40.0) -> np.ndarray: """Trim silence from beginning and end""" # Use librosa's trim function trimmed_audio, _ = librosa.effects.trim( audio, top_db=-threshold_db, frame_length=2048, hop_length=512 ) return trimmed_audio def reduce_noise(self, audio: np.ndarray) -> np.ndarray: """Apply noise reduction""" try: # Use noisereduce library reduced_noise = nr.reduce_noise(y=audio, sr=self.target_sr) return reduced_noise except: # Fallback: simple high-pass filter return self.apply_highpass_filter(audio, cutoff=80) def apply_preemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray: """Apply pre-emphasis filter""" return signal.lfilter([1, -coeff], [1], audio) def apply_deemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray: """Apply de-emphasis filter""" return signal.lfilter([1], [1, -coeff], audio) def apply_highpass_filter(self, audio: np.ndarray, cutoff: float = 80) -> np.ndarray: """Apply high-pass filter""" nyquist = self.target_sr * 0.5 normal_cutoff = cutoff / nyquist b, a = butter(5, normal_cutoff, btype='high', analog=False) return filtfilt(b, a, audio) def apply_lowpass_filter(self, audio: np.ndarray, cutoff: float = 8000) -> np.ndarray: """Apply low-pass filter""" nyquist = self.target_sr * 0.5 normal_cutoff = cutoff / nyquist b, a = butter(5, normal_cutoff, btype='low', analog=False) return filtfilt(b, a, audio) def apply_fade(self, audio: np.ndarray, fade_duration: float = 0.01) -> np.ndarray: """Apply fade in/out""" fade_samples = int(fade_duration * self.target_sr) if len(audio) > 2 * fade_samples: # Fade in fade_in = np.linspace(0, 1, fade_samples) audio[:fade_samples] *= fade_in # Fade out fade_out = np.linspace(1, 0, fade_samples) audio[-fade_samples:] *= fade_out return audio def enhance_audio(self, audio: np.ndarray) -> np.ndarray: """Enhance audio quality""" # Apply noise reduction enhanced = self.reduce_noise(audio) # Apply gentle compression enhanced = self.apply_compression(enhanced) # Apply EQ boost for clarity enhanced = self.apply_eq_boost(enhanced) # Final normalization enhanced = self.normalize_audio(enhanced) # Apply fade enhanced = self.apply_fade(enhanced) return enhanced def apply_compression(self, audio: np.ndarray, threshold: float = 0.5, ratio: float = 4.0) -> np.ndarray: """Apply dynamic range compression""" # Simple compression algorithm compressed = audio.copy() # Find samples above threshold above_threshold = np.abs(compressed) > threshold # Apply compression to samples above threshold compressed[above_threshold] = np.sign(compressed[above_threshold]) * ( threshold + (np.abs(compressed[above_threshold]) - threshold) / ratio ) return compressed def apply_eq_boost(self, audio: np.ndarray) -> np.ndarray: """Apply EQ boost for vocal clarity""" # Boost frequencies important for speech (1-4 kHz) # This is a simplified EQ - would use more sophisticated filtering in practice # High-pass filter to remove low frequency noise audio = self.apply_highpass_filter(audio, cutoff=85) # Gentle low-pass to prevent harsh highs audio = self.apply_lowpass_filter(audio, cutoff=7500) return audio def pitch_shift(self, audio: np.ndarray, semitones: float) -> np.ndarray: """Shift pitch by semitones""" return librosa.effects.pitch_shift(audio, sr=self.target_sr, n_steps=semitones) def time_stretch(self, audio: np.ndarray, rate: float) -> np.ndarray: """Change playback speed without affecting pitch""" return librosa.effects.time_stretch(audio, rate=rate) def detect_voice_activity(self, audio: np.ndarray, frame_duration: float = 0.025) -> np.ndarray: """Detect voice activity in audio""" frame_length = int(frame_duration * self.target_sr) hop_length = frame_length // 2 # Calculate energy for each frame energy = [] for i in range(0, len(audio) - frame_length + 1, hop_length): frame = audio[i:i + frame_length] frame_energy = np.sum(frame ** 2) energy.append(frame_energy) energy = np.array(energy) # Simple threshold-based VAD threshold = np.mean(energy) * 0.1 voice_activity = energy > threshold return voice_activity @staticmethod def audio_to_bytes(audio: np.ndarray, sample_rate: int) -> bytes: """Convert audio array to bytes for streaming""" with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: sf.write(tmp_file.name, audio, sample_rate) with open(tmp_file.name, 'rb') as f: audio_bytes = f.read() # Clean up os.unlink(tmp_file.name) return audio_bytes @staticmethod def bytes_to_audio(audio_bytes: bytes) -> Tuple[np.ndarray, int]: """Convert bytes to audio array""" with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: tmp_file.write(audio_bytes) tmp_file.flush() audio, sr = librosa.load(tmp_file.name, sr=None) # Clean up os.unlink(tmp_file.name) return audio, sr