import librosa import numpy as np from transformers import pipeline from config import config from models_config import get_model_config import os class AudioEmotionProcessor: """Process audio files and extract emotions using ML models""" def __init__(self): self.model = None self.model_name = config.MODEL_NAME self.chunk_duration = config.CHUNK_DURATION self.sample_rate = config.SAMPLE_RATE # Get model-specific configuration self.model_config = get_model_config(self.model_name) self.label_mapping = self.model_config.get("label_mapping", {}) def load_model(self): """Load the emotion detection model""" if self.model is None: print(f"Loading model: {self.model_name}") print(f"Model config: {self.model_config['description']}") # Get task type from model config task = self.model_config.get("task", "audio-classification") try: # Load model with configured task self.model = pipeline( task=task, model=self.model_name ) print("Model loaded successfully!") except Exception as e: print(f"Failed to load with task '{task}', trying auto-detection...") try: # Fallback: Try audio-classification self.model = pipeline( "audio-classification", model=self.model_name ) print("Model loaded successfully with audio-classification!") except Exception as e2: print(f"Error loading model: {e2}") raise return self.model def load_audio(self, filepath): """Load audio file and resample to target sample rate""" audio, sr = librosa.load(filepath, sr=self.sample_rate) # Normalize audio volume (boost quiet recordings) audio = self.normalize_audio(audio) return audio, sr def normalize_audio(self, audio): """Normalize audio to increase volume""" # Get max absolute value max_val = np.max(np.abs(audio)) # Avoid division by zero if max_val > 0: # Normalize to 0.95 to prevent clipping audio = audio / max_val * 0.95 return audio def get_audio_duration(self, audio, sr): """Get duration of audio in seconds""" return librosa.get_duration(y=audio, sr=sr) def split_into_chunks(self, audio, sr): """Split audio into fixed-duration chunks""" chunk_samples = int(self.chunk_duration * sr) chunks = [] for i in range(0, len(audio), chunk_samples): chunk = audio[i:i + chunk_samples] # Pad last chunk if it's shorter if len(chunk) < chunk_samples: chunk = np.pad(chunk, (0, chunk_samples - len(chunk)), mode='constant') chunks.append(chunk) return chunks def predict_emotion(self, audio_chunk): """Predict emotion for a single audio chunk""" if self.model is None: self.load_model() # Get predictions predictions = self.model(audio_chunk) # Get top prediction top_prediction = predictions[0] # Debug: Print raw model output print(f"DEBUG - Raw prediction: {top_prediction}") # Map model output to our emotion labels emotion_label = self.map_emotion_label(top_prediction['label']) confidence = top_prediction['score'] return emotion_label, confidence def map_emotion_label(self, model_label): """Map model output labels to standardized emotion names""" # Different models may have different label formats label_lower = model_label.lower() # Use model-specific label mapping first if label_lower in self.label_mapping: return self.label_mapping[label_lower] # Fallback to common variations emotion_map = { 'hap': 'Happy', 'happy': 'Happy', 'happiness': 'Happy', 'sad': 'Sad', 'sadness': 'Sad', 'ang': 'Angry', 'angry': 'Angry', 'anger': 'Angry', 'neu': 'Neutral', 'neutral': 'Neutral', 'calm': 'Neutral', 'fear': 'Fear', 'fearful': 'Fear', 'surprise': 'Surprise', 'surprised': 'Surprise', 'disgust': 'Disgust' } # Try to find a match for key, value in emotion_map.items(): if key in label_lower: return value # Default: capitalize first letter return model_label.capitalize() def format_time(self, seconds): """Format seconds to MM:SS format""" mins = int(seconds // 60) secs = int(seconds % 60) return f"{mins:02d}:{secs:02d}" def process_audio_file(self, filepath, progress_callback=None): """ Process entire audio file and return emotion timeline Args: filepath: Path to audio file progress_callback: Optional callback function(progress, message) Returns: dict: Results containing timeline and metadata """ try: # Load model if progress_callback: progress_callback(10, "Loading model...") self.load_model() # Load audio if progress_callback: progress_callback(20, "Loading audio file...") audio, sr = self.load_audio(filepath) # Get duration duration = self.get_audio_duration(audio, sr) duration_formatted = self.format_time(duration) # Split into chunks if progress_callback: progress_callback(30, "Splitting audio into segments...") chunks = self.split_into_chunks(audio, sr) # Process each chunk timeline = [] total_chunks = len(chunks) for i, chunk in enumerate(chunks): # Calculate progress (30% to 90%) progress = 30 + int((i / total_chunks) * 60) if progress_callback: progress_callback( progress, f"Analyzing chunk {i+1}/{total_chunks}..." ) # Predict emotion emotion, confidence = self.predict_emotion(chunk) # Calculate timestamp time_seconds = i * self.chunk_duration time_formatted = self.format_time(time_seconds) timeline.append({ "time": time_formatted, "emotion": emotion, "confidence": float(confidence) }) # Calculate statistics if progress_callback: progress_callback(95, "Calculating statistics...") emotions_list = [item['emotion'] for item in timeline] unique_emotions = len(set(emotions_list)) # Find dominant emotion from collections import Counter emotion_counts = Counter(emotions_list) dominant_emotion = emotion_counts.most_common(1)[0][0] # Build results results = { "duration": duration_formatted, "total_chunks": total_chunks, "emotions_detected": unique_emotions, "dominant_emotion": dominant_emotion, "timeline": timeline } if progress_callback: progress_callback(100, "Analysis complete!") return results except Exception as e: raise Exception(f"Audio processing failed: {str(e)}") # Global processor instance _processor = None def get_processor(): """Get or create global processor instance""" global _processor if _processor is None: _processor = AudioEmotionProcessor() return _processor