Spaces:
Sleeping
Sleeping
| import librosa | |
| import numpy as np | |
| from transformers import pipeline | |
| from config import config | |
| from models_config import get_model_config | |
| import os | |
| class AudioEmotionProcessor: | |
| """Process audio files and extract emotions using ML models""" | |
| def __init__(self): | |
| self.model = None | |
| self.model_name = config.MODEL_NAME | |
| self.chunk_duration = config.CHUNK_DURATION | |
| self.sample_rate = config.SAMPLE_RATE | |
| # Get model-specific configuration | |
| self.model_config = get_model_config(self.model_name) | |
| self.label_mapping = self.model_config.get("label_mapping", {}) | |
| def load_model(self): | |
| """Load the emotion detection model""" | |
| if self.model is None: | |
| print(f"Loading model: {self.model_name}") | |
| print(f"Model config: {self.model_config['description']}") | |
| # Get task type from model config | |
| task = self.model_config.get("task", "audio-classification") | |
| try: | |
| # Load model with configured task | |
| self.model = pipeline( | |
| task=task, | |
| model=self.model_name | |
| ) | |
| print("Model loaded successfully!") | |
| except Exception as e: | |
| print(f"Failed to load with task '{task}', trying auto-detection...") | |
| try: | |
| # Fallback: Try audio-classification | |
| self.model = pipeline( | |
| "audio-classification", | |
| model=self.model_name | |
| ) | |
| print("Model loaded successfully with audio-classification!") | |
| except Exception as e2: | |
| print(f"Error loading model: {e2}") | |
| raise | |
| return self.model | |
| def load_audio(self, filepath): | |
| """Load audio file and resample to target sample rate""" | |
| audio, sr = librosa.load(filepath, sr=self.sample_rate) | |
| # Normalize audio volume (boost quiet recordings) | |
| audio = self.normalize_audio(audio) | |
| return audio, sr | |
| def normalize_audio(self, audio): | |
| """Normalize audio to increase volume""" | |
| # Get max absolute value | |
| max_val = np.max(np.abs(audio)) | |
| # Avoid division by zero | |
| if max_val > 0: | |
| # Normalize to 0.95 to prevent clipping | |
| audio = audio / max_val * 0.95 | |
| return audio | |
| def get_audio_duration(self, audio, sr): | |
| """Get duration of audio in seconds""" | |
| return librosa.get_duration(y=audio, sr=sr) | |
| def split_into_chunks(self, audio, sr): | |
| """Split audio into fixed-duration chunks""" | |
| chunk_samples = int(self.chunk_duration * sr) | |
| chunks = [] | |
| for i in range(0, len(audio), chunk_samples): | |
| chunk = audio[i:i + chunk_samples] | |
| # Pad last chunk if it's shorter | |
| if len(chunk) < chunk_samples: | |
| chunk = np.pad(chunk, (0, chunk_samples - len(chunk)), mode='constant') | |
| chunks.append(chunk) | |
| return chunks | |
| def predict_emotion(self, audio_chunk): | |
| """Predict emotion for a single audio chunk""" | |
| if self.model is None: | |
| self.load_model() | |
| # Get predictions | |
| predictions = self.model(audio_chunk) | |
| # Get top prediction | |
| top_prediction = predictions[0] | |
| # Debug: Print raw model output | |
| print(f"DEBUG - Raw prediction: {top_prediction}") | |
| # Map model output to our emotion labels | |
| emotion_label = self.map_emotion_label(top_prediction['label']) | |
| confidence = top_prediction['score'] | |
| return emotion_label, confidence | |
| def map_emotion_label(self, model_label): | |
| """Map model output labels to standardized emotion names""" | |
| # Different models may have different label formats | |
| label_lower = model_label.lower() | |
| # Use model-specific label mapping first | |
| if label_lower in self.label_mapping: | |
| return self.label_mapping[label_lower] | |
| # Fallback to common variations | |
| emotion_map = { | |
| 'hap': 'Happy', | |
| 'happy': 'Happy', | |
| 'happiness': 'Happy', | |
| 'sad': 'Sad', | |
| 'sadness': 'Sad', | |
| 'ang': 'Angry', | |
| 'angry': 'Angry', | |
| 'anger': 'Angry', | |
| 'neu': 'Neutral', | |
| 'neutral': 'Neutral', | |
| 'calm': 'Neutral', | |
| 'fear': 'Fear', | |
| 'fearful': 'Fear', | |
| 'surprise': 'Surprise', | |
| 'surprised': 'Surprise', | |
| 'disgust': 'Disgust' | |
| } | |
| # Try to find a match | |
| for key, value in emotion_map.items(): | |
| if key in label_lower: | |
| return value | |
| # Default: capitalize first letter | |
| return model_label.capitalize() | |
| def format_time(self, seconds): | |
| """Format seconds to MM:SS format""" | |
| mins = int(seconds // 60) | |
| secs = int(seconds % 60) | |
| return f"{mins:02d}:{secs:02d}" | |
| def process_audio_file(self, filepath, progress_callback=None): | |
| """ | |
| Process entire audio file and return emotion timeline | |
| Args: | |
| filepath: Path to audio file | |
| progress_callback: Optional callback function(progress, message) | |
| Returns: | |
| dict: Results containing timeline and metadata | |
| """ | |
| try: | |
| # Load model | |
| if progress_callback: | |
| progress_callback(10, "Loading model...") | |
| self.load_model() | |
| # Load audio | |
| if progress_callback: | |
| progress_callback(20, "Loading audio file...") | |
| audio, sr = self.load_audio(filepath) | |
| # Get duration | |
| duration = self.get_audio_duration(audio, sr) | |
| duration_formatted = self.format_time(duration) | |
| # Split into chunks | |
| if progress_callback: | |
| progress_callback(30, "Splitting audio into segments...") | |
| chunks = self.split_into_chunks(audio, sr) | |
| # Process each chunk | |
| timeline = [] | |
| total_chunks = len(chunks) | |
| for i, chunk in enumerate(chunks): | |
| # Calculate progress (30% to 90%) | |
| progress = 30 + int((i / total_chunks) * 60) | |
| if progress_callback: | |
| progress_callback( | |
| progress, | |
| f"Analyzing chunk {i+1}/{total_chunks}..." | |
| ) | |
| # Predict emotion | |
| emotion, confidence = self.predict_emotion(chunk) | |
| # Calculate timestamp | |
| time_seconds = i * self.chunk_duration | |
| time_formatted = self.format_time(time_seconds) | |
| timeline.append({ | |
| "time": time_formatted, | |
| "emotion": emotion, | |
| "confidence": float(confidence) | |
| }) | |
| # Calculate statistics | |
| if progress_callback: | |
| progress_callback(95, "Calculating statistics...") | |
| emotions_list = [item['emotion'] for item in timeline] | |
| unique_emotions = len(set(emotions_list)) | |
| # Find dominant emotion | |
| from collections import Counter | |
| emotion_counts = Counter(emotions_list) | |
| dominant_emotion = emotion_counts.most_common(1)[0][0] | |
| # Build results | |
| results = { | |
| "duration": duration_formatted, | |
| "total_chunks": total_chunks, | |
| "emotions_detected": unique_emotions, | |
| "dominant_emotion": dominant_emotion, | |
| "timeline": timeline | |
| } | |
| if progress_callback: | |
| progress_callback(100, "Analysis complete!") | |
| return results | |
| except Exception as e: | |
| raise Exception(f"Audio processing failed: {str(e)}") | |
| # Global processor instance | |
| _processor = None | |
| def get_processor(): | |
| """Get or create global processor instance""" | |
| global _processor | |
| if _processor is None: | |
| _processor = AudioEmotionProcessor() | |
| return _processor | |