| | |
| | |
| | |
| |
|
| | |
| | import io |
| | import librosa |
| | import numpy as np |
| | import scipy.signal |
| |
|
| |
|
| | def extract_audio_features(audio_input, sr=88200, from_bytes=False): |
| | try: |
| | if from_bytes: |
| | y, sr = load_audio_from_bytes(audio_input, sr) |
| | else: |
| | y, sr = load_and_preprocess_audio(audio_input, sr) |
| | except Exception as e: |
| | print(f"Loading as WAV failed: {e}\nFalling back to PCM loading.") |
| | y = load_pcm_audio_from_bytes(audio_input) |
| | |
| | frame_length = int(0.01667 * sr) |
| | hop_length = frame_length // 2 |
| | min_frames = 9 |
| |
|
| | num_frames = (len(y) - frame_length) // hop_length + 1 |
| |
|
| | if num_frames < min_frames: |
| | print(f"Audio file is too short: {num_frames} frames, required: {min_frames} frames") |
| | return None, None |
| |
|
| | combined_features = extract_and_combine_features(y, sr, frame_length, hop_length) |
| | |
| | return combined_features, y |
| |
|
| | def extract_and_combine_features(y, sr, frame_length, hop_length, include_autocorr=True): |
| | |
| | all_features = [] |
| | mfcc_features = extract_mfcc_features(y, sr, frame_length, hop_length) |
| | all_features.append(mfcc_features) |
| |
|
| | if include_autocorr: |
| | autocorr_features = extract_autocorrelation_features( |
| | y, sr, frame_length, hop_length |
| | ) |
| | all_features.append(autocorr_features) |
| | |
| | combined_features = np.hstack(all_features) |
| |
|
| | return combined_features |
| |
|
| |
|
| | def extract_mfcc_features(y, sr, frame_length, hop_length, num_mfcc=23): |
| | mfcc_features = extract_overlapping_mfcc(y, sr, num_mfcc, frame_length, hop_length) |
| | reduced_mfcc_features = reduce_features(mfcc_features) |
| | return reduced_mfcc_features.T |
| |
|
| | def cepstral_mean_variance_normalization(mfcc): |
| | mean = np.mean(mfcc, axis=1, keepdims=True) |
| | std = np.std(mfcc, axis=1, keepdims=True) |
| | return (mfcc - mean) / (std + 1e-10) |
| |
|
| |
|
| | def extract_overlapping_mfcc(chunk, sr, num_mfcc, frame_length, hop_length, include_deltas=True, include_cepstral=True, threshold=1e-5): |
| | mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=num_mfcc, n_fft=frame_length, hop_length=hop_length) |
| | if include_cepstral: |
| | mfcc = cepstral_mean_variance_normalization(mfcc) |
| |
|
| | if include_deltas: |
| | delta_mfcc = librosa.feature.delta(mfcc) |
| | delta2_mfcc = librosa.feature.delta(mfcc, order=2) |
| | combined_mfcc = np.vstack([mfcc, delta_mfcc, delta2_mfcc]) |
| | return combined_mfcc |
| | else: |
| | return mfcc |
| |
|
| |
|
| | def reduce_features(features): |
| | num_frames = features.shape[1] |
| | paired_frames = features[:, :num_frames // 2 * 2].reshape(features.shape[0], -1, 2) |
| | reduced_frames = paired_frames.mean(axis=2) |
| | |
| | if num_frames % 2 == 1: |
| | last_frame = features[:, -1].reshape(-1, 1) |
| | reduced_final_features = np.hstack((reduced_frames, last_frame)) |
| | else: |
| | reduced_final_features = reduced_frames |
| | |
| | return reduced_final_features |
| |
|
| |
|
| |
|
| | def extract_overlapping_autocorr(y, sr, frame_length, hop_length, num_autocorr_coeff=187, pad_signal=True, padding_mode="reflect", trim_padded=False): |
| | if pad_signal: |
| | pad = frame_length // 2 |
| | y_padded = np.pad(y, pad_width=pad, mode=padding_mode) |
| | else: |
| | y_padded = y |
| |
|
| | frames = librosa.util.frame(y_padded, frame_length=frame_length, hop_length=hop_length) |
| | if pad_signal and trim_padded: |
| | num_frames = frames.shape[1] |
| | start_indices = np.arange(num_frames) * hop_length |
| | valid_idx = np.where((start_indices >= pad) & (start_indices + frame_length <= len(y) + pad))[0] |
| | frames = frames[:, valid_idx] |
| |
|
| | frames = frames - np.mean(frames, axis=0, keepdims=True) |
| | hann_window = np.hanning(frame_length) |
| | windowed_frames = frames * hann_window[:, np.newaxis] |
| |
|
| | autocorr_list = [] |
| | for frame in windowed_frames.T: |
| | full_corr = np.correlate(frame, frame, mode='full') |
| | mid = frame_length - 1 |
| | |
| | wanted = full_corr[mid: mid + num_autocorr_coeff + 1] |
| | |
| | if wanted[0] != 0: |
| | wanted = wanted / wanted[0] |
| | autocorr_list.append(wanted) |
| |
|
| | |
| | autocorr_features = np.array(autocorr_list).T |
| | |
| | autocorr_features = autocorr_features[1:, :] |
| |
|
| | autocorr_features = fix_edge_frames_autocorr(autocorr_features) |
| | |
| | return autocorr_features |
| |
|
| |
|
| | def fix_edge_frames_autocorr(autocorr_features, zero_threshold=1e-7): |
| | """If the first or last frame is near all-zero, replicate from adjacent frames.""" |
| | |
| | if np.all(np.abs(autocorr_features[:, 0]) < zero_threshold): |
| | autocorr_features[:, 0] = autocorr_features[:, 1] |
| | |
| | if np.all(np.abs(autocorr_features[:, -1]) < zero_threshold): |
| | autocorr_features[:, -1] = autocorr_features[:, -2] |
| | return autocorr_features |
| |
|
| | def extract_autocorrelation_features( |
| | y, sr, frame_length, hop_length, include_deltas=False |
| | ): |
| | """ |
| | Extract autocorrelation features, optionally with deltas/delta-deltas, |
| | then align with the MFCC frame count, reduce, and handle first/last frames. |
| | """ |
| | autocorr_features = extract_overlapping_autocorr( |
| | y, sr, frame_length, hop_length |
| | ) |
| | |
| | if include_deltas: |
| | autocorr_features = compute_autocorr_with_deltas(autocorr_features) |
| |
|
| | autocorr_features_reduced = reduce_features(autocorr_features) |
| |
|
| | return autocorr_features_reduced.T |
| |
|
| |
|
| | def compute_autocorr_with_deltas(autocorr_base): |
| | delta_ac = librosa.feature.delta(autocorr_base) |
| | delta2_ac = librosa.feature.delta(autocorr_base, order=2) |
| | combined_autocorr = np.vstack([autocorr_base, delta_ac, delta2_ac]) |
| | return combined_autocorr |
| |
|
| | def load_and_preprocess_audio(audio_path, sr=88200): |
| | y, sr = load_audio(audio_path, sr) |
| | if sr != 88200: |
| | y = librosa.resample(y, orig_sr=sr, target_sr=88200) |
| | sr = 88200 |
| | |
| | max_val = np.max(np.abs(y)) |
| | if max_val > 0: |
| | y = y / max_val |
| |
|
| | return y, sr |
| |
|
| | def load_audio(audio_path, sr=88200): |
| | y, sr = librosa.load(audio_path, sr=sr) |
| | print(f"Loaded audio file '{audio_path}' with sample rate {sr}") |
| | return y, sr |
| |
|
| | def load_audio_from_bytes(audio_bytes, sr=88200): |
| | audio_file = io.BytesIO(audio_bytes) |
| | y, sr = librosa.load(audio_file, sr=sr) |
| | |
| | max_val = np.max(np.abs(y)) |
| | if max_val > 0: |
| | y = y / max_val |
| |
|
| | return y, sr |
| |
|
| | def load_audio_file_from_memory(audio_bytes, sr=88200): |
| | """Load audio from memory bytes.""" |
| | y, sr = librosa.load(io.BytesIO(audio_bytes), sr=sr) |
| | print(f"Loaded audio data with sample rate {sr}") |
| | |
| | max_val = np.max(np.abs(y)) |
| | if max_val > 0: |
| | y = y / max_val |
| |
|
| | return y, sr |
| |
|
| |
|
| |
|
| |
|
| | def load_pcm_audio_from_bytes(audio_bytes, sr=22050, channels=1, sample_width=2): |
| | """ |
| | Load raw PCM bytes into a normalized numpy array and upsample to 88200 Hz. |
| | Assumes little-endian, 16-bit PCM data. |
| | """ |
| | |
| | if sample_width == 2: |
| | dtype = np.int16 |
| | max_val = 32768.0 |
| | else: |
| | raise ValueError("Unsupported sample width") |
| | |
| | |
| | data = np.frombuffer(audio_bytes, dtype=dtype) |
| | |
| | |
| | if channels > 1: |
| | data = data.reshape(-1, channels) |
| | |
| | |
| | y = data.astype(np.float32) / max_val |
| | |
| | |
| | target_sr = 88200 |
| | if sr != target_sr: |
| | |
| | num_samples = int(len(y) * target_sr / sr) |
| | if channels > 1: |
| | |
| | y_resampled = np.zeros((num_samples, channels), dtype=np.float32) |
| | for ch in range(channels): |
| | y_resampled[:, ch] = scipy.signal.resample(y[:, ch], num_samples) |
| | else: |
| | y_resampled = scipy.signal.resample(y, num_samples) |
| | y = y_resampled |
| | sr = target_sr |
| |
|
| | return y |
| |
|