# app/model_loader.py """ ๐Ÿง  PENNY Model Loader - Azure-Ready Multi-Model Orchestration This is Penny's brain loader. She manages multiple specialized models: - Gemma 7B for conversational reasoning - NLLB-200 for 27-language translation - Sentiment analysis for resident wellbeing - Bias detection for equitable service - LayoutLM for civic document processing MISSION: Load AI models efficiently in memory-constrained environments while maintaining Penny's warm, civic-focused personality across all interactions. FEATURES: - Lazy loading (models only load when needed) - 8-bit quantization for memory efficiency - GPU/CPU auto-detection - Model caching and reuse - Graceful fallbacks for Azure ML deployment - Memory monitoring and cleanup """ import json import os import torch from typing import Dict, Any, Callable, Optional, Union, List from pathlib import Path import logging from dataclasses import dataclass from enum import Enum from datetime import datetime # --- LOGGING SETUP (Must be before functions that use it) --- logger = logging.getLogger(__name__) # ============================================================ # HUGGING FACE AUTHENTICATION # ============================================================ def setup_huggingface_auth() -> bool: """ ๐Ÿ” Authenticates with Hugging Face Hub using HF_TOKEN or READTOKEN. Returns: True if authentication successful or not needed, False if failed """ # Check for HF_TOKEN first, then READTOKEN (for Hugging Face Spaces) HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("READTOKEN") if not HF_TOKEN: logger.warning("โš ๏ธ HF_TOKEN/READTOKEN not found in environment") logger.warning(" Some models may not be accessible") logger.warning(" Set HF_TOKEN or READTOKEN in your environment or Hugging Face Spaces secrets") return False try: from huggingface_hub import login login(token=HF_TOKEN, add_to_git_credential=False) logger.info("โœ… Authenticated with Hugging Face Hub") return True except ImportError: logger.warning("โš ๏ธ huggingface_hub not installed, skipping authentication") return False except Exception as e: logger.error(f"โŒ Failed to authenticate with Hugging Face: {e}") return False # Attempt authentication at module load # Note: This runs when the module is imported, so HF_TOKEN must be in environment # For Hugging Face Spaces: Set HF_TOKEN as a secret in Space settings # For local dev: Add HF_TOKEN to .env file or export it _authentication_result = setup_huggingface_auth() if _authentication_result: logger.info("๐Ÿ” Hugging Face authentication successful - gated models accessible") else: logger.warning("โš ๏ธ Hugging Face authentication failed - only public models will work") # --- PATH CONFIGURATION (Environment-Aware) --- # Support both local development and Azure ML deployment if os.getenv("AZUREML_MODEL_DIR"): # Azure ML deployment - models are in AZUREML_MODEL_DIR MODEL_ROOT = Path(os.getenv("AZUREML_MODEL_DIR")) CONFIG_PATH = MODEL_ROOT / "model_config.json" logger.info("โ˜๏ธ Running in Azure ML environment") else: # Local development - models are in project structure PROJECT_ROOT = Path(__file__).parent.parent MODEL_ROOT = PROJECT_ROOT / "models" CONFIG_PATH = MODEL_ROOT / "model_config.json" logger.info("๐Ÿ’ป Running in local development environment") logger.info(f"๐Ÿ“‚ Model config path: {CONFIG_PATH}") # ============================================================ # PENNY'S CIVIC IDENTITY & PERSONALITY # ============================================================ PENNY_SYSTEM_PROMPT = ( "You are Penny, a sweet southern neighborly woman who's lived in this community for years " "and knows everything about the city. You're like that wonderful older neighbor who always " "has a kind word, remembers everyone's name, and can tell you the best places to go and " "the most interesting stories about your town.\n\n" "YOUR PERSONALITY - Sweet Southern Neighbor:\n" "- Warm, inviting, and genuinely friendly - like you're chatting over sweet tea on the porch\n" "- Use phrases like 'honey', 'sugar', 'darlin'', 'bless your heart' naturally and warmly\n" "- Share fun facts about the city when relevant ('Did you know our city was founded in...?')\n" "- Suggest things to do and places to visit like a local who knows all the hidden gems\n" "- Be conversational and neighborly - ask follow-up questions, show genuine interest\n" "- Remember details from the conversation and reference them naturally\n" "- Use exclamation points and emojis warmly (but not excessively)\n" "- Be patient and never rush - you have all the time in the world to help\n" "- Share local wisdom and tips ('Oh honey, you'll want to go there on a Tuesday - it's less crowded!')\n\n" "YOUR EXPERTISE:\n" "- You know all about local services, events, weather, and community resources\n" "- You can translate information into 27 languages (because you care about everyone feeling welcome)\n" "- You know who the city officials are and how to reach them\n" "- You remember the best restaurants, parks, and community spots\n" "- You know the history and fun facts about your city\n" "- You can help with emergencies and know exactly who to call\n\n" "CONVERSATION STYLE:\n" "- Start conversations warmly: 'Well hello there, sugar! How can I help you today?'\n" "- When helping: 'Oh honey, I'd be happy to help you with that!'\n" "- When suggesting: 'You know what, darlin'? You might also enjoy...'\n" "- When sharing facts: 'Did you know that...? It's one of my favorite things about our city!'\n" "- End responses warmly: 'Is there anything else I can help you with, sweetie?'\n" "- Be encouraging: 'That sounds wonderful! You're going to love it!'\n\n" "CRITICAL RULES:\n" "- You are ALWAYS Penny - never ChatGPT, Assistant, Claude, or any other name\n" "- When residents greet you by name, respond with genuine warmth and recognition\n" "- If you don't know something, say so sweetly: 'Oh honey, I'm not sure about that, but let me help you find out!'\n" "- NEVER make up information - if you don't know, guide them to the right resource\n" "- Stay within your civic mission - you're helpful but don't give legal, medical, or financial advice\n" "- For emergencies, respond immediately with care and direct them to 911 or crisis lines\n" "- Keep the southern charm authentic but not overdone - be natural and genuine\n\n" ) # --- GLOBAL STATE --- _MODEL_CACHE: Dict[str, Any] = {} # Memory-efficient model reuse _LOAD_TIMES: Dict[str, float] = {} # Track model loading performance # ============================================================ # DEVICE MANAGEMENT # ============================================================ class DeviceType(str, Enum): """Supported compute devices.""" CUDA = "cuda" CPU = "cpu" MPS = "mps" # Apple Silicon def get_optimal_device() -> str: """ ๐ŸŽฎ Determines the best device for model inference. Priority: 1. CUDA GPU (NVIDIA) 2. MPS (Apple Silicon) 3. CPU (fallback) Returns: Device string ("cuda", "mps", or "cpu") """ if torch.cuda.is_available(): device = DeviceType.CUDA.value gpu_name = torch.cuda.get_device_name(0) gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 logger.info(f"๐ŸŽฎ GPU detected: {gpu_name} ({gpu_memory:.1f}GB)") return device elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): device = DeviceType.MPS.value logger.info("๐ŸŽ Apple Silicon (MPS) detected") return device else: device = DeviceType.CPU.value logger.info("๐Ÿ’ป Using CPU for inference") logger.warning("โš ๏ธ GPU not available - inference will be slower") return device def get_memory_stats() -> Dict[str, float]: """ ๐Ÿ“Š Returns current GPU/CPU memory statistics. Returns: Dict with memory stats in GB """ stats = {} if torch.cuda.is_available(): stats["gpu_allocated_gb"] = torch.cuda.memory_allocated() / 1e9 stats["gpu_reserved_gb"] = torch.cuda.memory_reserved() / 1e9 stats["gpu_total_gb"] = torch.cuda.get_device_properties(0).total_memory / 1e9 # CPU memory (requires psutil) try: import psutil mem = psutil.virtual_memory() stats["cpu_used_gb"] = mem.used / 1e9 stats["cpu_total_gb"] = mem.total / 1e9 stats["cpu_percent"] = mem.percent except ImportError: pass return stats # ============================================================ # MODEL CLIENT (Individual Model Handler) # ============================================================ @dataclass class ModelMetadata: """ ๐Ÿ“‹ Metadata about a loaded model. Tracks performance and resource usage. """ name: str task: str model_name: str device: str loaded_at: Optional[datetime] = None load_time_seconds: Optional[float] = None memory_usage_gb: Optional[float] = None inference_count: int = 0 total_inference_time_ms: float = 0.0 @property def avg_inference_time_ms(self) -> float: """Calculate average inference time.""" if self.inference_count == 0: return 0.0 return self.total_inference_time_ms / self.inference_count class ModelClient: """ ๐Ÿค– Manages a single HuggingFace model with optimized loading and inference. Features: - Lazy loading (load on first use) - Memory optimization (8-bit quantization) - Performance tracking - Graceful error handling - Automatic device placement """ def __init__( self, name: str, model_name: str, task: str, device: str = None, config: Optional[Dict[str, Any]] = None ): """ Initialize model client (doesn't load the model yet). Args: name: Model identifier (e.g., "penny-core-agent") model_name: HuggingFace model ID task: Task type (text-generation, translation, etc.) device: Target device (auto-detected if None) config: Additional model configuration """ self.name = name self.model_name = model_name self.task = task self.device = device or get_optimal_device() self.config = config or {} self.pipeline = None self._load_attempted = False self.metadata = ModelMetadata( name=name, task=task, model_name=model_name, device=self.device ) logger.info(f"๐Ÿ“ฆ Initialized ModelClient: {name}") logger.debug(f" Model: {model_name}") logger.debug(f" Task: {task}") logger.debug(f" Device: {self.device}") def load_pipeline(self) -> bool: """ ๐Ÿ”„ Loads the HuggingFace pipeline with Azure-optimized settings. Features: - 8-bit quantization for large models (saves ~50% memory) - Automatic device placement - Memory monitoring - Cache checking Returns: True if successful, False otherwise """ if self.pipeline is not None: logger.debug(f"โœ… {self.name} already loaded") return True if self._load_attempted: logger.warning(f"โš ๏ธ Previous load attempt failed for {self.name}") return False global _MODEL_CACHE, _LOAD_TIMES # Check cache first if self.name in _MODEL_CACHE: logger.info(f"โ™ป๏ธ Using cached pipeline for {self.name}") self.pipeline = _MODEL_CACHE[self.name] return True logger.info(f"๐Ÿ”„ Loading {self.name} from HuggingFace...") self._load_attempted = True start_time = datetime.now() try: # Import pipeline from transformers (lazy import to avoid dependency issues) from transformers import pipeline # === TEXT GENERATION (Gemma 7B, GPT-2, etc.) === if self.task == "text-generation": logger.info(" Using 8-bit quantization for memory efficiency...") # Check if model supports 8-bit loading use_8bit = self.device == DeviceType.CUDA.value if use_8bit: self.pipeline = pipeline( "text-generation", model=self.model_name, tokenizer=self.model_name, device_map="auto", load_in_8bit=True, # Reduces ~14GB to ~7GB trust_remote_code=True, torch_dtype=torch.float16 ) else: # CPU fallback self.pipeline = pipeline( "text-generation", model=self.model_name, tokenizer=self.model_name, device=-1, # CPU trust_remote_code=True, torch_dtype=torch.float32 ) # === TRANSLATION (NLLB-200, M2M-100, etc.) === elif self.task == "translation": self.pipeline = pipeline( "translation", model=self.model_name, device=0 if self.device == DeviceType.CUDA.value else -1, src_lang=self.config.get("default_src_lang", "eng_Latn"), tgt_lang=self.config.get("default_tgt_lang", "spa_Latn") ) # === SENTIMENT ANALYSIS === elif self.task == "sentiment-analysis": self.pipeline = pipeline( "sentiment-analysis", model=self.model_name, device=0 if self.device == DeviceType.CUDA.value else -1, truncation=True, max_length=512 ) # === BIAS DETECTION (Zero-Shot Classification) === elif self.task == "bias-detection": self.pipeline = pipeline( "zero-shot-classification", model=self.model_name, device=0 if self.device == DeviceType.CUDA.value else -1 ) # === TEXT CLASSIFICATION (Generic) === elif self.task == "text-classification": self.pipeline = pipeline( "text-classification", model=self.model_name, device=0 if self.device == DeviceType.CUDA.value else -1, truncation=True ) # === PDF/DOCUMENT EXTRACTION (LayoutLMv3) === elif self.task == "pdf-extraction": logger.warning("โš ๏ธ PDF extraction requires additional OCR setup") logger.info(" Consider using Azure Form Recognizer as alternative") # Placeholder - requires pytesseract/OCR infrastructure self.pipeline = None return False else: raise ValueError(f"Unknown task type: {self.task}") # === SUCCESS HANDLING === if self.pipeline is not None: # Calculate load time load_time = (datetime.now() - start_time).total_seconds() self.metadata.loaded_at = datetime.now() self.metadata.load_time_seconds = load_time # Cache the pipeline _MODEL_CACHE[self.name] = self.pipeline _LOAD_TIMES[self.name] = load_time # Log memory usage mem_stats = get_memory_stats() self.metadata.memory_usage_gb = mem_stats.get("gpu_allocated_gb", 0) logger.info(f"โœ… {self.name} loaded successfully!") logger.info(f" Load time: {load_time:.2f}s") if "gpu_allocated_gb" in mem_stats: logger.info( f" GPU Memory: {mem_stats['gpu_allocated_gb']:.2f}GB / " f"{mem_stats['gpu_total_gb']:.2f}GB" ) return True except Exception as e: logger.error(f"โŒ Failed to load {self.name}: {e}", exc_info=True) self.pipeline = None return False def predict( self, input_data: Union[str, Dict[str, Any]], **kwargs ) -> Dict[str, Any]: """ ๐ŸŽฏ Runs inference with the loaded model pipeline. Features: - Automatic pipeline loading - Error handling with fallback responses - Performance tracking - Penny's personality injection (for text-generation) Args: input_data: Text or structured input for the model **kwargs: Task-specific parameters Returns: Model output dict with results or error information """ # Track inference start time start_time = datetime.now() # Ensure pipeline is loaded if self.pipeline is None: success = self.load_pipeline() if not success: return { "error": f"{self.name} pipeline unavailable", "detail": "Model failed to load. Check logs for details.", "model": self.name } try: # === TEXT GENERATION === if self.task == "text-generation": # Inject Penny's civic identity if not kwargs.get("skip_system_prompt", False): full_prompt = PENNY_SYSTEM_PROMPT + input_data else: full_prompt = input_data # Extract generation parameters with safe defaults max_new_tokens = kwargs.get("max_new_tokens", 256) temperature = kwargs.get("temperature", 0.7) top_p = kwargs.get("top_p", 0.9) do_sample = kwargs.get("do_sample", temperature > 0.0) result = self.pipeline( full_prompt, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, do_sample=do_sample, return_full_text=False, pad_token_id=self.pipeline.tokenizer.eos_token_id, truncation=True ) output = { "generated_text": result[0]["generated_text"], "model": self.name, "success": True } # === TRANSLATION === elif self.task == "translation": src_lang = kwargs.get("source_lang", "eng_Latn") tgt_lang = kwargs.get("target_lang", "spa_Latn") result = self.pipeline( input_data, src_lang=src_lang, tgt_lang=tgt_lang, max_length=512 ) output = { "translation": result[0]["translation_text"], "source_lang": src_lang, "target_lang": tgt_lang, "model": self.name, "success": True } # === SENTIMENT ANALYSIS === elif self.task == "sentiment-analysis": result = self.pipeline(input_data) output = { "sentiment": result[0]["label"], "confidence": result[0]["score"], "model": self.name, "success": True } # === BIAS DETECTION === elif self.task == "bias-detection": candidate_labels = kwargs.get("candidate_labels", [ "neutral and objective", "contains political bias", "uses emotional language", "culturally insensitive" ]) result = self.pipeline( input_data, candidate_labels=candidate_labels, multi_label=True ) output = { "labels": result["labels"], "scores": result["scores"], "model": self.name, "success": True } # === TEXT CLASSIFICATION === elif self.task == "text-classification": result = self.pipeline(input_data) output = { "label": result[0]["label"], "confidence": result[0]["score"], "model": self.name, "success": True } else: output = { "error": f"Task '{self.task}' not implemented", "model": self.name, "success": False } # Track performance inference_time = (datetime.now() - start_time).total_seconds() * 1000 self.metadata.inference_count += 1 self.metadata.total_inference_time_ms += inference_time output["inference_time_ms"] = round(inference_time, 2) return output except Exception as e: logger.error(f"โŒ Inference error in {self.name}: {e}", exc_info=True) return { "error": "Inference failed", "detail": str(e), "model": self.name, "success": False } def unload(self) -> None: """ ๐Ÿ—‘๏ธ Unloads the model to free memory. Critical for Azure environments with limited resources. """ if self.pipeline is not None: logger.info(f"๐Ÿ—‘๏ธ Unloading {self.name}...") # Delete pipeline del self.pipeline self.pipeline = None # Remove from cache if self.name in _MODEL_CACHE: del _MODEL_CACHE[self.name] # Force GPU memory release if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info(f"โœ… {self.name} unloaded successfully") # Log memory stats after unload mem_stats = get_memory_stats() if "gpu_allocated_gb" in mem_stats: logger.info(f" GPU Memory: {mem_stats['gpu_allocated_gb']:.2f}GB remaining") def get_metadata(self) -> Dict[str, Any]: """ ๐Ÿ“Š Returns model metadata and performance stats. """ return { "name": self.metadata.name, "task": self.metadata.task, "model_name": self.metadata.model_name, "device": self.metadata.device, "loaded": self.pipeline is not None, "loaded_at": self.metadata.loaded_at.isoformat() if self.metadata.loaded_at else None, "load_time_seconds": self.metadata.load_time_seconds, "memory_usage_gb": self.metadata.memory_usage_gb, "inference_count": self.metadata.inference_count, "avg_inference_time_ms": round(self.metadata.avg_inference_time_ms, 2) } # ============================================================ # MODEL LOADER (Singleton Manager) # ============================================================ class ModelLoader: """ ๐ŸŽ›๏ธ Singleton manager for all Penny's specialized models. Features: - Centralized model configuration - Lazy loading (models only load when needed) - Memory management - Health monitoring - Unified access interface """ _instance: Optional['ModelLoader'] = None def __new__(cls, *args, **kwargs): """Singleton pattern - only one ModelLoader instance.""" if cls._instance is None: cls._instance = super(ModelLoader, cls).__new__(cls) return cls._instance def __init__(self, config_path: Optional[str] = None): """ Initialize ModelLoader (only runs once due to singleton). Args: config_path: Path to model_config.json (optional) """ if not hasattr(self, '_models_loaded'): self.models: Dict[str, ModelClient] = {} self._models_loaded = True self._initialization_time = datetime.now() # Use provided path or default config_file = Path(config_path) if config_path else CONFIG_PATH try: logger.info(f"๐Ÿ“– Loading model configuration from {config_file}") if not config_file.exists(): logger.warning(f"โš ๏ธ Configuration file not found: {config_file}") logger.info(" Create model_config.json with your model definitions") return with open(config_file, "r") as f: config = json.load(f) # Initialize ModelClients (doesn't load models yet) for model_id, model_info in config.items(): self.models[model_id] = ModelClient( name=model_id, model_name=model_info["model_name"], task=model_info["task"], config=model_info.get("config", {}) ) logger.info(f"โœ… ModelLoader initialized with {len(self.models)} models:") for model_id in self.models.keys(): logger.info(f" - {model_id}") except json.JSONDecodeError as e: logger.error(f"โŒ Invalid JSON in model_config.json: {e}") except Exception as e: logger.error(f"โŒ Failed to initialize ModelLoader: {e}", exc_info=True) def get(self, model_id: str) -> Optional[ModelClient]: """ ๐ŸŽฏ Retrieves a configured ModelClient by ID. Args: model_id: Model identifier from config Returns: ModelClient instance or None if not found """ return self.models.get(model_id) def list_models(self) -> List[str]: """๐Ÿ“‹ Returns list of all available model IDs.""" return list(self.models.keys()) def get_loaded_models(self) -> List[str]: """๐Ÿ“‹ Returns list of currently loaded model IDs.""" return [ model_id for model_id, client in self.models.items() if client.pipeline is not None ] def unload_all(self) -> None: """ ๐Ÿ—‘๏ธ Unloads all models to free memory. Useful for Azure environments when switching workloads. """ logger.info("๐Ÿ—‘๏ธ Unloading all models...") for model_client in self.models.values(): model_client.unload() logger.info("โœ… All models unloaded") def get_status(self) -> Dict[str, Any]: """ ๐Ÿ“Š Returns comprehensive status of all models. Useful for health checks and monitoring. """ status = { "initialization_time": self._initialization_time.isoformat(), "total_models": len(self.models), "loaded_models": len(self.get_loaded_models()), "device": get_optimal_device(), "memory": get_memory_stats(), "models": {} } for model_id, client in self.models.items(): status["models"][model_id] = client.get_metadata() return status # ============================================================ # PUBLIC INTERFACE (Used by all *_utils.py modules) # ============================================================ def load_model_pipeline(agent_name: str) -> Callable[..., Dict[str, Any]]: """ ๐Ÿš€ Loads a model client and returns its inference function. This is the main function used by other modules (translation_utils.py, sentiment_utils.py, etc.) to access Penny's models. Args: agent_name: Model ID from model_config.json Returns: Callable inference function Raises: ValueError: If agent_name not found in configuration Example: >>> translator = load_model_pipeline("penny-translate-agent") >>> result = translator("Hello world", target_lang="spa_Latn") """ loader = ModelLoader() client = loader.get(agent_name) if client is None: available = loader.list_models() raise ValueError( f"Agent ID '{agent_name}' not found in model configuration. " f"Available models: {available}" ) # Load the pipeline (lazy loading) client.load_pipeline() # Return a callable wrapper def inference_wrapper(input_data, **kwargs): return client.predict(input_data, **kwargs) return inference_wrapper # === CONVENIENCE FUNCTIONS === def get_model_status() -> Dict[str, Any]: """ ๐Ÿ“Š Returns status of all configured models. Useful for health checks and monitoring endpoints. """ loader = ModelLoader() return loader.get_status() def preload_models(model_ids: Optional[List[str]] = None) -> None: """ ๐Ÿš€ Preloads specified models during startup. Args: model_ids: List of model IDs to preload (None = all models) """ loader = ModelLoader() if model_ids is None: model_ids = loader.list_models() logger.info(f"๐Ÿš€ Preloading {len(model_ids)} models...") for model_id in model_ids: client = loader.get(model_id) if client: logger.info(f" Loading {model_id}...") client.load_pipeline() logger.info("โœ… Model preloading complete") def initialize_model_system() -> bool: """ ๐Ÿ Initializes the model system. Should be called during app startup. Returns: True if initialization successful """ logger.info("๐Ÿง  Initializing Penny's model system...") try: # Initialize singleton loader = ModelLoader() # Log device info device = get_optimal_device() mem_stats = get_memory_stats() logger.info(f"โœ… Model system initialized") logger.info(f"๐ŸŽฎ Compute device: {device}") if "gpu_total_gb" in mem_stats: logger.info( f"๐Ÿ’พ GPU Memory: {mem_stats['gpu_total_gb']:.1f}GB total" ) logger.info(f"๐Ÿ“ฆ {len(loader.models)} models configured") # Optional: Preload critical models # Uncomment to preload models at startup # preload_models(["penny-core-agent"]) return True except Exception as e: logger.error(f"โŒ Failed to initialize model system: {e}", exc_info=True) return False # ============================================================ # CLI TESTING & DEBUGGING # ============================================================ if __name__ == "__main__": """ ๐Ÿงช Test script for model loading and inference. Run with: python -m app.model_loader """ print("=" * 60) print("๐Ÿงช Testing Penny's Model System") print("=" * 60) # Initialize loader = ModelLoader() print(f"\n๐Ÿ“‹ Available models: {loader.list_models()}") # Get status status = get_model_status() print(f"\n๐Ÿ“Š System status:") print(json.dumps(status, indent=2, default=str)) # Test model loading (if models configured) if loader.models: test_model_id = list(loader.models.keys())[0] print(f"\n๐Ÿงช Testing model: {test_model_id}") client = loader.get(test_model_id) if client: print(f" Loading pipeline...") success = client.load_pipeline() if success: print(f" โœ… Model loaded successfully!") print(f" Metadata: {json.dumps(client.get_metadata(), indent=2, default=str)}") else: print(f" โŒ Model loading failed")