File size: 7,859 Bytes
0d1b7fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import numpy as np
import librosa
import soundfile as sf
import noisereduce as nr
from scipy import signal
from scipy.signal import butter, filtfilt
import tempfile
import os
from typing import Tuple, Optional
import io

class AudioProcessor:
    """Advanced audio processing for voice cloning"""
    
    def __init__(self):
        self.target_sr = 22050
        
    def preprocess_audio(self, audio: np.ndarray, sr: int) -> np.ndarray:
        """Comprehensive audio preprocessing"""
        
        # Resample to target sample rate
        if sr != self.target_sr:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=self.target_sr)
        
        # Normalize amplitude
        audio = self.normalize_audio(audio)
        
        # Trim silence
        audio = self.trim_silence(audio)
        
        # Apply noise reduction
        audio = self.reduce_noise(audio)
        
        # Apply pre-emphasis filter
        audio = self.apply_preemphasis(audio)
        
        return audio
    
    def normalize_audio(self, audio: np.ndarray, target_db: float = -20.0) -> np.ndarray:
        """Normalize audio to target dB level"""
        
        # Calculate RMS
        rms = np.sqrt(np.mean(audio**2))
        
        if rms > 0:
            # Convert target dB to linear scale
            target_rms = 10**(target_db / 20)
            
            # Apply normalization
            audio = audio * (target_rms / rms)
            
            # Prevent clipping
            max_val = np.max(np.abs(audio))
            if max_val > 0.95:
                audio = audio * (0.95 / max_val)
        
        return audio
    
    def trim_silence(self, audio: np.ndarray, threshold_db: float = -40.0) -> np.ndarray:
        """Trim silence from beginning and end"""
        
        # Use librosa's trim function
        trimmed_audio, _ = librosa.effects.trim(
            audio, 
            top_db=-threshold_db,
            frame_length=2048,
            hop_length=512
        )
        
        return trimmed_audio
    
    def reduce_noise(self, audio: np.ndarray) -> np.ndarray:
        """Apply noise reduction"""
        try:
            # Use noisereduce library
            reduced_noise = nr.reduce_noise(y=audio, sr=self.target_sr)
            return reduced_noise
        except:
            # Fallback: simple high-pass filter
            return self.apply_highpass_filter(audio, cutoff=80)
    
    def apply_preemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray:
        """Apply pre-emphasis filter"""
        return signal.lfilter([1, -coeff], [1], audio)
    
    def apply_deemphasis(self, audio: np.ndarray, coeff: float = 0.97) -> np.ndarray:
        """Apply de-emphasis filter"""
        return signal.lfilter([1], [1, -coeff], audio)
    
    def apply_highpass_filter(self, audio: np.ndarray, cutoff: float = 80) -> np.ndarray:
        """Apply high-pass filter"""
        nyquist = self.target_sr * 0.5
        normal_cutoff = cutoff / nyquist
        b, a = butter(5, normal_cutoff, btype='high', analog=False)
        return filtfilt(b, a, audio)
    
    def apply_lowpass_filter(self, audio: np.ndarray, cutoff: float = 8000) -> np.ndarray:
        """Apply low-pass filter"""
        nyquist = self.target_sr * 0.5
        normal_cutoff = cutoff / nyquist
        b, a = butter(5, normal_cutoff, btype='low', analog=False)
        return filtfilt(b, a, audio)
    
    def apply_fade(self, audio: np.ndarray, fade_duration: float = 0.01) -> np.ndarray:
        """Apply fade in/out"""
        fade_samples = int(fade_duration * self.target_sr)
        
        if len(audio) > 2 * fade_samples:
            # Fade in
            fade_in = np.linspace(0, 1, fade_samples)
            audio[:fade_samples] *= fade_in
            
            # Fade out
            fade_out = np.linspace(1, 0, fade_samples)
            audio[-fade_samples:] *= fade_out
        
        return audio
    
    def enhance_audio(self, audio: np.ndarray) -> np.ndarray:
        """Enhance audio quality"""
        
        # Apply noise reduction
        enhanced = self.reduce_noise(audio)
        
        # Apply gentle compression
        enhanced = self.apply_compression(enhanced)
        
        # Apply EQ boost for clarity
        enhanced = self.apply_eq_boost(enhanced)
        
        # Final normalization
        enhanced = self.normalize_audio(enhanced)
        
        # Apply fade
        enhanced = self.apply_fade(enhanced)
        
        return enhanced
    
    def apply_compression(self, audio: np.ndarray, threshold: float = 0.5, ratio: float = 4.0) -> np.ndarray:
        """Apply dynamic range compression"""
        
        # Simple compression algorithm
        compressed = audio.copy()
        
        # Find samples above threshold
        above_threshold = np.abs(compressed) > threshold
        
        # Apply compression to samples above threshold
        compressed[above_threshold] = np.sign(compressed[above_threshold]) * (
            threshold + (np.abs(compressed[above_threshold]) - threshold) / ratio
        )
        
        return compressed
    
    def apply_eq_boost(self, audio: np.ndarray) -> np.ndarray:
        """Apply EQ boost for vocal clarity"""
        
        # Boost frequencies important for speech (1-4 kHz)
        # This is a simplified EQ - would use more sophisticated filtering in practice
        
        # High-pass filter to remove low frequency noise
        audio = self.apply_highpass_filter(audio, cutoff=85)
        
        # Gentle low-pass to prevent harsh highs
        audio = self.apply_lowpass_filter(audio, cutoff=7500)
        
        return audio
    
    def pitch_shift(self, audio: np.ndarray, semitones: float) -> np.ndarray:
        """Shift pitch by semitones"""
        return librosa.effects.pitch_shift(audio, sr=self.target_sr, n_steps=semitones)
    
    def time_stretch(self, audio: np.ndarray, rate: float) -> np.ndarray:
        """Change playback speed without affecting pitch"""
        return librosa.effects.time_stretch(audio, rate=rate)
    
    def detect_voice_activity(self, audio: np.ndarray, frame_duration: float = 0.025) -> np.ndarray:
        """Detect voice activity in audio"""
        
        frame_length = int(frame_duration * self.target_sr)
        hop_length = frame_length // 2
        
        # Calculate energy for each frame
        energy = []
        for i in range(0, len(audio) - frame_length + 1, hop_length):
            frame = audio[i:i + frame_length]
            frame_energy = np.sum(frame ** 2)
            energy.append(frame_energy)
        
        energy = np.array(energy)
        
        # Simple threshold-based VAD
        threshold = np.mean(energy) * 0.1
        voice_activity = energy > threshold
        
        return voice_activity
    
    @staticmethod
    def audio_to_bytes(audio: np.ndarray, sample_rate: int) -> bytes:
        """Convert audio array to bytes for streaming"""
        
        with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
            sf.write(tmp_file.name, audio, sample_rate)
            
            with open(tmp_file.name, 'rb') as f:
                audio_bytes = f.read()
            
            # Clean up
            os.unlink(tmp_file.name)
            
            return audio_bytes
    
    @staticmethod
    def bytes_to_audio(audio_bytes: bytes) -> Tuple[np.ndarray, int]:
        """Convert bytes to audio array"""
        
        with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
            tmp_file.write(audio_bytes)
            tmp_file.flush()
            
            audio, sr = librosa.load(tmp_file.name, sr=None)
            
            # Clean up
            os.unlink(tmp_file.name)
            
            return audio, sr