WildnerveAI's picture
Upload 8 files
6ffc9f3 verified
# model_prtr.py
import os
import sys
import math
import torch
import logging
import importlib
import torch.nn as nn
from config import load_config, app_config
# Fix: Move transformers imports to module scope
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import Optional, List, Dict, Any, Union
from sentence_transformers import SentenceTransformer
# Import service registry
from service_registry import registry, MODEL, TOKENIZER, PRETRAINED_MODEL
# First import base interfaces
from base_interfaces.common_types import *
from base_interfaces.model_interface import AbstractModel
from model_manager import safe_get_config_value
app_config = load_config()
logger = logging.getLogger(__name__)
# ----------------------------
# Positional Encoding Module (for decoder)
# ----------------------------
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, max_len: Optional[int] = None):
super().__init__()
# Get MAX_SEQ_LENGTH safely from config
if max_len is None:
if hasattr(app_config, "TRANSFORMER_CONFIG") and isinstance(app_config.TRANSFORMER_CONFIG, dict):
max_len = app_config.TRANSFORMER_CONFIG.get("MAX_SEQ_LENGTH", 1024)
else:
max_len = 1024 # Safe default
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(1) # shape: (max_len, 1, d_model)
self.register_buffer('pe', pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x shape: (seq_len, batch_size, d_model)
seq_len = x.size(0)
x = x + self.pe[:seq_len]
return x
# ----------------------------
# Wildnerve-tlm01 using Only Pretrained Encoder
# ----------------------------
class Wildnerve_tlm01(nn.Module, AbstractModel):
"""A Transformer-based language model that uses:
- A pretrained GPT-2 model for powerful text generation
- A custom decoder stack
The model uses the GPT-2 tokenizer for consistent tokenization."""
def __init__(
self,
vocab_size: int = 50257, # Standardized GPT-2 vocab size
specialization: str = "general",
dataset_path: str = None,
model_name: str = "gpt2", # Standardized to GPT-2
embedding_dim: int = 768,
num_heads: int = 12,
hidden_dim: int = 768,
num_layers: int = 6,
output_size: int = 50257, # Standardized GPT-2 vocab size
dropout: float = 0.1,
max_seq_length: int = 1024, # GPT-2 supports longer contexts
pooling_mode: str = "last", # GPT-2 typically uses last token
tokenizer=None,
max_length: Optional[int] = None
) -> None:
super().__init__()
self.specialization = specialization
self.dataset_path = dataset_path
self.model_name = model_name
self.pooling_mode = pooling_mode
self.vocab_size = vocab_size
self.max_seq_length = max_seq_length
self.embedding_dim = embedding_dim
self.num_heads = num_heads
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_size = output_size
self.dropout = dropout
# fetch MAX_SEQ_LENGTH safely
cfg = safe_get_config_value(app_config, "TRANSFORMER_CONFIG", {})
self.max_length = max_length or cfg.get("MAX_SEQ_LENGTH", 1024) # Increased for GPT-2
# Use GPT-2 directly for text generation (not a simplified version)
try:
# Use the full GPT-2 model implementation for production use
from transformers import GPT2LMHeadModel, GPT2Tokenizer
# Initialize the model and tokenizer
self.model_name = model_name
self.gpt2_model = None # Will be loaded on first use
# Ensure proper tokenizer setup for GPT-2
if tokenizer is not None:
self.tokenizer = tokenizer
elif registry.has(TOKENIZER):
self.tokenizer = registry.get(TOKENIZER)
else:
self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
# Ensure GPT-2 tokenizer has pad_token set (critical fix)
if self.tokenizer.pad_token_id is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
logger.info(f"Successfully initialized GPT-2 model: {model_name}")
except Exception as e:
logger.error(f"Error initializing GPT-2 model: {e}", exc_info=True)
raise
# Register this model instance in the registry by specialization
model_registry_key = f"model_{specialization}"
registry.register(model_registry_key, self)
# Also register as pretrained model
registry.register(PRETRAINED_MODEL, self, overwrite=True)
logger.info("Registered GPT-2 model as pretrained model")
def _ensure_model_loaded(self):
if self.gpt2_model is None:
self.gpt2_model = GPT2LMHeadModel.from_pretrained(self.model_name)
# Replace the old forward method with GPT-2 specific implementation
def forward(self, src: torch.Tensor, tgt: Optional[torch.Tensor] = None,
src_key_padding_mask: Optional[torch.Tensor] = None,
tgt_key_padding_mask: Optional[torch.Tensor] = None,
return_sequence: bool = False,
**kwargs) -> torch.Tensor:
self._ensure_model_loaded() # Load model only when needed
# Use GPT-2 directly for generation
outputs = self.gpt2_model(src, **kwargs)
return outputs.logits
# Update generate to handle both direct prompt and tokenized input
def generate(self, prompt=None, input_ids=None, max_length=None, **kwargs):
"""Generate text using the GPT-2 model"""
self._ensure_model_loaded() # Load model only when needed
try:
# Try to use adapter_layer.generate if available (consolidate generation paths)
adapter_layer = registry.get("adapter_layer")
if adapter_layer and hasattr(adapter_layer, "generate"):
if prompt:
return adapter_layer.generate(prompt, max_length=max_length, **kwargs)
elif input_ids is not None and self.tokenizer:
# Convert input_ids back to text
prompt = self.tokenizer.decode(input_ids[0], skip_special_tokens=True)
return adapter_layer.generate(prompt, max_length=max_length, **kwargs)
# Continue with direct generation if adapter_layer not available
# Enhanced generation parameters
generation_config = {
"max_length": max_length or 150,
"temperature": kwargs.get('temperature', 0.7),
"top_p": kwargs.get('top_p', 0.95),
"top_k": kwargs.get('top_k', 50),
"repetition_penalty": kwargs.get('repetition_penalty', 1.3),
"no_repeat_ngram_size": kwargs.get('no_repeat_ngram_size', 3),
"do_sample": True,
"pad_token_id": self.tokenizer.pad_token_id,
"eos_token_id": self.tokenizer.eos_token_id,
"early_stopping": True,
"penalty_alpha": 0.6 # Add penalty alpha for better response quality
}
# Handle either string prompt or direct input_ids
if isinstance(prompt, str) and input_ids is None:
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
input_ids = inputs.input_ids
elif input_ids is None:
raise ValueError("Either prompt or input_ids must be provided")
# Add user-provided kwargs that we didn't explicitly set
for k, v in kwargs.items():
if k not in generation_config and k not in ('prompt', 'context'):
generation_config[k] = v
# Use max_new_tokens instead of max_length if input is longer than max_length-50
if input_ids.shape[1] > (generation_config["max_length"] - 50):
logger.info(f"Input length {input_ids.shape[1]} is close to max_length, using max_new_tokens instead")
del generation_config["max_length"]
# Generate output using the full GPT-2 model
output_ids = self.gpt2_model.generate(input_ids, **generation_config)
# Decode the output and ensure it's a string, not a tensor
if torch.is_tensor(output_ids):
generated_text = self.tokenizer.decode(output_ids[0].cpu(), skip_special_tokens=True)
else:
generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
return generated_text
except Exception as e:
logger.error(f"Error in GPT-2 generation: {e}", exc_info=True)
return f"Error generating response: {str(e)}"
def generate_streaming(self, prompt=None, input_ids=None, **kwargs):
"""Generate tokens one by one in streaming fashion"""
self._ensure_model_loaded() # Load model only when needed
try:
# Handle either text or tokenized input
if prompt is not None and input_ids is None:
inputs = self.tokenizer(
prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=self.max_length
)
input_ids = inputs.input_ids
# Set generation parameters
max_length = kwargs.get('max_length', min(self.max_length, 200))
temperature = kwargs.get('temperature', 0.7)
top_p = kwargs.get('top_p', 0.9)
# Generate with token streaming
from transformers import TextIteratorStreamer
from threading import Thread
streamer = TextIteratorStreamer(
self.tokenizer,
timeout=10.0,
skip_prompt=True,
skip_special_tokens=True
)
generation_kwargs = dict(
input_ids=input_ids,
max_length=max_length,
temperature=temperature,
top_p=top_p,
streamer=streamer,
do_sample=True,
)
# Create a thread to run the generation
thread = Thread(target=self.gpt2_model.generate, kwargs=generation_kwargs)
thread.start()
# Stream the output tokens
for token in streamer:
yield token
except Exception as e:
logger.error(f"Error in streaming generation: {e}", exc_info=True)
yield f"Error: {str(e)}"
#-------Pretrained Transformer Model-------------
class PretrainedTransformer(nn.Module, AbstractModel):
"""A simple wrapper around a pretrained Hugging Face transformer model."""
def __init__(
self,
vocab_size=50257, # Updated for GPT-2 (was 30522)
specialization="general",
dataset_path=None,
model_name="gpt2", # Updated from bert-base-uncased
embedding_dim=768,
num_heads=12,
hidden_dim=768,
num_layers=6,
output_size=768,
dropout=0.1,
max_seq_length=1024, # Increased for GPT-2
pooling_mode="last", # Changed from "mean" for GPT-2
tokenizer=None,
**kwargs
) -> None:
super().__init__()
# Optionally track model usage
self.model_last_used = {}
# Unified tokenizer initialization:
# Primary: Load tokenizer for "gpt2"
# Fallback: if it fails, try GPT2 tokenizer
if tokenizer is not None:
self.tokenizer = tokenizer
else:
# Use imports from module scope
if registry.has(TOKENIZER):
self.tokenizer = registry.get(TOKENIZER)
else:
try:
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
logger.info("Loaded primary tokenizer: gpt2")
# Add pad token if not present (GPT-2 doesn't have one by default)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
except Exception as e:
logger.warning(f"Primary tokenizer load failed: {e}")
self.tokenizer = None
registry.register(TOKENIZER, self.tokenizer)
# Set model names for fallback chain explicitly
self.model_name = model_name # Should be "gpt2"
self.fallback_model = "gpt2" # Fallback tokenization/model if needed
# Use AutoModelForCausalLM instead of AutoModel for GPT-2
self.model = AutoModelForCausalLM.from_pretrained(model_name)
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
# Add pad token if not present (GPT-2 doesn't have one by default)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
except Exception as e:
logger.error(f"Failed to load tokenizer for {model_name}: {e}")
self.tokenizer = None
def forward(self, input_ids, attention_mask=None):
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
return outputs.last_hidden_state
def encode(self, text: str):
if not self.tokenizer:
raise ValueError("Tokenizer not available")
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)
with torch.no_grad():
outputs = self.forward(inputs.input_ids, inputs.get("attention_mask"))
# Pool by averaging the token embeddings
return outputs.mean(dim=1)
def generate(self, input_ids, max_length=100, **kwargs):
# Use generate method from model if available, else fallback.
if hasattr(self.model, "generate"):
return self.model.generate(input_ids=input_ids, max_length=max_length, **kwargs)
else:
# Simple fallback: return input_ids as is
return input_ids
# Register model classes in registry
registry.register("model_class_pretrained", Wildnerve_tlm01)
registry.register("pretrained_transformer_class", PretrainedTransformer)
# Check if pretrained transformers are properly initialized.
def initialize_pretrained_model():
"""Attempt to initialize a pretrained tokenizer with a fallback mechanism.
Tries to load 'bert-base-uncased' first; if that fails, attempts to load 'gpt2'.
If the fallback is used, then reattempts loading 'bert-base-uncased' on subsequent tries.
Repeats up to 5 attempts in total.
Returns:
The initialized tokenizer instance if successful, otherwise None."""
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
logger.info(f"Attempt {attempt}: Successfully loaded bert-base-uncased.")
return tokenizer
except Exception as e:
logger.warning(f"Attempt {attempt}: Loading bert-base-uncased failed: {e}")
try:
tokenizer = AutoTokenizer.from_pretrained("gpt2")
logger.info(f"Attempt {attempt}: Successfully loaded gpt2 as fallback.")
return tokenizer
except Exception as e2:
logger.warning(f"Attempt {attempt}: Loading gpt2 failed as fallback: {e2}")
logger.info("Retrying tokenizer initialization...")
logger.error("Failed to initialize pretrained model tokenizer after 5 attempts.")
return None
"""
Pretrained model wrapper for Wildnerve-tlm01
"""
import logging
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from service_registry import registry, PRETRAINED_MODEL, TOKENIZER
logger = logging.getLogger(__name__)
class Wildnerve_tlm01:
"""
A wrapper for transformer models from HuggingFace.
Provides the same interface as our custom models for consistency.
"""
def __init__(
self,
model_name="gpt2",
tokenizer=None,
device=None,
**kwargs
):
self.model_name = model_name
# Use provided tokenizer or get one from registry
if tokenizer is not None:
self.tokenizer = tokenizer
elif registry.has(TOKENIZER):
self.tokenizer = registry.get(TOKENIZER)
else:
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
logger.info(f"Initialized tokenizer from {model_name}")
except Exception as e:
logger.error(f"Failed to initialize tokenizer: {e}")
self.tokenizer = None
try:
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Loading pretrained model from {model_name} on {self.device}")
# Don't actually load the full model in this case to save memory
# This is just a placeholder that can generate simple responses
self.model = None
logger.info(f"Created simplified pretrained model wrapper")
except Exception as e:
logger.error(f"Failed to initialize pretrained model: {e}")
self.model = None
def generate(self, prompt, **kwargs):
"""Generate a response to the given prompt"""
return f"I processed your request about '{prompt[:20]}...' using my pretrained capabilities."
def __call__(self, input_ids, attention_mask=None):
"""Forward pass for HuggingFace compatibility"""
# Simplified placeholder functionality
batch_size = input