# model_prtr.py import os import sys import math import torch import logging import importlib import torch.nn as nn from config import load_config, app_config # Fix: Move transformers imports to module scope from transformers import GPT2LMHeadModel, GPT2Tokenizer from transformers import AutoModelForCausalLM, AutoTokenizer from typing import Optional, List, Dict, Any, Union from sentence_transformers import SentenceTransformer # Import service registry from service_registry import registry, MODEL, TOKENIZER, PRETRAINED_MODEL # First import base interfaces from base_interfaces.common_types import * from base_interfaces.model_interface import AbstractModel from model_manager import safe_get_config_value app_config = load_config() logger = logging.getLogger(__name__) # ---------------------------- # Positional Encoding Module (for decoder) # ---------------------------- class PositionalEncoding(nn.Module): def __init__(self, d_model: int, max_len: Optional[int] = None): super().__init__() # Get MAX_SEQ_LENGTH safely from config if max_len is None: if hasattr(app_config, "TRANSFORMER_CONFIG") and isinstance(app_config.TRANSFORMER_CONFIG, dict): max_len = app_config.TRANSFORMER_CONFIG.get("MAX_SEQ_LENGTH", 1024) else: max_len = 1024 # Safe default pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(1) # shape: (max_len, 1, d_model) self.register_buffer('pe', pe) def forward(self, x: torch.Tensor) -> torch.Tensor: # x shape: (seq_len, batch_size, d_model) seq_len = x.size(0) x = x + self.pe[:seq_len] return x # ---------------------------- # Wildnerve-tlm01 using Only Pretrained Encoder # ---------------------------- class Wildnerve_tlm01(nn.Module, AbstractModel): """A Transformer-based language model that uses: - A pretrained GPT-2 model for powerful text generation - A custom decoder stack The model uses the GPT-2 tokenizer for consistent tokenization.""" def __init__( self, vocab_size: int = 50257, # Standardized GPT-2 vocab size specialization: str = "general", dataset_path: str = None, model_name: str = "gpt2", # Standardized to GPT-2 embedding_dim: int = 768, num_heads: int = 12, hidden_dim: int = 768, num_layers: int = 6, output_size: int = 50257, # Standardized GPT-2 vocab size dropout: float = 0.1, max_seq_length: int = 1024, # GPT-2 supports longer contexts pooling_mode: str = "last", # GPT-2 typically uses last token tokenizer=None, max_length: Optional[int] = None ) -> None: super().__init__() self.specialization = specialization self.dataset_path = dataset_path self.model_name = model_name self.pooling_mode = pooling_mode self.vocab_size = vocab_size self.max_seq_length = max_seq_length self.embedding_dim = embedding_dim self.num_heads = num_heads self.hidden_dim = hidden_dim self.num_layers = num_layers self.output_size = output_size self.dropout = dropout # fetch MAX_SEQ_LENGTH safely cfg = safe_get_config_value(app_config, "TRANSFORMER_CONFIG", {}) self.max_length = max_length or cfg.get("MAX_SEQ_LENGTH", 1024) # Increased for GPT-2 # Use GPT-2 directly for text generation (not a simplified version) try: # Use the full GPT-2 model implementation for production use from transformers import GPT2LMHeadModel, GPT2Tokenizer # Initialize the model and tokenizer self.model_name = model_name self.gpt2_model = None # Will be loaded on first use # Ensure proper tokenizer setup for GPT-2 if tokenizer is not None: self.tokenizer = tokenizer elif registry.has(TOKENIZER): self.tokenizer = registry.get(TOKENIZER) else: self.tokenizer = GPT2Tokenizer.from_pretrained(model_name) # Ensure GPT-2 tokenizer has pad_token set (critical fix) if self.tokenizer.pad_token_id is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.tokenizer.pad_token_id = self.tokenizer.eos_token_id logger.info(f"Successfully initialized GPT-2 model: {model_name}") except Exception as e: logger.error(f"Error initializing GPT-2 model: {e}", exc_info=True) raise # Register this model instance in the registry by specialization model_registry_key = f"model_{specialization}" registry.register(model_registry_key, self) # Also register as pretrained model registry.register(PRETRAINED_MODEL, self, overwrite=True) logger.info("Registered GPT-2 model as pretrained model") def _ensure_model_loaded(self): if self.gpt2_model is None: self.gpt2_model = GPT2LMHeadModel.from_pretrained(self.model_name) # Replace the old forward method with GPT-2 specific implementation def forward(self, src: torch.Tensor, tgt: Optional[torch.Tensor] = None, src_key_padding_mask: Optional[torch.Tensor] = None, tgt_key_padding_mask: Optional[torch.Tensor] = None, return_sequence: bool = False, **kwargs) -> torch.Tensor: self._ensure_model_loaded() # Load model only when needed # Use GPT-2 directly for generation outputs = self.gpt2_model(src, **kwargs) return outputs.logits # Update generate to handle both direct prompt and tokenized input def generate(self, prompt=None, input_ids=None, max_length=None, **kwargs): """Generate text using the GPT-2 model""" self._ensure_model_loaded() # Load model only when needed try: # Try to use adapter_layer.generate if available (consolidate generation paths) adapter_layer = registry.get("adapter_layer") if adapter_layer and hasattr(adapter_layer, "generate"): if prompt: return adapter_layer.generate(prompt, max_length=max_length, **kwargs) elif input_ids is not None and self.tokenizer: # Convert input_ids back to text prompt = self.tokenizer.decode(input_ids[0], skip_special_tokens=True) return adapter_layer.generate(prompt, max_length=max_length, **kwargs) # Continue with direct generation if adapter_layer not available # Enhanced generation parameters generation_config = { "max_length": max_length or 150, "temperature": kwargs.get('temperature', 0.7), "top_p": kwargs.get('top_p', 0.95), "top_k": kwargs.get('top_k', 50), "repetition_penalty": kwargs.get('repetition_penalty', 1.3), "no_repeat_ngram_size": kwargs.get('no_repeat_ngram_size', 3), "do_sample": True, "pad_token_id": self.tokenizer.pad_token_id, "eos_token_id": self.tokenizer.eos_token_id, "early_stopping": True, "penalty_alpha": 0.6 # Add penalty alpha for better response quality } # Handle either string prompt or direct input_ids if isinstance(prompt, str) and input_ids is None: inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True) input_ids = inputs.input_ids elif input_ids is None: raise ValueError("Either prompt or input_ids must be provided") # Add user-provided kwargs that we didn't explicitly set for k, v in kwargs.items(): if k not in generation_config and k not in ('prompt', 'context'): generation_config[k] = v # Use max_new_tokens instead of max_length if input is longer than max_length-50 if input_ids.shape[1] > (generation_config["max_length"] - 50): logger.info(f"Input length {input_ids.shape[1]} is close to max_length, using max_new_tokens instead") del generation_config["max_length"] # Generate output using the full GPT-2 model output_ids = self.gpt2_model.generate(input_ids, **generation_config) # Decode the output and ensure it's a string, not a tensor if torch.is_tensor(output_ids): generated_text = self.tokenizer.decode(output_ids[0].cpu(), skip_special_tokens=True) else: generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) return generated_text except Exception as e: logger.error(f"Error in GPT-2 generation: {e}", exc_info=True) return f"Error generating response: {str(e)}" def generate_streaming(self, prompt=None, input_ids=None, **kwargs): """Generate tokens one by one in streaming fashion""" self._ensure_model_loaded() # Load model only when needed try: # Handle either text or tokenized input if prompt is not None and input_ids is None: inputs = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=self.max_length ) input_ids = inputs.input_ids # Set generation parameters max_length = kwargs.get('max_length', min(self.max_length, 200)) temperature = kwargs.get('temperature', 0.7) top_p = kwargs.get('top_p', 0.9) # Generate with token streaming from transformers import TextIteratorStreamer from threading import Thread streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True ) generation_kwargs = dict( input_ids=input_ids, max_length=max_length, temperature=temperature, top_p=top_p, streamer=streamer, do_sample=True, ) # Create a thread to run the generation thread = Thread(target=self.gpt2_model.generate, kwargs=generation_kwargs) thread.start() # Stream the output tokens for token in streamer: yield token except Exception as e: logger.error(f"Error in streaming generation: {e}", exc_info=True) yield f"Error: {str(e)}" #-------Pretrained Transformer Model------------- class PretrainedTransformer(nn.Module, AbstractModel): """A simple wrapper around a pretrained Hugging Face transformer model.""" def __init__( self, vocab_size=50257, # Updated for GPT-2 (was 30522) specialization="general", dataset_path=None, model_name="gpt2", # Updated from bert-base-uncased embedding_dim=768, num_heads=12, hidden_dim=768, num_layers=6, output_size=768, dropout=0.1, max_seq_length=1024, # Increased for GPT-2 pooling_mode="last", # Changed from "mean" for GPT-2 tokenizer=None, **kwargs ) -> None: super().__init__() # Optionally track model usage self.model_last_used = {} # Unified tokenizer initialization: # Primary: Load tokenizer for "gpt2" # Fallback: if it fails, try GPT2 tokenizer if tokenizer is not None: self.tokenizer = tokenizer else: # Use imports from module scope if registry.has(TOKENIZER): self.tokenizer = registry.get(TOKENIZER) else: try: self.tokenizer = AutoTokenizer.from_pretrained("gpt2") logger.info("Loaded primary tokenizer: gpt2") # Add pad token if not present (GPT-2 doesn't have one by default) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token except Exception as e: logger.warning(f"Primary tokenizer load failed: {e}") self.tokenizer = None registry.register(TOKENIZER, self.tokenizer) # Set model names for fallback chain explicitly self.model_name = model_name # Should be "gpt2" self.fallback_model = "gpt2" # Fallback tokenization/model if needed # Use AutoModelForCausalLM instead of AutoModel for GPT-2 self.model = AutoModelForCausalLM.from_pretrained(model_name) try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) # Add pad token if not present (GPT-2 doesn't have one by default) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token except Exception as e: logger.error(f"Failed to load tokenizer for {model_name}: {e}") self.tokenizer = None def forward(self, input_ids, attention_mask=None): outputs = self.model(input_ids=input_ids, attention_mask=attention_mask) return outputs.last_hidden_state def encode(self, text: str): if not self.tokenizer: raise ValueError("Tokenizer not available") inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True) with torch.no_grad(): outputs = self.forward(inputs.input_ids, inputs.get("attention_mask")) # Pool by averaging the token embeddings return outputs.mean(dim=1) def generate(self, input_ids, max_length=100, **kwargs): # Use generate method from model if available, else fallback. if hasattr(self.model, "generate"): return self.model.generate(input_ids=input_ids, max_length=max_length, **kwargs) else: # Simple fallback: return input_ids as is return input_ids # Register model classes in registry registry.register("model_class_pretrained", Wildnerve_tlm01) registry.register("pretrained_transformer_class", PretrainedTransformer) # Check if pretrained transformers are properly initialized. def initialize_pretrained_model(): """Attempt to initialize a pretrained tokenizer with a fallback mechanism. Tries to load 'bert-base-uncased' first; if that fails, attempts to load 'gpt2'. If the fallback is used, then reattempts loading 'bert-base-uncased' on subsequent tries. Repeats up to 5 attempts in total. Returns: The initialized tokenizer instance if successful, otherwise None.""" max_attempts = 5 for attempt in range(1, max_attempts + 1): try: tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") logger.info(f"Attempt {attempt}: Successfully loaded bert-base-uncased.") return tokenizer except Exception as e: logger.warning(f"Attempt {attempt}: Loading bert-base-uncased failed: {e}") try: tokenizer = AutoTokenizer.from_pretrained("gpt2") logger.info(f"Attempt {attempt}: Successfully loaded gpt2 as fallback.") return tokenizer except Exception as e2: logger.warning(f"Attempt {attempt}: Loading gpt2 failed as fallback: {e2}") logger.info("Retrying tokenizer initialization...") logger.error("Failed to initialize pretrained model tokenizer after 5 attempts.") return None """ Pretrained model wrapper for Wildnerve-tlm01 """ import logging import torch from transformers import AutoModelForCausalLM, AutoTokenizer from service_registry import registry, PRETRAINED_MODEL, TOKENIZER logger = logging.getLogger(__name__) class Wildnerve_tlm01: """ A wrapper for transformer models from HuggingFace. Provides the same interface as our custom models for consistency. """ def __init__( self, model_name="gpt2", tokenizer=None, device=None, **kwargs ): self.model_name = model_name # Use provided tokenizer or get one from registry if tokenizer is not None: self.tokenizer = tokenizer elif registry.has(TOKENIZER): self.tokenizer = registry.get(TOKENIZER) else: try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) logger.info(f"Initialized tokenizer from {model_name}") except Exception as e: logger.error(f"Failed to initialize tokenizer: {e}") self.tokenizer = None try: self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Loading pretrained model from {model_name} on {self.device}") # Don't actually load the full model in this case to save memory # This is just a placeholder that can generate simple responses self.model = None logger.info(f"Created simplified pretrained model wrapper") except Exception as e: logger.error(f"Failed to initialize pretrained model: {e}") self.model = None def generate(self, prompt, **kwargs): """Generate a response to the given prompt""" return f"I processed your request about '{prompt[:20]}...' using my pretrained capabilities." def __call__(self, input_ids, attention_mask=None): """Forward pass for HuggingFace compatibility""" # Simplified placeholder functionality batch_size = input