|
|
""" |
|
|
Real Model Loader for Hugging Face Models |
|
|
Manages model loading, caching, and inference |
|
|
""" |
|
|
|
|
|
import os |
|
|
import logging |
|
|
from typing import Dict, Any, Optional, List |
|
|
|
|
|
|
|
|
try: |
|
|
import torch |
|
|
from transformers import ( |
|
|
AutoTokenizer, |
|
|
AutoModel, |
|
|
AutoModelForSequenceClassification, |
|
|
AutoModelForTokenClassification, |
|
|
pipeline |
|
|
) |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.warning("Transformers not available - AI models will not load") |
|
|
|
|
|
from functools import lru_cache |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
|
|
|
|
|
|
class ModelLoader: |
|
|
""" |
|
|
Manages loading and caching of Hugging Face models |
|
|
Implements lazy loading and GPU optimization |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
logger.warning("Transformers library not available - using fallback mode") |
|
|
self.device = "cpu" |
|
|
self.loaded_models = {} |
|
|
self.model_configs = {} |
|
|
return |
|
|
|
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.loaded_models = {} |
|
|
self.model_configs = self._get_model_configs() |
|
|
logger.info(f"Model Loader initialized on device: {self.device}") |
|
|
|
|
|
def _get_model_configs(self) -> Dict[str, Dict[str, Any]]: |
|
|
""" |
|
|
Configuration for real Hugging Face models |
|
|
Maps tasks to actual model names on Hugging Face Hub |
|
|
""" |
|
|
return { |
|
|
|
|
|
"document_classifier": { |
|
|
"model_id": "emilyalsentzer/Bio_ClinicalBERT", |
|
|
"task": "text-classification", |
|
|
"description": "Clinical document type classification" |
|
|
}, |
|
|
|
|
|
|
|
|
"clinical_ner": { |
|
|
"model_id": "d4data/biomedical-ner-all", |
|
|
"task": "ner", |
|
|
"description": "Biomedical named entity recognition" |
|
|
}, |
|
|
|
|
|
|
|
|
"clinical_generation": { |
|
|
"model_id": "microsoft/BioGPT-Large", |
|
|
"task": "text-generation", |
|
|
"description": "Clinical text generation and summarization" |
|
|
}, |
|
|
|
|
|
|
|
|
"medical_qa": { |
|
|
"model_id": "deepset/roberta-base-squad2", |
|
|
"task": "question-answering", |
|
|
"description": "Medical question answering" |
|
|
}, |
|
|
|
|
|
|
|
|
"general_medical": { |
|
|
"model_id": "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext", |
|
|
"task": "feature-extraction", |
|
|
"description": "General medical text understanding" |
|
|
}, |
|
|
|
|
|
|
|
|
"drug_interaction": { |
|
|
"model_id": "allenai/scibert_scivocab_uncased", |
|
|
"task": "feature-extraction", |
|
|
"description": "Drug interaction detection" |
|
|
}, |
|
|
|
|
|
|
|
|
"radiology_generation": { |
|
|
"model_id": "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract", |
|
|
"task": "feature-extraction", |
|
|
"description": "Radiology report analysis" |
|
|
}, |
|
|
|
|
|
|
|
|
"clinical_summarization": { |
|
|
"model_id": "google/bigbird-pegasus-large-pubmed", |
|
|
"task": "summarization", |
|
|
"description": "Clinical document summarization" |
|
|
} |
|
|
} |
|
|
|
|
|
def load_model(self, model_key: str) -> Optional[Any]: |
|
|
""" |
|
|
Load a model by key, with caching |
|
|
""" |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
logger.warning(f"Cannot load model {model_key} - transformers not available") |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
if model_key in self.loaded_models: |
|
|
logger.info(f"Using cached model: {model_key}") |
|
|
return self.loaded_models[model_key] |
|
|
|
|
|
|
|
|
if model_key not in self.model_configs: |
|
|
logger.warning(f"Unknown model key: {model_key}, using fallback") |
|
|
model_key = "general_medical" |
|
|
|
|
|
config = self.model_configs[model_key] |
|
|
model_id = config["model_id"] |
|
|
task = config["task"] |
|
|
|
|
|
logger.info(f"Loading model: {model_id} for task: {task}") |
|
|
|
|
|
|
|
|
try: |
|
|
model_pipeline = pipeline( |
|
|
task=task, |
|
|
model=model_id, |
|
|
device=0 if self.device == "cuda" else -1, |
|
|
token=HF_TOKEN if HF_TOKEN else None, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
self.loaded_models[model_key] = model_pipeline |
|
|
logger.info(f"Successfully loaded model: {model_id}") |
|
|
return model_pipeline |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model {model_id}: {str(e)}") |
|
|
|
|
|
try: |
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
model_id, |
|
|
token=HF_TOKEN if HF_TOKEN else None |
|
|
) |
|
|
model = AutoModel.from_pretrained( |
|
|
model_id, |
|
|
token=HF_TOKEN if HF_TOKEN else None |
|
|
).to(self.device) |
|
|
|
|
|
self.loaded_models[model_key] = { |
|
|
"tokenizer": tokenizer, |
|
|
"model": model, |
|
|
"type": "custom" |
|
|
} |
|
|
logger.info(f"Loaded model {model_id} with custom loader") |
|
|
return self.loaded_models[model_key] |
|
|
|
|
|
except Exception as inner_e: |
|
|
logger.error(f"Custom loader also failed: {str(inner_e)}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Model loading failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
def run_inference( |
|
|
self, |
|
|
model_key: str, |
|
|
input_text: str, |
|
|
task_params: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Run inference on loaded model |
|
|
""" |
|
|
try: |
|
|
model = self.load_model(model_key) |
|
|
|
|
|
if model is None: |
|
|
return { |
|
|
"error": "Model not available", |
|
|
"model_key": model_key |
|
|
} |
|
|
|
|
|
task_params = task_params or {} |
|
|
|
|
|
|
|
|
if hasattr(model, '__call__') and not isinstance(model, dict): |
|
|
|
|
|
max_length = task_params.get("max_length", 512) |
|
|
|
|
|
result = model( |
|
|
input_text[:4000], |
|
|
max_length=max_length, |
|
|
truncation=True, |
|
|
**task_params |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"result": result, |
|
|
"model_key": model_key |
|
|
} |
|
|
|
|
|
|
|
|
elif isinstance(model, dict) and model.get("type") == "custom": |
|
|
tokenizer = model["tokenizer"] |
|
|
model_obj = model["model"] |
|
|
|
|
|
inputs = tokenizer( |
|
|
input_text[:512], |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
max_length=512 |
|
|
).to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model_obj(**inputs) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"result": { |
|
|
"embeddings": outputs.last_hidden_state.mean(dim=1).cpu().tolist(), |
|
|
"pooled": outputs.pooler_output.cpu().tolist() if hasattr(outputs, 'pooler_output') else None |
|
|
}, |
|
|
"model_key": model_key |
|
|
} |
|
|
|
|
|
else: |
|
|
return { |
|
|
"error": "Unknown model type", |
|
|
"model_key": model_key |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Inference failed for {model_key}: {str(e)}") |
|
|
return { |
|
|
"error": str(e), |
|
|
"model_key": model_key |
|
|
} |
|
|
|
|
|
def clear_cache(self, model_key: Optional[str] = None): |
|
|
"""Clear model cache to free memory""" |
|
|
if model_key: |
|
|
if model_key in self.loaded_models: |
|
|
del self.loaded_models[model_key] |
|
|
logger.info(f"Cleared cache for model: {model_key}") |
|
|
else: |
|
|
self.loaded_models.clear() |
|
|
logger.info("Cleared all model caches") |
|
|
|
|
|
|
|
|
if TRANSFORMERS_AVAILABLE and torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
|
|
|
_model_loader = None |
|
|
|
|
|
|
|
|
def get_model_loader() -> ModelLoader: |
|
|
"""Get singleton model loader instance""" |
|
|
global _model_loader |
|
|
if _model_loader is None: |
|
|
_model_loader = ModelLoader() |
|
|
return _model_loader |
|
|
|