File size: 13,363 Bytes
13d5ab4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
"""
Real Model Loader for Hugging Face Models
Manages model loading, caching, and inference
Works with public HuggingFace models without requiring authentication
"""

import os
import logging
from typing import Dict, Any, Optional, List
from functools import lru_cache

# Required ML libraries - these MUST be installed
import torch
from transformers import (
    AutoTokenizer,
    AutoModel,
    AutoModelForSequenceClassification,
    AutoModelForTokenClassification,
    pipeline
)

logger = logging.getLogger(__name__)

# Get HF token from environment (optional - most models are public)
HF_TOKEN = os.getenv("HF_TOKEN", None)

if HF_TOKEN:
    logger.info("HF_TOKEN found - will use for gated models if needed")
else:
    logger.info("HF_TOKEN not found - using public models only (this is normal)")


class ModelLoader:
    """
    Manages loading and caching of Hugging Face models
    Implements lazy loading and GPU optimization
    """
    
    def __init__(self):
        """Initialize the model loader with GPU support if available"""
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.loaded_models = {}
        self.model_configs = self._get_model_configs()
        
        # Log system information
        logger.info(f"Model Loader initialized on device: {self.device}")
        logger.info(f"PyTorch version: {torch.__version__}")
        logger.info(f"CUDA available: {torch.cuda.is_available()}")
        
        # Verify model configs are properly loaded
        logger.info(f"Model configurations loaded: {len(self.model_configs)} models")
        for key in self.model_configs:
            logger.info(f"  - {key}: {self.model_configs[key]['model_id']}")
    
    def _get_model_configs(self) -> Dict[str, Dict[str, Any]]:
        """
        Configuration for real Hugging Face models
        Maps tasks to actual model names on Hugging Face Hub
        """
        return {
            # Document Classification
            "document_classifier": {
                "model_id": "emilyalsentzer/Bio_ClinicalBERT",
                "task": "text-classification",
                "description": "Clinical document type classification"
            },
            
            # Clinical NER
            "clinical_ner": {
                "model_id": "d4data/biomedical-ner-all",
                "task": "ner",
                "description": "Biomedical named entity recognition"
            },
            
            # Clinical Text Generation
            "clinical_generation": {
                "model_id": "microsoft/BioGPT-Large",
                "task": "text-generation",
                "description": "Clinical text generation and summarization"
            },
            
            # Medical Question Answering
            "medical_qa": {
                "model_id": "deepset/roberta-base-squad2",
                "task": "question-answering",
                "description": "Medical question answering"
            },
            
            # General Medical Analysis
            "general_medical": {
                "model_id": "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext",
                "task": "feature-extraction",
                "description": "General medical text understanding"
            },
            
            # Drug-Drug Interaction
            "drug_interaction": {
                "model_id": "allenai/scibert_scivocab_uncased",
                "task": "feature-extraction",
                "description": "Drug interaction detection"
            },
            
            # Radiology Report Generation (fallback to general medical)
            "radiology_generation": {
                "model_id": "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract",
                "task": "feature-extraction",
                "description": "Radiology report analysis"
            },
            
            # Clinical Summarization
            "clinical_summarization": {
                "model_id": "google/bigbird-pegasus-large-pubmed",
                "task": "summarization",
                "description": "Clinical document summarization"
            }
        }
    
    def load_model(self, model_key: str) -> Optional[Any]:
        """
        Load a model by key, with caching
        
        Most HuggingFace models are public and don't require authentication.
        HF_TOKEN is only needed for private/gated models.
        """
        try:
            # Check if already loaded
            if model_key in self.loaded_models:
                logger.info(f"Using cached model: {model_key}")
                return self.loaded_models[model_key]
            
            # Get model configuration
            if model_key not in self.model_configs:
                logger.warning(f"Unknown model key: {model_key}, using fallback")
                model_key = "general_medical"
            
            config = self.model_configs[model_key]
            model_id = config["model_id"]
            task = config["task"]
            
            logger.info(f"Loading model: {model_id} for task: {task}")
            
            # Try loading with pipeline (works for most public models)
            # Pass token only if available (most models don't need it)
            try:
                pipeline_kwargs = {
                    "task": task,
                    "model": model_id,
                    "device": 0 if self.device == "cuda" else -1,
                    "trust_remote_code": True
                }
                
                # Only add token if it exists (avoid passing None/empty string)
                if HF_TOKEN:
                    pipeline_kwargs["token"] = HF_TOKEN
                
                model_pipeline = pipeline(**pipeline_kwargs)
                
                self.loaded_models[model_key] = model_pipeline
                logger.info(f"Successfully loaded model: {model_id}")
                return model_pipeline
                
            except Exception as e:
                error_msg = str(e).lower()
                
                # Check if it's an authentication error
                if "401" in error_msg or "unauthorized" in error_msg or "authentication" in error_msg:
                    if not HF_TOKEN:
                        logger.error(f"Model {model_id} requires authentication but HF_TOKEN not available")
                        logger.error("This model is gated/private. Using public alternative or fallback.")
                    else:
                        logger.error(f"Model {model_id} authentication failed even with HF_TOKEN")
                else:
                    logger.error(f"Failed to load model {model_id}: {str(e)}")
                
                # Try loading with AutoModel as fallback
                try:
                    logger.info(f"Trying alternative loading method for {model_id}...")
                    
                    tokenizer_kwargs = {"model_id": model_id, "trust_remote_code": True}
                    model_kwargs = {"pretrained_model_name_or_path": model_id, "trust_remote_code": True}
                    
                    if HF_TOKEN:
                        tokenizer_kwargs["token"] = HF_TOKEN
                        model_kwargs["token"] = HF_TOKEN
                    
                    tokenizer = AutoTokenizer.from_pretrained(**tokenizer_kwargs)
                    model = AutoModel.from_pretrained(**model_kwargs).to(self.device)
                    
                    self.loaded_models[model_key] = {
                        "tokenizer": tokenizer,
                        "model": model,
                        "type": "custom"
                    }
                    logger.info(f"Successfully loaded {model_id} with alternative method")
                    return self.loaded_models[model_key]
                    
                except Exception as inner_e:
                    logger.error(f"Alternative loading also failed for {model_id}: {str(inner_e)}")
                    logger.info(f"Model {model_key} unavailable - will use fallback analysis")
                    return None
            
        except Exception as e:
            logger.error(f"Model loading failed for {model_key}: {str(e)}")
            return None
    
    def run_inference(
        self,
        model_key: str,
        input_text: str,
        task_params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        Run inference on loaded model
        """
        try:
            model = self.load_model(model_key)
            
            if model is None:
                return {
                    "error": "Model not available",
                    "model_key": model_key
                }
            
            task_params = task_params or {}
            
            # Handle pipeline models
            if hasattr(model, '__call__') and not isinstance(model, dict):
                # Truncate input to avoid token limit issues
                max_length = task_params.get("max_length", 512)
                
                result = model(
                    input_text[:4000],  # Limit input length
                    max_length=max_length,
                    truncation=True,
                    **task_params
                )
                
                return {
                    "success": True,
                    "result": result,
                    "model_key": model_key
                }
            
            # Handle custom loaded models
            elif isinstance(model, dict) and model.get("type") == "custom":
                tokenizer = model["tokenizer"]
                model_obj = model["model"]
                
                inputs = tokenizer(
                    input_text[:512],
                    return_tensors="pt",
                    truncation=True,
                    max_length=512
                ).to(self.device)
                
                with torch.no_grad():
                    outputs = model_obj(**inputs)
                
                return {
                    "success": True,
                    "result": {
                        "embeddings": outputs.last_hidden_state.mean(dim=1).cpu().tolist(),
                        "pooled": outputs.pooler_output.cpu().tolist() if hasattr(outputs, 'pooler_output') else None
                    },
                    "model_key": model_key
                }
            
            else:
                return {
                    "error": "Unknown model type",
                    "model_key": model_key
                }
                
        except Exception as e:
            logger.error(f"Inference failed for {model_key}: {str(e)}")
            return {
                "error": str(e),
                "model_key": model_key
            }
    
    def clear_cache(self, model_key: Optional[str] = None):
        """Clear model cache to free memory"""
        if model_key:
            if model_key in self.loaded_models:
                del self.loaded_models[model_key]
                logger.info(f"Cleared cache for model: {model_key}")
        else:
            self.loaded_models.clear()
            logger.info("Cleared all model caches")
            
        # Force garbage collection and clear GPU cache if available
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
    
    def test_model_loading(self) -> Dict[str, Any]:
        """Test loading all configured models to verify AI functionality"""
        results = {
            "total_models": len(self.model_configs),
            "models_loaded": 0,
            "models_failed": 0,
            "errors": [],
            "device": self.device,
            "pytorch_version": torch.__version__
        }
        
        for model_key, config in self.model_configs.items():
            try:
                logger.info(f"Testing model: {model_key} ({config['model_id']})")
                
                # Try to load the model
                test_input = "Test ECG analysis request"
                result = self.run_inference(model_key, test_input, {"max_new_tokens": 50})
                
                if result.get("success"):
                    results["models_loaded"] += 1
                    logger.info(f"✅ {model_key}: Loaded successfully")
                else:
                    results["models_failed"] += 1
                    error_msg = result.get("error", "Unknown error")
                    results["errors"].append(f"{model_key}: {error_msg}")
                    logger.warning(f"⚠️ {model_key}: {error_msg}")
                    
            except Exception as e:
                results["models_failed"] += 1
                error_msg = f"Exception during loading: {str(e)}"
                results["errors"].append(f"{model_key}: {error_msg}")
                logger.error(f"❌ {model_key}: {error_msg}")
        
        logger.info(f"Model loading test complete: {results['models_loaded']}/{results['total_models']} successful")
        return results


# Global model loader instance
_model_loader = None


def get_model_loader() -> ModelLoader:
    """Get singleton model loader instance"""
    global _model_loader
    if _model_loader is None:
        _model_loader = ModelLoader()
    return _model_loader