Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import librosa | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN | |
| from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances | |
| from scipy.spatial.distance import jensenshannon | |
| from scipy.stats import pearsonr | |
| from scipy.signal import get_window as scipy_get_window | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import os | |
| import tempfile | |
| # ---------------------------- | |
| # Fixed: Added missing segment_audio function | |
| # ---------------------------- | |
| def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"): | |
| """Segment audio into frames with specified windowing""" | |
| frame_length = int(frame_length_ms * sr / 1000) | |
| hop_length = int(hop_length_ms * sr / 1000) | |
| # Get window function | |
| if window_type == "rectangular": | |
| window = scipy_get_window('boxcar', frame_length) | |
| else: | |
| window = scipy_get_window(window_type, frame_length) | |
| frames = [] | |
| for i in range(0, len(y) - frame_length + 1, hop_length): | |
| frame = y[i:i + frame_length] * window | |
| frames.append(frame) | |
| # Convert to 2D array (frames x samples) | |
| if frames: | |
| frames = np.array(frames).T | |
| else: | |
| # If audio is too short, create at least one frame with zero-padding | |
| frames = np.zeros((frame_length, 1)) | |
| return frames, frame_length | |
| # ---------------------------- | |
| # Enhanced Feature Extraction (with spectral bins) | |
| # ---------------------------- | |
| def extract_features_with_spectrum(frames, sr): | |
| features = [] | |
| n_mfcc = 13 | |
| n_fft = min(2048, frames.shape[0]) # Fixed: Ensure n_fft <= frame length | |
| for i in range(frames.shape[1]): | |
| frame = frames[:, i] | |
| # Skip if frame is too short or silent | |
| if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10: | |
| continue | |
| feat = {} | |
| # Basic features with error handling | |
| try: | |
| rms = np.mean(librosa.feature.rms(y=frame)[0]) | |
| feat["rms"] = float(rms) | |
| except: | |
| feat["rms"] = 0.0 | |
| try: | |
| sc = np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0]) | |
| feat["spectral_centroid"] = float(sc) | |
| except: | |
| feat["spectral_centroid"] = 0.0 | |
| try: | |
| zcr = np.mean(librosa.feature.zero_crossing_rate(frame)[0]) | |
| feat["zcr"] = float(zcr) | |
| except: | |
| feat["zcr"] = 0.0 | |
| try: | |
| mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft) | |
| for j in range(n_mfcc): | |
| feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j])) | |
| except: | |
| for j in range(n_mfcc): | |
| feat[f"mfcc_{j+1}"] = 0.0 | |
| # Spectral bins for lost frequencies | |
| try: | |
| S = np.abs(librosa.stft(frame, n_fft=n_fft)) | |
| S_db = librosa.amplitude_to_db(S, ref=np.max) | |
| freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft) | |
| # Split spectrum: low (<2kHz), mid (2-4kHz), high (>4kHz) | |
| low_mask = freqs <= 2000 | |
| mid_mask = (freqs > 2000) & (freqs <= 4000) | |
| high_mask = freqs > 4000 | |
| feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else 0.0 | |
| feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else 0.0 | |
| feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else 0.0 | |
| # Store full spectrum for later (optional) | |
| feat["spectrum"] = S_db # will be used for heatmap | |
| except: | |
| feat["low_freq_energy"] = 0.0 | |
| feat["mid_freq_energy"] = 0.0 | |
| feat["high_freq_energy"] = 0.0 | |
| feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1)) | |
| features.append(feat) | |
| # Handle case where no features were extracted | |
| if not features: | |
| # Create one dummy feature set to avoid errors | |
| feat = { | |
| "rms": 0.0, "spectral_centroid": 0.0, "zcr": 0.0, | |
| "low_freq_energy": 0.0, "mid_freq_energy": 0.0, "high_freq_energy": 0.0, | |
| "spectrum": np.zeros((n_fft // 2 + 1, 1)) | |
| } | |
| for j in range(n_mfcc): | |
| feat[f"mfcc_{j+1}"] = 0.0 | |
| features.append(feat) | |
| return features | |
| def compare_frames_enhanced(near_feats, far_feats, metrics): | |
| min_len = min(len(near_feats), len(far_feats)) | |
| if min_len == 0: | |
| return pd.DataFrame({"frame_index": []}) | |
| results = {"frame_index": list(range(min_len))} | |
| # Prepare vectors | |
| near_df = pd.DataFrame([f for f in near_feats[:min_len]]) | |
| far_df = pd.DataFrame([f for f in far_feats[:min_len]]) | |
| # Remove non-numeric columns | |
| near_vec = near_df.drop(columns=["spectrum"], errors="ignore").values | |
| far_vec = far_df.drop(columns=["spectrum"], errors="ignore").values | |
| # 1. Euclidean Distance | |
| if "Euclidean Distance" in metrics: | |
| results["euclidean_dist"] = np.linalg.norm(near_vec - far_vec, axis=1).tolist() | |
| # 2. Cosine Similarity | |
| if "Cosine Similarity" in metrics: | |
| cos_vals = [] | |
| for i in range(min_len): | |
| a, b = near_vec[i].reshape(1, -1), far_vec[i].reshape(1, -1) | |
| # Handle zero vectors | |
| if np.all(a == 0) and np.all(b == 0): | |
| cos_vals.append(1.0) | |
| elif np.all(a == 0) or np.all(b == 0): | |
| cos_vals.append(0.0) | |
| else: | |
| cos_vals.append(float(cosine_similarity(a, b)[0][0])) | |
| results["cosine_similarity"] = cos_vals | |
| # 3. Pearson Correlation | |
| if "Pearson Correlation" in metrics: | |
| corr_vals = [] | |
| for i in range(min_len): | |
| try: | |
| corr, _ = pearsonr(near_vec[i], far_vec[i]) | |
| corr_vals.append(float(corr) if not np.isnan(corr) else 0.0) | |
| except: | |
| corr_vals.append(0.0) | |
| results["pearson_corr"] = corr_vals | |
| # 4. KL Divergence (on normalized features) | |
| if "KL Divergence" in metrics: | |
| kl_vals = [] | |
| for i in range(min_len): | |
| try: | |
| p = near_vec[i] - near_vec[i].min() + 1e-8 | |
| q = far_vec[i] - far_vec[i].min() + 1e-8 | |
| p /= p.sum() | |
| q /= q.sum() | |
| kl = np.sum(p * np.log(p / q)) | |
| kl_vals.append(float(kl)) | |
| except: | |
| kl_vals.append(0.0) | |
| results["kl_divergence"] = kl_vals | |
| # 5. Jensen-Shannon Divergence (symmetric, safer) | |
| if "Jensen-Shannon Divergence" in metrics: | |
| js_vals = [] | |
| for i in range(min_len): | |
| try: | |
| p = near_vec[i] - near_vec[i].min() + 1e-8 | |
| q = far_vec[i] - far_vec[i].min() + 1e-8 | |
| p /= p.sum() | |
| q /= q.sum() | |
| js = jensenshannon(p, q) | |
| js_vals.append(float(js)) | |
| except: | |
| js_vals.append(0.0) | |
| results["js_divergence"] = js_vals | |
| # 6. Lost High Frequencies Ratio | |
| if "High-Freq Loss Ratio" in metrics: | |
| loss_ratios = [] | |
| for i in range(min_len): | |
| try: | |
| near_high = near_feats[i]["high_freq_energy"] | |
| far_high = far_feats[i]["high_freq_energy"] | |
| # Ratio: how much high-freq energy is lost (positive = loss) | |
| ratio = near_high - far_high # in dB | |
| loss_ratios.append(float(ratio)) | |
| except: | |
| loss_ratios.append(0.0) | |
| results["high_freq_loss_db"] = loss_ratios | |
| # 7. Spectral Centroid Shift | |
| if "Spectral Centroid Shift" in metrics: | |
| shifts = [] | |
| for i in range(min_len): | |
| try: | |
| shift = near_feats[i]["spectral_centroid"] - far_feats[i]["spectral_centroid"] | |
| shifts.append(float(shift)) | |
| except: | |
| shifts.append(0.0) | |
| results["centroid_shift"] = shifts | |
| return pd.DataFrame(results) | |
| def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5): | |
| if not cluster_features: | |
| raise gr.Error("Please select at least one feature for clustering.") | |
| if len(features_df) == 0: | |
| features_df["cluster"] = [] | |
| return features_df | |
| X = features_df[cluster_features].values | |
| if algo == "KMeans": | |
| n_clusters = min(n_clusters, len(X)) # Fixed: Cannot have more clusters than samples | |
| model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| labels = model.fit_predict(X) | |
| elif algo == "Agglomerative": | |
| n_clusters = min(n_clusters, len(X)) | |
| model = AgglomerativeClustering(n_clusters=n_clusters) | |
| labels = model.fit_predict(X) | |
| elif algo == "DBSCAN": | |
| # Fixed: DBSCAN doesn't use n_clusters parameter | |
| model = DBSCAN(eps=eps, min_samples=min(3, len(X))) | |
| labels = model.fit_predict(X) | |
| else: | |
| raise ValueError("Unknown clustering algorithm") | |
| features_df = features_df.copy() | |
| features_df["cluster"] = labels | |
| return features_df | |
| def plot_spectral_difference(near_feats, far_feats, frame_idx=0): | |
| if not near_feats or not far_feats or frame_idx >= len(near_feats) or frame_idx >= len(far_feats): | |
| # Return empty plot | |
| fig = go.Figure() | |
| fig.update_layout(title="No data available for spectral analysis", height=300) | |
| return fig | |
| near_spec = near_feats[frame_idx]["spectrum"] | |
| far_spec = far_feats[frame_idx]["spectrum"] | |
| # Ensure both spectrograms have the same shape | |
| min_freq_bins = min(near_spec.shape[0], far_spec.shape[0]) | |
| near_spec = near_spec[:min_freq_bins] | |
| far_spec = far_spec[:min_freq_bins] | |
| diff = near_spec - far_spec # positive = energy lost in far-field | |
| fig = go.Figure(data=go.Heatmap( | |
| z=diff, # Fixed: Removed extra list brackets | |
| colorscale='RdBu', | |
| zmid=0, | |
| colorbar=dict(title="dB Difference") | |
| )) | |
| fig.update_layout( | |
| title=f"Spectral Difference (Frame {frame_idx}): Near - Far", | |
| xaxis_title="Time Frames", | |
| yaxis_title="Frequency Bins", | |
| height=300 | |
| ) | |
| return fig | |
| # ---------------------------- | |
| # Main Analysis Function | |
| # ---------------------------- | |
| def analyze_audio_pair( | |
| near_file, | |
| far_file, | |
| frame_length_ms, | |
| hop_length_ms, | |
| window_type, | |
| comparison_metrics, | |
| cluster_features, | |
| clustering_algo, | |
| n_clusters, | |
| dbscan_eps | |
| ): | |
| if not near_file or not far_file: | |
| raise gr.Error("Upload both audio files.") | |
| try: | |
| # Fixed: Use librosa.load instead of non-existent librosa.load_audio | |
| y_near, sr_near = librosa.load(near_file.name, sr=None) | |
| y_far, sr_far = librosa.load(far_file.name, sr=None) | |
| except Exception as e: | |
| raise gr.Error(f"Error loading audio files: {str(e)}") | |
| if sr_near != sr_far: | |
| y_far = librosa.resample(y_far, orig_sr=sr_far, target_sr=sr_near) | |
| sr = sr_near | |
| else: | |
| sr = sr_near | |
| frames_near, frame_length = segment_audio(y_near, sr, frame_length_ms, hop_length_ms, window_type) | |
| frames_far, _ = segment_audio(y_far, sr, frame_length_ms, hop_length_ms, window_type) | |
| near_feats = extract_features_with_spectrum(frames_near, sr) | |
| far_feats = extract_features_with_spectrum(frames_far, sr) | |
| # Comparison | |
| comparison_df = compare_frames_enhanced(near_feats, far_feats, comparison_metrics) | |
| # Clustering (on near-field) | |
| near_df = pd.DataFrame(near_feats) | |
| near_df = near_df.drop(columns=["spectrum"], errors="ignore") | |
| clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps) | |
| # Plots | |
| plot_comparison = None | |
| if comparison_df.shape[1] > 1 and len(comparison_df) > 0: | |
| metric_cols = [col for col in comparison_df.columns if col != "frame_index"] | |
| if metric_cols: | |
| metric_to_plot = metric_cols[0] | |
| plot_comparison = px.line( | |
| comparison_df, | |
| x="frame_index", | |
| y=metric_to_plot, | |
| title=f"{metric_to_plot.replace('_', ' ').title()} Over Time" | |
| ) | |
| else: | |
| plot_comparison = px.line(title="No comparison metrics available") | |
| else: | |
| plot_comparison = px.line(title="No comparison data available") | |
| # Scatter: user-selected features | |
| plot_scatter = None | |
| if len(cluster_features) >= 2 and len(clustered_df) > 0: | |
| x_feat, y_feat = cluster_features[0], cluster_features[1] | |
| if x_feat in clustered_df.columns and y_feat in clustered_df.columns: | |
| plot_scatter = px.scatter( | |
| clustered_df, | |
| x=x_feat, | |
| y=y_feat, | |
| color="cluster", | |
| title=f"Clustering: {x_feat} vs {y_feat}", | |
| hover_data=["cluster"] | |
| ) | |
| else: | |
| plot_scatter = px.scatter(title="Selected features not available in data") | |
| else: | |
| plot_scatter = px.scatter(title="Select β₯2 features for scatter plot") | |
| # Spectral difference heatmap (first frame) | |
| spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=0) | |
| return ( | |
| plot_comparison, | |
| comparison_df, | |
| plot_scatter, | |
| clustered_df, | |
| spec_heatmap | |
| ) | |
| def export_results(comparison_df, clustered_df): | |
| temp_dir = tempfile.mkdtemp() | |
| comp_path = os.path.join(temp_dir, "frame_comparisons.csv") | |
| cluster_path = os.path.join(temp_dir, "clustered_frames.csv") | |
| comparison_df.to_csv(comp_path, index=False) | |
| clustered_df.to_csv(cluster_path, index=False) | |
| return [comp_path, cluster_path] | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| # Get feature names dynamically | |
| dummy_features = ["rms", "spectral_centroid", "zcr"] + [f"mfcc_{i}" for i in range(1,14)] + \ | |
| ["low_freq_energy", "mid_freq_energy", "high_freq_energy"] | |
| with gr.Blocks(title="Advanced Near vs Far Field Analyzer") as demo: | |
| gr.Markdown("# ποΈ Advanced Near vs Far Field Speech Analyzer") | |
| gr.Markdown("Upload simultaneous recordings. Analyze **lost frequencies**, **frame degradation**, and **cluster by custom attributes**.") | |
| with gr.Row(): | |
| near_file = gr.File(label="Near-Field Audio (.wav)", file_types=[".wav"]) | |
| far_file = gr.File(label="Far-Field Audio (.wav)", file_types=[".wav"]) | |
| with gr.Accordion("βοΈ Frame Settings", open=True): | |
| frame_length_ms = gr.Slider(10, 500, value=50, step=1, label="Frame Length (ms)") | |
| hop_length_ms = gr.Slider(1, 250, value=25, step=1, label="Hop Length (ms)") | |
| window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type") | |
| with gr.Accordion("π Comparison Metrics", open=True): | |
| comparison_metrics = gr.CheckboxGroup( | |
| choices=[ | |
| "Euclidean Distance", | |
| "Cosine Similarity", | |
| "Pearson Correlation", | |
| "KL Divergence", | |
| "Jensen-Shannon Divergence", | |
| "High-Freq Loss Ratio", | |
| "Spectral Centroid Shift" | |
| ], | |
| value=["High-Freq Loss Ratio", "Cosine Similarity"], | |
| label="Select Comparison Metrics" | |
| ) | |
| with gr.Accordion("π§© Clustering Configuration", open=True): | |
| cluster_features = gr.CheckboxGroup( | |
| choices=dummy_features, | |
| value=["rms", "spectral_centroid", "high_freq_energy"], | |
| label="Features to Use for Clustering" | |
| ) | |
| clustering_algo = gr.Radio( | |
| ["KMeans", "Agglomerative", "DBSCAN"], | |
| value="KMeans", | |
| label="Clustering Algorithm" | |
| ) | |
| n_clusters = gr.Slider(2, 20, value=5, step=1, label="Number of Clusters (for KMeans/Agglomerative)") | |
| dbscan_eps = gr.Slider(0.1, 2.0, value=0.5, step=0.1, label="DBSCAN eps (neighborhood radius)") | |
| btn = gr.Button("π Analyze") | |
| with gr.Tabs(): | |
| with gr.Tab("π Frame Comparison"): | |
| comp_plot = gr.Plot() | |
| comp_table = gr.Dataframe() | |
| with gr.Tab("π§© Clustering"): | |
| cluster_plot = gr.Plot() | |
| cluster_table = gr.Dataframe() | |
| with gr.Tab("π Spectral Analysis"): | |
| spec_heatmap = gr.Plot(label="Spectral Difference (Near - Far)") | |
| with gr.Tab("π€ Export"): | |
| export_btn = gr.Button("πΎ Download CSVs") | |
| export_files = gr.Files() | |
| btn.click( | |
| fn=analyze_audio_pair, | |
| inputs=[ | |
| near_file, far_file, | |
| frame_length_ms, hop_length_ms, window_type, | |
| comparison_metrics, | |
| cluster_features, | |
| clustering_algo, | |
| n_clusters, | |
| dbscan_eps | |
| ], | |
| outputs=[comp_plot, comp_table, cluster_plot, cluster_table, spec_heatmap] | |
| ) | |
| export_btn.click( | |
| fn=export_results, | |
| inputs=[comp_table, cluster_table], | |
| outputs=export_files | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |