| |
|
| | import os
|
| | import sys
|
| | import math
|
| | import torch
|
| | import logging
|
| | import importlib
|
| | import torch.nn as nn
|
| | from config import load_config, app_config
|
| |
|
| |
|
| | from transformers import GPT2LMHeadModel, GPT2Tokenizer
|
| | from transformers import AutoModelForCausalLM, AutoTokenizer
|
| |
|
| | from typing import Optional, List, Dict, Any, Union
|
| | from sentence_transformers import SentenceTransformer
|
| |
|
| | from service_registry import registry, MODEL, TOKENIZER, PRETRAINED_MODEL
|
| |
|
| | from base_interfaces.common_types import *
|
| | from base_interfaces.model_interface import AbstractModel
|
| | from model_manager import safe_get_config_value
|
| |
|
| | app_config = load_config()
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| |
|
| |
|
| | class PositionalEncoding(nn.Module):
|
| | def __init__(self, d_model: int, max_len: Optional[int] = None):
|
| | super().__init__()
|
| |
|
| | if max_len is None:
|
| | if hasattr(app_config, "TRANSFORMER_CONFIG") and isinstance(app_config.TRANSFORMER_CONFIG, dict):
|
| | max_len = app_config.TRANSFORMER_CONFIG.get("MAX_SEQ_LENGTH", 1024)
|
| | else:
|
| | max_len = 1024
|
| |
|
| | pe = torch.zeros(max_len, d_model)
|
| | position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
|
| | div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model))
|
| | pe[:, 0::2] = torch.sin(position * div_term)
|
| | pe[:, 1::2] = torch.cos(position * div_term)
|
| | pe = pe.unsqueeze(1)
|
| | self.register_buffer('pe', pe)
|
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| |
|
| | seq_len = x.size(0)
|
| | x = x + self.pe[:seq_len]
|
| | return x
|
| |
|
| |
|
| |
|
| |
|
| | class Wildnerve_tlm01(nn.Module, AbstractModel):
|
| | """A Transformer-based language model that uses:
|
| | - A pretrained GPT-2 model for powerful text generation
|
| | - A custom decoder stack
|
| | The model uses the GPT-2 tokenizer for consistent tokenization."""
|
| | def __init__(
|
| | self,
|
| | vocab_size: int = 50257,
|
| | specialization: str = "general",
|
| | dataset_path: str = None,
|
| | model_name: str = "gpt2",
|
| | embedding_dim: int = 768,
|
| | num_heads: int = 12,
|
| | hidden_dim: int = 768,
|
| | num_layers: int = 6,
|
| | output_size: int = 50257,
|
| | dropout: float = 0.1,
|
| | max_seq_length: int = 1024,
|
| | pooling_mode: str = "last",
|
| | tokenizer=None,
|
| | max_length: Optional[int] = None
|
| | ) -> None:
|
| | super().__init__()
|
| | self.specialization = specialization
|
| | self.dataset_path = dataset_path
|
| | self.model_name = model_name
|
| | self.pooling_mode = pooling_mode
|
| | self.vocab_size = vocab_size
|
| | self.max_seq_length = max_seq_length
|
| | self.embedding_dim = embedding_dim
|
| | self.num_heads = num_heads
|
| | self.hidden_dim = hidden_dim
|
| | self.num_layers = num_layers
|
| | self.output_size = output_size
|
| | self.dropout = dropout
|
| |
|
| |
|
| | cfg = safe_get_config_value(app_config, "TRANSFORMER_CONFIG", {})
|
| | self.max_length = max_length or cfg.get("MAX_SEQ_LENGTH", 1024)
|
| |
|
| |
|
| | try:
|
| |
|
| | from transformers import GPT2LMHeadModel, GPT2Tokenizer
|
| |
|
| |
|
| | self.model_name = model_name
|
| | self.gpt2_model = None
|
| |
|
| |
|
| | if tokenizer is not None:
|
| | self.tokenizer = tokenizer
|
| | elif registry.has(TOKENIZER):
|
| | self.tokenizer = registry.get(TOKENIZER)
|
| | else:
|
| | self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
|
| |
|
| |
|
| | if self.tokenizer.pad_token_id is None:
|
| | self.tokenizer.pad_token = self.tokenizer.eos_token
|
| | self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
|
| |
|
| | logger.info(f"Successfully initialized GPT-2 model: {model_name}")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error initializing GPT-2 model: {e}", exc_info=True)
|
| | raise
|
| |
|
| |
|
| | model_registry_key = f"model_{specialization}"
|
| | registry.register(model_registry_key, self)
|
| |
|
| |
|
| | registry.register(PRETRAINED_MODEL, self, overwrite=True)
|
| | logger.info("Registered GPT-2 model as pretrained model")
|
| |
|
| | def _ensure_model_loaded(self):
|
| | if self.gpt2_model is None:
|
| | self.gpt2_model = GPT2LMHeadModel.from_pretrained(self.model_name)
|
| |
|
| |
|
| | def forward(self, src: torch.Tensor, tgt: Optional[torch.Tensor] = None,
|
| | src_key_padding_mask: Optional[torch.Tensor] = None,
|
| | tgt_key_padding_mask: Optional[torch.Tensor] = None,
|
| | return_sequence: bool = False,
|
| | **kwargs) -> torch.Tensor:
|
| |
|
| | self._ensure_model_loaded()
|
| |
|
| | outputs = self.gpt2_model(src, **kwargs)
|
| | return outputs.logits
|
| |
|
| |
|
| | def generate(self, prompt=None, input_ids=None, max_length=None, **kwargs):
|
| | """Generate text using the GPT-2 model"""
|
| | self._ensure_model_loaded()
|
| | try:
|
| |
|
| | adapter_layer = registry.get("adapter_layer")
|
| | if adapter_layer and hasattr(adapter_layer, "generate"):
|
| | if prompt:
|
| | return adapter_layer.generate(prompt, max_length=max_length, **kwargs)
|
| | elif input_ids is not None and self.tokenizer:
|
| |
|
| | prompt = self.tokenizer.decode(input_ids[0], skip_special_tokens=True)
|
| | return adapter_layer.generate(prompt, max_length=max_length, **kwargs)
|
| |
|
| |
|
| |
|
| | generation_config = {
|
| | "max_length": max_length or 150,
|
| | "temperature": kwargs.get('temperature', 0.7),
|
| | "top_p": kwargs.get('top_p', 0.95),
|
| | "top_k": kwargs.get('top_k', 50),
|
| | "repetition_penalty": kwargs.get('repetition_penalty', 1.3),
|
| | "no_repeat_ngram_size": kwargs.get('no_repeat_ngram_size', 3),
|
| | "do_sample": True,
|
| | "pad_token_id": self.tokenizer.pad_token_id,
|
| | "eos_token_id": self.tokenizer.eos_token_id,
|
| | "early_stopping": True,
|
| | "penalty_alpha": 0.6
|
| | }
|
| |
|
| |
|
| | if isinstance(prompt, str) and input_ids is None:
|
| | inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
|
| | input_ids = inputs.input_ids
|
| | elif input_ids is None:
|
| | raise ValueError("Either prompt or input_ids must be provided")
|
| |
|
| |
|
| | for k, v in kwargs.items():
|
| | if k not in generation_config and k not in ('prompt', 'context'):
|
| | generation_config[k] = v
|
| |
|
| |
|
| | if input_ids.shape[1] > (generation_config["max_length"] - 50):
|
| | logger.info(f"Input length {input_ids.shape[1]} is close to max_length, using max_new_tokens instead")
|
| | del generation_config["max_length"]
|
| |
|
| |
|
| | output_ids = self.gpt2_model.generate(input_ids, **generation_config)
|
| |
|
| |
|
| | if torch.is_tensor(output_ids):
|
| | generated_text = self.tokenizer.decode(output_ids[0].cpu(), skip_special_tokens=True)
|
| | else:
|
| | generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
|
| |
|
| | return generated_text
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error in GPT-2 generation: {e}", exc_info=True)
|
| | return f"Error generating response: {str(e)}"
|
| |
|
| | def generate_streaming(self, prompt=None, input_ids=None, **kwargs):
|
| | """Generate tokens one by one in streaming fashion"""
|
| | self._ensure_model_loaded()
|
| | try:
|
| |
|
| | if prompt is not None and input_ids is None:
|
| | inputs = self.tokenizer(
|
| | prompt,
|
| | return_tensors="pt",
|
| | padding=True,
|
| | truncation=True,
|
| | max_length=self.max_length
|
| | )
|
| | input_ids = inputs.input_ids
|
| |
|
| |
|
| | max_length = kwargs.get('max_length', min(self.max_length, 200))
|
| | temperature = kwargs.get('temperature', 0.7)
|
| | top_p = kwargs.get('top_p', 0.9)
|
| |
|
| |
|
| | from transformers import TextIteratorStreamer
|
| | from threading import Thread
|
| |
|
| | streamer = TextIteratorStreamer(
|
| | self.tokenizer,
|
| | timeout=10.0,
|
| | skip_prompt=True,
|
| | skip_special_tokens=True
|
| | )
|
| |
|
| | generation_kwargs = dict(
|
| | input_ids=input_ids,
|
| | max_length=max_length,
|
| | temperature=temperature,
|
| | top_p=top_p,
|
| | streamer=streamer,
|
| | do_sample=True,
|
| | )
|
| |
|
| |
|
| | thread = Thread(target=self.gpt2_model.generate, kwargs=generation_kwargs)
|
| | thread.start()
|
| |
|
| |
|
| | for token in streamer:
|
| | yield token
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error in streaming generation: {e}", exc_info=True)
|
| | yield f"Error: {str(e)}"
|
| |
|
| |
|
| | class PretrainedTransformer(nn.Module, AbstractModel):
|
| | """A simple wrapper around a pretrained Hugging Face transformer model."""
|
| | def __init__(
|
| | self,
|
| | vocab_size=50257,
|
| | specialization="general",
|
| | dataset_path=None,
|
| | model_name="gpt2",
|
| | embedding_dim=768,
|
| | num_heads=12,
|
| | hidden_dim=768,
|
| | num_layers=6,
|
| | output_size=768,
|
| | dropout=0.1,
|
| | max_seq_length=1024,
|
| | pooling_mode="last",
|
| | tokenizer=None,
|
| | **kwargs
|
| | ) -> None:
|
| | super().__init__()
|
| |
|
| |
|
| | self.model_last_used = {}
|
| |
|
| |
|
| |
|
| |
|
| | if tokenizer is not None:
|
| | self.tokenizer = tokenizer
|
| | else:
|
| |
|
| | if registry.has(TOKENIZER):
|
| | self.tokenizer = registry.get(TOKENIZER)
|
| | else:
|
| | try:
|
| | self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
|
| | logger.info("Loaded primary tokenizer: gpt2")
|
| |
|
| | if self.tokenizer.pad_token is None:
|
| | self.tokenizer.pad_token = self.tokenizer.eos_token
|
| | except Exception as e:
|
| | logger.warning(f"Primary tokenizer load failed: {e}")
|
| | self.tokenizer = None
|
| | registry.register(TOKENIZER, self.tokenizer)
|
| |
|
| |
|
| | self.model_name = model_name
|
| | self.fallback_model = "gpt2"
|
| |
|
| |
|
| | self.model = AutoModelForCausalLM.from_pretrained(model_name)
|
| | try:
|
| | self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
| |
|
| | if self.tokenizer.pad_token is None:
|
| | self.tokenizer.pad_token = self.tokenizer.eos_token
|
| | except Exception as e:
|
| | logger.error(f"Failed to load tokenizer for {model_name}: {e}")
|
| | self.tokenizer = None
|
| |
|
| | def forward(self, input_ids, attention_mask=None):
|
| | outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
|
| | return outputs.last_hidden_state
|
| |
|
| | def encode(self, text: str):
|
| | if not self.tokenizer:
|
| | raise ValueError("Tokenizer not available")
|
| | inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)
|
| | with torch.no_grad():
|
| | outputs = self.forward(inputs.input_ids, inputs.get("attention_mask"))
|
| |
|
| | return outputs.mean(dim=1)
|
| |
|
| | def generate(self, input_ids, max_length=100, **kwargs):
|
| |
|
| | if hasattr(self.model, "generate"):
|
| | return self.model.generate(input_ids=input_ids, max_length=max_length, **kwargs)
|
| | else:
|
| |
|
| | return input_ids
|
| |
|
| |
|
| | registry.register("model_class_pretrained", Wildnerve_tlm01)
|
| | registry.register("pretrained_transformer_class", PretrainedTransformer)
|
| |
|
| |
|
| | def initialize_pretrained_model():
|
| | """Attempt to initialize a pretrained tokenizer with a fallback mechanism.
|
| | Tries to load 'bert-base-uncased' first; if that fails, attempts to load 'gpt2'.
|
| | If the fallback is used, then reattempts loading 'bert-base-uncased' on subsequent tries.
|
| | Repeats up to 5 attempts in total.
|
| | Returns:
|
| | The initialized tokenizer instance if successful, otherwise None."""
|
| | max_attempts = 5
|
| | for attempt in range(1, max_attempts + 1):
|
| | try:
|
| | tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
|
| | logger.info(f"Attempt {attempt}: Successfully loaded bert-base-uncased.")
|
| | return tokenizer
|
| | except Exception as e:
|
| | logger.warning(f"Attempt {attempt}: Loading bert-base-uncased failed: {e}")
|
| | try:
|
| | tokenizer = AutoTokenizer.from_pretrained("gpt2")
|
| | logger.info(f"Attempt {attempt}: Successfully loaded gpt2 as fallback.")
|
| | return tokenizer
|
| | except Exception as e2:
|
| | logger.warning(f"Attempt {attempt}: Loading gpt2 failed as fallback: {e2}")
|
| | logger.info("Retrying tokenizer initialization...")
|
| | logger.error("Failed to initialize pretrained model tokenizer after 5 attempts.")
|
| | return None
|
| |
|
| | """
|
| | Pretrained model wrapper for Wildnerve-tlm01
|
| | """
|
| | import logging
|
| | import torch
|
| | from transformers import AutoModelForCausalLM, AutoTokenizer
|
| | from service_registry import registry, PRETRAINED_MODEL, TOKENIZER
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class Wildnerve_tlm01:
|
| | """
|
| | A wrapper for transformer models from HuggingFace.
|
| | Provides the same interface as our custom models for consistency.
|
| | """
|
| | def __init__(
|
| | self,
|
| | model_name="gpt2",
|
| | tokenizer=None,
|
| | device=None,
|
| | **kwargs
|
| | ):
|
| | self.model_name = model_name
|
| |
|
| |
|
| | if tokenizer is not None:
|
| | self.tokenizer = tokenizer
|
| | elif registry.has(TOKENIZER):
|
| | self.tokenizer = registry.get(TOKENIZER)
|
| | else:
|
| | try:
|
| | self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
| | logger.info(f"Initialized tokenizer from {model_name}")
|
| | except Exception as e:
|
| | logger.error(f"Failed to initialize tokenizer: {e}")
|
| | self.tokenizer = None
|
| |
|
| | try:
|
| | self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| | logger.info(f"Loading pretrained model from {model_name} on {self.device}")
|
| |
|
| |
|
| |
|
| | self.model = None
|
| | logger.info(f"Created simplified pretrained model wrapper")
|
| | except Exception as e:
|
| | logger.error(f"Failed to initialize pretrained model: {e}")
|
| | self.model = None
|
| |
|
| | def generate(self, prompt, **kwargs):
|
| | """Generate a response to the given prompt"""
|
| | return f"I processed your request about '{prompt[:20]}...' using my pretrained capabilities."
|
| |
|
| | def __call__(self, input_ids, attention_mask=None):
|
| | """Forward pass for HuggingFace compatibility"""
|
| |
|
| | batch_size = input |