import torch import torch.nn as nn import torch.optim as optim import numpy as np import json import pandas as pd from torch.utils.data import Dataset, DataLoader import math # Set device device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") # Custom Dataset Class class TextDataset(Dataset): def __init__(self, texts, tokenizer, max_length=128): self.texts = texts self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.texts) def __getitem__(self, idx): text = str(self.texts[idx]) tokens = self.tokenizer.encode(text, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt') input_ids = tokens.squeeze(0) # For language modeling, target is input shifted by 1 target_ids = torch.cat([input_ids[1:], torch.tensor([self.tokenizer.pad_token_id])], dim=0) return input_ids, target_ids # Simple Character-level Tokenizer class CharacterTokenizer: def __init__(self): self.char_to_idx = {} self.idx_to_char = {} self.vocab_size = 0 self.pad_token_id = 0 self.unk_token_id = 1 def fit(self, texts): # Build vocabulary from characters chars = set() for text in texts: chars.update(list(str(text))) # Add special tokens self.char_to_idx[''] = 0 self.char_to_idx[''] = 1 # Add regular characters for i, char in enumerate(sorted(chars)): self.char_to_idx[char] = i + 2 # Create reverse mapping self.idx_to_char = {v: k for k, v in self.char_to_idx.items()} self.vocab_size = len(self.char_to_idx) def encode(self, text, max_length=None, padding=False, truncation=False, return_tensors=None): if isinstance(text, str): text = [text] encoded = [] for t in text: tokens = [self.char_to_idx.get(c, self.unk_token_id) for c in str(t)] if truncation and max_length: tokens = tokens[:max_length] if padding and max_length: tokens = tokens + [self.pad_token_id] * (max_length - len(tokens)) encoded.append(tokens) if return_tensors == 'pt': return torch.tensor(encoded, dtype=torch.long) return encoded def decode(self, token_ids): if isinstance(token_ids, torch.Tensor): token_ids = token_ids.tolist() chars = [self.idx_to_char.get(idx, '') for idx in token_ids] return ''.join(chars) # Transformer Language Model class TransformerLM(nn.Module): def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dim_feedforward=1024, max_seq_length=128): super(TransformerLM, self).__init__() self.d_model = d_model self.embedding = nn.Embedding(vocab_size, d_model) self.pos_embedding = nn.Embedding(max_seq_length, d_model) encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) self.output_layer = nn.Linear(d_model, vocab_size) self.max_seq_length = max_seq_length def forward(self, src): seq_len = src.size(1) pos = torch.arange(0, seq_len, device=src.device).unsqueeze(0) # Embedding + positional encoding src_emb = self.embedding(src) * math.sqrt(self.d_model) pos_emb = self.pos_embedding(pos) src_emb = src_emb + pos_emb # Create mask for padding (optional) # src_key_padding_mask = (src == 0) # Assuming 0 is pad token # Transformer encoder output = self.transformer_encoder(src_emb) # , src_key_padding_mask=src_key_padding_mask) # Output projection logits = self.output_layer(output) return logits # Load dataset print("Loading dataset...") df = pd.read_csv('data/dataset.csv') texts = df['text'].tolist() print(f"Loaded {len(texts)} text samples") # Initialize tokenizer tokenizer = CharacterTokenizer() tokenizer.fit(texts) print(f"Vocabulary size: {tokenizer.vocab_size}") # Create dataset and dataloader dataset = TextDataset(texts, tokenizer, max_length=64) dataloader = DataLoader(dataset, batch_size=4, shuffle=True) # Initialize model model = TransformerLM( vocab_size=tokenizer.vocab_size, d_model=256, nhead=8, num_layers=4, dim_feedforward=1024, max_seq_length=64 ).to(device) print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}") # Loss and optimizer criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id) optimizer = optim.AdamW(model.parameters(), lr=0.001) # Training loop num_epochs = 10 model.train() print("Starting training...") for epoch in range(num_epochs): total_loss = 0 num_batches = 0 for batch_idx, (input_ids, target_ids) in enumerate(dataloader): input_ids = input_ids.to(device) target_ids = target_ids.to(device) # Forward pass optimizer.zero_grad() logits = model(input_ids) # Reshape for loss calculation: (batch_size * seq_len, vocab_size) loss = criterion(logits.view(-1, logits.size(-1)), target_ids.view(-1)) # Backward pass loss.backward() optimizer.step() total_loss += loss.item() num_batches += 1 if batch_idx % 10 == 0: print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}/{len(dataloader)}], Loss: {loss.item():.4f}') avg_loss = total_loss / num_batches print(f'Epoch [{epoch+1}/{num_epochs}] Completed - Average Loss: {avg_loss:.4f}') # Save model and tokenizer print("Saving model and tokenizer...") torch.save({ 'model_state_dict': model.state_dict(), 'tokenizer': tokenizer, 'model_config': { 'vocab_size': tokenizer.vocab_size, 'd_model': 256, 'nhead': 8, 'num_layers': 4, 'dim_feedforward': 1024, 'max_seq_length': 64 } }, 'custom_llm_model.pth') print("Training completed! Model saved as 'custom_llm_model.pth'") # Test generation def generate_text(model, tokenizer, prompt, max_length=50, temperature=0.8): model.eval() with torch.no_grad(): # Tokenize prompt input_ids = tokenizer.encode(prompt, max_length=32, padding=False, return_tensors='pt') input_ids = input_ids.to(device) generated = input_ids.clone() for _ in range(max_length): # Get model predictions logits = model(generated) next_token_logits = logits[0, -1, :] / temperature # Apply softmax to get probabilities probs = torch.softmax(next_token_logits, dim=-1) # Sample next token next_token = torch.multinomial(probs, num_samples=1) # Append to generated sequence generated = torch.cat([generated, next_token.unsqueeze(0)], dim=1) # Stop if we generate a period or reach reasonable length if next_token.item() == tokenizer.char_to_idx.get('.', tokenizer.unk_token_id): break # Decode generated text generated_text = tokenizer.decode(generated[0]) return generated_text # Test the model print("\nTesting generation:") test_prompts = ["Hello", "The weather", "Deep learning"] for prompt in test_prompts: generated = generate_text(model, tokenizer, prompt, max_length=30) print(f"Prompt: '{prompt}' -> Generated: '{generated}'")