WildnerveAI's picture
Upload model_Custm.py
4d99388 verified
# model_Custm.py
# Consolidated imports
import os
import sys
import math
import torch
import logging
import traceback
import numpy as np
import transformers
import torch.nn as nn
from typing import Optional, List, Dict, Union, Tuple
from codecarbon import EmissionsTracker
import transformer_patches
from service_registry import registry, MODEL, TOKENIZER
from utils.transformer_utils import get_tokenizer
from utils.smartHybridAttention import SmartHybridAttention, get_hybrid_attention_config
from base_interfaces.common_types import *
from base_interfaces.model_interface import AbstractModel
from config import app_config
import json
from types import SimpleNamespace
# Add import for ModelOutput from transformers
try:
from transformers.modeling_outputs import ModelOutput
except ImportError:
# Fallback definition if transformers isn't available
class ModelOutput:
"""Minimal placeholder for transformers ModelOutput class"""
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
from transformers import PretrainedConfig
# add after imports:
class WildnerveConfig(PretrainedConfig):
model_type = "wildnerve_tlm01"
def __init__(
self,
vocab_size: int = 50257,
embedding_dim: int = 768,
num_heads: int = 12,
hidden_dim: int = 768,
num_layers: int = 12,
output_size: int = 50257,
dropout: float = 0.1,
max_seq_length: int = 767,
pooling_mode: str = "last",
model_name: str = "gpt2",
specialization: str = "general",
**kwargs
):
super().__init__(**kwargs)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.num_heads = num_heads
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_size = output_size
self.dropout = dropout
self.max_seq_length = max_seq_length
self.pooling_mode = pooling_mode
self.model_name = model_name
self.specialization = specialization
logger = logging.getLogger(__name__)
# Check if transformers integrations has CodeCarbonCallback
if hasattr(transformers, 'integrations') and hasattr(transformers.integrations, 'CodeCarbonCallback'):
logger.info("transformers.integrations.CodeCarbonCallback is available")
# Check if we're using our proxy or the real implementation
if hasattr(transformers.integrations, 'CodeCarbonCallback'):
callback_module = transformers.integrations.CodeCarbonCallback.__module__
if (callback_module == 'carbon_tracking'):
logger.info("Using our clean architecture implementation for CodeCarbonCallback")
else:
logger.info(f"Using original implementation for CodeCarbonCallback from {callback_module}")
# Ensure data/model directories exist (silently ignore errors)
for d in (app_config.DATA_DIR, app_config.MODEL_DIR):
try: os.makedirs(d, exist_ok=True)
except Exception as _e: logger.warning(f"Could not create directory {d}: {_e}")
# Suppress TensorFlow logs if present
os.environ.setdefault("TF_ENABLE_ONEDNN_OPTS", "0")
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")
# ----------------------------
# Positional Encoding Module
# ----------------------------
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, max_len: Optional[int] = None):
super().__init__()
# determine max_len dynamically
if max_len is None:
cfg = app_config.TRANSFORMER_CONFIG
if isinstance(cfg, dict):
max_len = cfg.get("MAX_SEQ_LENGTH", 512)
else:
max_len = getattr(cfg, "MAX_SEQ_LENGTH", 512)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(1)
self.register_buffer("pe", pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x shape: (seq_len, batch_size, d_model)
seq_len = x.size(0)
x = x + self.pe[:seq_len]
return x
# ----------------------------
# Wildnerve-tlm01 using Only Custom Encoder/Decoder
# ----------------------------
class Wildnerve_tlm01(nn.Module, AbstractModel):
"""A Transformer-based Tiny Language Model that uses:
- A custom built encoder & decoder (embedding, positional encoding, and TransformerEncoder)
- An adapter and classifier for post-processing
- The AutoTokenizer for consistent tokenization and decoding
- SmartHybridAttention for better context handling"""
# Define the 12 valid specializations
VALID_SPECIALIZATIONS = [
"python",
"rust",
"solidity",
"computer",
"cpp",
"go",
"java",
"javascript",
"mathematics",
"nim",
"other_information",
"physics",
"general" # Include general as a valid fallback option
]
def __init__(
self,
vocab_size: int = 50257, # Default to GPT-2 vocab size (was 30522 for BERT)
specialization: str = "general",
dataset_path: str = None,
model_name: str = "gpt2", # Default to GPT-2 (was bert-base-uncased)
embedding_dim: int = 768, # GPT-2 small uses 768 dimensional embeddings
num_heads: int = 12, # GPT-2 small uses 12 heads
hidden_dim: int = 768, # Match embedding dimension
num_layers: int = 12, # GPT-2 small has 12 layers
output_size: int = 50257, # Match GPT-2 vocab size
dropout: float = 0.1,
max_seq_length: int = 767, # IMPORTANT: Use 767 for consistency with config.json
pooling_mode: str = "last", # Default for autoregressive models
tokenizer=None,
max_length: Optional[int] = None
) -> None:
super().__init__()
# Validate specialization against known valid options
if specialization not in self.VALID_SPECIALIZATIONS:
logger.warning(f"Unknown specialization '{specialization}'. Valid options are: {', '.join(self.VALID_SPECIALIZATIONS)}")
logger.warning(f"Defaulting to 'general' specialization")
specialization = "general"
# Set device once at the start
object.__setattr__(self, "device", torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
self.specialization = specialization
self.dataset_path = dataset_path
self.model_name = model_name
self.pooling_mode = pooling_mode
self.embedding_dim = embedding_dim
self.vocab_size = vocab_size
self.max_seq_length = max_seq_length
self.num_heads = num_heads
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_size = output_size
self.dropout = dropout
# Optionally track model usage
self.model_last_used = {}
# Unified tokenizer initialization:
if tokenizer is not None:
self.tokenizer = tokenizer
else:
if registry.has(TOKENIZER):
self.tokenizer = registry.get(TOKENIZER)
else:
try:
from transformers import GPT2Tokenizer
self.tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
logger.info("Initialized GPT2Tokenizer")
# Ensure pad_token is set for GPT-2
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
except Exception as e:
# More detailed error logging
logger.warning(f"Failed to load GPT-2 tokenizer: {e}")
logger.warning(f"Error details: {traceback.format_exc()}")
# Retry logic - try up to 5 times with increasing delay
retry_count = 0
max_retries = 5
success = False
while not success and retry_count < max_retries:
retry_count += 1
delay = 2 ** retry_count # Exponential backoff
logger.info(f"Retrying tokenizer initialization (attempt {retry_count}/{max_retries}) after {delay}s delay")
try:
import time
time.sleep(delay)
from transformers import GPT2Tokenizer
self.tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
success = True
logger.info(f"Successfully loaded GPT-2 tokenizer on retry {retry_count}")
except Exception as retry_e:
logger.warning(f"Retry {retry_count} failed: {retry_e}")
# Final fallback only after all retries exhausted
if not success:
logger.error("All tokenizer initialization attempts failed")
from utils.transformer_utils import get_tokenizer
self.tokenizer = get_tokenizer(model_name="gpt2")
logger.warning("Using simplified tokenizer wrapper as fallback")
registry.register(TOKENIZER, self.tokenizer, overwrite=True)
# Register this model instance in the registry by specialization
model_registry_key = f"model_{specialization}"
registry.register(model_registry_key, self)
# Also register as default model if it's the primary specialization
if specialization == "general":
registry.register(MODEL, self)
# ----------------------------
# Encoder Components (Custom)
# ----------------------------
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.pos_encoder = PositionalEncoding(embedding_dim, max_len=max_seq_length)
# ----------------------------
# Decoder Components (Custom)
# ----------------------------
self.tgt_embedding = nn.Embedding(vocab_size, embedding_dim)
self.pos_decoder = PositionalEncoding(embedding_dim, max_len=max_seq_length)
# ----------------------------
# Transformer Encoder and Decoder (Custom)
# Always create with batch_first=True for better performance
# ----------------------------
encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=num_heads,
dim_feedforward=hidden_dim,
dropout=dropout,
batch_first=True # Fixed to use batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
decoder_layer = nn.TransformerDecoderLayer(
d_model=embedding_dim,
nhead=num_heads,
dim_feedforward=hidden_dim,
dropout=dropout,
batch_first=True # Fixed to use batch_first=True
)
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
# Initialize the smart hybrid attention
attention_config = get_hybrid_attention_config()
attention_config['NUM_HEADS'] = num_heads
attention_config['WINDOW_SIZE'] = max(256, max_seq_length // 4)
self.hybrid_attention = SmartHybridAttention(attention_config)
# ----------------------------
# Adapter & Output Layers
# ----------------------------
self.adapter = nn.Sequential(
nn.Linear(embedding_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, embedding_dim)
)
self.classifier = nn.Linear(embedding_dim, self.vocab_size)
self.dropout_layer = nn.Dropout(dropout)
# This is a standard linear layer that reshapes 3D input to 2D output:
self.final_layer = nn.Linear(hidden_dim, vocab_size)
self.init_weights()
# instantiate a proper config instead of SimpleNamespace (Never use SimpleNameSpace)
self.config = WildnerveConfig(
vocab_size=self.vocab_size,
embedding_dim=self.embedding_dim,
num_heads=self.num_heads,
hidden_dim=self.hidden_dim,
num_layers=self.num_layers,
output_size=self.output_size,
dropout=self.dropout,
max_seq_length=self.max_seq_length,
pooling_mode=self.pooling_mode,
model_name=self.model_name,
specialization=self.specialization
)
def init_weights(self) -> None:
initrange = 0.1
with torch.no_grad():
self.embedding.weight.uniform_(-initrange, initrange)
self.tgt_embedding.weight.uniform_(-initrange, initrange)
self.classifier.weight.uniform_(-initrange, initrange)
self.classifier.bias.zero_()
for layer in self.adapter:
if isinstance(layer, nn.Linear):
layer.weight.uniform_(-initrange, initrange)
if layer.bias is not None:
layer.bias.zero_()
def forward(
self,
input_ids=None,
attention_mask=None,
labels=None,
src=None,
tgt=None,
src_mask: Optional[torch.Tensor] = None, # added
src_key_padding_mask=None,
tgt_key_padding_mask=None,
memory_key_padding_mask=None,
return_sequence=False,
**kwargs
):
try:
# Log input shapes for debugging
logger.info(f"Input shapes - src: {src.shape if src is not None else None}, tgt: {tgt.shape if tgt is not None else None}")
# Handle either input_ids or src
if input_ids is not None:
src = input_ids
# Source (encoder input)
# In language modeling: [batch_size, seq_length] -> [batch_size, seq_length, embedding_dim]
src_embeddings = self.embedding(src)
# Add positional encoding
src_embeddings = self.pos_encoder(src_embeddings)
# Pass through encoder layers
memory = self.transformer_encoder(src_embeddings,
src_key_padding_mask=src_key_padding_mask)
if src.size(1) > 256 and hasattr(self, 'hybrid_attention'):
# Prepare inputs for hybrid attention
query = src_embeddings.transpose(0, 1)
key = query
value = query
# CRITICAL: Initialize src_mask if it's None
if src_mask is None and src is not None:
# Create a default mask that allows all tokens to attend to all other tokens
src_seq_len = src.size(1)
src_mask = torch.zeros((src_seq_len, src_seq_len), device=src.device, dtype=torch.bool)
# Actually using the hybrid attention here!
hybrid_outputs = self.hybrid_attention(
query=query,
key=key,
value=value,
key_padding_mask=src_key_padding_mask,
attn_mask=src_mask, # Now src_mask is properly defined
prompt_length=src.size(1),
prompt_complexity=0.5
)
# Process the hybrid attention outputs
encoded_src = hybrid_outputs
# Pass through decoder layers
if tgt is not None:
tgt_embeddings = self.tgt_embedding(tgt)
tgt_embeddings = self.pos_decoder(tgt_embeddings)
output = self.transformer_decoder(tgt_embeddings, memory,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask)
else:
output = memory
# Ensure output maintains 3 dimensions [batch_size, seq_length, hidden_dim]
if output.dim() == 2:
output = output.unsqueeze(1)
# Apply final projection to vocabulary space
logits = self.final_layer(output)
# CRITICAL: Ensure output is always 3D [batch_size, seq_length, vocab_size]
if logits.dim() == 2:
# If 2D tensor [batch_size, vocab_size], reshape to 3D [batch_size, 1, vocab_size]
batch_size, vocab_size = logits.shape
logger.info(f"2D tensor: batch_size={batch_size}, vocab_size={vocab_size}")
logits = logits.unsqueeze(1) # Add sequence dimension
logger.info(f"Reshaped 2D output to 3D tensor: {logits.shape}")
# Record the output shape and dimensions for debugging
logger.info(f"Output shape: {logits.shape}, dimensions: {logits.dim()}")
# Calculate loss if labels are provided
loss = None
if labels is not None:
# Reshape labels to 1D if needed
if labels.dim() > 1:
labels = labels.reshape(-1)
logger.info(f"Reshaped labels to {labels.shape}")
# Calculate loss with properly shaped tensors
batch_size, seq_length, vocab_size = logits.shape
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(logits.reshape(-1, vocab_size), labels)
logger.info(f"Returning loss tensor: {loss.item()}")
# Return the proper format
from transformers.modeling_outputs import CausalLMOutputWithCrossAttentions
return CausalLMOutputWithCrossAttentions(
loss=loss,
logits=logits,
past_key_values=None,
hidden_states=None,
attentions=None,
cross_attentions=None
)
except Exception as e:
logger.error(f"Error in forward pass: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
# Log input shapes for debugging
logger.error(f"Input shapes - src: {src.shape if src is not None else None}, input_ids: {input_ids.shape if input_ids is not None else None}")
# Create minimal dummy outputs in correct format
dummy_batch = 1
if src is not None:
dummy_batch = src.shape[0]
elif input_ids is not None:
dummy_batch = input_ids.shape[0]
# CRITICAL: Return a proper 3D tensor even in error case
dummy_output = torch.zeros((dummy_batch, 1, self.vocab_size), device=next(self.parameters()).device)
dummy_loss = torch.tensor(float('nan'), device=next(self.parameters()).device)
return CausalLMOutputWithCrossAttentions(
loss=dummy_loss,
logits=dummy_output,
past_key_values=None,
hidden_states=None,
attentions=None,
cross_attentions=None
)
# Add sentence transformer methods
def encode_sentences(self, sentences, batch_size=32, normalize_embeddings=True):
"""Encode sentences into vectors (sentence transformer functionality)"""
self.eval()
from torch.utils.data import DataLoader, Dataset
# Handle single sentence
if isinstance(sentences, str):
sentences = [sentences]
class SentencesDataset(Dataset):
def __init__(self, sentences, tokenizer, max_length):
self.sentences = sentences
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.sentences)
def __getitem__(self, idx):
return self.tokenizer(self.sentences[idx],
padding='max_length',
truncation=True,
max_length=self.max_length,
return_tensors='pt')
# Create dataset and dataloader
dataset = SentencesDataset(sentences, self.tokenizer, self.max_seq_length)
dataloader = DataLoader(dataset, batch_size=batch_size)
all_embeddings = []
device = next(self.parameters()).device
with torch.no_grad():
for batch in dataloader:
inputs = {k: v.squeeze(1).to(device) for k, v in batch.items()}
outputs = self(inputs['input_ids'], src_key_padding_mask=inputs.get('attention_mask'))
if normalize_embeddings:
outputs = torch.nn.functional.normalize(outputs, p=2, dim=1)
all_embeddings.append(outputs.cpu().numpy())
return np.vstack(all_embeddings)
def similarity(self, sentence1: str, sentence2: str) -> float:
"""Compute cosine similarity between two sentences"""
embeddings = self.encode_sentences([sentence1, sentence2])
return np.dot(embeddings[0], embeddings[1]) / (np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1]))
# Update generate to use adapter_layer as the primary generation point
def generate(
self,
prompt=None,
input_ids=None,
max_length: int = None,
device: str = None,
temperature: float = 0.7,
**kwargs
) -> str:
"""Generate text using the model, supporting either prompt string or input_ids."""
# Try to use adapter_layer.generate if available
adapter_layer = registry.get("adapter_layer")
if (adapter_layer and hasattr(adapter_layer, "generate")):
if prompt:
return adapter_layer.generate(prompt, max_length=max_length, temperature=temperature, **kwargs)
elif input_ids is not None and self.tokenizer:
# Convert input_ids back to text to use centralized generation
decoded_prompt = self.tokenizer.decode(input_ids[0], skip_special_tokens=True)
return adapter_layer.generate(decoded_prompt, max_length=max_length, temperature=temperature, **kwargs)
# Fall back to direct generation if adapter_layer is not available
# Log what we're working with
logger.info(f"Generate called with: prompt={type(prompt).__name__ if prompt else None}, input_ids={type(input_ids).__name__ if input_ids else None}")
# Handle case where max_length is not provided
if max_length is None:
if hasattr(self, 'max_seq_length'):
max_length = self.max_seq_length
else: # Default fallback
max_length = 512
# Handle device if not provided
if device is None:
device = next(self.parameters()).device
# Case 1: String prompt, tokenize it
if isinstance(prompt, str) and prompt:
if not self.tokenizer:
raise ValueError("Tokenizer not available but prompt is a string")
# Create inputs with attention mask - ensure we have the tokenizer
if callable(self.tokenizer):
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
padding=True
)
input_ids = inputs.input_ids.to(device)
logger.debug(f"Tokenized prompt '{prompt[:20]}...' to tensor of shape {input_ids.shape}")
# Case 2: No inputs provided or invalid inputs
if input_ids is None:
raise ValueError("Either prompt or input_ids must be provided")
# Ensure input_ids is a tensor with batch dimension
if not isinstance(input_ids, torch.Tensor):
input_ids = torch.tensor(input_ids, dtype=torch.long)
if input_ids.dim() == 1:
input_ids = input_ids.unsqueeze(0)
# Process parameters
gen_kwargs = {}
gen_kwargs.update(kwargs) # Include any passed parameters
# Set common defaults if not specified
if 'max_length' not in gen_kwargs and 'max_new_tokens' not in gen_kwargs:
# Handle cases where input might be close to max_length
if input_ids.shape[1] > max_length - 50:
gen_kwargs['max_new_tokens'] = 100
else:
gen_kwargs['max_length'] = max_length
if 'temperature' not in gen_kwargs:
gen_kwargs['temperature'] = temperature
# If we're close to max_length, use max_new_tokens instead
if 'max_length' in gen_kwargs and input_ids.shape[1] > (gen_kwargs['max_length'] - 50):
logger.info(f"Input length {input_ids.shape[1]} close to max_length, switching to max_new_tokens")
gen_kwargs['max_new_tokens'] = 100
del gen_kwargs['max_length']
try:
# Use the actual model's generate method
output_ids = self.generate_tokens(input_ids, **gen_kwargs)
# Decode the output
if self.tokenizer:
# Strip input from response when decoding
input_length = input_ids.shape[1]
if hasattr(output_ids, 'shape') and len(output_ids.shape) > 1 and output_ids.shape[1] > input_length:
response_ids = output_ids[0][input_length:]
generated_text = self.tokenizer.decode(response_ids, skip_special_tokens=True)
else:
# Default decoding
generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
return generated_text
else:
# No tokenizer available
return f"Generated token IDs: {output_ids}"
except Exception as e:
logger.error(f"Error in generate: {e}", exc_info=True)
return f"Error generating response: {str(e)}"
def generate_tokens(self, input_ids, max_length=None, temperature=0.7, top_k=50, top_p=0.95, repetition_penalty=1.0, **kwargs):
"""Generate tokens autoregressively."""
logger.info(f"generate_tokens called with tensor of shape: {input_ids.shape if hasattr(input_ids, 'shape') else 'unknown'}")
try:
import torch
# Make sure input_ids is a tensor
if not isinstance(input_ids, torch.Tensor):
input_ids = torch.tensor(input_ids, dtype=torch.long)
# Add batch dimension if needed
if input_ids.dim() == 1:
input_ids = input_ids.unsqueeze(0)
# Set reasonable defaults for missing parameters
if max_length is None:
max_length = min(getattr(self, 'max_seq_length', 1024), 1024)
max_length = min(max_length, 1024)
# Initialize generated sequences with input_ids
generated_sequences = input_ids.clone()
# Auto-regressive generation loop
for step in range(max_length - input_ids.shape[1]):
# Forward pass through the model
with torch.no_grad():
outputs = self(generated_sequences)
# FIX: Handle both 2D and 3D output formats
if outputs.dim() == 3: # [batch_size, seq_length, vocab_size]
# Original 3D format
next_token_logits = outputs[:, -1, :]
else: # outputs.dim() == 2: [batch_size, vocab_size]
# Direct 2D output format
next_token_logits = outputs
# Apply temperature
if temperature > 0:
next_token_logits = next_token_logits / temperature
# Apply top-k filtering
if top_k > 0:
top_k_values, top_k_indices = torch.topk(next_token_logits, top_k)
next_token_logits = torch.full_like(next_token_logits, float("-inf"))
for batch_idx in range(generated_sequences.shape[0]):
next_token_logits[batch_idx, top_k_indices[batch_idx]] = top_k_values[batch_idx]
# Sample next token
probs = torch.softmax(next_token_logits, dim=-1)
next_tokens = torch.multinomial(probs, num_samples=1).squeeze(-1)
# Add to sequence
generated_sequences = torch.cat([generated_sequences, next_tokens.unsqueeze(-1)], dim=1)
# Optional stopping criteria could be added here
return generated_sequences
except Exception as e:
logger.error(f"Error in generate_tokens: {e}")
return input_ids # Return input as fallback
def generate_with_decoding(self, input_ids=None, prompt=None, **kwargs):
"""
Generate text from either input_ids or a text prompt.
This is a helper method that handles both tokenization and decoding.
"""
try:
# Handle either prompt or input_ids
if prompt is not None and input_ids is None:
if not hasattr(self, 'tokenizer') or self.tokenizer is None:
logger.error("No tokenizer available for text prompt")
return "Error: No tokenizer available for processing text prompt"
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, padding=True)
input_ids = inputs.input_ids
if input_ids is None:
logger.error("Neither prompt nor input_ids provided")
return "Error: No input provided"
# Generate token ids
output_ids = self.generate_tokens(input_ids, **kwargs)
# Decode the generated ids
if not hasattr(self, 'tokenizer') or self.tokenizer is None:
return f"Generated sequence (no tokenizer): {output_ids.tolist()}"
return self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
except Exception as e:
logger.error(f"Error in generate_with_decoding: {e}", exc_info=True)
return f"Error generating text: {str(e)}"
def forward_with_custom_embeddings(self, embeddings: torch.Tensor) -> torch.Tensor:
"""Forward pass that accepts pre-calculated embeddings to bypass shape errors."""
try:
# Get device
device = next(self.parameters()).device
embeddings = embeddings.to(device)
# Process through transformer encoder - bypassing the embedding layer
# Check if embeddings need to be transposed for batch_first format
batch_first = getattr(self.transformer_encoder, 'batch_first', False)
if batch_first and embeddings.shape[0] <= embeddings.shape[1]:
# First dimension is smaller than second, likely needs transpose
# from [seq_len, batch, dim] to [batch, seq_len, dim]
embeddings = embeddings.transpose(0, 1)
# Apply position encoding if needed
if hasattr(self, 'pos_encoder'):
# Check if position encoder expects seq_first or batch_first
if not batch_first:
# Ensure shape is [seq_len, batch, dim]
if embeddings.shape[0] > embeddings.shape[1]:
# Already in correct format
embeddings = self.pos_encoder(embeddings)
else:
# Need to transpose first
embeddings = embeddings.transpose(0, 1)
embeddings = self.pos_encoder(embeddings)
embeddings = embeddings.transpose(0, 1)
else:
# With batch_first, no need to transpose
embeddings = self.pos_encoder(embeddings)
# Process through encoder
encoded = self.transformer_encoder(embeddings)
# Process through adapter
if hasattr(self, 'adapter'):
encoded = self.adapter(encoded)
# Apply pooling for output
if self.pooling_mode == "mean":
pooled = encoded.mean(dim=1)
elif self.pooling_mode == "max":
pooled = torch.max(encoded, dim=1)[0]
elif self.pooling_mode == "cls":
# Use first token (CLS token) for classification
pooled = encoded[:, 0]
else:
pooled = encoded.mean(dim=1)
# Final dropout and classification
pooled = self.dropout_layer(pooled)
output = self.classifier(pooled)
return output
except Exception as e:
logger.error(f"Error in custom embeddings forward pass: {e}")
# Return a tensor of the right shape to prevent further errors
return torch.zeros(1, self.output_size, device=device)
def forward_with_error_handling(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
**kwargs
) -> Union[torch.Tensor, Dict[str, torch.Tensor]]:
"""Forward pass with enhanced error handling for shape mismatches"""
try:
# Try standard forward pass first
return self.forward(
src=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
**kwargs
)
except RuntimeError as e:
# Check if this is a shape error
if "shape" in str(e):
logger.warning(f"Shape mismatch detected: {e}")
if input_ids.dim() == 3 and input_ids.size(0) > input_ids.size(1):
input_ids = input_ids.transpose(0, 1) # Adjust shape as needed
# Retry the forward pass using adapted input
try:
embedded = self.embedding(input_ids)
if hasattr(self, 'pos_encoder'):
embedded = self.pos_encoder(embedded)
encoder_out = self.transformer_encoder(embedded)
pooled = encoder_out.mean(dim=1)
pooled = self.dropout_layer(pooled)
return self.classifier(pooled)
except Exception as inner_e:
logger.error(f"Adaptation failed: {inner_e}")
batch_size = input_ids.size(0) if input_ids is not None else 1
return torch.zeros((batch_size, self.output_size), device=self.device)
# Re-raise the exception if not handled
raise
except Exception as e:
logger.error(f"Unhandled error in forward_with_error_handling: {e}")
batch_size = input_ids.size(0) if input_ids is not None else 1
return torch.zeros((batch_size, self.output_size), device=self.device)
def train_with_emissions_tracking(self, dataloader, optimizer, criterion, num_epochs=1):
"""
Train the model while tracking carbon emissions using CodeCarbon.
"""
tracker = EmissionsTracker()
tracker.start() # Start tracking emissions
self.train() # Set model to training mode
for epoch in range(num_epochs):
for batch in dataloader:
inputs, labels = batch
inputs, labels = inputs.to(self.device), labels.to(self.device)
optimizer.zero_grad()
outputs = self(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
logging.info(f"Epoch {epoch + 1}/{num_epochs} completed.")
emissions = tracker.stop() # Stop tracking emissions
logging.info(f"Training completed. Carbon emissions: {emissions:.4f} kg CO2")
def infer_with_emissions_tracking(self, input_ids):
"""
Perform inference while tracking carbon emissions using CodeCarbon.
"""
tracker = EmissionsTracker()
tracker.start() # Start tracking emissions
self.eval() # Set model to evaluation mode
with torch.no_grad():
outputs = self(input_ids)
emissions = tracker.stop() # Stop tracking emissions
logging.info(f"Inference completed. Carbon emissions: {emissions:.4f} kg CO2")
return outputs
def __call__(self, input_ids, attention_mask=None, labels=None, **kwargs):
"""Forward pass with HF-style parameters"""
try:
return self.forward(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
**kwargs
)
except Exception as e:
logger.error(f"Error in __call__: {e}")
# Return dummy output in HF-style format
batch_size = input_ids.shape[0] if hasattr(input_ids, 'shape') else 1
vocab_size = self.vocab_size if hasattr(self, 'vocab_size') else 50257
device = input_ids.device if hasattr(input_ids, 'device') else 'cpu'
# Add shape debugging
if labels is not None:
logger.error(f"Input shapes - input_ids: {input_ids.shape if hasattr(input_ids, 'shape') else 'unknown'}, "
f"labels: {labels.shape if hasattr(labels, 'shape') else 'unknown'}")
# Return dummy output in expected format
dummy_output = torch.zeros((batch_size, vocab_size), device=device)
class SimpleOutput:
def __init__(self, logits):
self.logits = logits
return SimpleOutput(dummy_output)
def save_pretrained(self, save_directory: str):
"""Save model weights in HF format."""
os.makedirs(save_directory, exist_ok=True)
pt_file = os.path.join(save_directory, "pytorch_model.bin")
torch.save(self.state_dict(), pt_file)
logger.info(f"Saved model weights to {pt_file}")
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path: str, *args, **kwargs):
"""
Instantiate model then load weights.
Accepts either a folder (containing pytorch_model.bin)
or a direct path to a .bin file.
"""
model = cls(*args, **kwargs)
if os.path.isdir(pretrained_model_name_or_path):
weight_path = os.path.join(pretrained_model_name_or_path, "pytorch_model.bin")
else:
weight_path = pretrained_model_name_or_path
state = torch.load(weight_path, map_location=model.device)
model.load_state_dict(state, strict=False)
logger.info(f"Loaded weights from {weight_path}")
return model
# Register the model class in registry for discovery
registry.register("model_class_custom", Wildnerve_tlm01)
# Check if tokenizer is initialized properly.
def initialize_tokenizer():
"""
Fallback function to initialize the tokenizer.
Tries up to 5 times and logs debug messages on each attempt.
"""
from transformers import GPT2Tokenizer, AutoTokenizer
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
# Attempt to get tokenizer from the registry
from service_registry import registry, TOKENIZER
if registry.has(TOKENIZER):
tokenizer = registry.get(TOKENIZER)
if tokenizer is not None:
logger.debug(f"Attempt {attempt}: Successfully retrieved tokenizer from registry.")
return tokenizer
# Fallback: load tokenizer directly
tokenizer = AutoTokenizer.from_pretrained("gpt2")
logger.debug(f"Attempt {attempt}: Successfully loaded GPT-2 tokenizer.")
# Ensure pad_token is set for GPT-2
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
logger.debug("Set pad_token to eos_token for GPT-2 tokenizer")
# Register it for future use
registry.register(TOKENIZER, tokenizer)
return tokenizer
except Exception as e:
logger.debug(f"Attempt {attempt}: Failed to initialize tokenizer due to: {e}")
# No return here - continue to next attempt
# This is reached only after all attempts fail
logger.error("Tokenizer initialization failed after 5 attempts. Using fallback GPT2Tokenizer.")
try:
# Last resort fallback - but don't return immediately
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
# Ensure pad_token is set for GPT-2
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
return tokenizer # Return the fallback tokenizer
except Exception as e:
logger.error(f"Default tokenizer initialization failed: {e}")
return None # Return None to indicate complete failure