Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Utility functions for Translation AI Agent | |
| """ | |
| import os | |
| import time | |
| import tempfile | |
| import logging | |
| import hashlib | |
| from typing import Optional, Tuple, List, Dict, Any | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """Audio processing utilities""" | |
| def load_audio(file_path: str, target_sr: int = 16000) -> Tuple[np.ndarray, int]: | |
| """Load and resample audio file""" | |
| try: | |
| audio, sr = librosa.load(file_path, sr=target_sr, mono=True) | |
| return audio, sr | |
| except Exception as e: | |
| logger.error(f"Error loading audio: {e}") | |
| raise | |
| def save_audio(audio: np.ndarray, file_path: str, sample_rate: int = 16000): | |
| """Save audio array to file""" | |
| try: | |
| sf.write(file_path, audio, sample_rate) | |
| except Exception as e: | |
| logger.error(f"Error saving audio: {e}") | |
| raise | |
| def get_audio_duration(file_path: str) -> float: | |
| """Get duration of audio file in seconds""" | |
| try: | |
| audio, sr = librosa.load(file_path, sr=None) | |
| return len(audio) / sr | |
| except Exception as e: | |
| logger.error(f"Error getting audio duration: {e}") | |
| return 0.0 | |
| def validate_audio_file(file_path: str, max_duration: int = 300) -> bool: | |
| """Validate audio file format and duration""" | |
| if not os.path.exists(file_path): | |
| return False | |
| try: | |
| duration = AudioProcessor.get_audio_duration(file_path) | |
| return 0 < duration <= max_duration | |
| except: | |
| return False | |
| def normalize_audio(audio: np.ndarray) -> np.ndarray: | |
| """Normalize audio to [-1, 1] range""" | |
| if audio.max() > 1.0 or audio.min() < -1.0: | |
| audio = audio / np.max(np.abs(audio)) | |
| return audio | |
| def add_silence(audio: np.ndarray, duration: float, sample_rate: int) -> np.ndarray: | |
| """Add silence to beginning and end of audio""" | |
| silence_samples = int(duration * sample_rate) | |
| silence = np.zeros(silence_samples) | |
| return np.concatenate([silence, audio, silence]) | |
| class LanguageDetector: | |
| """Language detection utilities""" | |
| def __init__(self, keywords_dict: Dict[str, List[str]]): | |
| self.keywords = keywords_dict | |
| def detect(self, text: str, threshold: int = 2) -> str: | |
| """Detect language from text using keyword matching""" | |
| text_lower = text.lower().split() | |
| scores = {} | |
| for lang, keywords in self.keywords.items(): | |
| score = sum(1 for word in keywords if word in text_lower) | |
| scores[lang] = score | |
| # Get language with highest score | |
| if scores: | |
| detected_lang, score = max(scores.items(), key=lambda x: x[1]) | |
| if score >= threshold: | |
| return detected_lang | |
| return 'en' # Default to English | |
| def get_confidence(self, text: str, detected_lang: str) -> float: | |
| """Get confidence score for detected language""" | |
| text_lower = text.lower().split() | |
| keywords = self.keywords.get(detected_lang, []) | |
| if not keywords or not text_lower: | |
| return 0.0 | |
| matches = sum(1 for word in keywords if word in text_lower) | |
| return min(matches / len(keywords), 1.0) | |
| class FileManager: | |
| """File management utilities""" | |
| def create_temp_file(suffix: str = '.wav', prefix: str = 'temp_') -> str: | |
| """Create temporary file and return path""" | |
| temp_file = tempfile.NamedTemporaryFile( | |
| suffix=suffix, | |
| prefix=prefix, | |
| delete=False | |
| ) | |
| temp_file.close() | |
| return temp_file.name | |
| def cleanup_temp_files(file_paths: List[str]): | |
| """Remove temporary files""" | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| except Exception as e: | |
| logger.warning(f"Could not remove temp file {file_path}: {e}") | |
| def ensure_directory(directory: str): | |
| """Ensure directory exists, create if not""" | |
| Path(directory).mkdir(parents=True, exist_ok=True) | |
| def get_file_hash(file_path: str) -> str: | |
| """Get SHA256 hash of file""" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| return hashlib.sha256(f.read()).hexdigest() | |
| except Exception as e: | |
| logger.error(f"Error computing file hash: {e}") | |
| return "" | |
| class ModelManager: | |
| """Model loading and management utilities""" | |
| def check_cuda_availability() -> bool: | |
| """Check if CUDA is available""" | |
| try: | |
| import torch | |
| return torch.cuda.is_available() | |
| except ImportError: | |
| return False | |
| def get_device_info() -> Dict[str, Any]: | |
| """Get device information""" | |
| info = {"has_cuda": False, "device_count": 0, "device_names": []} | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| info["has_cuda"] = True | |
| info["device_count"] = torch.cuda.device_count() | |
| info["device_names"] = [ | |
| torch.cuda.get_device_name(i) | |
| for i in range(torch.cuda.device_count()) | |
| ] | |
| except ImportError: | |
| pass | |
| return info | |
| def estimate_model_memory(model_name: str) -> int: | |
| """Estimate memory requirements for model in MB""" | |
| # Rough estimates based on common model sizes | |
| memory_estimates = { | |
| "whisper-tiny": 128, | |
| "whisper-base": 256, | |
| "whisper-small": 512, | |
| "whisper-medium": 1024, | |
| "nllb-200-distilled-600M": 1200, | |
| "nllb-200-1.3B": 2600, | |
| "speecht5": 800 | |
| } | |
| for key, memory in memory_estimates.items(): | |
| if key in model_name.lower(): | |
| return memory | |
| return 1000 # Default estimate | |
| class CacheManager: | |
| """Caching utilities""" | |
| def __init__(self, cache_dir: str, max_size: int = 1000, ttl: int = 3600): | |
| self.cache_dir = Path(cache_dir) | |
| self.max_size = max_size | |
| self.ttl = ttl # Time to live in seconds | |
| self.cache_info = {} | |
| self.ensure_cache_dir() | |
| def ensure_cache_dir(self): | |
| """Ensure cache directory exists""" | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| def get_cache_key(self, data: str) -> str: | |
| """Generate cache key from data""" | |
| return hashlib.md5(data.encode()).hexdigest() | |
| def is_cached(self, key: str) -> bool: | |
| """Check if key is in cache and not expired""" | |
| cache_file = self.cache_dir / f"{key}.cache" | |
| if not cache_file.exists(): | |
| return False | |
| # Check TTL | |
| if key in self.cache_info: | |
| cache_time = self.cache_info[key] | |
| if time.time() - cache_time > self.ttl: | |
| self.remove_from_cache(key) | |
| return False | |
| return True | |
| def get_from_cache(self, key: str) -> Optional[Any]: | |
| """Get item from cache""" | |
| if not self.is_cached(key): | |
| return None | |
| try: | |
| cache_file = self.cache_dir / f"{key}.cache" | |
| with open(cache_file, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| logger.error(f"Error reading from cache: {e}") | |
| return None | |
| def add_to_cache(self, key: str, data: str): | |
| """Add item to cache""" | |
| try: | |
| cache_file = self.cache_dir / f"{key}.cache" | |
| with open(cache_file, 'w', encoding='utf-8') as f: | |
| f.write(data) | |
| self.cache_info[key] = time.time() | |
| self.cleanup_old_cache() | |
| except Exception as e: | |
| logger.error(f"Error writing to cache: {e}") | |
| def remove_from_cache(self, key: str): | |
| """Remove item from cache""" | |
| try: | |
| cache_file = self.cache_dir / f"{key}.cache" | |
| if cache_file.exists(): | |
| cache_file.unlink() | |
| if key in self.cache_info: | |
| del self.cache_info[key] | |
| except Exception as e: | |
| logger.error(f"Error removing from cache: {e}") | |
| def cleanup_old_cache(self): | |
| """Remove old cache entries if over max size""" | |
| if len(self.cache_info) <= self.max_size: | |
| return | |
| # Sort by timestamp and remove oldest | |
| sorted_items = sorted(self.cache_info.items(), key=lambda x: x[1]) | |
| items_to_remove = len(sorted_items) - self.max_size | |
| for key, _ in sorted_items[:items_to_remove]: | |
| self.remove_from_cache(key) | |
| class MetricsTracker: | |
| """Track performance metrics""" | |
| def __init__(self): | |
| self.metrics = { | |
| "translations": 0, | |
| "speech_recognitions": 0, | |
| "text_to_speech": 0, | |
| "total_processing_time": 0, | |
| "average_processing_time": 0, | |
| "errors": 0 | |
| } | |
| self.start_time = time.time() | |
| def record_translation(self, processing_time: float): | |
| """Record a translation event""" | |
| self.metrics["translations"] += 1 | |
| self._update_timing(processing_time) | |
| def record_speech_recognition(self, processing_time: float): | |
| """Record a speech recognition event""" | |
| self.metrics["speech_recognitions"] += 1 | |
| self._update_timing(processing_time) | |
| def record_tts(self, processing_time: float): | |
| """Record a text-to-speech event""" | |
| self.metrics["text_to_speech"] += 1 | |
| self._update_timing(processing_time) | |
| def record_error(self): | |
| """Record an error event""" | |
| self.metrics["errors"] += 1 | |
| def _update_timing(self, processing_time: float): | |
| """Update timing metrics""" | |
| self.metrics["total_processing_time"] += processing_time | |
| total_operations = ( | |
| self.metrics["translations"] + | |
| self.metrics["speech_recognitions"] + | |
| self.metrics["text_to_speech"] | |
| ) | |
| if total_operations > 0: | |
| self.metrics["average_processing_time"] = ( | |
| self.metrics["total_processing_time"] / total_operations | |
| ) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get current statistics""" | |
| uptime = time.time() - self.start_time | |
| return { | |
| **self.metrics, | |
| "uptime_seconds": uptime, | |
| "operations_per_minute": ( | |
| (self.metrics["translations"] + | |
| self.metrics["speech_recognitions"] + | |
| self.metrics["text_to_speech"]) / (uptime / 60) | |
| if uptime > 0 else 0 | |
| ) | |
| } | |
| # Utility functions | |
| def format_duration(seconds: float) -> str: | |
| """Format duration in human-readable format""" | |
| if seconds < 60: | |
| return f"{seconds:.1f}s" | |
| elif seconds < 3600: | |
| minutes = int(seconds // 60) | |
| secs = int(seconds % 60) | |
| return f"{minutes}m {secs}s" | |
| else: | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| return f"{hours}h {minutes}m" | |
| def validate_language_code(code: str, supported_languages: Dict[str, str]) -> bool: | |
| """Validate language code""" | |
| return code in supported_languages | |
| def extract_language_code(display_string: str) -> str: | |
| """Extract language code from display string like 'en - English'""" | |
| return display_string.split(' - ')[0] if ' - ' in display_string else display_string | |
| def create_progress_callback(progress_bar=None): | |
| """Create progress callback for long-running operations""" | |
| def callback(current: int, total: int): | |
| if progress_bar: | |
| progress_bar.progress(current / total) | |
| return callback |