import gradio as gr import torch try: import spaces ZERO_GPU = True except ImportError: ZERO_GPU = False import numpy as np from transformers import ASTForAudioClassification, AutoFeatureExtractor from pydub import AudioSegment import tempfile import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Model configurations MODELS = { "fine_tuned": { "name": "Vyvo-Research/AST-Music-Classifier-1K", "display_name": "AST-Music-Classifier-1K (Fine-tuned)", "description": "Music sınıflandırması için özelleştirilmiş model", "badge": "Fine-tuned" }, "base": { "name": "MIT/ast-finetuned-audioset-10-10-0.4593", "display_name": "MIT AST (Base Model)", "description": "AudioSet üzerinde eğitilmiş orijinal AST modeli", "badge": "Base" } } DETECTION_THRESHOLD = 0.50 WINDOW_SIZE = 5.0 HOP_SIZE = 5.0 # Load both models logger.info("Loading models...") models = {} feature_extractors = {} for key, config in MODELS.items(): logger.info(f"Loading {config['display_name']}...") models[key] = ASTForAudioClassification.from_pretrained(config["name"]) feature_extractors[key] = AutoFeatureExtractor.from_pretrained(config["name"]) models[key].eval() logger.info("All models loaded") def load_audio(file_path: str, target_sr: int): audio = AudioSegment.from_file(file_path) audio = audio.set_channels(1).set_frame_rate(target_sr) samples = np.array(audio.get_array_of_samples()).astype(np.float32) samples = samples / np.iinfo(audio.array_type).max return samples, audio @torch.no_grad() def detect_music_with_model(audio_array, sample_rate, model_key): model = models[model_key] feature_extractor = feature_extractors[model_key] window_samples = int(WINDOW_SIZE * sample_rate) hop_samples = int(HOP_SIZE * sample_rate) total_samples = len(audio_array) music_segments = [] all_predictions = [] last_was_music = False device = next(model.parameters()).device use_half = device.type == "cuda" for start in range(0, total_samples, hop_samples): end = min(start + window_samples, total_samples) segment = audio_array[start:end] if len(segment) < sample_rate: continue needs_padding = len(segment) < window_samples if needs_padding: segment = np.pad(segment, (0, window_samples - len(segment)), mode='constant') inputs = feature_extractor( segment, sampling_rate=sample_rate, return_tensors="pt", padding="max_length", truncation=True, max_length=1024 ) if use_half: inputs = {k: v.to(device).half() for k, v in inputs.items()} else: inputs = {k: v.to(device) for k, v in inputs.items()} outputs = model(**inputs) probs = torch.softmax(outputs.logits, dim=-1) pred_idx = torch.argmax(probs[0]).item() pred_label = model.config.id2label.get(pred_idx, "") pred_score = probs[0][pred_idx].item() is_music = "music" in pred_label.lower() is_uncertain = 0.40 <= pred_score <= 0.60 start_sec = start / sample_rate end_sec = end / sample_rate all_predictions.append({ "start": start_sec, "end": end_sec, "label": pred_label, "score": pred_score, "is_music": is_music }) if is_uncertain and needs_padding: if last_was_music: music_segments.append((int(start_sec * 1000), int(end_sec * 1000), pred_score)) elif is_music and pred_score >= DETECTION_THRESHOLD: music_segments.append((int(start_sec * 1000), int(end_sec * 1000), pred_score)) last_was_music = True else: last_was_music = False return music_segments, all_predictions def merge_segments(segments): if not segments: return [] segments = sorted(segments, key=lambda x: x[0]) merged = [segments[0]] for current in segments[1:]: last = merged[-1] if current[0] <= last[1]: merged[-1] = (last[0], max(last[1], current[1]), max(last[2], current[2])) else: merged.append(current) return merged def remove_music_segments(audio, segments): if not segments: return audio clean_parts = [] last_end = 0 for start_ms, end_ms, _ in segments: if start_ms > last_end: clean_parts.append(audio[last_end:start_ms]) last_end = end_ms if last_end < len(audio): clean_parts.append(audio[last_end:]) if not clean_parts: return AudioSegment.silent(duration=0) return sum(clean_parts) def calculate_metrics(segments, total_duration_ms): if not segments: return { "total_music_ms": 0, "segment_count": 0, "avg_confidence": 0, "coverage_percent": 0 } total_music_ms = sum(end - start for start, end, _ in segments) avg_confidence = sum(score for _, _, score in segments) / len(segments) coverage_percent = (total_music_ms / total_duration_ms) * 100 if total_duration_ms > 0 else 0 return { "total_music_ms": total_music_ms, "segment_count": len(segments), "avg_confidence": avg_confidence, "coverage_percent": coverage_percent } def build_comparison_report(original_dur, ft_segments, base_segments, ft_metrics, base_metrics): ft_detected = ft_metrics["total_music_ms"] / 1000 base_detected = base_metrics["total_music_ms"] / 1000 # Calculate improvement percentages if base_metrics["avg_confidence"] > 0: conf_improvement = ((ft_metrics["avg_confidence"] - base_metrics["avg_confidence"]) / base_metrics["avg_confidence"]) * 100 else: conf_improvement = 100 if ft_metrics["avg_confidence"] > 0 else 0 if base_metrics["segment_count"] > 0: segment_improvement = ((ft_metrics["segment_count"] - base_metrics["segment_count"]) / base_metrics["segment_count"]) * 100 else: segment_improvement = 100 if ft_metrics["segment_count"] > 0 else 0 # Winner determination ft_score = 0 base_score = 0 if ft_metrics["avg_confidence"] > base_metrics["avg_confidence"]: ft_score += 1 else: base_score += 1 if ft_metrics["segment_count"] >= base_metrics["segment_count"]: ft_score += 1 else: base_score += 1 if ft_score > base_score: winner = "Fine-tuned" winner_pct = abs(conf_improvement) else: winner = "Base" winner_pct = abs(conf_improvement) report = f""" ## Result: **{winner}** model wins! (+{winner_pct:.1f}% confidence) | Metric | Fine-tuned | Base | |--------|-----------|------| | Segments | **{ft_metrics['segment_count']}** | {base_metrics['segment_count']} | | Duration | **{ft_detected:.1f}s** | {base_detected:.1f}s | | Confidence | **{ft_metrics['avg_confidence']:.0%}** | {base_metrics['avg_confidence']:.0%} | --- **Fine-tuned segments:** """ if ft_segments: for start_ms, end_ms, score in ft_segments: report += f"- {start_ms/1000:.1f}s - {end_ms/1000:.1f}s ({score:.0%})\n" else: report += "No music detected\n" report += "\n**Base segments:**\n" if base_segments: for start_ms, end_ms, score in base_segments: report += f"- {start_ms/1000:.1f}s - {end_ms/1000:.1f}s ({score:.0%})\n" else: report += "No music detected\n" return report @spaces.GPU if ZERO_GPU else lambda f: f def process_audio_comparison(audio_file, progress=gr.Progress()): if audio_file is None: return None, None, "Please upload an audio file." try: progress(0.05, desc="Preparing models...") # Move models to GPU if available if torch.cuda.is_available(): for key in models: models[key].to("cuda").half() torch.backends.cudnn.benchmark = True progress(0.1, desc="Loading audio...") sample_rate = feature_extractors["fine_tuned"].sampling_rate audio_array, audio = load_audio(audio_file, sample_rate) original_duration = len(audio) / 1000 total_duration_ms = len(audio) # Process with Fine-tuned model progress(0.2, desc="Analyzing with Fine-tuned Model...") ft_segments, ft_predictions = detect_music_with_model(audio_array, sample_rate, "fine_tuned") ft_segments = merge_segments(ft_segments) ft_metrics = calculate_metrics(ft_segments, total_duration_ms) # Process with Base model progress(0.5, desc="Analyzing with Base Model...") base_segments, base_predictions = detect_music_with_model(audio_array, sample_rate, "base") base_segments = merge_segments(base_segments) base_metrics = calculate_metrics(base_segments, total_duration_ms) # Create outputs for both models progress(0.8, desc="Generating outputs...") # Fine-tuned model output ft_clean_audio = remove_music_segments(audio, ft_segments) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: ft_clean_audio.export(f.name, format="wav") ft_output_path = f.name # Base model output base_clean_audio = remove_music_segments(audio, base_segments) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: base_clean_audio.export(f.name, format="wav") base_output_path = f.name progress(0.95, desc="Building report...") report = build_comparison_report( original_duration, ft_segments, base_segments, ft_metrics, base_metrics ) progress(1.0, desc="Done") return ft_output_path, base_output_path, report except Exception as e: logger.exception("Processing failed") return None, None, f"Error: {str(e)}" with gr.Blocks(title="CleanSpeech - Model Comparison") as demo: gr.Markdown("# CleanSpeech - Model Comparison") # Input section with gr.Row(): with gr.Column(scale=2): audio_input = gr.Audio(label="Upload Audio File", type="filepath") process_btn = gr.Button("Compare Models", variant="primary", size="lg") # Output section - Side by side with gr.Row(): with gr.Column(scale=1): ft_audio_output = gr.Audio(label="Fine-tuned Output") with gr.Column(scale=1): base_audio_output = gr.Audio(label="Base Model Output") # Comparison report comparison_report = gr.Markdown(label="Comparison Report") process_btn.click( fn=process_audio_comparison, inputs=[audio_input], outputs=[ft_audio_output, base_audio_output, comparison_report] ) # Footer gr.Markdown(""" --- **Models:** [Fine-tuned](https://huggingface.co/Vyvo-Research/AST-Music-Classifier-1K) | [Base](https://huggingface.co/MIT/ast-finetuned-audioset-10-10-0.4593) """) demo.queue() demo.launch(theme=gr.themes.Soft())