fastapi / utils /audio /extraction /extract_features.py
precison9's picture
Add Dockerized Flask application files for deployment
2b85b0c
# This software is licensed under a **dual-license model**
# For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
# Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
# extract_features.py
import io
import librosa
import numpy as np
import scipy.signal
def extract_audio_features(audio_input, sr=88200, from_bytes=False):
try:
if from_bytes:
y, sr = load_audio_from_bytes(audio_input, sr)
else:
y, sr = load_and_preprocess_audio(audio_input, sr)
except Exception as e:
print(f"Loading as WAV failed: {e}\nFalling back to PCM loading.")
y = load_pcm_audio_from_bytes(audio_input)
frame_length = int(0.01667 * sr) # Frame length set to 0.01667 seconds (~60 fps)
hop_length = frame_length // 2 # 2x overlap for smoother transitions
min_frames = 9 # Minimum number of frames needed for delta calculation
num_frames = (len(y) - frame_length) // hop_length + 1
if num_frames < min_frames:
print(f"Audio file is too short: {num_frames} frames, required: {min_frames} frames")
return None, None
combined_features = extract_and_combine_features(y, sr, frame_length, hop_length)
return combined_features, y
def extract_and_combine_features(y, sr, frame_length, hop_length, include_autocorr=True):
all_features = []
mfcc_features = extract_mfcc_features(y, sr, frame_length, hop_length)
all_features.append(mfcc_features)
if include_autocorr:
autocorr_features = extract_autocorrelation_features(
y, sr, frame_length, hop_length
)
all_features.append(autocorr_features)
combined_features = np.hstack(all_features)
return combined_features
def extract_mfcc_features(y, sr, frame_length, hop_length, num_mfcc=23):
mfcc_features = extract_overlapping_mfcc(y, sr, num_mfcc, frame_length, hop_length)
reduced_mfcc_features = reduce_features(mfcc_features)
return reduced_mfcc_features.T
def cepstral_mean_variance_normalization(mfcc):
mean = np.mean(mfcc, axis=1, keepdims=True)
std = np.std(mfcc, axis=1, keepdims=True)
return (mfcc - mean) / (std + 1e-10)
def extract_overlapping_mfcc(chunk, sr, num_mfcc, frame_length, hop_length, include_deltas=True, include_cepstral=True, threshold=1e-5):
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=num_mfcc, n_fft=frame_length, hop_length=hop_length)
if include_cepstral:
mfcc = cepstral_mean_variance_normalization(mfcc)
if include_deltas:
delta_mfcc = librosa.feature.delta(mfcc)
delta2_mfcc = librosa.feature.delta(mfcc, order=2)
combined_mfcc = np.vstack([mfcc, delta_mfcc, delta2_mfcc]) # Stack original MFCCs with deltas
return combined_mfcc
else:
return mfcc
def reduce_features(features):
num_frames = features.shape[1]
paired_frames = features[:, :num_frames // 2 * 2].reshape(features.shape[0], -1, 2)
reduced_frames = paired_frames.mean(axis=2)
if num_frames % 2 == 1:
last_frame = features[:, -1].reshape(-1, 1)
reduced_final_features = np.hstack((reduced_frames, last_frame))
else:
reduced_final_features = reduced_frames
return reduced_final_features
def extract_overlapping_autocorr(y, sr, frame_length, hop_length, num_autocorr_coeff=187, pad_signal=True, padding_mode="reflect", trim_padded=False):
if pad_signal:
pad = frame_length // 2
y_padded = np.pad(y, pad_width=pad, mode=padding_mode)
else:
y_padded = y
frames = librosa.util.frame(y_padded, frame_length=frame_length, hop_length=hop_length)
if pad_signal and trim_padded:
num_frames = frames.shape[1]
start_indices = np.arange(num_frames) * hop_length
valid_idx = np.where((start_indices >= pad) & (start_indices + frame_length <= len(y) + pad))[0]
frames = frames[:, valid_idx]
frames = frames - np.mean(frames, axis=0, keepdims=True)
hann_window = np.hanning(frame_length)
windowed_frames = frames * hann_window[:, np.newaxis]
autocorr_list = []
for frame in windowed_frames.T:
full_corr = np.correlate(frame, frame, mode='full')
mid = frame_length - 1 # Zero-lag index.
# Extract `num_autocorr_coeff + 1` to include the first column initially
wanted = full_corr[mid: mid + num_autocorr_coeff + 1]
# Normalize by the zero-lag (energy) if nonzero.
if wanted[0] != 0:
wanted = wanted / wanted[0]
autocorr_list.append(wanted)
# Convert list to array and transpose so that shape is (num_autocorr_coeff + 1, num_valid_frames)
autocorr_features = np.array(autocorr_list).T
# Remove the first coefficient to avoid redundancy
autocorr_features = autocorr_features[1:, :]
autocorr_features = fix_edge_frames_autocorr(autocorr_features)
return autocorr_features
def fix_edge_frames_autocorr(autocorr_features, zero_threshold=1e-7):
"""If the first or last frame is near all-zero, replicate from adjacent frames."""
# Check first frame energy
if np.all(np.abs(autocorr_features[:, 0]) < zero_threshold):
autocorr_features[:, 0] = autocorr_features[:, 1]
# Check last frame energy
if np.all(np.abs(autocorr_features[:, -1]) < zero_threshold):
autocorr_features[:, -1] = autocorr_features[:, -2]
return autocorr_features
def extract_autocorrelation_features(
y, sr, frame_length, hop_length, include_deltas=False
):
"""
Extract autocorrelation features, optionally with deltas/delta-deltas,
then align with the MFCC frame count, reduce, and handle first/last frames.
"""
autocorr_features = extract_overlapping_autocorr(
y, sr, frame_length, hop_length
)
if include_deltas:
autocorr_features = compute_autocorr_with_deltas(autocorr_features)
autocorr_features_reduced = reduce_features(autocorr_features)
return autocorr_features_reduced.T
def compute_autocorr_with_deltas(autocorr_base):
delta_ac = librosa.feature.delta(autocorr_base)
delta2_ac = librosa.feature.delta(autocorr_base, order=2)
combined_autocorr = np.vstack([autocorr_base, delta_ac, delta2_ac])
return combined_autocorr
def load_and_preprocess_audio(audio_path, sr=88200):
y, sr = load_audio(audio_path, sr)
if sr != 88200:
y = librosa.resample(y, orig_sr=sr, target_sr=88200)
sr = 88200
max_val = np.max(np.abs(y))
if max_val > 0:
y = y / max_val
return y, sr
def load_audio(audio_path, sr=88200):
y, sr = librosa.load(audio_path, sr=sr)
print(f"Loaded audio file '{audio_path}' with sample rate {sr}")
return y, sr
def load_audio_from_bytes(audio_bytes, sr=88200):
audio_file = io.BytesIO(audio_bytes)
y, sr = librosa.load(audio_file, sr=sr)
max_val = np.max(np.abs(y))
if max_val > 0:
y = y / max_val
return y, sr
def load_audio_file_from_memory(audio_bytes, sr=88200):
"""Load audio from memory bytes."""
y, sr = librosa.load(io.BytesIO(audio_bytes), sr=sr)
print(f"Loaded audio data with sample rate {sr}")
max_val = np.max(np.abs(y))
if max_val > 0:
y = y / max_val
return y, sr
def load_pcm_audio_from_bytes(audio_bytes, sr=22050, channels=1, sample_width=2):
"""
Load raw PCM bytes into a normalized numpy array and upsample to 88200 Hz.
Assumes little-endian, 16-bit PCM data.
"""
# Determine the appropriate numpy dtype.
if sample_width == 2:
dtype = np.int16
max_val = 32768.0
else:
raise ValueError("Unsupported sample width")
# Convert bytes to numpy array.
data = np.frombuffer(audio_bytes, dtype=dtype)
# If stereo or more channels, reshape accordingly.
if channels > 1:
data = data.reshape(-1, channels)
# Normalize the data to range [-1, 1]
y = data.astype(np.float32) / max_val
# Upsample the audio from the current sample rate to 88200 Hz.
target_sr = 88200
if sr != target_sr:
# Calculate the number of samples in the resampled signal.
num_samples = int(len(y) * target_sr / sr)
if channels > 1:
# Resample each channel separately.
y_resampled = np.zeros((num_samples, channels), dtype=np.float32)
for ch in range(channels):
y_resampled[:, ch] = scipy.signal.resample(y[:, ch], num_samples)
else:
y_resampled = scipy.signal.resample(y, num_samples)
y = y_resampled
sr = target_sr
return y