Spaces:
Runtime error
Runtime error
| """ | |
| ML MFCC Digit Processor | |
| Uses the trained MFCC + Dense NN model for digit classification | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, Union | |
| import numpy as np | |
| from .base_processor import AudioProcessor | |
| # Add project root to path for ML imports | |
| PROJECT_ROOT = Path(__file__).parent.parent | |
| sys.path.append(str(PROJECT_ROOT)) | |
| # Import ML inference | |
| from ml_training.inference import load_classifier | |
| logger = logging.getLogger(__name__) | |
| class MLMFCCProcessor(AudioProcessor): | |
| """ | |
| ML-based MFCC digit processor using trained Dense NN model. | |
| Performance characteristics (based on training results): | |
| - Test accuracy: 98.52% | |
| - Inference time: ~1-2ms | |
| - Model size: ~0.3MB | |
| """ | |
| name = "ML MFCC + Dense NN (Best)" | |
| def __init__(self, model_dir: str = "models", device: str = "auto"): | |
| """ | |
| Initialize ML MFCC processor. | |
| Args: | |
| model_dir: Directory containing trained models | |
| device: Device to run inference on ('cpu', 'cuda', or 'auto') | |
| """ | |
| super().__init__(self.name) | |
| self.model_dir = Path(model_dir) | |
| self.device = device if device != "auto" else None | |
| self.classifier = None | |
| self._configured = False | |
| # Performance tracking | |
| self.prediction_count = 0 | |
| self.total_inference_time = 0.0 | |
| self.last_prediction_time = None | |
| # Try to load the model | |
| self._initialize_classifier() | |
| logger.info(f"ML MFCC Processor initialized (configured: {self._configured})") | |
| def _initialize_classifier(self): | |
| """Initialize the ML classifier.""" | |
| try: | |
| # Check if model directory exists | |
| if not self.model_dir.exists(): | |
| logger.warning(f"Model directory not found: {self.model_dir}") | |
| return | |
| # Load the MFCC classifier | |
| self.classifier = load_classifier( | |
| model_dir=str(self.model_dir), | |
| pipeline_type='mfcc', | |
| device=self.device | |
| ) | |
| self._configured = True | |
| logger.info("ML MFCC classifier loaded successfully") | |
| logger.info(f" Model device: {self.classifier.device}") | |
| logger.info(f" Parameters: {sum(p.numel() for p in self.classifier.model.parameters()):,}") | |
| except Exception as e: | |
| logger.error(f"Failed to load ML MFCC classifier: {str(e)}") | |
| self.classifier = None | |
| self._configured = False | |
| def is_configured(self) -> bool: | |
| """Check if the processor is properly configured.""" | |
| return self._configured and self.classifier is not None | |
| def process_audio(self, audio_data: bytes) -> str: | |
| """ | |
| Process audio and return predicted digit (required by base class). | |
| Args: | |
| audio_data: Raw audio data in bytes | |
| Returns: | |
| predicted_digit: Predicted digit as string | |
| """ | |
| return self.predict(audio_data) | |
| def predict(self, audio_data: bytes) -> str: | |
| """ | |
| Predict digit from audio data. | |
| Args: | |
| audio_data: Raw audio data in bytes | |
| Returns: | |
| predicted_digit: Predicted digit as string | |
| """ | |
| if not self.is_configured(): | |
| raise RuntimeError("ML MFCC processor not properly configured") | |
| try: | |
| # Convert audio with optimized format for ML models | |
| from utils.audio_utils import convert_for_ml_models | |
| optimized_audio = convert_for_ml_models(audio_data, 'mfcc') | |
| # Convert audio bytes to numpy array | |
| audio_array = self._bytes_to_audio_array(optimized_audio) | |
| # No audio preprocessing needed - normalization happens at feature level in ML pipeline | |
| # Make prediction using ML classifier | |
| start_time = time.time() | |
| result = self.classifier.predict( | |
| audio_array, | |
| return_probabilities=True, | |
| return_features=False | |
| ) | |
| inference_time = time.time() - start_time | |
| # Update performance tracking | |
| self.prediction_count += 1 | |
| self.total_inference_time += inference_time | |
| self.last_prediction_time = inference_time | |
| predicted_digit = str(result['predicted_digit']) | |
| confidence = result['confidence'] | |
| # Debug logging for predictions (temporary) | |
| if hasattr(result, 'probabilities') or 'probabilities' in result: | |
| probs = result.get('probabilities', []) | |
| if len(probs) >= 10: | |
| top_predictions = [(i, p) for i, p in enumerate(probs)] | |
| top_predictions.sort(key=lambda x: x[1], reverse=True) | |
| logger.debug(f"MFCC Top 3 predictions: {[(str(d), f'{p:.3f}') for d, p in top_predictions[:3]]}") | |
| logger.debug(f"MFCC predicted '{predicted_digit}' with confidence {confidence:.3f} in {inference_time:.3f}s") | |
| logger.debug(f"ML MFCC prediction: '{predicted_digit}' " | |
| f"(confidence: {confidence:.3f}, time: {inference_time*1000:.1f}ms)") | |
| return predicted_digit | |
| except Exception as e: | |
| logger.error(f"ML MFCC prediction failed: {str(e)}") | |
| raise | |
| def predict_with_timing(self, audio_data: bytes) -> Dict[str, Any]: | |
| """ | |
| Predict digit with detailed timing and confidence information. | |
| Args: | |
| audio_data: Raw audio data in bytes | |
| Returns: | |
| result: Detailed prediction results | |
| """ | |
| if not self.is_configured(): | |
| return { | |
| 'success': False, | |
| 'error': 'ML MFCC processor not properly configured', | |
| 'predicted_digit': None, | |
| 'inference_time': 0.0 | |
| } | |
| try: | |
| # Convert audio with optimized format for ML models | |
| from utils.audio_utils import convert_for_ml_models | |
| optimized_audio = convert_for_ml_models(audio_data, 'mfcc') | |
| # Convert audio bytes to numpy array | |
| audio_array = self._bytes_to_audio_array(optimized_audio) | |
| # No audio preprocessing needed - normalization happens at feature level in ML pipeline | |
| # Make prediction using ML classifier | |
| start_time = time.time() | |
| ml_result = self.classifier.predict( | |
| audio_array, | |
| return_probabilities=True, | |
| return_features=False | |
| ) | |
| inference_time = time.time() - start_time | |
| # Update performance tracking | |
| self.prediction_count += 1 | |
| self.total_inference_time += inference_time | |
| self.last_prediction_time = inference_time | |
| # Format result | |
| result = { | |
| 'success': True, | |
| 'predicted_digit': str(ml_result['predicted_digit']), | |
| 'confidence': ml_result['confidence'], | |
| 'inference_time': inference_time, | |
| 'class_probabilities': { | |
| str(k): float(v) for k, v in ml_result['class_probabilities'].items() | |
| }, | |
| 'top_3_predictions': [ | |
| { | |
| 'digit': str(pred['digit']), | |
| 'probability': pred['probability'] | |
| } | |
| for pred in ml_result['top_3_predictions'] | |
| ], | |
| 'method': self.name, | |
| 'model_type': 'ml_mfcc', | |
| 'timestamp': time.time() | |
| } | |
| logger.debug(f"ML MFCC detailed prediction: '{result['predicted_digit']}' " | |
| f"(confidence: {result['confidence']:.3f}, " | |
| f"time: {inference_time*1000:.1f}ms)") | |
| return result | |
| except Exception as e: | |
| logger.error(f"ML MFCC prediction with timing failed: {str(e)}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'predicted_digit': None, | |
| 'inference_time': 0.0, | |
| 'method': self.name, | |
| 'model_type': 'ml_mfcc', | |
| 'timestamp': time.time() | |
| } | |
| def _bytes_to_audio_array(self, audio_data: bytes) -> np.ndarray: | |
| """Convert audio bytes to numpy array.""" | |
| try: | |
| # Try to interpret as int16 PCM first (most common) | |
| audio_array = np.frombuffer(audio_data, dtype=np.int16) | |
| # Convert to float32 and normalize | |
| audio_array = audio_array.astype(np.float32) / 32768.0 | |
| # If the array is too short, pad it | |
| if len(audio_array) < 1000: # Less than ~60ms at 16kHz | |
| # Pad with zeros to minimum length | |
| audio_array = np.pad(audio_array, (0, 1000 - len(audio_array))) | |
| return audio_array | |
| except Exception as e: | |
| logger.error(f"Failed to convert audio bytes to array: {str(e)}") | |
| # Return a small zero array as fallback | |
| return np.zeros(1000, dtype=np.float32) | |
| def _preprocess_audio_for_mfcc(self, audio_array: np.ndarray) -> np.ndarray: | |
| """ | |
| Apply MFCC-specific audio preprocessing to improve model performance. | |
| This compensates for missing scaler normalization. | |
| Args: | |
| audio_array: Raw audio array | |
| Returns: | |
| preprocessed_audio: Audio array optimized for MFCC feature extraction | |
| """ | |
| try: | |
| # Remove DC component | |
| audio_array = audio_array - np.mean(audio_array) | |
| # Apply gentle normalization to handle volume variations | |
| # This helps compensate for the missing feature scaler | |
| max_val = np.max(np.abs(audio_array)) | |
| if max_val > 0: | |
| audio_array = audio_array / max_val * 0.7 # Scale to 70% of max to avoid clipping | |
| # Apply a gentle high-pass filter to remove low-frequency noise | |
| # This improves MFCC feature quality | |
| from scipy import signal | |
| if len(audio_array) > 100: # Only apply if we have enough samples | |
| # Simple high-pass filter at ~300Hz for 8kHz sample rate | |
| sos = signal.butter(2, 300, btype='high', fs=8000, output='sos') | |
| audio_array = signal.sosfilt(sos, audio_array) | |
| # Ensure we don't have any NaN or inf values | |
| audio_array = np.nan_to_num(audio_array, nan=0.0, posinf=0.0, neginf=0.0) | |
| logger.debug(f"MFCC preprocessing applied: range=[{np.min(audio_array):.3f}, {np.max(audio_array):.3f}], " | |
| f"mean={np.mean(audio_array):.3f}, std={np.std(audio_array):.3f}") | |
| return audio_array | |
| except ImportError: | |
| # Fallback if scipy is not available - just normalize | |
| logger.warning("Scipy not available, using basic normalization") | |
| audio_array = audio_array - np.mean(audio_array) | |
| max_val = np.max(np.abs(audio_array)) | |
| if max_val > 0: | |
| audio_array = audio_array / max_val * 0.7 | |
| return audio_array | |
| except Exception as e: | |
| logger.error(f"MFCC preprocessing failed: {str(e)}") | |
| # Return original array if preprocessing fails | |
| return audio_array | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get processor performance statistics.""" | |
| stats = super().get_stats() | |
| if self.prediction_count > 0: | |
| stats.update({ | |
| 'ml_predictions': self.prediction_count, | |
| 'average_inference_time': self.total_inference_time / self.prediction_count, | |
| 'last_inference_time': self.last_prediction_time, | |
| 'throughput_per_second': self.prediction_count / self.total_inference_time if self.total_inference_time > 0 else 0, | |
| 'model_configured': self.is_configured() | |
| }) | |
| if self.classifier: | |
| # Get ML classifier performance stats | |
| ml_stats = self.classifier.get_performance_stats() | |
| stats['ml_classifier_stats'] = ml_stats | |
| return stats | |
| def get_model_info(self) -> Dict[str, Any]: | |
| """Get information about the loaded model.""" | |
| if not self.is_configured(): | |
| return {'error': 'Model not loaded'} | |
| try: | |
| info = { | |
| 'pipeline_type': 'mfcc', | |
| 'model_class': self.classifier.model.__class__.__name__, | |
| 'device': str(self.classifier.device), | |
| 'parameters': sum(p.numel() for p in self.classifier.model.parameters()), | |
| 'feature_extractor': self.classifier.feature_extractor.__class__.__name__, | |
| 'has_scaler': self.classifier.scaler is not None, | |
| 'expected_sample_rate': 8000, | |
| 'expected_audio_length': 8000 # 1 second at 8kHz | |
| } | |
| if hasattr(self.classifier, 'model_path'): | |
| info['model_path'] = str(self.classifier.model_path) | |
| return info | |
| except Exception as e: | |
| logger.error(f"Failed to get model info: {str(e)}") | |
| return {'error': str(e)} | |
| def benchmark_speed(self, num_samples: int = 100) -> Dict[str, Any]: | |
| """Benchmark inference speed.""" | |
| if not self.is_configured(): | |
| return {'error': 'Model not configured'} | |
| try: | |
| return self.classifier.benchmark_speed(num_samples) | |
| except Exception as e: | |
| logger.error(f"Benchmark failed: {str(e)}") | |
| return {'error': str(e)} |