import gradio as gr import librosa import numpy as np import pandas as pd from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN from sklearn.metrics.pairwise import cosine_similarity from scipy.spatial.distance import jensenshannon from scipy.stats import pearsonr from scipy.signal import get_window as scipy_get_window import plotly.express as px import plotly.graph_objects as go import os import tempfile # ---------------------------- # Audio Segmentation # ---------------------------- def segment_audio(y, sr, frame_length_ms, hop_length_ms, window_type="hann"): """Segment audio into frames with specified windowing""" frame_length = int(frame_length_ms * sr / 1000) hop_length = int(hop_length_ms * sr / 1000) if frame_length > len(y): frame_length = len(y) hop_length = max(1, frame_length // 2) # Get window function if window_type == "rectangular": window = scipy_get_window('boxcar', frame_length) else: window = scipy_get_window(window_type, frame_length) frames = [] for i in range(0, len(y) - frame_length + 1, hop_length): frame = y[i:i + frame_length] * window frames.append(frame) # Convert to 2D array (frames x samples) if frames: frames = np.array(frames).T else: # If audio is too short, create at least one frame with zero-padding frames = np.zeros((frame_length, 1)) return frames, frame_length # ---------------------------- # Enhanced Feature Extraction # ---------------------------- def extract_features_with_spectrum(frames, sr): features = [] n_mfcc = 13 n_fft = min(2048, frames.shape[0]) for i in range(frames.shape[1]): frame = frames[:, i] # Skip if frame is too short or silent if len(frame) < n_fft or np.max(np.abs(frame)) < 1e-10: continue feat = {} # Basic features try: rms = np.mean(librosa.feature.rms(y=frame)[0]) feat["rms"] = float(rms) except: feat["rms"] = 0.0 try: sc = np.mean(librosa.feature.spectral_centroid(y=frame, sr=sr)[0]) feat["spectral_centroid"] = float(sc) except: feat["spectral_centroid"] = 0.0 try: zcr = np.mean(librosa.feature.zero_crossing_rate(frame)[0]) feat["zcr"] = float(zcr) except: feat["zcr"] = 0.0 try: mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft) for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = float(np.mean(mfccs[j])) except: for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 # Spectral features for quality assessment try: S = np.abs(librosa.stft(frame, n_fft=n_fft)) S_db = librosa.amplitude_to_db(S, ref=np.max) freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft) # Frequency bands for quality assessment low_mask = freqs <= 500 mid_mask = (freqs > 500) & (freqs <= 4000) # Speech range high_mask = freqs > 4000 feat["low_freq_energy"] = float(np.mean(S_db[low_mask])) if np.any(low_mask) else -80.0 feat["mid_freq_energy"] = float(np.mean(S_db[mid_mask])) if np.any(mid_mask) else -80.0 feat["high_freq_energy"] = float(np.mean(S_db[high_mask])) if np.any(high_mask) else -80.0 # Spectral rolloff (85%) rolloff = np.mean(librosa.feature.spectral_rolloff(y=frame, sr=sr, roll_percent=0.85)[0]) feat["spectral_rolloff"] = float(rolloff) # Spectral bandwidth bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=frame, sr=sr)[0]) feat["spectral_bandwidth"] = float(bandwidth) # Spectral flatness (noisiness) flatness = np.mean(librosa.feature.spectral_flatness(y=frame)[0]) feat["spectral_flatness"] = float(flatness) feat["spectrum"] = S_db except: feat["low_freq_energy"] = -80.0 feat["mid_freq_energy"] = -80.0 feat["high_freq_energy"] = -80.0 feat["spectral_rolloff"] = 0.0 feat["spectral_bandwidth"] = 0.0 feat["spectral_flatness"] = 0.0 feat["spectrum"] = np.zeros((n_fft // 2 + 1, 1)) features.append(feat) if not features: feat = { "rms": 0.0, "spectral_centroid": 0.0, "zcr": 0.0, "low_freq_energy": -80.0, "mid_freq_energy": -80.0, "high_freq_energy": -80.0, "spectral_rolloff": 0.0, "spectral_bandwidth": 0.0, "spectral_flatness": 0.0, "spectrum": np.zeros((n_fft // 2 + 1, 1)) } for j in range(n_mfcc): feat[f"mfcc_{j+1}"] = 0.0 features.append(feat) return features # ---------------------------- # Frame-wise Quality Metrics (0-1 scale) # ---------------------------- def calculate_frame_quality_metrics(near_feats, far_feats): """Calculate multiple quality metrics between 0 and 1 for each frame""" min_len = min(len(near_feats), len(far_feats)) if min_len == 0: return pd.DataFrame({"frame_index": []}) results = {"frame_index": list(range(min_len))} # Prepare feature vectors (excluding spectrum) near_df = pd.DataFrame([f for f in near_feats[:min_len]]) far_df = pd.DataFrame([f for f in far_feats[:min_len]]) feature_cols = [col for col in near_df.columns if col != "spectrum"] near_vec = near_df[feature_cols].values far_vec = far_df[feature_cols].values # 1. Spectral Similarity Score (0-1) spectral_scores = [] for i in range(min_len): try: # Compare spectral distributions using cosine similarity near_spectral = np.array([near_feats[i]["low_freq_energy"], near_feats[i]["mid_freq_energy"], near_feats[i]["high_freq_energy"]]) far_spectral = np.array([far_feats[i]["low_freq_energy"], far_feats[i]["mid_freq_energy"], far_feats[i]["high_freq_energy"]]) # Convert to positive values and normalize near_spectral = near_spectral - near_spectral.min() + 1e-8 far_spectral = far_spectral - far_spectral.min() + 1e-8 near_spectral = near_spectral / near_spectral.sum() far_spectral = far_spectral / far_spectral.sum() # Use cosine similarity on spectral distribution spec_sim = cosine_similarity([near_spectral], [far_spectral])[0][0] spectral_scores.append(max(0, min(1, spec_sim))) except: spectral_scores.append(0.5) results["spectral_similarity"] = spectral_scores # 2. High-Frequency Preservation Score (0-1) hf_scores = [] for i in range(min_len): try: near_hf = near_feats[i]["high_freq_energy"] far_hf = far_feats[i]["high_freq_energy"] # Normalize HF energy difference (assuming -80dB to 0dB range) hf_diff = near_hf - far_hf # Convert to 0-1 scale: 0dB difference = 1.0, 40dB loss = 0.0 hf_score = max(0, min(1, 1.0 - (max(0, hf_diff) / 40.0))) hf_scores.append(hf_score) except: hf_scores.append(0.5) results["high_freq_preservation"] = hf_scores # 3. MFCC Structural Similarity (0-1) mfcc_scores = [] for i in range(min_len): try: # Extract MFCC features near_mfcc = np.array([near_feats[i][f"mfcc_{j+1}"] for j in range(13)]) far_mfcc = np.array([far_feats[i][f"mfcc_{j+1}"] for j in range(13)]) # Normalize and compute cosine similarity near_mfcc_norm = (near_mfcc - near_mfcc.mean()) / (near_mfcc.std() + 1e-8) far_mfcc_norm = (far_mfcc - far_mfcc.mean()) / (far_mfcc.std() + 1e-8) mfcc_sim = cosine_similarity([near_mfcc_norm], [far_mfcc_norm])[0][0] mfcc_scores.append(max(0, min(1, (mfcc_sim + 1) / 2))) # Convert -1:1 to 0:1 except: mfcc_scores.append(0.5) results["mfcc_similarity"] = mfcc_scores # 4. Temporal Consistency Score (RMS stability) temporal_scores = [] for i in range(min_len): try: near_rms = near_feats[i]["rms"] far_rms = far_feats[i]["rms"] # Ratio of RMS energies (closer to 1 is better) rms_ratio = min(near_rms, far_rms) / (max(near_rms, far_rms) + 1e-8) temporal_scores.append(float(rms_ratio)) except: temporal_scores.append(0.5) results["temporal_consistency"] = temporal_scores # 5. Spectral Centroid Stability (0-1) centroid_scores = [] for i in range(min_len): try: near_sc = near_feats[i]["spectral_centroid"] far_sc = far_feats[i]["spectral_centroid"] # Ratio of spectral centroids sc_ratio = min(near_sc, far_sc) / (max(near_sc, far_sc) + 1e-8) centroid_scores.append(float(sc_ratio)) except: centroid_scores.append(0.5) results["spectral_centroid_stability"] = centroid_scores # 6. Overall Audio Quality Score (Compound Metric) quality_scores = [] for i in range(min_len): # Weighted combination of all metrics weights = { 'spectral_similarity': 0.25, # Spectral distribution match 'high_freq_preservation': 0.30, # HF content preservation (most important) 'mfcc_similarity': 0.20, # Structural similarity 'temporal_consistency': 0.15, # Amplitude consistency 'spectral_centroid_stability': 0.10 # Spectral shape stability } total_score = 0 for metric, weight in weights.items(): total_score += results[metric][i] * weight quality_scores.append(max(0, min(1, total_score))) results["overall_quality"] = quality_scores # 7. Quality Degradation Level degradation_levels = [] for score in quality_scores: if score >= 0.8: degradation_levels.append("Excellent") elif score >= 0.6: degradation_levels.append("Good") elif score >= 0.4: degradation_levels.append("Moderate") elif score >= 0.2: degradation_levels.append("Poor") else: degradation_levels.append("Very Poor") results["degradation_level"] = degradation_levels return pd.DataFrame(results) # ---------------------------- # Clustering and Visualization # ---------------------------- def cluster_frames_custom(features_df, cluster_features, algo, n_clusters=5, eps=0.5): if not cluster_features: raise gr.Error("Please select at least one feature for clustering.") if len(features_df) == 0: features_df["cluster"] = [] return features_df X = features_df[cluster_features].values if algo == "KMeans": n_clusters = min(n_clusters, len(X)) model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) labels = model.fit_predict(X) elif algo == "Agglomerative": n_clusters = min(n_clusters, len(X)) model = AgglomerativeClustering(n_clusters=n_clusters) labels = model.fit_predict(X) elif algo == "DBSCAN": model = DBSCAN(eps=eps, min_samples=min(3, len(X))) labels = model.fit_predict(X) else: raise ValueError("Unknown clustering algorithm") features_df = features_df.copy() features_df["cluster"] = labels return features_df def plot_spectral_difference(near_feats, far_feats, frame_idx=0): if not near_feats or not far_feats or frame_idx >= len(near_feats) or frame_idx >= len(far_feats): fig = go.Figure() fig.update_layout(title="No data available for spectral analysis", height=300) return fig near_spec = near_feats[frame_idx]["spectrum"] far_spec = far_feats[frame_idx]["spectrum"] min_freq_bins = min(near_spec.shape[0], far_spec.shape[0]) min_time_frames = min(near_spec.shape[1], far_spec.shape[1]) near_spec = near_spec[:min_freq_bins, :min_time_frames] far_spec = far_spec[:min_freq_bins, :min_time_frames] diff = near_spec - far_spec fig = go.Figure(data=go.Heatmap( z=diff, colorscale='RdBu', zmid=0, colorbar=dict(title="dB Difference") )) fig.update_layout( title=f"Spectral Difference (Frame {frame_idx}): Near - Far", xaxis_title="Time Frames", yaxis_title="Frequency Bins", height=300 ) return fig # ---------------------------- # Main Analysis Function # ---------------------------- def analyze_audio_pair( near_file, far_file, frame_length_ms, hop_length_ms, window_type, cluster_features, clustering_algo, n_clusters, dbscan_eps ): if not near_file or not far_file: raise gr.Error("Upload both audio files.") try: y_near, sr_near = librosa.load(near_file.name, sr=None) y_far, sr_far = librosa.load(far_file.name, sr=None) except Exception as e: raise gr.Error(f"Error loading audio files: {str(e)}") if sr_near != sr_far: y_far = librosa.resample(y_far, orig_sr=sr_far, target_sr=sr_near) sr = sr_near else: sr = sr_near frames_near, frame_length = segment_audio(y_near, sr, frame_length_ms, hop_length_ms, window_type) frames_far, _ = segment_audio(y_far, sr, frame_length_ms, hop_length_ms, window_type) near_feats = extract_features_with_spectrum(frames_near, sr) far_feats = extract_features_with_spectrum(frames_far, sr) # Calculate frame-wise quality metrics comparison_df = calculate_frame_quality_metrics(near_feats, far_feats) # Clustering (on near-field) near_df = pd.DataFrame(near_feats) near_df = near_df.drop(columns=["spectrum"], errors="ignore") clustered_df = cluster_frames_custom(near_df, cluster_features, clustering_algo, n_clusters, dbscan_eps) # Plots plot_comparison = None if len(comparison_df) > 0: plot_comparison = px.line( comparison_df, x="frame_index", y="overall_quality", title="Overall Audio Quality Score Over Time (0-1 scale)", labels={"overall_quality": "Quality Score", "frame_index": "Frame Index"} ) plot_comparison.update_yaxes(range=[0, 1]) else: plot_comparison = px.line(title="No comparison data available") # Quality distribution plot quality_dist_plot = None if len(comparison_df) > 0: quality_dist_plot = px.histogram( comparison_df, x="overall_quality", title="Distribution of Audio Quality Scores", nbins=20, labels={"overall_quality": "Quality Score"} ) quality_dist_plot.update_xaxes(range=[0, 1]) else: quality_dist_plot = px.histogram(title="No quality data available") # Scatter plot plot_scatter = None if len(cluster_features) >= 2 and len(clustered_df) > 0: x_feat, y_feat = cluster_features[0], cluster_features[1] if x_feat in clustered_df.columns and y_feat in clustered_df.columns: plot_scatter = px.scatter( clustered_df, x=x_feat, y=y_feat, color="cluster", title=f"Clustering: {x_feat} vs {y_feat}", hover_data=["cluster"] ) else: plot_scatter = px.scatter(title="Selected features not available in data") else: plot_scatter = px.scatter(title="Select ≥2 features for scatter plot") # Spectral difference heatmap spec_heatmap = plot_spectral_difference(near_feats, far_feats, frame_idx=0) return ( plot_comparison, quality_dist_plot, comparison_df, plot_scatter, clustered_df, spec_heatmap ) def export_results(comparison_df, clustered_df): temp_dir = tempfile.mkdtemp() comp_path = os.path.join(temp_dir, "frame_quality_scores.csv") cluster_path = os.path.join(temp_dir, "clustered_frames.csv") comparison_df.to_csv(comp_path, index=False) clustered_df.to_csv(cluster_path, index=False) return [comp_path, cluster_path] # ---------------------------- # Gradio UI # ---------------------------- dummy_features = ["rms", "spectral_centroid", "zcr", "spectral_rolloff", "spectral_bandwidth", "spectral_flatness"] + \ [f"mfcc_{i}" for i in range(1,14)] + \ ["low_freq_energy", "mid_freq_energy", "high_freq_energy"] with gr.Blocks(title="Audio Quality Analyzer") as demo: gr.Markdown("# 🎙️ Near vs Far Field Audio Quality Analyzer") gr.Markdown("**Quantify audio degradation per frame (0-1 scale)** - Compare near-field vs far-field recording quality") with gr.Row(): near_file = gr.File(label="Near-Field Audio (.wav)", file_types=[".wav"]) far_file = gr.File(label="Far-Field Audio (.wav)", file_types=[".wav"]) with gr.Accordion("⚙️ Frame Settings", open=True): frame_length_ms = gr.Slider(10, 500, value=50, step=1, label="Frame Length (ms)") hop_length_ms = gr.Slider(1, 250, value=25, step=1, label="Hop Length (ms)") window_type = gr.Dropdown(["hann", "hamming", "rectangular"], value="hann", label="Window Type") with gr.Accordion("🧩 Clustering Configuration", open=False): cluster_features = gr.CheckboxGroup( choices=dummy_features, value=["rms", "spectral_centroid", "high_freq_energy"], label="Features to Use for Clustering" ) clustering_algo = gr.Radio( ["KMeans", "Agglomerative", "DBSCAN"], value="KMeans", label="Clustering Algorithm" ) n_clusters = gr.Slider(2, 20, value=5, step=1, label="Number of Clusters (for KMeans/Agglomerative)") dbscan_eps = gr.Slider(0.1, 2.0, value=0.5, step=0.1, label="DBSCAN eps (neighborhood radius)") btn = gr.Button("🚀 Analyze Audio Quality") with gr.Tabs(): with gr.Tab("📊 Quality Analysis"): with gr.Row(): comp_plot = gr.Plot(label="Quality Over Time") quality_dist_plot = gr.Plot(label="Quality Distribution") comp_table = gr.Dataframe(label="Frame-wise Quality Scores") with gr.Tab("🧩 Clustering"): cluster_plot = gr.Plot() cluster_table = gr.Dataframe() with gr.Tab("🔍 Spectral Analysis"): spec_heatmap = gr.Plot(label="Spectral Difference (Near - Far)") with gr.Tab("📤 Export"): gr.Markdown("### Download Analysis Results") export_btn = gr.Button("💾 Download CSV Files") export_files = gr.Files() btn.click( fn=analyze_audio_pair, inputs=[ near_file, far_file, frame_length_ms, hop_length_ms, window_type, cluster_features, clustering_algo, n_clusters, dbscan_eps ], outputs=[comp_plot, quality_dist_plot, comp_table, cluster_plot, cluster_table, spec_heatmap] ) export_btn.click( fn=export_results, inputs=[comp_table, cluster_table], outputs=export_files ) if __name__ == "__main__": demo.launch()