import gradio as gr import librosa import numpy as np import pandas as pd from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN from sklearn.metrics.pairwise import cosine_similarity from scipy.spatial.distance import jensenshannon from scipy import signal from scipy.signal import get_window as scipy_get_window import plotly.express as px import plotly.graph_objects as go import os import tempfile # ---------------------------- # 1. Signal Alignment & Preprocessing (NEW) # ---------------------------- def align_signals(ref, target): """ Aligns target signal (Far Field) to reference signal (Near Field) using Cross-Correlation to fix time-of-arrival delays. """ # Normalize both to prevent amplitude from skewing correlation ref_norm = librosa.util.normalize(ref) target_norm = librosa.util.normalize(target) # correlated = signal.correlate(target_norm, ref_norm, mode='full') # Use FFT-based correlation for speed on longer audio correlation = signal.fftconvolve(target_norm, ref_norm[::-1], mode='full') lags = signal.correlation_lags(len(target_norm), len(ref_norm), mode='full') lag = lags[np.argmax(correlation)] print(f"Calculated Lag: {lag} samples") if lag > 0: # Target is "ahead" (starts later in the array structure relative to overlap) # Shift target back aligned_target = target[lag:] aligned_ref = ref else: # Target is "behind" (delayed), typical for Far Field # Shift target forward (padding start) or slice Ref # Easier strategy: slice Ref to match where Target starts aligned_target = target aligned_ref = ref[abs(lag):] # Truncate to same length min_len = min(len(aligned_ref), len(aligned_target)) return aligned_ref[:min_len], aligned_target[:min_len] # ---------------------------- # 2. Segment Audio into Frames # ---------------------------- def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"): frame_length = int(frame_length_ms * sr / 1000) hop_length = int(hop_length_ms * sr / 1000) window = scipy_get_window(window_type if window_type != "rectangular" else "boxcar", frame_length) frames = [] # Pad to ensure we don't drop the last partial frame y_padded = np.pad(y, (0, frame_length), mode='constant') for i in range(0, len(y) - frame_length + 1, hop_length): frame = y[i:i + frame_length] * window frames.append(frame) if frames: frames = np.array(frames).T else: frames = np.zeros((frame_length, 1)) return frames, frame_length # ---------------------------- # 3. Feature Extraction # ---------------------------- def extract_features_with_spectrum(frames, sr): features = [] n_mfcc = 13 n_fft = min(2048, frames.shape[0]) for i in range(frames.shape[1]): frame = frames[:, i] # Skip empty/silent frames to prevent NaN if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10: feat = {k: 0.0 for k in ["rms", "spectral_centroid", "zcr", "spectral_flatness", "low_freq_energy", "mid_freq_energy", "high_freq_energy"]} for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1)) features.append(feat) continue feat = {} # Basic feat["rms"] = float(np.mean(librosa.feature.rms(y=frame)[0])) feat["zcr"] = float(np.mean(librosa.feature.zero_crossing_rate(frame)[0])) # Spectral try: feat["spectral_centroid"] = float(np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0])) except: feat["spectral_centroid"] = 0.0 # Reverb Metric (NEW) try: feat["spectral_flatness"] = float(np.mean(librosa.feature.spectral_flatness(y=frame)[0])) except: feat["spectral_flatness"] = 0.0 # MFCC try: mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft) for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j])) except: for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 # Frequency Bands try: S = np.abs(librosa.stft(frame, n_fft=n_fft)) S_db = librosa.amplitude_to_db(S, ref=np.max) freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft) low_mask = freqs <= 2000 mid_mask = (freqs > 2000) & (freqs <= 4000) high_mask = freqs > 4000 feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else -80.0 feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else -80.0 feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else -80.0 feat["spectrum"] = S_db except: feat["low_freq_energy"] = feat["mid_freq_energy"] = feat["high_freq_energy"] = -80.0 feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1)) features.append(feat) return features # ---------------------------- # 4. Frame Comparison Logic # ---------------------------- def compare_frames_enhanced(near_feats, far_feats, metrics): min_len = min(len(near_feats), len(far_feats)) if min_len == 0: return pd.DataFrame({"frame_index": []}) results = {"frame_index": list(range(min_len))} near_df = pd.DataFrame([f for f in near_feats[:min_len]]) far_df = pd.DataFrame([f for f in far_feats[:min_len]]) # Feature Vectors (exclude non-numeric or high-dim cols) drop_cols = ["spectrum"] near_vec = near_df.drop(columns=drop_cols, errors="ignore").values far_vec = far_df.drop(columns=drop_cols, errors="ignore").values # Euclidean Distance if "Euclidean Distance" in metrics: results["euclidean_dist"] = np.linalg.norm(near_vec - far_vec, axis=1).tolist() # Cosine Similarity if "Cosine Similarity" in metrics: cos_vals = [] for i in range(min_len): a, b = near_vec[i].reshape(1, -1), far_vec[i].reshape(1, -1) if np.all(a == 0) or np.all(b == 0): cos_vals.append(0.0) else: cos_vals.append(float(cosine_similarity(a, b)[0][0])) results["cosine_similarity"] = cos_vals # High-Freq Loss Ratio if "High-Freq Loss Ratio" in metrics: loss_ratios = [] for i in range(min_len): near_high = near_feats[i]["high_freq_energy"] far_high = far_feats[i]["high_freq_energy"] # Energy is in dB (negative), so we look at the difference # Simple diff: Near (-20dB) - Far (-30dB) = 10dB loss diff = near_high - far_high loss_ratios.append(float(diff)) results["high_freq_loss_db"] = loss_ratios # Spectral Flatness Difference (Reverberation Check) flatness_diff = [] for i in range(min_len): n_flat = near_feats[i]["spectral_flatness"] f_flat = far_feats[i]["spectral_flatness"] flatness_diff.append(f_flat - n_flat) # Postive usually means more noise/reverb results["flatness_increase"] = flatness_diff # Spectral Overlap overlap_scores = [] for i in range(min_len): near_spec = near_feats[i]["spectrum"].flatten() far_spec = far_feats[i]["spectrum"].flatten() if np.all(near_spec == 0) or np.all(far_spec == 0): overlap_scores.append(0.0) else: overlap = float(cosine_similarity(near_spec.reshape(1, -1), far_spec.reshape(1, -1))[0][0]) overlap_scores.append(overlap) results["spectral_overlap"] = overlap_scores # Combined Quality Score (0 to 1 approximate) # Higher overlap + Higher Cosine + Lower Loss = Better Quality combined = [] for i in range(min_len): score = (results["spectral_overlap"][i] * 0.5) if "cosine_similarity" in results: score += (results["cosine_similarity"][i] * 0.5) combined.append(score) results["combined_match_score"] = combined return pd.DataFrame(results) # ---------------------------- # 5. Clustering & Visualization # ---------------------------- def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5): if not cluster_features: return features_df # Ensure selected features exist in DF valid_features = [f for f in cluster_features if f in features_df.columns] if not valid_features: return features_df X = features_df[valid_features].values # Handle NaN/Inf just in case X = np.nan_to_num(X) if len(X) < 5: features_df["cluster"] = -1 return features_df if algo == "KMeans": n_clusters = min(n_clusters, len(X)) model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) labels = model.fit_predict(X) elif algo == "Agglomerative": n_clusters = min(n_clusters, len(X)) model = AgglomerativeClustering(n_clusters=n_clusters) labels = model.fit_predict(X) elif algo == "DBSCAN": model = DBSCAN(eps=eps, min_samples=min(3, len(X))) labels = model.fit_predict(X) else: labels = np.zeros(len(X)) features_df = features_df.copy() features_df["cluster"] = labels return features_df def plot_spectral_difference(near_feats, far_feats, frame_idx=0): if not near_feats or not far_feats: fig = go.Figure(); fig.update_layout(title="No data"); return fig safe_idx = min(frame_idx, len(near_feats)-1, len(far_feats)-1) near_spec = near_feats[safe_idx]["spectrum"] far_spec = far_feats[safe_idx]["spectrum"] min_freq_bins = min(near_spec.shape[0], far_spec.shape[0]) diff = near_spec[:min_freq_bins] - far_spec[:min_freq_bins] fig = go.Figure(data=go.Heatmap(z=diff, colorscale='RdBu', zmid=0)) fig.update_layout( title=f"Spectral Difference (Frame {safe_idx}) [Near - Far]", yaxis_title="Frequency Bin", xaxis_title="Time (within frame)", height=350 ) return fig # ---------------------------- # 6. Main Analysis Logic # ---------------------------- def analyze_audio_pair( near_file, far_file, frame_length_ms, hop_length_ms, window_type, comparison_metrics, cluster_features, clustering_algo, n_clusters, dbscan_eps ): if not near_file or not far_file: raise gr.Error("Please upload both audio files.") # 1. Load Audio # Load Near try: y_near, sr_near = librosa.load(near_file.name, sr=None) except: raise gr.Error("Failed to load Near Field audio.") # Load Far (Force resample to match Near) try: y_far, sr_far = librosa.load(far_file.name, sr=sr_near) except: raise gr.Error("Failed to load Far Field audio.") # 2. Normalize and Align (CRITICAL STEP) y_near = librosa.util.normalize(y_near) y_far = librosa.util.normalize(y_far) gr.Info("Aligning signals (calculating time delay)...") y_near, y_far = align_signals(y_near, y_far) # 3. Segment frames_near, _ = segment_audio(y_near, sr_near, frame_length_ms, hop_length_ms, window_type) frames_far, _ = segment_audio(y_far, sr_near, frame_length_ms, hop_length_ms, window_type) # 4. Extract gr.Info("Extracting features...") near_feats = extract_features_with_spectrum(frames_near, sr_near) far_feats = extract_features_with_spectrum(frames_far, sr_near) # 5. Compare comparison_df = compare_frames_enhanced(near_feats, far_feats, comparison_metrics) # 6. Cluster (on Near field features usually, to classify phonemes) near_df = pd.DataFrame(near_feats).drop(columns=["spectrum"], errors="ignore") clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps) # 7. Visuals metric_cols = [c for c in comparison_df.columns if c != "frame_index"] if metric_cols: plot_comparison = px.line(comparison_df, x="frame_index", y=metric_cols, title="Frame-by-Frame Comparison Metrics") else: plot_comparison = px.line(title="No metrics selected") if len(cluster_features) >= 2: x_f, y_f = cluster_features[0], cluster_features[1] plot_scatter = px.scatter(clustered_df, x=x_f, y=y_f, color="cluster", title=f"Clustering Analysis (Near Field): {x_f} vs {y_f}") else: plot_scatter = px.scatter(title="Select at least 2 features to visualize clusters") spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=int(len(near_feats)/2)) # Metric Overlay: Combine Clustering with Quality # Add combined score to clustered df for visualization clustered_df["match_quality"] = comparison_df["combined_match_score"] if len(cluster_features) > 0: overlay_fig = px.scatter(clustered_df, x=cluster_features[0], y="match_quality", color="cluster", title=f"Cluster vs. Match Quality ({cluster_features[0]})") else: overlay_fig = px.scatter(title="Not enough data for overlay") return plot_comparison, comparison_df, plot_scatter, clustered_df, spec_heatmap, overlay_fig def export_results(comparison_df, clustered_df): temp_dir = tempfile.mkdtemp() comp_path = os.path.join(temp_dir, "frame_comparisons.csv") cluster_path = os.path.join(temp_dir, "clustered_frames.csv") comparison_df.to_csv(comp_path, index=False) clustered_df.to_csv(cluster_path, index=False) return [comp_path, cluster_path] # ---------------------------- # 7. Gradio UI # ---------------------------- # Expanded feature list for UI feature_list = ["rms", "spectral_centroid", "zcr", "spectral_flatness", "low_freq_energy", "mid_freq_energy", "high_freq_energy"] + \ [f"mfcc_{i}" for i in range(1, 14)] with gr.Blocks(title="Corrected Near vs Far Field Analyzer", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎙️ Corrected Near vs Far Field Analyzer **Now includes:** Automatic Time Alignment (Cross-Correlation), Normalization, and Reverb Detection. """) with gr.Row(): with gr.Column(): near_file = gr.File(label="Near-Field Audio (Reference)", file_types=[".wav", ".mp3"]) with gr.Column(): far_file = gr.File(label="Far-Field Audio (Target)", file_types=[".wav", ".mp3"]) with gr.Accordion("⚙️ Analysis Settings", open=False): with gr.Row(): frame_length_ms = gr.Slider(10, 200, value=30, step=5, label="Frame Length (ms)") hop_length_ms = gr.Slider(5, 100, value=15, step=5, label="Hop Length (ms)") window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type") with gr.Accordion("📊 Metrics & Clustering", open=False): comparison_metrics = gr.CheckboxGroup( choices=["Euclidean Distance", "Cosine Similarity", "High-Freq Loss Ratio"], value=["Cosine Similarity", "High-Freq Loss Ratio"], label="Comparison Metrics" ) cluster_features = gr.CheckboxGroup( choices=feature_list, value=["spectral_centroid", "spectral_flatness", "high_freq_energy"], label="Features for Clustering (Select >= 2)" ) with gr.Row(): clustering_algo = gr.Dropdown(["KMeans", "Agglomerative", "DBSCAN"], value="KMeans", label="Algorithm") n_clusters = gr.Slider(2, 10, value=4, step=1, label="Num Clusters") dbscan_eps = gr.Slider(0.1, 5.0, value=0.5, label="DBSCAN Epsilon") btn = gr.Button("🚀 Align & Analyze", variant="primary") with gr.Tabs(): with gr.Tab("📈 Time Series Comparison"): comp_plot = gr.Plot() # CORRECTED: Replaced height=200 with row_count=10 comp_table = gr.Dataframe(row_count=10) with gr.Tab("🧩 Phoneme Clustering"): cluster_plot = gr.Plot() # CORRECTED: Replaced height=200 with row_count=10 cluster_table = gr.Dataframe(row_count=10) with gr.Tab("🔍 Spectral Check"): gr.Markdown("Difference Heatmap (Near - Far). Blue = Near has more energy. Red = Far has more energy.") spec_heatmap = gr.Plot() with gr.Tab("🧭 Quality Overlay"): overlay_plot = gr.Plot() with gr.Tab("📤 Export"): export_btn = gr.Button("💾 Download Results") export_files = gr.Files() btn.click(fn=analyze_audio_pair, inputs=[near_file, far_file, frame_length_ms, hop_length_ms, window_type, comparison_metrics, cluster_features, clustering_algo, n_clusters, dbscan_eps], outputs=[comp_plot, comp_table, cluster_plot, cluster_table, spec_heatmap, overlay_plot]) export_btn.click(fn=export_results, inputs=[comp_table, cluster_table], outputs=export_files) if __name__ == "__main__": demo.launch()