Spaces:
Sleeping
Sleeping
| """ | |
| HuggingFace Model Interface for Maya Gradio Demo | |
| Supports multiple models and providers | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| pipeline, | |
| BitsAndBytesConfig | |
| ) | |
| from peft import PeftModel | |
| import torch | |
| from huggingface_hub import HfApi | |
| import json | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ModelInterface: | |
| """ | |
| Interface for managing multiple HuggingFace models | |
| Supports local models, HF Inference API, and custom fine-tuned models | |
| """ | |
| def __init__(self): | |
| """Initialize model interface""" | |
| self.models = {} | |
| self.current_model = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.hf_api = HfApi() | |
| # Configure quantization for memory efficiency | |
| self.quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| ) if torch.cuda.is_available() else None | |
| logger.info(f"Model interface initialized on device: {self.device}") | |
| # Define available models (optimized for HuggingFace Spaces) | |
| self.available_models = { | |
| # Maya's fine-tuned LoRA model via inference API (requires Pro account) | |
| "blakeurmos/maya-7b-lora-v1": { | |
| "name": "Maya 7B (Fine-tuned)", | |
| "description": "Maya's personality fine-tuned on Mistral-7B (requires auth)", | |
| "size": "LoRA (~14MB + base model)", | |
| "type": "inference_api", | |
| "requires_auth": True, | |
| "base_model": "mistralai/Mistral-7B-Instruct-v0.3" # Original trained base | |
| }, | |
| # Backup Maya model using non-gated Mistral | |
| "mistralai/Mistral-7B-Instruct-v0.1": { | |
| "name": "Maya 7B (Mistral Base)", | |
| "description": "Mistral 7B with Maya personality via prompting", | |
| "size": "Large (~7B params)", | |
| "type": "inference_api", | |
| "requires_auth": False | |
| }, | |
| # Latest Mistral instruction model | |
| "mistralai/Mistral-7B-Instruct-v0.3": { | |
| "name": "Mistral 7B Instruct v0.3", | |
| "description": "Mistral's latest instruction-tuned model", | |
| "size": "Large (~7B params)", | |
| "type": "inference_api", | |
| "requires_auth": True | |
| }, | |
| # Moonshot AI's latest model | |
| "moonshotai/Kimi-K2-Instruct": { | |
| "name": "Kimi K2 Instruct", | |
| "description": "Moonshot AI's latest instruction model", | |
| "size": "Large", | |
| "type": "inference_api", | |
| "requires_auth": True | |
| } | |
| } | |
| def get_available_models(self) -> Dict[str, Dict[str, Any]]: | |
| """Get list of available models with metadata""" | |
| return self.available_models | |
| def _load_as_inference_api(self, model_id: str, use_auth_token: bool = False) -> bool: | |
| """Load model using inference API as fallback""" | |
| try: | |
| logger.info(f"Loading {model_id} via inference API fallback") | |
| auth_token = None | |
| if use_auth_token: | |
| auth_token = ( | |
| os.getenv("HUGGINGFACE_API_TOKEN") or | |
| os.getenv("HF_TOKEN") or | |
| os.getenv("HUGGINGFACE_TOKEN") or | |
| os.getenv("HF_API_TOKEN") | |
| ) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_id, | |
| token=auth_token, | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| self.models[model_id] = { | |
| "pipeline": pipe, | |
| "type": "inference_api", | |
| "tokenizer": None | |
| } | |
| return True | |
| except Exception as e: | |
| logger.error(f"Inference API fallback also failed for {model_id}: {e}") | |
| return False | |
| def load_model(self, model_id: str, use_auth_token: bool = False) -> bool: | |
| """ | |
| Load a model for inference | |
| Args: | |
| model_id: HuggingFace model identifier | |
| use_auth_token: Whether to use HF auth token | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| if model_id in self.models: | |
| logger.info(f"Model {model_id} already loaded") | |
| self.current_model = model_id | |
| return True | |
| model_config = self.available_models.get(model_id, {}) | |
| model_type = model_config.get("type", "local") | |
| if model_type == "inference_api": | |
| # For inference API, just create a pipeline | |
| logger.info(f"Setting up inference API pipeline for {model_id}") | |
| # Use auth token if available - check multiple possible env vars | |
| auth_token = None | |
| if use_auth_token: | |
| auth_token = ( | |
| os.getenv("HUGGINGFACE_API_TOKEN") or | |
| os.getenv("HF_TOKEN") or | |
| os.getenv("HUGGINGFACE_TOKEN") or | |
| os.getenv("HF_API_TOKEN") | |
| ) | |
| if auth_token: | |
| logger.info("Using HuggingFace authentication token") | |
| else: | |
| logger.warning("Auth requested but no HF token found in environment") | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_id, | |
| token=auth_token, | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| self.models[model_id] = { | |
| "pipeline": pipe, | |
| "type": "inference_api", | |
| "tokenizer": None | |
| } | |
| elif model_type in ["local", "custom"]: | |
| # Load model locally | |
| logger.info(f"Loading local model {model_id}...") | |
| # Check if model exists (especially for custom models) | |
| if model_config.get("exists", True) == False: | |
| try: | |
| # Try to check if the model exists on HF Hub | |
| model_info = self.hf_api.model_info(model_id) | |
| logger.info(f"Found model {model_id} on HuggingFace Hub") | |
| except Exception as e: | |
| logger.error(f"Model {model_id} not found: {e}") | |
| return False | |
| # Load tokenizer | |
| auth_token = os.getenv("HUGGINGFACE_API_TOKEN") if use_auth_token else None | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_id, | |
| token=auth_token, | |
| padding_side="left" | |
| ) | |
| # Add pad token if missing | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Load model with quantization if available | |
| load_kwargs = { | |
| "token": auth_token, | |
| "torch_dtype": torch.float16, | |
| "device_map": "auto" if torch.cuda.is_available() else None | |
| } | |
| if self.quantization_config and torch.cuda.is_available(): | |
| load_kwargs["quantization_config"] = self.quantization_config | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| **load_kwargs | |
| ) | |
| # Create pipeline | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=0 if torch.cuda.is_available() else -1, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| self.models[model_id] = { | |
| "pipeline": pipe, | |
| "tokenizer": tokenizer, | |
| "model": model, | |
| "type": "local" | |
| } | |
| elif model_type == "lora": | |
| # Load LoRA adapter with base model | |
| logger.info(f"Loading LoRA model {model_id}...") | |
| base_model_id = model_config.get("base_model") | |
| if not base_model_id: | |
| logger.error(f"No base model specified for LoRA {model_id}") | |
| return False | |
| # Use auth token if available - check multiple possible env vars | |
| auth_token = None | |
| if use_auth_token: | |
| auth_token = ( | |
| os.getenv("HUGGINGFACE_API_TOKEN") or | |
| os.getenv("HF_TOKEN") or | |
| os.getenv("HUGGINGFACE_TOKEN") or | |
| os.getenv("HF_API_TOKEN") | |
| ) | |
| if auth_token: | |
| logger.info("Using HuggingFace authentication token") | |
| else: | |
| logger.warning("Auth requested but no HF token found in environment") | |
| # Try loading base model, with fallback for Maya LoRA | |
| logger.info(f"Loading base model {base_model_id}...") | |
| try: | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| base_model_id, | |
| token=auth_token, | |
| torch_dtype=torch.float16, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| low_cpu_mem_usage=True | |
| ) | |
| except Exception as base_error: | |
| logger.warning(f"Failed to load base model {base_model_id}: {base_error}") | |
| # Check if there's a fallback base model | |
| fallback_base = model_config.get("fallback_base") | |
| if fallback_base and fallback_base != base_model_id: | |
| logger.info(f"Trying fallback base model {fallback_base}...") | |
| try: | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| fallback_base, | |
| token=auth_token, | |
| torch_dtype=torch.float16, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| low_cpu_mem_usage=True | |
| ) | |
| logger.info(f"Successfully loaded fallback base model {fallback_base}") | |
| except Exception as fallback_error: | |
| logger.error(f"Fallback base model also failed: {fallback_error}") | |
| # Convert to inference API mode as last resort | |
| logger.info("Converting to inference API mode...") | |
| return self._load_as_inference_api(model_id, use_auth_token) | |
| else: | |
| logger.error(f"No fallback available for base model {base_model_id}") | |
| return False | |
| # Load LoRA adapter | |
| logger.info(f"Loading LoRA adapter {model_id}...") | |
| model = PeftModel.from_pretrained(base_model, model_id, token=auth_token) | |
| # Load tokenizer (from base model) with fallback | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| base_model_id, | |
| token=auth_token, | |
| padding_side="left", | |
| use_fast=True # Try fast tokenizer first | |
| ) | |
| except Exception as tokenizer_error: | |
| logger.warning(f"Fast tokenizer failed: {tokenizer_error}, trying slow tokenizer...") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| base_model_id, | |
| token=auth_token, | |
| padding_side="left", | |
| use_fast=False # Fallback to slow tokenizer | |
| ) | |
| except Exception as slow_tokenizer_error: | |
| logger.error(f"Both tokenizers failed: {slow_tokenizer_error}") | |
| return False | |
| # Add pad token if missing | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Create pipeline | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=0 if torch.cuda.is_available() else -1, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| self.models[model_id] = { | |
| "pipeline": pipe, | |
| "tokenizer": tokenizer, | |
| "model": model, | |
| "type": "lora", | |
| "base_model": base_model_id | |
| } | |
| else: | |
| logger.error(f"Unknown model type: {model_type}") | |
| return False | |
| self.current_model = model_id | |
| logger.info(f"Successfully loaded model: {model_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load model {model_id}: {e}") | |
| return False | |
| def generate_response( | |
| self, | |
| prompt: str, | |
| max_length: int = 512, | |
| temperature: float = 0.7, | |
| top_p: float = 0.9, | |
| do_sample: bool = True, | |
| model_id: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Generate response using current or specified model | |
| Args: | |
| prompt: Input prompt | |
| max_length: Maximum response length | |
| temperature: Sampling temperature | |
| top_p: Top-p sampling | |
| do_sample: Whether to use sampling | |
| model_id: Specific model to use (optional) | |
| Returns: | |
| Generated response text | |
| """ | |
| try: | |
| # Use specified model or current model | |
| target_model = model_id or self.current_model | |
| if not target_model or target_model not in self.models: | |
| return "Error: No model loaded. Please select and load a model first." | |
| model_data = self.models[target_model] | |
| pipeline_obj = model_data["pipeline"] | |
| # Generate response | |
| logger.info(f"Generating response with {target_model}") | |
| # Prepare generation parameters | |
| generation_kwargs = { | |
| "max_length": max_length, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "do_sample": do_sample, | |
| "pad_token_id": pipeline_obj.tokenizer.eos_token_id, | |
| "eos_token_id": pipeline_obj.tokenizer.eos_token_id, | |
| "return_full_text": False # Only return generated text | |
| } | |
| # For local and LoRA models, we might need to format the prompt differently | |
| if model_data["type"] in ["local", "lora"]: | |
| # Some models work better with specific formatting | |
| if "llama" in target_model.lower(): | |
| formatted_prompt = f"<s>[INST] {prompt} [/INST]" | |
| elif "mistral" in target_model.lower() or model_data["type"] == "lora": | |
| # For LoRA models (especially Maya), use Mistral format since base is Mistral | |
| formatted_prompt = f"<s>[INST] {prompt} [/INST]" | |
| else: | |
| formatted_prompt = prompt | |
| elif target_model in ["blakeurmos/maya-7b-lora-v1", "mistralai/Mistral-7B-Instruct-v0.1"]: | |
| # Maya models always need Mistral format (even via inference API) | |
| formatted_prompt = f"<s>[INST] {prompt} [/INST]" | |
| else: | |
| formatted_prompt = prompt | |
| # Generate | |
| results = pipeline_obj(formatted_prompt, **generation_kwargs) | |
| if isinstance(results, list) and len(results) > 0: | |
| response = results[0].get("generated_text", "") | |
| else: | |
| response = str(results) | |
| # Clean up response | |
| response = response.strip() | |
| # Remove the original prompt if it was included | |
| if response.startswith(formatted_prompt): | |
| response = response[len(formatted_prompt):].strip() | |
| logger.info(f"Generated response length: {len(response)}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Failed to generate response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def unload_model(self, model_id: str = None): | |
| """Unload a specific model or current model""" | |
| target_model = model_id or self.current_model | |
| if target_model and target_model in self.models: | |
| del self.models[target_model] | |
| if self.current_model == target_model: | |
| self.current_model = None | |
| logger.info(f"Unloaded model: {target_model}") | |
| # Clear GPU cache | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| def get_model_info(self, model_id: str = None) -> Dict[str, Any]: | |
| """Get information about current or specified model""" | |
| target_model = model_id or self.current_model | |
| if not target_model: | |
| return {"error": "No model specified or loaded"} | |
| model_config = self.available_models.get(target_model, {}) | |
| is_loaded = target_model in self.models | |
| info = { | |
| "model_id": target_model, | |
| "name": model_config.get("name", target_model), | |
| "description": model_config.get("description", ""), | |
| "size": model_config.get("size", "Unknown"), | |
| "type": model_config.get("type", "unknown"), | |
| "is_loaded": is_loaded, | |
| "is_current": target_model == self.current_model | |
| } | |
| if is_loaded: | |
| model_data = self.models[target_model] | |
| info["device"] = str(next(model_data["pipeline"].model.parameters()).device) if hasattr(model_data["pipeline"], "model") else "unknown" | |
| return info | |
| def list_loaded_models(self) -> List[str]: | |
| """Get list of currently loaded models""" | |
| return list(self.models.keys()) | |
| def get_memory_usage(self) -> Dict[str, Any]: | |
| """Get current memory usage information""" | |
| info = { | |
| "device": self.device, | |
| "loaded_models": len(self.models), | |
| "current_model": self.current_model | |
| } | |
| if torch.cuda.is_available(): | |
| info["cuda_memory_allocated"] = f"{torch.cuda.memory_allocated() / 1024**3:.2f} GB" | |
| info["cuda_memory_reserved"] = f"{torch.cuda.memory_reserved() / 1024**3:.2f} GB" | |
| return info |