|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import io |
|
|
import librosa |
|
|
import numpy as np |
|
|
import scipy.signal |
|
|
|
|
|
|
|
|
def extract_audio_features(audio_input, sr=88200, from_bytes=False): |
|
|
try: |
|
|
if from_bytes: |
|
|
y, sr = load_audio_from_bytes(audio_input, sr) |
|
|
else: |
|
|
y, sr = load_and_preprocess_audio(audio_input, sr) |
|
|
except Exception as e: |
|
|
print(f"Loading as WAV failed: {e}\nFalling back to PCM loading.") |
|
|
y = load_pcm_audio_from_bytes(audio_input) |
|
|
|
|
|
frame_length = int(0.01667 * sr) |
|
|
hop_length = frame_length // 2 |
|
|
min_frames = 9 |
|
|
|
|
|
num_frames = (len(y) - frame_length) // hop_length + 1 |
|
|
|
|
|
if num_frames < min_frames: |
|
|
print(f"Audio file is too short: {num_frames} frames, required: {min_frames} frames") |
|
|
return None, None |
|
|
|
|
|
combined_features = extract_and_combine_features(y, sr, frame_length, hop_length) |
|
|
|
|
|
return combined_features, y |
|
|
|
|
|
def extract_and_combine_features(y, sr, frame_length, hop_length, include_autocorr=True): |
|
|
|
|
|
all_features = [] |
|
|
mfcc_features = extract_mfcc_features(y, sr, frame_length, hop_length) |
|
|
all_features.append(mfcc_features) |
|
|
|
|
|
if include_autocorr: |
|
|
autocorr_features = extract_autocorrelation_features( |
|
|
y, sr, frame_length, hop_length |
|
|
) |
|
|
all_features.append(autocorr_features) |
|
|
|
|
|
combined_features = np.hstack(all_features) |
|
|
|
|
|
return combined_features |
|
|
|
|
|
|
|
|
def extract_mfcc_features(y, sr, frame_length, hop_length, num_mfcc=23): |
|
|
mfcc_features = extract_overlapping_mfcc(y, sr, num_mfcc, frame_length, hop_length) |
|
|
reduced_mfcc_features = reduce_features(mfcc_features) |
|
|
return reduced_mfcc_features.T |
|
|
|
|
|
def cepstral_mean_variance_normalization(mfcc): |
|
|
mean = np.mean(mfcc, axis=1, keepdims=True) |
|
|
std = np.std(mfcc, axis=1, keepdims=True) |
|
|
return (mfcc - mean) / (std + 1e-10) |
|
|
|
|
|
|
|
|
def extract_overlapping_mfcc(chunk, sr, num_mfcc, frame_length, hop_length, include_deltas=True, include_cepstral=True, threshold=1e-5): |
|
|
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=num_mfcc, n_fft=frame_length, hop_length=hop_length) |
|
|
if include_cepstral: |
|
|
mfcc = cepstral_mean_variance_normalization(mfcc) |
|
|
|
|
|
if include_deltas: |
|
|
delta_mfcc = librosa.feature.delta(mfcc) |
|
|
delta2_mfcc = librosa.feature.delta(mfcc, order=2) |
|
|
combined_mfcc = np.vstack([mfcc, delta_mfcc, delta2_mfcc]) |
|
|
return combined_mfcc |
|
|
else: |
|
|
return mfcc |
|
|
|
|
|
|
|
|
def reduce_features(features): |
|
|
num_frames = features.shape[1] |
|
|
paired_frames = features[:, :num_frames // 2 * 2].reshape(features.shape[0], -1, 2) |
|
|
reduced_frames = paired_frames.mean(axis=2) |
|
|
|
|
|
if num_frames % 2 == 1: |
|
|
last_frame = features[:, -1].reshape(-1, 1) |
|
|
reduced_final_features = np.hstack((reduced_frames, last_frame)) |
|
|
else: |
|
|
reduced_final_features = reduced_frames |
|
|
|
|
|
return reduced_final_features |
|
|
|
|
|
|
|
|
|
|
|
def extract_overlapping_autocorr(y, sr, frame_length, hop_length, num_autocorr_coeff=187, pad_signal=True, padding_mode="reflect", trim_padded=False): |
|
|
if pad_signal: |
|
|
pad = frame_length // 2 |
|
|
y_padded = np.pad(y, pad_width=pad, mode=padding_mode) |
|
|
else: |
|
|
y_padded = y |
|
|
|
|
|
frames = librosa.util.frame(y_padded, frame_length=frame_length, hop_length=hop_length) |
|
|
if pad_signal and trim_padded: |
|
|
num_frames = frames.shape[1] |
|
|
start_indices = np.arange(num_frames) * hop_length |
|
|
valid_idx = np.where((start_indices >= pad) & (start_indices + frame_length <= len(y) + pad))[0] |
|
|
frames = frames[:, valid_idx] |
|
|
|
|
|
frames = frames - np.mean(frames, axis=0, keepdims=True) |
|
|
hann_window = np.hanning(frame_length) |
|
|
windowed_frames = frames * hann_window[:, np.newaxis] |
|
|
|
|
|
autocorr_list = [] |
|
|
for frame in windowed_frames.T: |
|
|
full_corr = np.correlate(frame, frame, mode='full') |
|
|
mid = frame_length - 1 |
|
|
|
|
|
wanted = full_corr[mid: mid + num_autocorr_coeff + 1] |
|
|
|
|
|
if wanted[0] != 0: |
|
|
wanted = wanted / wanted[0] |
|
|
autocorr_list.append(wanted) |
|
|
|
|
|
|
|
|
autocorr_features = np.array(autocorr_list).T |
|
|
|
|
|
autocorr_features = autocorr_features[1:, :] |
|
|
|
|
|
autocorr_features = fix_edge_frames_autocorr(autocorr_features) |
|
|
|
|
|
return autocorr_features |
|
|
|
|
|
|
|
|
def fix_edge_frames_autocorr(autocorr_features, zero_threshold=1e-7): |
|
|
"""If the first or last frame is near all-zero, replicate from adjacent frames.""" |
|
|
|
|
|
if np.all(np.abs(autocorr_features[:, 0]) < zero_threshold): |
|
|
autocorr_features[:, 0] = autocorr_features[:, 1] |
|
|
|
|
|
if np.all(np.abs(autocorr_features[:, -1]) < zero_threshold): |
|
|
autocorr_features[:, -1] = autocorr_features[:, -2] |
|
|
return autocorr_features |
|
|
|
|
|
def extract_autocorrelation_features( |
|
|
y, sr, frame_length, hop_length, include_deltas=False |
|
|
): |
|
|
""" |
|
|
Extract autocorrelation features, optionally with deltas/delta-deltas, |
|
|
then align with the MFCC frame count, reduce, and handle first/last frames. |
|
|
""" |
|
|
autocorr_features = extract_overlapping_autocorr( |
|
|
y, sr, frame_length, hop_length |
|
|
) |
|
|
|
|
|
if include_deltas: |
|
|
autocorr_features = compute_autocorr_with_deltas(autocorr_features) |
|
|
|
|
|
autocorr_features_reduced = reduce_features(autocorr_features) |
|
|
|
|
|
return autocorr_features_reduced.T |
|
|
|
|
|
|
|
|
def compute_autocorr_with_deltas(autocorr_base): |
|
|
delta_ac = librosa.feature.delta(autocorr_base) |
|
|
delta2_ac = librosa.feature.delta(autocorr_base, order=2) |
|
|
combined_autocorr = np.vstack([autocorr_base, delta_ac, delta2_ac]) |
|
|
return combined_autocorr |
|
|
|
|
|
def load_and_preprocess_audio(audio_path, sr=88200): |
|
|
y, sr = load_audio(audio_path, sr) |
|
|
if sr != 88200: |
|
|
y = librosa.resample(y, orig_sr=sr, target_sr=88200) |
|
|
sr = 88200 |
|
|
|
|
|
max_val = np.max(np.abs(y)) |
|
|
if max_val > 0: |
|
|
y = y / max_val |
|
|
|
|
|
return y, sr |
|
|
|
|
|
def load_audio(audio_path, sr=88200): |
|
|
y, sr = librosa.load(audio_path, sr=sr) |
|
|
print(f"Loaded audio file '{audio_path}' with sample rate {sr}") |
|
|
return y, sr |
|
|
|
|
|
def load_audio_from_bytes(audio_bytes, sr=88200): |
|
|
audio_file = io.BytesIO(audio_bytes) |
|
|
y, sr = librosa.load(audio_file, sr=sr) |
|
|
|
|
|
max_val = np.max(np.abs(y)) |
|
|
if max_val > 0: |
|
|
y = y / max_val |
|
|
|
|
|
return y, sr |
|
|
|
|
|
def load_audio_file_from_memory(audio_bytes, sr=88200): |
|
|
"""Load audio from memory bytes.""" |
|
|
y, sr = librosa.load(io.BytesIO(audio_bytes), sr=sr) |
|
|
print(f"Loaded audio data with sample rate {sr}") |
|
|
|
|
|
max_val = np.max(np.abs(y)) |
|
|
if max_val > 0: |
|
|
y = y / max_val |
|
|
|
|
|
return y, sr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_pcm_audio_from_bytes(audio_bytes, sr=22050, channels=1, sample_width=2): |
|
|
""" |
|
|
Load raw PCM bytes into a normalized numpy array and upsample to 88200 Hz. |
|
|
Assumes little-endian, 16-bit PCM data. |
|
|
""" |
|
|
|
|
|
if sample_width == 2: |
|
|
dtype = np.int16 |
|
|
max_val = 32768.0 |
|
|
else: |
|
|
raise ValueError("Unsupported sample width") |
|
|
|
|
|
|
|
|
data = np.frombuffer(audio_bytes, dtype=dtype) |
|
|
|
|
|
|
|
|
if channels > 1: |
|
|
data = data.reshape(-1, channels) |
|
|
|
|
|
|
|
|
y = data.astype(np.float32) / max_val |
|
|
|
|
|
|
|
|
target_sr = 88200 |
|
|
if sr != target_sr: |
|
|
|
|
|
num_samples = int(len(y) * target_sr / sr) |
|
|
if channels > 1: |
|
|
|
|
|
y_resampled = np.zeros((num_samples, channels), dtype=np.float32) |
|
|
for ch in range(channels): |
|
|
y_resampled[:, ch] = scipy.signal.resample(y[:, ch], num_samples) |
|
|
else: |
|
|
y_resampled = scipy.signal.resample(y, num_samples) |
|
|
y = y_resampled |
|
|
sr = target_sr |
|
|
|
|
|
return y |
|
|
|