Spaces:
Sleeping
Sleeping
| """ | |
| LLM utilities for handling different LLM providers. | |
| Supports OpenAI and Hugging Face models. | |
| """ | |
| import os | |
| from typing import Optional, List | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv(override=True) | |
| class LLMHandler: | |
| """Handler for different LLM providers.""" | |
| def __init__( | |
| self, | |
| provider: str = "openai", | |
| model_name: Optional[str] = None, | |
| temperature: float = 0.7, | |
| max_tokens: int = 500 | |
| ): | |
| """ | |
| Initialize LLM handler. | |
| Args: | |
| provider: LLM provider ("openai" or "huggingface") | |
| model_name: Model name (optional, uses default if not provided) | |
| temperature: Temperature for generation | |
| max_tokens: Maximum tokens to generate | |
| """ | |
| self.provider = provider.lower() | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| self.model = None | |
| self.tokenizer = None | |
| self.embedding_model = None | |
| if self.provider == "openai": | |
| self._initialize_openai(model_name) | |
| elif self.provider == "huggingface": | |
| self._initialize_huggingface(model_name) | |
| else: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| def _initialize_openai(self, model_name: Optional[str] = None): | |
| """Initialize OpenAI client.""" | |
| try: | |
| from openai import OpenAI | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY not found in environment variables") | |
| self.client = OpenAI(api_key=api_key) | |
| self.model_name = model_name or os.getenv("OPENAI_MODEL", "gpt-3.5-turbo") | |
| print(f"✓ OpenAI client initialized with model: {self.model_name}") | |
| except ImportError: | |
| raise ImportError("OpenAI package not installed. Run: pip install openai") | |
| except Exception as e: | |
| raise Exception(f"Failed to initialize OpenAI: {e}") | |
| def _initialize_huggingface(self, model_name: Optional[str] = None): | |
| """Initialize Hugging Face model.""" | |
| try: | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| # Get model name from parameter or environment | |
| if model_name is None: | |
| model_name = os.getenv( | |
| "HUGGINGFACE_MODEL", | |
| "google/flan-t5-large" | |
| ) | |
| self.model_name = model_name | |
| print(f"Initializing LLM: huggingface - {self.model_name}") | |
| # Get HF token | |
| hf_token = os.getenv("HUGGINGFACE_API_TOKEN") | |
| if not hf_token: | |
| print("⚠️ Warning: HUGGINGFACE_API_TOKEN not found. Some models may not be accessible.") | |
| # Determine device | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {self.device}") | |
| # Load tokenizer | |
| print(f"Loading tokenizer for {self.model_name}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| token=hf_token, | |
| trust_remote_code=True | |
| ) | |
| # Load model based on type | |
| print(f"Loading model {self.model_name}...") | |
| # Detect model type | |
| if "t5" in self.model_name.lower() or "flan" in self.model_name.lower(): | |
| # Seq2Seq models (T5, Flan-T5) | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
| self.model_name, | |
| token=hf_token, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 | |
| ) | |
| else: | |
| # Causal LM models (Mistral, Llama, etc.) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| token=hf_token, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 | |
| ) | |
| self.model.to(self.device) | |
| self.model.eval() | |
| print(f"✓ LLM initialized successfully") | |
| except ImportError as e: | |
| raise ImportError(f"Required packages not installed: {e}") | |
| except Exception as e: | |
| raise Exception(f"Failed to initialize Hugging Face model: {e}") | |
| def generate( | |
| self, | |
| prompt: str, | |
| system_message: Optional[str] = None, | |
| temperature: Optional[float] = None, | |
| max_tokens: Optional[int] = None | |
| ) -> str: | |
| """ | |
| Generate text using the LLM. | |
| Args: | |
| prompt: Input prompt | |
| system_message: Optional system message | |
| temperature: Optional temperature override | |
| max_tokens: Optional max tokens override | |
| Returns: | |
| Generated text | |
| """ | |
| temp = temperature if temperature is not None else self.temperature | |
| max_tok = max_tokens if max_tokens is not None else self.max_tokens | |
| if self.provider == "openai": | |
| return self._generate_openai(prompt, system_message, temp, max_tok) | |
| elif self.provider == "huggingface": | |
| return self._generate_huggingface(prompt, system_message, temp, max_tok) | |
| def _generate_openai( | |
| self, | |
| prompt: str, | |
| system_message: Optional[str], | |
| temperature: float, | |
| max_tokens: int | |
| ) -> str: | |
| """Generate using OpenAI.""" | |
| messages = [] | |
| if system_message: | |
| messages.append({"role": "system", "content": system_message}) | |
| messages.append({"role": "user", "content": prompt}) | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens | |
| ) | |
| return response.choices[0].message.content | |
| def _generate_huggingface( | |
| self, | |
| prompt: str, | |
| system_message: Optional[str], | |
| temperature: float, | |
| max_tokens: int | |
| ) -> str: | |
| """Generate using Hugging Face.""" | |
| import torch | |
| # Construct full prompt | |
| if system_message: | |
| full_prompt = f"{system_message}\n\n{prompt}" | |
| else: | |
| full_prompt = prompt | |
| # Tokenize | |
| inputs = self.tokenizer( | |
| full_prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512 | |
| ).to(self.device) | |
| # Generate | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| do_sample=temperature > 0, | |
| top_p=0.9, | |
| num_return_sequences=1, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| # Decode | |
| generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # For seq2seq models, return as-is | |
| # For causal models, remove the prompt | |
| if "t5" in self.model_name.lower() or "flan" in self.model_name.lower(): | |
| return generated_text.strip() | |
| else: | |
| # Remove the input prompt from output | |
| return generated_text[len(full_prompt):].strip() | |
| def generate_with_context( | |
| self, | |
| query: str, | |
| context: str, | |
| system_message: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Generate answer using query and context. | |
| Args: | |
| query: User query | |
| context: Retrieved context | |
| system_message: Optional system message | |
| Returns: | |
| Generated answer | |
| """ | |
| prompt = f"""Context: | |
| {context} | |
| Question: {query} | |
| Answer the question based on the context provided above. Be concise and accurate.""" | |
| return self.generate(prompt, system_message) | |
| class EmbeddingHandler: | |
| """Handler for embedding models.""" | |
| def __init__(self, model_name: Optional[str] = None): | |
| """ | |
| Initialize embedding handler. | |
| Args: | |
| model_name: Embedding model name | |
| """ | |
| from sentence_transformers import SentenceTransformer | |
| self.model_name = model_name or os.getenv( | |
| "EMBEDDING_MODEL", | |
| "sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| print(f"Loading embedding model: {self.model_name}") | |
| self.model = SentenceTransformer(self.model_name) | |
| print(f"✓ Embedding model loaded successfully") | |
| def embed_documents(self, texts: List[str]) -> List[List[float]]: | |
| """ | |
| Embed a list of documents. | |
| Args: | |
| texts: List of text documents | |
| Returns: | |
| List of embeddings | |
| """ | |
| embeddings = self.model.encode(texts, convert_to_numpy=True) | |
| return embeddings.tolist() | |
| def embed_query(self, text: str) -> List[float]: | |
| """ | |
| Embed a single query. | |
| Args: | |
| text: Query text | |
| Returns: | |
| Embedding vector | |
| """ | |
| embedding = self.model.encode(text, convert_to_numpy=True) | |
| return embedding.tolist() | |
| def create_llm_handler( | |
| provider: str = "openai", | |
| model_name: Optional[str] = None, | |
| temperature: float = 0.7, | |
| max_tokens: int = 500 | |
| ) -> LLMHandler: | |
| """ | |
| Create and return an LLM handler. | |
| Args: | |
| provider: LLM provider | |
| model_name: Model name | |
| temperature: Temperature | |
| max_tokens: Max tokens | |
| Returns: | |
| LLMHandler instance | |
| """ | |
| return LLMHandler(provider, model_name, temperature, max_tokens) | |
| def create_embedding_handler(model_name: Optional[str] = None) -> EmbeddingHandler: | |
| """ | |
| Create and return an embedding handler. | |
| Args: | |
| model_name: Embedding model name | |
| Returns: | |
| EmbeddingHandler instance | |
| """ | |
| return EmbeddingHandler(model_name) |