9x25dillon's picture
Initial upload of LiMp Pipeline Integration System
22ae78a verified
#!/usr/bin/env python3
"""
HuggingFace Model Orchestrator
===============================
Loads and manages HuggingFace models for the dual-LLM pipeline.
Supports LFM2-8B-A1B-Dimensional-Entanglement and 9xdSq-LIMPS-FemTO-R1C.
"""
import torch
import logging
import gc
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, List
from dataclasses import dataclass
import warnings
# HuggingFace imports
try:
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig
from transformers import BitsAndBytesConfig, GenerationConfig
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
print("⚠️ Transformers not available - install with: pip install transformers")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ModelConfig:
"""Configuration for HuggingFace model loading."""
model_name: str
device: str = "auto"
torch_dtype: torch.dtype = torch.bfloat16
trust_remote_code: bool = True
use_cache: bool = True
low_cpu_mem_usage: bool = True
quantization_config: Optional[Any] = None
max_memory: Optional[Dict[int, str]] = None
offload_folder: Optional[str] = None
@dataclass
class GenerationSettings:
"""Settings for text generation."""
max_new_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.9
top_k: int = 50
repetition_penalty: float = 1.1
do_sample: bool = True
pad_token_id: Optional[int] = None
eos_token_id: Optional[int] = None
class HuggingFaceModelOrchestrator:
"""Orchestrator for managing HuggingFace models in the dual-LLM pipeline."""
def __init__(self, primary_config: ModelConfig, secondary_config: ModelConfig):
self.primary_config = primary_config
self.secondary_config = secondary_config
self.primary_model = None
self.primary_tokenizer = None
self.secondary_model = None
self.secondary_tokenizer = None
self.device = self._determine_device()
self.generation_settings = GenerationSettings()
# Model cache for loaded models
self.model_cache = {}
self.tokenizer_cache = {}
logger.info(f"🤖 Initializing HuggingFace Model Orchestrator")
logger.info(f" Primary Model: {primary_config.model_name}")
logger.info(f" Secondary Model: {secondary_config.model_name}")
logger.info(f" Device: {self.device}")
def _determine_device(self) -> str:
"""Determine the best device for model loading."""
if torch.cuda.is_available():
gpu_count = torch.cuda.device_count()
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
logger.info(f"🖥️ GPU detected: {gpu_count} devices, {gpu_memory:.1f}GB total memory")
if gpu_memory > 20: # Sufficient memory for both models
return "cuda:0"
else:
logger.warning("⚠️ Limited GPU memory, using CPU for secondary model")
return "cpu"
else:
logger.info("🖥️ No GPU detected, using CPU")
return "cpu"
def _setup_quantization_config(self) -> Optional[BitsAndBytesConfig]:
"""Setup quantization configuration for memory optimization."""
if torch.cuda.is_available():
try:
return BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_enable_fp32_cpu_offload=True,
llm_int8_skip_modules=["lm_head"]
)
except Exception as e:
logger.warning(f"⚠️ Quantization setup failed: {e}")
return None
return None
def _load_model_and_tokenizer(self, config: ModelConfig, model_key: str) -> Tuple[Any, Any]:
"""Load a single model and tokenizer with error handling."""
logger.info(f"📥 Loading {model_key} model: {config.model_name}")
try:
# Load tokenizer first
tokenizer = AutoTokenizer.from_pretrained(
config.model_name,
trust_remote_code=config.trust_remote_code,
use_fast=True
)
# Set pad token if not exists
if tokenizer.pad_token is None:
if tokenizer.eos_token is not None:
tokenizer.pad_token = tokenizer.eos_token
else:
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
# Load model configuration
model_config = AutoConfig.from_pretrained(
config.model_name,
trust_remote_code=config.trust_remote_code
)
# Setup quantization if needed
quantization_config = config.quantization_config or self._setup_quantization_config()
# Load model
model = AutoModelForCausalLM.from_pretrained(
config.model_name,
config=model_config,
torch_dtype=config.torch_dtype,
device_map=config.device,
trust_remote_code=config.trust_remote_code,
use_cache=config.use_cache,
low_cpu_mem_usage=config.low_cpu_mem_usage,
quantization_config=quantization_config,
max_memory=config.max_memory,
offload_folder=config.offload_folder
)
# Set generation config
model.generation_config = GenerationConfig(
max_new_tokens=self.generation_settings.max_new_tokens,
temperature=self.generation_settings.temperature,
top_p=self.generation_settings.top_p,
top_k=self.generation_settings.top_k,
repetition_penalty=self.generation_settings.repetition_penalty,
do_sample=self.generation_settings.do_sample,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
logger.info(f"✅ {model_key} model loaded successfully")
logger.info(f" Model size: {model.num_parameters():,} parameters")
logger.info(f" Device: {next(model.parameters()).device}")
return model, tokenizer
except Exception as e:
logger.error(f"❌ Failed to load {model_key} model: {e}")
raise
def load_primary_model(self) -> bool:
"""Load the primary LFM2-8B model."""
try:
self.primary_model, self.primary_tokenizer = self._load_model_and_tokenizer(
self.primary_config, "Primary"
)
return True
except Exception as e:
logger.error(f"❌ Primary model loading failed: {e}")
return False
def load_secondary_model(self) -> bool:
"""Load the secondary FemTO-R1C model."""
try:
# Use CPU for secondary model if memory is limited
if torch.cuda.is_available():
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
if gpu_memory < 24: # Less than 24GB GPU memory
secondary_config = ModelConfig(
model_name=self.secondary_config.model_name,
device="cpu",
torch_dtype=torch.float32, # Use float32 on CPU
trust_remote_code=self.secondary_config.trust_remote_code,
use_cache=self.secondary_config.use_cache,
low_cpu_mem_usage=self.secondary_config.low_cpu_mem_usage
)
else:
secondary_config = self.secondary_config
else:
secondary_config = ModelConfig(
model_name=self.secondary_config.model_name,
device="cpu",
torch_dtype=torch.float32,
trust_remote_code=self.secondary_config.trust_remote_code,
use_cache=self.secondary_config.use_cache,
low_cpu_mem_usage=self.secondary_config.low_cpu_mem_usage
)
self.secondary_model, self.secondary_tokenizer = self._load_model_and_tokenizer(
secondary_config, "Secondary"
)
return True
except Exception as e:
logger.error(f"❌ Secondary model loading failed: {e}")
return False
def load_all_models(self) -> bool:
"""Load both primary and secondary models."""
logger.info("🚀 Loading all HuggingFace models...")
# Load primary model first
if not self.load_primary_model():
return False
# Load secondary model
if not self.load_secondary_model():
logger.warning("⚠️ Secondary model failed, continuing with primary only")
logger.info("✅ All models loaded successfully")
return True
def generate_with_primary(self, prompt: str, **kwargs) -> str:
"""Generate text using the primary model."""
if self.primary_model is None or self.primary_tokenizer is None:
raise RuntimeError("Primary model not loaded")
try:
# Tokenize input
inputs = self.primary_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
inputs = {k: v.to(self.primary_model.device) for k, v in inputs.items()}
# Generate
with torch.no_grad():
outputs = self.primary_model.generate(
**inputs,
max_new_tokens=kwargs.get('max_new_tokens', self.generation_settings.max_new_tokens),
temperature=kwargs.get('temperature', self.generation_settings.temperature),
top_p=kwargs.get('top_p', self.generation_settings.top_p),
top_k=kwargs.get('top_k', self.generation_settings.top_k),
repetition_penalty=kwargs.get('repetition_penalty', self.generation_settings.repetition_penalty),
do_sample=kwargs.get('do_sample', self.generation_settings.do_sample),
pad_token_id=self.primary_tokenizer.pad_token_id,
eos_token_id=self.primary_tokenizer.eos_token_id
)
# Decode output
generated_text = self.primary_tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return generated_text
except Exception as e:
logger.error(f"❌ Primary model generation failed: {e}")
raise
def generate_with_secondary(self, prompt: str, **kwargs) -> str:
"""Generate text using the secondary model."""
if self.secondary_model is None or self.secondary_tokenizer is None:
raise RuntimeError("Secondary model not loaded")
try:
# Tokenize input
inputs = self.secondary_tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
inputs = {k: v.to(self.secondary_model.device) for k, v in inputs.items()}
# Generate
with torch.no_grad():
outputs = self.secondary_model.generate(
**inputs,
max_new_tokens=kwargs.get('max_new_tokens', self.generation_settings.max_new_tokens),
temperature=kwargs.get('temperature', self.generation_settings.temperature),
top_p=kwargs.get('top_p', self.generation_settings.top_p),
top_k=kwargs.get('top_k', self.generation_settings.top_k),
repetition_penalty=kwargs.get('repetition_penalty', self.generation_settings.repetition_penalty),
do_sample=kwargs.get('do_sample', self.generation_settings.do_sample),
pad_token_id=self.secondary_tokenizer.pad_token_id,
eos_token_id=self.secondary_tokenizer.eos_token_id
)
# Decode output
generated_text = self.secondary_tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return generated_text
except Exception as e:
logger.error(f"❌ Secondary model generation failed: {e}")
raise
def get_model_info(self) -> Dict[str, Any]:
"""Get information about loaded models."""
info = {
"primary_model": {
"loaded": self.primary_model is not None,
"name": self.primary_config.model_name,
"parameters": self.primary_model.num_parameters() if self.primary_model else 0,
"device": str(next(self.primary_model.parameters()).device) if self.primary_model else "Not loaded"
},
"secondary_model": {
"loaded": self.secondary_model is not None,
"name": self.secondary_config.model_name,
"parameters": self.secondary_model.num_parameters() if self.secondary_model else 0,
"device": str(next(self.secondary_model.parameters()).device) if self.secondary_model else "Not loaded"
},
"system": {
"device": self.device,
"cuda_available": torch.cuda.is_available(),
"cuda_devices": torch.cuda.device_count() if torch.cuda.is_available() else 0
}
}
return info
def cleanup(self):
"""Clean up models and free memory."""
logger.info("🧹 Cleaning up HuggingFace models...")
if self.primary_model is not None:
del self.primary_model
if self.primary_tokenizer is not None:
del self.primary_tokenizer
if self.secondary_model is not None:
del self.secondary_model
if self.secondary_tokenizer is not None:
del self.secondary_tokenizer
# Clear cache
self.model_cache.clear()
self.tokenizer_cache.clear()
# Force garbage collection
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info("✅ Cleanup completed")
def create_model_orchestrator() -> HuggingFaceModelOrchestrator:
"""Create a configured model orchestrator with the specified models."""
# Primary model: LFM2-8B-A1B-Dimensional-Entanglement
primary_config = ModelConfig(
model_name="9x25dillon/LFM2-8B-A1B-Dimensional-Entanglement",
device="auto",
torch_dtype=torch.bfloat16,
trust_remote_code=True,
use_cache=True,
low_cpu_mem_usage=True
)
# Secondary model: 9xdSq-LIMPS-FemTO-R1C
secondary_config = ModelConfig(
model_name="9x25dillon/9xdSq-LIMPS-FemTO-R1C",
device="auto",
torch_dtype=torch.bfloat16,
trust_remote_code=True,
use_cache=True,
low_cpu_mem_usage=True
)
return HuggingFaceModelOrchestrator(primary_config, secondary_config)
def main():
"""Demo function to test the model orchestrator."""
print("🚀 Testing HuggingFace Model Orchestrator")
print("=" * 50)
# Create orchestrator
orchestrator = create_model_orchestrator()
try:
# Load models
if orchestrator.load_all_models():
print("✅ All models loaded successfully")
# Get model info
info = orchestrator.get_model_info()
print(f"\n📊 Model Information:")
print(f" Primary: {info['primary_model']['name']}")
print(f" Parameters: {info['primary_model']['parameters']:,}")
print(f" Device: {info['primary_model']['device']}")
if info['secondary_model']['loaded']:
print(f" Secondary: {info['secondary_model']['name']}")
print(f" Parameters: {info['secondary_model']['parameters']:,}")
print(f" Device: {info['secondary_model']['device']}")
# Test generation
test_prompt = "Explain the concept of dimensional entanglement in AI systems."
print(f"\n🧪 Testing generation with prompt: '{test_prompt}'")
# Test primary model
try:
primary_output = orchestrator.generate_with_primary(test_prompt, max_new_tokens=100)
print(f"✅ Primary model output: {primary_output[:200]}...")
except Exception as e:
print(f"❌ Primary model generation failed: {e}")
# Test secondary model
try:
secondary_output = orchestrator.generate_with_secondary(test_prompt, max_new_tokens=100)
print(f"✅ Secondary model output: {secondary_output[:200]}...")
except Exception as e:
print(f"❌ Secondary model generation failed: {e}")
else:
print("❌ Failed to load models")
except Exception as e:
print(f"❌ Error: {e}")
finally:
# Cleanup
orchestrator.cleanup()
print("\n🧹 Cleanup completed")
if __name__ == "__main__":
main()