MSD / app1.py
AdityaK007's picture
Create app1.py
5b17c9f verified
import gradio as gr
import librosa
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
from scipy.spatial.distance import jensenshannon
from scipy.stats import pearsonr
from scipy.signal import get_window as scipy_get_window
import plotly.express as px
import plotly.graph_objects as go
import os
import tempfile
# ----------------------------
# Fixed: Added missing segment_audio function
# ----------------------------
def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"):
"""Segment audio into frames with specified windowing"""
frame_length = int(frame_length_ms * sr / 1000)
hop_length = int(hop_length_ms * sr / 1000)
# Get window function
if window_type == "rectangular":
window = scipy_get_window('boxcar', frame_length)
else:
window = scipy_get_window(window_type, frame_length)
frames = []
for i in range(0, len(y) - frame_length + 1, hop_length):
frame = y[i:i + frame_length] * window
frames.append(frame)
# Convert to 2D array (frames x samples)
if frames:
frames = np.array(frames).T
else:
# If audio is too short, create at least one frame with zero-padding
frames = np.zeros((frame_length, 1))
return frames, frame_length
# ----------------------------
# Enhanced Feature Extraction (with spectral bins)
# ----------------------------
def extract_features_with_spectrum(frames, sr):
features = []
n_mfcc = 13
n_fft = min(2048, frames.shape[0]) # Fixed: Ensure n_fft <= frame length
for i in range(frames.shape[1]):
frame = frames[:, i]
# Skip if frame is too short or silent
if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10:
continue
feat = {}
# Basic features with error handling
try:
rms = np.mean(librosa.feature.rms(y=frame)[0])
feat["rms"] = float(rms)
except:
feat["rms"] = 0.0
try:
sc = np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0])
feat["spectral_centroid"] = float(sc)
except:
feat["spectral_centroid"] = 0.0
try:
zcr = np.mean(librosa.feature.zero_crossing_rate(frame)[0])
feat["zcr"] = float(zcr)
except:
feat["zcr"] = 0.0
try:
mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft)
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j]))
except:
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = 0.0
# Spectral bins for lost frequencies
try:
S = np.abs(librosa.stft(frame, n_fft=n_fft))
S_db = librosa.amplitude_to_db(S, ref=np.max)
freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
# Split spectrum: low (<2kHz), mid (2-4kHz), high (>4kHz)
low_mask = freqs <= 2000
mid_mask = (freqs > 2000) & (freqs <= 4000)
high_mask = freqs > 4000
feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else 0.0
feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else 0.0
feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else 0.0
# Store full spectrum for later (optional)
feat["spectrum"] = S_db # will be used for heatmap
except:
feat["low_freq_energy"] = 0.0
feat["mid_freq_energy"] = 0.0
feat["high_freq_energy"] = 0.0
feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1))
features.append(feat)
# Handle case where no features were extracted
if not features:
# Create one dummy feature set to avoid errors
feat = {
"rms": 0.0, "spectral_centroid": 0.0, "zcr": 0.0,
"low_freq_energy": 0.0, "mid_freq_energy": 0.0, "high_freq_energy": 0.0,
"spectrum": np.zeros((n_fft // 2 + 1, 1))
}
for j in range(n_mfcc):
feat[f"mfcc_{j+1}"] = 0.0
features.append(feat)
return features
def compare_frames_enhanced(near_feats, far_feats, metrics):
min_len = min(len(near_feats), len(far_feats))
if min_len == 0:
return pd.DataFrame({"frame_index": []})
results = {"frame_index": list(range(min_len))}
# Prepare vectors
near_df = pd.DataFrame([f for f in near_feats[:min_len]])
far_df = pd.DataFrame([f for f in far_feats[:min_len]])
# Remove non-numeric columns
near_vec = near_df.drop(columns=["spectrum"], errors="ignore").values
far_vec = far_df.drop(columns=["spectrum"], errors="ignore").values
# 1. Euclidean Distance
if "Euclidean Distance" in metrics:
results["euclidean_dist"] = np.linalg.norm(near_vec - far_vec, axis=1).tolist()
# 2. Cosine Similarity
if "Cosine Similarity" in metrics:
cos_vals = []
for i in range(min_len):
a, b = near_vec[i].reshape(1, -1), far_vec[i].reshape(1, -1)
# Handle zero vectors
if np.all(a == 0) and np.all(b == 0):
cos_vals.append(1.0)
elif np.all(a == 0) or np.all(b == 0):
cos_vals.append(0.0)
else:
cos_vals.append(float(cosine_similarity(a, b)[0][0]))
results["cosine_similarity"] = cos_vals
# 3. Pearson Correlation
if "Pearson Correlation" in metrics:
corr_vals = []
for i in range(min_len):
try:
corr, _ = pearsonr(near_vec[i], far_vec[i])
corr_vals.append(float(corr) if not np.isnan(corr) else 0.0)
except:
corr_vals.append(0.0)
results["pearson_corr"] = corr_vals
# 4. KL Divergence (on normalized features)
if "KL Divergence" in metrics:
kl_vals = []
for i in range(min_len):
try:
p = near_vec[i] - near_vec[i].min() + 1e-8
q = far_vec[i] - far_vec[i].min() + 1e-8
p /= p.sum()
q /= q.sum()
kl = np.sum(p * np.log(p / q))
kl_vals.append(float(kl))
except:
kl_vals.append(0.0)
results["kl_divergence"] = kl_vals
# 5. Jensen-Shannon Divergence (symmetric, safer)
if "Jensen-Shannon Divergence" in metrics:
js_vals = []
for i in range(min_len):
try:
p = near_vec[i] - near_vec[i].min() + 1e-8
q = far_vec[i] - far_vec[i].min() + 1e-8
p /= p.sum()
q /= q.sum()
js = jensenshannon(p, q)
js_vals.append(float(js))
except:
js_vals.append(0.0)
results["js_divergence"] = js_vals
# 6. Lost High Frequencies Ratio
if "High-Freq Loss Ratio" in metrics:
loss_ratios = []
for i in range(min_len):
try:
near_high = near_feats[i]["high_freq_energy"]
far_high = far_feats[i]["high_freq_energy"]
# Ratio: how much high-freq energy is lost (positive = loss)
ratio = near_high - far_high # in dB
loss_ratios.append(float(ratio))
except:
loss_ratios.append(0.0)
results["high_freq_loss_db"] = loss_ratios
# 7. Spectral Centroid Shift
if "Spectral Centroid Shift" in metrics:
shifts = []
for i in range(min_len):
try:
shift = near_feats[i]["spectral_centroid"] - far_feats[i]["spectral_centroid"]
shifts.append(float(shift))
except:
shifts.append(0.0)
results["centroid_shift"] = shifts
return pd.DataFrame(results)
def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5):
if not cluster_features:
raise gr.Error("Please select at least one feature for clustering.")
if len(features_df) == 0:
features_df["cluster"] = []
return features_df
X = features_df[cluster_features].values
if algo == "KMeans":
n_clusters = min(n_clusters, len(X)) # Fixed: Cannot have more clusters than samples
model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = model.fit_predict(X)
elif algo == "Agglomerative":
n_clusters = min(n_clusters, len(X))
model = AgglomerativeClustering(n_clusters=n_clusters)
labels = model.fit_predict(X)
elif algo == "DBSCAN":
# Fixed: DBSCAN doesn't use n_clusters parameter
model = DBSCAN(eps=eps, min_samples=min(3, len(X)))
labels = model.fit_predict(X)
else:
raise ValueError("Unknown clustering algorithm")
features_df = features_df.copy()
features_df["cluster"] = labels
return features_df
def plot_spectral_difference(near_feats, far_feats, frame_idx=0):
if not near_feats or not far_feats or frame_idx >= len(near_feats) or frame_idx >= len(far_feats):
# Return empty plot
fig = go.Figure()
fig.update_layout(title="No data available for spectral analysis", height=300)
return fig
near_spec = near_feats[frame_idx]["spectrum"]
far_spec = far_feats[frame_idx]["spectrum"]
# Ensure both spectrograms have the same shape
min_freq_bins = min(near_spec.shape[0], far_spec.shape[0])
near_spec = near_spec[:min_freq_bins]
far_spec = far_spec[:min_freq_bins]
diff = near_spec - far_spec # positive = energy lost in far-field
fig = go.Figure(data=go.Heatmap(
z=diff, # Fixed: Removed extra list brackets
colorscale='RdBu',
zmid=0,
colorbar=dict(title="dB Difference")
))
fig.update_layout(
title=f"Spectral Difference (Frame {frame_idx}): Near - Far",
xaxis_title="Time Frames",
yaxis_title="Frequency Bins",
height=300
)
return fig
# ----------------------------
# Main Analysis Function
# ----------------------------
def analyze_audio_pair(
near_file,
far_file,
frame_length_ms,
hop_length_ms,
window_type,
comparison_metrics,
cluster_features,
clustering_algo,
n_clusters,
dbscan_eps
):
if not near_file or not far_file:
raise gr.Error("Upload both audio files.")
try:
# Fixed: Use librosa.load instead of non-existent librosa.load_audio
y_near, sr_near = librosa.load(near_file.name, sr=None)
y_far, sr_far = librosa.load(far_file.name, sr=None)
except Exception as e:
raise gr.Error(f"Error loading audio files: {str(e)}")
if sr_near != sr_far:
y_far = librosa.resample(y_far, orig_sr=sr_far, target_sr=sr_near)
sr = sr_near
else:
sr = sr_near
frames_near, frame_length = segment_audio(y_near, sr, frame_length_ms, hop_length_ms, window_type)
frames_far, _ = segment_audio(y_far, sr, frame_length_ms, hop_length_ms, window_type)
near_feats = extract_features_with_spectrum(frames_near, sr)
far_feats = extract_features_with_spectrum(frames_far, sr)
# Comparison
comparison_df = compare_frames_enhanced(near_feats, far_feats, comparison_metrics)
# Clustering (on near-field)
near_df = pd.DataFrame(near_feats)
near_df = near_df.drop(columns=["spectrum"], errors="ignore")
clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps)
# Plots
plot_comparison = None
if comparison_df.shape[1] > 1 and len(comparison_df) > 0:
metric_cols = [col for col in comparison_df.columns if col != "frame_index"]
if metric_cols:
metric_to_plot = metric_cols[0]
plot_comparison = px.line(
comparison_df,
x="frame_index",
y=metric_to_plot,
title=f"{metric_to_plot.replace('_', ' ').title()} Over Time"
)
else:
plot_comparison = px.line(title="No comparison metrics available")
else:
plot_comparison = px.line(title="No comparison data available")
# Scatter: user-selected features
plot_scatter = None
if len(cluster_features) >= 2 and len(clustered_df) > 0:
x_feat, y_feat = cluster_features[0], cluster_features[1]
if x_feat in clustered_df.columns and y_feat in clustered_df.columns:
plot_scatter = px.scatter(
clustered_df,
x=x_feat,
y=y_feat,
color="cluster",
title=f"Clustering: {x_feat} vs {y_feat}",
hover_data=["cluster"]
)
else:
plot_scatter = px.scatter(title="Selected features not available in data")
else:
plot_scatter = px.scatter(title="Select β‰₯2 features for scatter plot")
# Spectral difference heatmap (first frame)
spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=0)
return (
plot_comparison,
comparison_df,
plot_scatter,
clustered_df,
spec_heatmap
)
def export_results(comparison_df, clustered_df):
temp_dir = tempfile.mkdtemp()
comp_path = os.path.join(temp_dir, "frame_comparisons.csv")
cluster_path = os.path.join(temp_dir, "clustered_frames.csv")
comparison_df.to_csv(comp_path, index=False)
clustered_df.to_csv(cluster_path, index=False)
return [comp_path, cluster_path]
# ----------------------------
# Gradio UI
# ----------------------------
# Get feature names dynamically
dummy_features = ["rms", "spectral_centroid", "zcr"] + [f"mfcc_{i}" for i in range(1,14)] + \
["low_freq_energy", "mid_freq_energy", "high_freq_energy"]
with gr.Blocks(title="Advanced Near vs Far Field Analyzer") as demo:
gr.Markdown("# πŸŽ™οΈ Advanced Near vs Far Field Speech Analyzer")
gr.Markdown("Upload simultaneous recordings. Analyze **lost frequencies**, **frame degradation**, and **cluster by custom attributes**.")
with gr.Row():
near_file = gr.File(label="Near-Field Audio (.wav)", file_types=[".wav"])
far_file = gr.File(label="Far-Field Audio (.wav)", file_types=[".wav"])
with gr.Accordion("βš™οΈ Frame Settings", open=True):
frame_length_ms = gr.Slider(10, 500, value=50, step=1, label="Frame Length (ms)")
hop_length_ms = gr.Slider(1, 250, value=25, step=1, label="Hop Length (ms)")
window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type")
with gr.Accordion("πŸ“Š Comparison Metrics", open=True):
comparison_metrics = gr.CheckboxGroup(
choices=[
"Euclidean Distance",
"Cosine Similarity",
"Pearson Correlation",
"KL Divergence",
"Jensen-Shannon Divergence",
"High-Freq Loss Ratio",
"Spectral Centroid Shift"
],
value=["High-Freq Loss Ratio", "Cosine Similarity"],
label="Select Comparison Metrics"
)
with gr.Accordion("🧩 Clustering Configuration", open=True):
cluster_features = gr.CheckboxGroup(
choices=dummy_features,
value=["rms", "spectral_centroid", "high_freq_energy"],
label="Features to Use for Clustering"
)
clustering_algo = gr.Radio(
["KMeans", "Agglomerative", "DBSCAN"],
value="KMeans",
label="Clustering Algorithm"
)
n_clusters = gr.Slider(2, 20, value=5, step=1, label="Number of Clusters (for KMeans/Agglomerative)")
dbscan_eps = gr.Slider(0.1, 2.0, value=0.5, step=0.1, label="DBSCAN eps (neighborhood radius)")
btn = gr.Button("πŸš€ Analyze")
with gr.Tabs():
with gr.Tab("πŸ“ˆ Frame Comparison"):
comp_plot = gr.Plot()
comp_table = gr.Dataframe()
with gr.Tab("🧩 Clustering"):
cluster_plot = gr.Plot()
cluster_table = gr.Dataframe()
with gr.Tab("πŸ” Spectral Analysis"):
spec_heatmap = gr.Plot(label="Spectral Difference (Near - Far)")
with gr.Tab("πŸ“€ Export"):
export_btn = gr.Button("πŸ’Ύ Download CSVs")
export_files = gr.Files()
btn.click(
fn=analyze_audio_pair,
inputs=[
near_file, far_file,
frame_length_ms, hop_length_ms, window_type,
comparison_metrics,
cluster_features,
clustering_algo,
n_clusters,
dbscan_eps
],
outputs=[comp_plot, comp_table, cluster_plot, cluster_table, spec_heatmap]
)
export_btn.click(
fn=export_results,
inputs=[comp_table, cluster_table],
outputs=export_files
)
if __name__ == "__main__":
demo.launch()