import gradio as gr import librosa import numpy as np import pandas as pd from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN from sklearn.metrics.pairwise import cosine_similarity from scipy.spatial.distance import jensenshannon from scipy.stats import pearsonr from scipy.signal import get_window as scipy_get_window import plotly.express as px import plotly.graph_objects as go import os import tempfile # ---------------------------- # Segment Audio into Frames # ---------------------------- def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"): frame_length = int(frame_length_ms * sr / 1000) hop_length = int(hop_length_ms * sr / 1000) window = scipy_get_window(window_type if window_type != "rectangular" else "boxcar", frame_length) frames = [] for i in range(0, len(y) - frame_length + 1, hop_length): frame = y[i:i + frame_length] * window frames.append(frame) if frames: frames = np.array(frames).T else: frames = np.zeros((frame_length, 1)) return frames, frame_length # ---------------------------- # Feature Extraction # ---------------------------- def extract_features_with_spectrum(frames, sr): features = [] n_mfcc = 13 n_fft = min(2048, frames.shape[0]) for i in range(frames.shape[1]): frame = frames[:, i] if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10: continue feat = {} try: feat["rms"] = float(np.mean(librosa.feature.rms(y=frame)[0])) except: feat["rms"] = 0.0 try: feat["spectral_centroid"] = float(np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0])) except: feat["spectral_centroid"] = 0.0 try: feat["zcr"] = float(np.mean(librosa.feature.zero_crossing_rate(frame)[0])) except: feat["zcr"] = 0.0 try: mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft) for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j])) except: for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 try: S = np.abs(librosa.stft(frame, n_fft=n_fft)) S_db = librosa.amplitude_to_db(S, ref=np.max) freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft) low_mask = freqs <= 2000 mid_mask = (freqs > 2000) & (freqs <= 4000) high_mask = freqs > 4000 feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else 0.0 feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else 0.0 feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else 0.0 feat["spectrum"] = S_db except: feat["low_freq_energy"] = feat["mid_freq_energy"] = feat["high_freq_energy"] = 0.0 feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1)) features.append(feat) if not features: feat = { "rms": 0.0, "spectral_centroid": 0.0, "zcr": 0.0, "low_freq_energy": 0.0, "mid_freq_energy": 0.0, "high_freq_energy": 0.0, "spectrum": np.zeros((n_fft // 2 + 1, 1)) } for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 features.append(feat) return features # ---------------------------- # Frame Comparison (core metrics) # ---------------------------- def compare_frames_enhanced(near_feats, far_feats, metrics): min_len = min(len(near_feats), len(far_feats)) if min_len == 0: return pd.DataFrame({"frame_index": []}) results = {"frame_index": list(range(min_len))} near_df = pd.DataFrame([f for f in near_feats[:min_len]]) far_df = pd.DataFrame([f for f in far_feats[:min_len]]) near_vec = near_df.drop(columns=["spectrum"], errors="ignore").values far_vec = far_df.drop(columns=["spectrum"], errors="ignore").values # Euclidean Distance if "Euclidean Distance" in metrics: results["euclidean_dist"] = np.linalg.norm(near_vec - far_vec, axis=1).tolist() # Cosine Similarity if "Cosine Similarity" in metrics: cos_vals = [] for i in range(min_len): a, b = near_vec[i].reshape(1, -1), far_vec[i].reshape(1, -1) if np.all(a == 0) or np.all(b == 0): cos_vals.append(0.0) else: cos_vals.append(float(cosine_similarity(a, b)[0][0])) results["cosine_similarity"] = cos_vals # High-Freq Loss Ratio (Quality) if "High-Freq Loss Ratio" in metrics: loss_ratios = [] for i in range(min_len): near_high = near_feats[i]["high_freq_energy"] far_high = far_feats[i]["high_freq_energy"] ratio = max(0.0, 1.0 - abs(near_high - far_high) / (abs(near_high) + 1e-6)) loss_ratios.append(float(ratio)) results["high_freq_quality"] = loss_ratios # 🔹 Energy Ratio energy_ratio = [] for i in range(min_len): near_rms = near_feats[i]["rms"]; far_rms = far_feats[i]["rms"] ratio = (far_rms + 1e-6) / (near_rms + 1e-6) energy_ratio.append(float(np.clip(ratio, 0, 1))) results["energy_ratio"] = energy_ratio # 🔹 Clarity Ratio clarity_ratio = [] for i in range(min_len): near_low, near_high = near_feats[i]["low_freq_energy"], near_feats[i]["high_freq_energy"] far_low, far_high = far_feats[i]["low_freq_energy"], far_feats[i]["high_freq_energy"] near_ratio, far_ratio = (near_low - near_high), (far_low - far_high) diff = 1 - abs(far_ratio - near_ratio) / (abs(near_ratio) + 1e-6) clarity_ratio.append(np.clip(diff, 0, 1)) results["clarity_ratio"] = clarity_ratio # 🔹 Spectral Overlap overlap_scores = [] for i in range(min_len): near_spec = near_feats[i]["spectrum"].flatten() far_spec = far_feats[i]["spectrum"].flatten() if np.all(near_spec == 0) or np.all(far_spec == 0): overlap_scores.append(0.0) else: overlap = float(cosine_similarity(near_spec.reshape(1, -1), far_spec.reshape(1, -1))[0][0]) overlap_scores.append(overlap) results["spectral_overlap"] = overlap_scores # 🔹 Combined Weighted Quality weights = { "cosine_similarity": 0.3, "high_freq_quality": 0.25, "energy_ratio": 0.2, "clarity_ratio": 0.15, "spectral_overlap": 0.1 } combined_quality = [] for i in range(min_len): val = sum(results[k][i] * w for k, w in weights.items() if k in results) combined_quality.append(float(val / sum(weights.values()))) results["combined_quality"] = combined_quality return pd.DataFrame(results) # ---------------------------- # Clustering + Overlay # ---------------------------- def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5): if not cluster_features: raise gr.Error("Please select at least one feature for clustering.") if len(features_df) == 0: features_df["cluster"] = [] return features_df X = features_df[cluster_features].values if algo == "KMeans": n_clusters = min(n_clusters, len(X)) model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) labels = model.fit_predict(X) elif algo == "Agglomerative": n_clusters = min(n_clusters, len(X)) model = AgglomerativeClustering(n_clusters=n_clusters) labels = model.fit_predict(X) elif algo == "DBSCAN": model = DBSCAN(eps=eps, min_samples=min(3, len(X))) labels = model.fit_predict(X) else: raise ValueError("Unknown clustering algorithm") features_df = features_df.copy() features_df["cluster"] = labels return features_df def plot_spectral_difference(near_feats, far_feats, frame_idx=0): if not near_feats or not far_feats or frame_idx >= len(near_feats) or frame_idx >= len(far_feats): fig = go.Figure(); fig.update_layout(title="No data available"); return fig near_spec = near_feats[frame_idx]["spectrum"]; far_spec = far_feats[frame_idx]["spectrum"] min_freq_bins = min(near_spec.shape[0], far_spec.shape[0]) diff = near_spec[:min_freq_bins] - far_spec[:min_freq_bins] fig = go.Figure(data=go.Heatmap(z=diff, colorscale='RdBu', zmid=0)) fig.update_layout(title=f"Spectral Difference (Frame {frame_idx})", height=300) return fig def plot_cluster_overlay(df, cluster_metric, overlay_metric): if cluster_metric not in df.columns or overlay_metric not in df.columns: fig = go.Figure(); fig.update_layout(title="Metrics not found"); return fig fig = px.scatter(df, x=cluster_metric, y=overlay_metric, color=overlay_metric, color_continuous_scale='Viridis', title=f"Cluster Overlay: {cluster_metric} vs {overlay_metric}") fig.update_layout(height=400) return fig # ---------------------------- # Main Analysis Function # ---------------------------- def analyze_audio_pair( near_file, far_file, frame_length_ms, hop_length_ms, window_type, comparison_metrics, cluster_features, clustering_algo, n_clusters, dbscan_eps ): if not near_file or not far_file: raise gr.Error("Upload both audio files.") try: y_near, sr_near = librosa.load(near_file.name, sr=None) y_far, sr_far = librosa.load(far_file.name, sr=None) except Exception as e: raise gr.Error(f"Error loading audio: {str(e)}") if sr_near != sr_far: y_far = librosa.resample(y_far, orig_sr=sr_far, target_sr=sr_near) sr = sr_near else: sr = sr_near frames_near, _ = segment_audio(y_near, sr, frame_length_ms, hop_length_ms, window_type) frames_far, _ = segment_audio(y_far, sr, frame_length_ms, hop_length_ms, window_type) near_feats = extract_features_with_spectrum(frames_near, sr) far_feats = extract_features_with_spectrum(frames_far, sr) comparison_df = compare_frames_enhanced(near_feats, far_feats, comparison_metrics) near_df = pd.DataFrame(near_feats).drop(columns=["spectrum"], errors="ignore") clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps) # Plots metric_cols = [col for col in comparison_df.columns if col != "frame_index"] plot_comparison = px.line(comparison_df, x="frame_index", y=metric_cols[0], title=f"{metric_cols[0].replace('_',' ').title()} Over Time") if metric_cols else px.line() if len(cluster_features) >= 2 and len(clustered_df) > 0: x_feat, y_feat = cluster_features[0], cluster_features[1] plot_scatter = px.scatter(clustered_df, x=x_feat, y=y_feat, color="cluster", title=f"Clustering: {x_feat} vs {y_feat}") else: plot_scatter = px.scatter(title="Select ≥2 features for clustering") spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=0) overlay_fig = plot_cluster_overlay(clustered_df, cluster_features[0], "combined_quality") return plot_comparison, comparison_df, plot_scatter, clustered_df, spec_heatmap, overlay_fig def export_results(comparison_df, clustered_df): temp_dir = tempfile.mkdtemp() comp_path = os.path.join(temp_dir, "frame_comparisons.csv") cluster_path = os.path.join(temp_dir, "clustered_frames.csv") comparison_df.to_csv(comp_path, index=False) clustered_df.to_csv(cluster_path, index=False) return [comp_path, cluster_path] # ---------------------------- # Gradio UI # ---------------------------- dummy_features = ["rms", "spectral_centroid", "zcr"] + [f"mfcc_{i}" for i in range(1,14)] + \ ["low_freq_energy", "mid_freq_energy", "high_freq_energy"] with gr.Blocks(title="Advanced Near vs Far Field Analyzer") as demo: gr.Markdown("# 🎙️ Advanced Near vs Far Field Speech Analyzer") with gr.Row(): near_file = gr.File(label="Near-Field Audio (.wav)", file_types=[".wav"]) far_file = gr.File(label="Far-Field Audio (.wav)") with gr.Accordion("⚙️ Frame Settings", open=True): frame_length_ms = gr.Slider(10, 500, value=50, step=1, label="Frame Length (ms)") hop_length_ms = gr.Slider(1, 250, value=25, step=1, label="Hop Length (ms)") window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type") with gr.Accordion("📊 Comparison Metrics", open=True): comparison_metrics = gr.CheckboxGroup( choices=[ "Euclidean Distance", "Cosine Similarity", "High-Freq Loss Ratio" ], value=["Cosine Similarity", "High-Freq Loss Ratio"], label="Select Metrics" ) with gr.Accordion("🧩 Clustering Configuration", open=True): cluster_features = gr.CheckboxGroup( choices=dummy_features, value=["rms", "spectral_centroid", "high_freq_energy"], label="Features for Clustering") clustering_algo = gr.Radio(["KMeans", "Agglomerative", "DBSCAN"], value="KMeans", label="Clustering Algorithm") n_clusters = gr.Slider(2, 20, value=5, step=1, label="Clusters (for KMeans/Agglomerative)") dbscan_eps = gr.Slider(0.1, 2.0, value=0.5, step=0.1, label="DBSCAN eps") btn = gr.Button("🚀 Analyze") with gr.Tabs(): with gr.Tab("📈 Frame Comparison"): comp_plot = gr.Plot(); comp_table = gr.Dataframe() with gr.Tab("🧩 Clustering"): cluster_plot = gr.Plot(); cluster_table = gr.Dataframe() with gr.Tab("🔍 Spectral Analysis"): spec_heatmap = gr.Plot(label="Spectral Difference (Near - Far)") with gr.Tab("🧭 Metric Overlay"): overlay_plot = gr.Plot(label="Metric Overlay") with gr.Tab("📤 Export"): export_btn = gr.Button("💾 Download CSVs"); export_files = gr.Files() btn.click(fn=analyze_audio_pair, inputs=[near_file, far_file, frame_length_ms, hop_length_ms, window_type, comparison_metrics, cluster_features, clustering_algo, n_clusters, dbscan_eps], outputs=[comp_plot, comp_table, cluster_plot, cluster_table, spec_heatmap, overlay_plot]) export_btn.click(fn=export_results, inputs=[comp_table, cluster_table], outputs=export_files) if __name__ == "__main__": demo.launch()