MSD / app2.py
AdityaK007's picture
Update app2.py
3a782b8 verified
import gradio as gr
import librosa
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import jensenshannon
from scipy.stats import pearsonr
from scipy.signal import get_window as scipy_get_window
import plotly.express as px
import plotly.graph_objects as go
import os
import tempfile
# ----------------------------
# Audio Segmentation
# ----------------------------
def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"):
"""Segment audio into frames with specified windowing"""
frame_length = int(frame_length_ms * sr / 1000)
hop_length = int(hop_length_ms * sr / 1000)
if frame_length > len(y):
frame_length = len(y)
hop_length = max(1, frame_length // 2)
# Get window function
if window_type == "rectangular":
window = scipy_get_window('boxcar', frame_length)
else:
window = scipy_get_window(window_type, frame_length)
frames = []
for i in range(0, len(y) - frame_length + 1, hop_length):
frame = y[i:i + frame_length] * window
frames.append(frame)
# Convert to 2D array (frames x samples)
if frames:
frames = np.array(frames).T
else:
# If audio is too short, create at least one frame with zero-padding
frames = np.zeros((frame_length, 1))
return frames, frame_length
# ----------------------------
# Enhanced Feature Extraction
# ----------------------------
def extract_features_with_spectrum(frames, sr):
features = []
n_mfcc = 13
n_fft = min(2048, frames.shape[0])
for i in range(frames.shape[1]):
frame = frames[:, i]
# Skip if frame is too short or silent
if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10:
continue
feat = {}
# Basic features
try:
rms = np.mean(librosa.feature.rms(y=frame)[0])
feat["rms"] = float(rms)
except:
feat["rms"] = 0.0
try:
sc = np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0])
feat["spectral_centroid"] = float(sc)
except:
feat["spectral_centroid"] = 0.0
try:
zcr = np.mean(librosa.feature.zero_crossing_rate(frame)[0])
feat["zcr"] = float(zcr)
except:
feat["zcr"] = 0.0
try:
mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft)
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j]))
except:
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = 0.0
# Spectral features for quality assessment
try:
S = np.abs(librosa.stft(frame, n_fft=n_fft))
S_db = librosa.amplitude_to_db(S, ref=np.max)
freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
# Frequency bands for quality assessment
low_mask = freqs <= 500
mid_mask = (freqs > 500) & (freqs <= 4000) # Speech range
high_mask = freqs > 4000
feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else -80.0
feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else -80.0
feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else -80.0
# Spectral rolloff (85%)
rolloff = np.mean(librosa.feature.spectral_rolloff(y=frame, sr=sr, roll_percent=0.85)[0])
feat["spectral_rolloff"] = float(rolloff)
# Spectral bandwidth
bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=frame, sr=sr)[0])
feat["spectral_bandwidth"] = float(bandwidth)
# Spectral flatness (noisiness)
flatness = np.mean(librosa.feature.spectral_flatness(y=frame)[0])
feat["spectral_flatness"] = float(flatness)
feat["spectrum"] = S_db
except:
feat["low_freq_energy"] = -80.0
feat["mid_freq_energy"] = -80.0
feat["high_freq_energy"] = -80.0
feat["spectral_rolloff"] = 0.0
feat["spectral_bandwidth"] = 0.0
feat["spectral_flatness"] = 0.0
feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1))
features.append(feat)
if not features:
feat = {
"rms": 0.0, "spectral_centroid": 0.0, "zcr": 0.0,
"low_freq_energy": -80.0, "mid_freq_energy": -80.0, "high_freq_energy": -80.0,
"spectral_rolloff": 0.0, "spectral_bandwidth": 0.0, "spectral_flatness": 0.0,
"spectrum": np.zeros((n_fft // 2 + 1, 1))
}
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = 0.0
features.append(feat)
return features
# ----------------------------
# Frame-wise Quality Metrics (0-1 scale)
# ----------------------------
def calculate_frame_quality_metrics(near_feats, far_feats):
"""Calculate multiple quality metrics between 0 and 1 for each frame"""
min_len = min(len(near_feats), len(far_feats))
if min_len == 0:
return pd.DataFrame({"frame_index": []})
results = {"frame_index": list(range(min_len))}
# Prepare feature vectors (excluding spectrum)
near_df = pd.DataFrame([f for f in near_feats[:min_len]])
far_df = pd.DataFrame([f for f in far_feats[:min_len]])
feature_cols = [col for col in near_df.columns if col != "spectrum"]
near_vec = near_df[feature_cols].values
far_vec = far_df[feature_cols].values
# 1. Spectral Similarity Score (0-1)
spectral_scores = []
for i in range(min_len):
try:
# Compare spectral distributions using cosine similarity
near_spectral = np.array([near_feats[i]["low_freq_energy"],
near_feats[i]["mid_freq_energy"],
near_feats[i]["high_freq_energy"]])
far_spectral = np.array([far_feats[i]["low_freq_energy"],
far_feats[i]["mid_freq_energy"],
far_feats[i]["high_freq_energy"]])
# Convert to positive values and normalize
near_spectral = near_spectral - near_spectral.min() + 1e-8
far_spectral = far_spectral - far_spectral.min() + 1e-8
near_spectral = near_spectral / near_spectral.sum()
far_spectral = far_spectral / far_spectral.sum()
# Use cosine similarity on spectral distribution
spec_sim = cosine_similarity([near_spectral], [far_spectral])[0][0]
spectral_scores.append(max(0, min(1, spec_sim)))
except:
spectral_scores.append(0.5)
results["spectral_similarity"] = spectral_scores
# 2. High-Frequency Preservation Score (0-1)
hf_scores = []
for i in range(min_len):
try:
near_hf = near_feats[i]["high_freq_energy"]
far_hf = far_feats[i]["high_freq_energy"]
# Normalize HF energy difference (assuming -80dB to 0dB range)
hf_diff = near_hf - far_hf
# Convert to 0-1 scale: 0dB difference = 1.0, 40dB loss = 0.0
hf_score = max(0, min(1, 1.0 - (max(0, hf_diff) / 40.0)))
hf_scores.append(hf_score)
except:
hf_scores.append(0.5)
results["high_freq_preservation"] = hf_scores
# 3. MFCC Structural Similarity (0-1)
mfcc_scores = []
for i in range(min_len):
try:
# Extract MFCC features
near_mfcc = np.array([near_feats[i][f"mfcc_{j+1}"] for j in range(13)])
far_mfcc = np.array([far_feats[i][f"mfcc_{j+1}"] for j in range(13)])
# Normalize and compute cosine similarity
near_mfcc_norm = (near_mfcc - near_mfcc.mean()) / (near_mfcc.std() + 1e-8)
far_mfcc_norm = (far_mfcc - far_mfcc.mean()) / (far_mfcc.std() + 1e-8)
mfcc_sim = cosine_similarity([near_mfcc_norm], [far_mfcc_norm])[0][0]
mfcc_scores.append(max(0, min(1, (mfcc_sim + 1) / 2))) # Convert -1:1 to 0:1
except:
mfcc_scores.append(0.5)
results["mfcc_similarity"] = mfcc_scores
# 4. Temporal Consistency Score (RMS stability)
temporal_scores = []
for i in range(min_len):
try:
near_rms = near_feats[i]["rms"]
far_rms = far_feats[i]["rms"]
# Ratio of RMS energies (closer to 1 is better)
rms_ratio = min(near_rms, far_rms) / (max(near_rms, far_rms) + 1e-8)
temporal_scores.append(float(rms_ratio))
except:
temporal_scores.append(0.5)
results["temporal_consistency"] = temporal_scores
# 5. Spectral Centroid Stability (0-1)
centroid_scores = []
for i in range(min_len):
try:
near_sc = near_feats[i]["spectral_centroid"]
far_sc = far_feats[i]["spectral_centroid"]
# Ratio of spectral centroids
sc_ratio = min(near_sc, far_sc) / (max(near_sc, far_sc) + 1e-8)
centroid_scores.append(float(sc_ratio))
except:
centroid_scores.append(0.5)
results["spectral_centroid_stability"] = centroid_scores
# 6. Overall Audio Quality Score (Compound Metric)
quality_scores = []
for i in range(min_len):
# Weighted combination of all metrics
weights = {
'spectral_similarity': 0.25, # Spectral distribution match
'high_freq_preservation': 0.30, # HF content preservation (most important)
'mfcc_similarity': 0.20, # Structural similarity
'temporal_consistency': 0.15, # Amplitude consistency
'spectral_centroid_stability': 0.10 # Spectral shape stability
}
total_score = 0
for metric, weight in weights.items():
total_score += results[metric][i] * weight
quality_scores.append(max(0, min(1, total_score)))
results["overall_quality"] = quality_scores
# 7. Quality Degradation Level
degradation_levels = []
for score in quality_scores:
if score >= 0.8:
degradation_levels.append("Excellent")
elif score >= 0.6:
degradation_levels.append("Good")
elif score >= 0.4:
degradation_levels.append("Moderate")
elif score >= 0.2:
degradation_levels.append("Poor")
else:
degradation_levels.append("Very Poor")
results["degradation_level"] = degradation_levels
return pd.DataFrame(results)
# ----------------------------
# Clustering and Visualization
# ----------------------------
def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5):
if not cluster_features:
raise gr.Error("Please select at least one feature for clustering.")
if len(features_df) == 0:
features_df["cluster"] = []
return features_df
X = features_df[cluster_features].values
if algo == "KMeans":
n_clusters = min(n_clusters, len(X))
model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = model.fit_predict(X)
elif algo == "Agglomerative":
n_clusters = min(n_clusters, len(X))
model = AgglomerativeClustering(n_clusters=n_clusters)
labels = model.fit_predict(X)
elif algo == "DBSCAN":
model = DBSCAN(eps=eps, min_samples=min(3, len(X)))
labels = model.fit_predict(X)
else:
raise ValueError("Unknown clustering algorithm")
features_df = features_df.copy()
features_df["cluster"] = labels
return features_df
def plot_spectral_difference(near_feats, far_feats, frame_idx=0):
if not near_feats or not far_feats or frame_idx >= len(near_feats) or frame_idx >= len(far_feats):
fig = go.Figure()
fig.update_layout(title="No data available for spectral analysis", height=300)
return fig
near_spec = near_feats[frame_idx]["spectrum"]
far_spec = far_feats[frame_idx]["spectrum"]
min_freq_bins = min(near_spec.shape[0], far_spec.shape[0])
min_time_frames = min(near_spec.shape[1], far_spec.shape[1])
near_spec = near_spec[:min_freq_bins, :min_time_frames]
far_spec = far_spec[:min_freq_bins, :min_time_frames]
diff = near_spec - far_spec
fig = go.Figure(data=go.Heatmap(
z=diff,
colorscale='RdBu',
zmid=0,
colorbar=dict(title="dB Difference")
))
fig.update_layout(
title=f"Spectral Difference (Frame {frame_idx}): Near - Far",
xaxis_title="Time Frames",
yaxis_title="Frequency Bins",
height=300
)
return fig
# ----------------------------
# Main Analysis Function
# ----------------------------
def analyze_audio_pair(
near_file,
far_file,
frame_length_ms,
hop_length_ms,
window_type,
cluster_features,
clustering_algo,
n_clusters,
dbscan_eps
):
if not near_file or not far_file:
raise gr.Error("Upload both audio files.")
try:
y_near, sr_near = librosa.load(near_file.name, sr=None)
y_far, sr_far = librosa.load(far_file.name, sr=None)
except Exception as e:
raise gr.Error(f"Error loading audio files: {str(e)}")
if sr_near != sr_far:
y_far = librosa.resample(y_far, orig_sr=sr_far, target_sr=sr_near)
sr = sr_near
else:
sr = sr_near
frames_near, frame_length = segment_audio(y_near, sr, frame_length_ms, hop_length_ms, window_type)
frames_far, _ = segment_audio(y_far, sr, frame_length_ms, hop_length_ms, window_type)
near_feats = extract_features_with_spectrum(frames_near, sr)
far_feats = extract_features_with_spectrum(frames_far, sr)
# Calculate frame-wise quality metrics
comparison_df = calculate_frame_quality_metrics(near_feats, far_feats)
# Clustering (on near-field)
near_df = pd.DataFrame(near_feats)
near_df = near_df.drop(columns=["spectrum"], errors="ignore")
clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps)
# Plots
plot_comparison = None
if len(comparison_df) > 0:
plot_comparison = px.line(
comparison_df,
x="frame_index",
y="overall_quality",
title="Overall Audio Quality Score Over Time (0-1 scale)",
labels={"overall_quality": "Quality Score", "frame_index": "Frame Index"}
)
plot_comparison.update_yaxes(range=[0, 1])
else:
plot_comparison = px.line(title="No comparison data available")
# Quality distribution plot
quality_dist_plot = None
if len(comparison_df) > 0:
quality_dist_plot = px.histogram(
comparison_df,
x="overall_quality",
title="Distribution of Audio Quality Scores",
nbins=20,
labels={"overall_quality": "Quality Score"}
)
quality_dist_plot.update_xaxes(range=[0, 1])
else:
quality_dist_plot = px.histogram(title="No quality data available")
# Scatter plot
plot_scatter = None
if len(cluster_features) >= 2 and len(clustered_df) > 0:
x_feat, y_feat = cluster_features[0], cluster_features[1]
if x_feat in clustered_df.columns and y_feat in clustered_df.columns:
plot_scatter = px.scatter(
clustered_df,
x=x_feat,
y=y_feat,
color="cluster",
title=f"Clustering: {x_feat} vs {y_feat}",
hover_data=["cluster"]
)
else:
plot_scatter = px.scatter(title="Selected features not available in data")
else:
plot_scatter = px.scatter(title="Select β‰₯2 features for scatter plot")
# Spectral difference heatmap
spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=0)
return (
plot_comparison,
quality_dist_plot,
comparison_df,
plot_scatter,
clustered_df,
spec_heatmap
)
def export_results(comparison_df, clustered_df):
temp_dir = tempfile.mkdtemp()
comp_path = os.path.join(temp_dir, "frame_quality_scores.csv")
cluster_path = os.path.join(temp_dir, "clustered_frames.csv")
comparison_df.to_csv(comp_path, index=False)
clustered_df.to_csv(cluster_path, index=False)
return [comp_path, cluster_path]
# ----------------------------
# Gradio UI
# ----------------------------
dummy_features = ["rms", "spectral_centroid", "zcr", "spectral_rolloff",
"spectral_bandwidth", "spectral_flatness"] + \
[f"mfcc_{i}" for i in range(1,14)] + \
["low_freq_energy", "mid_freq_energy", "high_freq_energy"]
with gr.Blocks(title="Audio Quality Analyzer") as demo:
gr.Markdown("# πŸŽ™οΈ Near vs Far Field Audio Quality Analyzer")
gr.Markdown("**Quantify audio degradation per frame (0-1 scale)** - Compare near-field vs far-field recording quality")
with gr.Row():
near_file = gr.File(label="Near-Field Audio (.wav)", file_types=[".wav"])
far_file = gr.File(label="Far-Field Audio (.wav)", file_types=[".wav"])
with gr.Accordion("βš™οΈ Frame Settings", open=True):
frame_length_ms = gr.Slider(10, 500, value=50, step=1, label="Frame Length (ms)")
hop_length_ms = gr.Slider(1, 250, value=25, step=1, label="Hop Length (ms)")
window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type")
with gr.Accordion("🧩 Clustering Configuration", open=False):
cluster_features = gr.CheckboxGroup(
choices=dummy_features,
value=["rms", "spectral_centroid", "high_freq_energy"],
label="Features to Use for Clustering"
)
clustering_algo = gr.Radio(
["KMeans", "Agglomerative", "DBSCAN"],
value="KMeans",
label="Clustering Algorithm"
)
n_clusters = gr.Slider(2, 20, value=5, step=1, label="Number of Clusters (for KMeans/Agglomerative)")
dbscan_eps = gr.Slider(0.1, 2.0, value=0.5, step=0.1, label="DBSCAN eps (neighborhood radius)")
btn = gr.Button("πŸš€ Analyze Audio Quality")
with gr.Tabs():
with gr.Tab("πŸ“Š Quality Analysis"):
with gr.Row():
comp_plot = gr.Plot(label="Quality Over Time")
quality_dist_plot = gr.Plot(label="Quality Distribution")
comp_table = gr.Dataframe(label="Frame-wise Quality Scores")
with gr.Tab("🧩 Clustering"):
cluster_plot = gr.Plot()
cluster_table = gr.Dataframe()
with gr.Tab("πŸ” Spectral Analysis"):
spec_heatmap = gr.Plot(label="Spectral Difference (Near - Far)")
with gr.Tab("πŸ“€ Export"):
gr.Markdown("### Download Analysis Results")
export_btn = gr.Button("πŸ’Ύ Download CSV Files")
export_files = gr.Files()
btn.click(
fn=analyze_audio_pair,
inputs=[
near_file, far_file,
frame_length_ms, hop_length_ms, window_type,
cluster_features,
clustering_algo,
n_clusters,
dbscan_eps
],
outputs=[comp_plot, quality_dist_plot, comp_table, cluster_plot, cluster_table, spec_heatmap]
)
export_btn.click(
fn=export_results,
inputs=[comp_table, cluster_table],
outputs=export_files
)
if __name__ == "__main__":
demo.launch()