Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Real LLM integration for RAG system. | |
| Uses HuggingFace transformers with CPU optimizations. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| from typing import List, Dict, Any | |
| import time | |
| from config import MAX_TOKENS, TEMPERATURE | |
| class CPUOptimizedLLM: | |
| """CPU-optimized LLM for RAG responses.""" | |
| def __init__(self, model_name="microsoft/phi-2"): | |
| """ | |
| Initialize a CPU-friendly model. | |
| Options: microsoft/phi-2, TinyLlama/TinyLlama-1.1B, Qwen/Qwen2.5-0.5B | |
| """ | |
| self.model_name = model_name | |
| self.tokenizer = None | |
| self.model = None | |
| self.pipeline = None | |
| self._initialized = False | |
| # CPU optimization settings | |
| self.torch_dtype = torch.float32 # Use float32 for CPU | |
| self.device = "cpu" | |
| self.load_in_8bit = False # Can't use 8-bit on CPU without special setup | |
| def initialize(self): | |
| """Lazy initialization of the model.""" | |
| if self._initialized: | |
| return | |
| print(f"Loading LLM model: {self.model_name} (CPU optimized)...") | |
| start_time = time.time() | |
| try: | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True | |
| ) | |
| # Add padding token if missing | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # Load model with CPU optimizations | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| torch_dtype=self.torch_dtype, | |
| device_map="cpu", | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True | |
| ) | |
| # Create text generation pipeline | |
| self.pipeline = pipeline( | |
| "text-generation", | |
| model=self.model, | |
| tokenizer=self.tokenizer, | |
| device=-1, # CPU | |
| torch_dtype=self.torch_dtype | |
| ) | |
| load_time = time.time() - start_time | |
| print(f"LLM loaded in {load_time:.1f}s") | |
| self._initialized = True | |
| except Exception as e: | |
| print(f"Error loading model {self.model_name}: {e}") | |
| print("Falling back to simulated LLM...") | |
| self._initialized = False | |
| def generate_response(self, question: str, context: str) -> str: | |
| """ | |
| Generate a response using the LLM. | |
| Args: | |
| question: User's question | |
| context: Retrieved context chunks | |
| Returns: | |
| Generated answer | |
| """ | |
| if not self._initialized: | |
| # Fallback to simulated response | |
| return self._generate_simulated_response(question, context) | |
| # Create prompt | |
| prompt = f"""Context information: | |
| {context} | |
| Based on the context above, answer the following question: | |
| Question: {question} | |
| Answer: """ | |
| try: | |
| # Generate response | |
| start_time = time.perf_counter() | |
| outputs = self.pipeline( | |
| prompt, | |
| max_new_tokens=MAX_TOKENS, | |
| temperature=TEMPERATURE, | |
| do_sample=True, | |
| top_p=0.95, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| num_return_sequences=1 | |
| ) | |
| generation_time = (time.perf_counter() - start_time) * 1000 | |
| # Extract response | |
| response = outputs[0]['generated_text'] | |
| # Remove the prompt from response | |
| if response.startswith(prompt): | |
| response = response[len(prompt):].strip() | |
| print(f" [Real LLM] Generation: {generation_time:.1f}ms") | |
| return response | |
| except Exception as e: | |
| print(f" [Real LLM Error] {e}, falling back to simulated...") | |
| return self._generate_simulated_response(question, context) | |
| def _generate_simulated_response(self, question: str, context: str) -> str: | |
| """Fallback simulated response.""" | |
| # Simulate generation time (80ms for optimized, 200ms for naive) | |
| time.sleep(0.08 if len(context) < 1000 else 0.2) | |
| if context: | |
| return f"Based on the context: {context[:300]}..." | |
| else: | |
| return "I don't have enough information to answer that question." | |
| def close(self): | |
| """Clean up model resources.""" | |
| if self.model: | |
| del self.model | |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
| self._initialized = False | |
| # Test the LLM integration | |
| if __name__ == "__main__": | |
| llm = CPUOptimizedLLM("microsoft/phi-2") | |
| llm.initialize() | |
| test_context = """Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. There are three main types: supervised learning, unsupervised learning, and reinforcement learning.""" | |
| test_question = "What is machine learning?" | |
| response = llm.generate_response(test_question, test_context) | |
| print(f"\nQuestion: {test_question}") | |
| print(f"Response: {response[:200]}...") | |
| llm.close() | |