import torch import torch.nn as nn from torch.nn import functional as F import json import os # --- Hyperparameters --- # These are the settings for our model. You can experiment with these values. batch_size = 64 # Increased from 32 to process more sequences in parallel block_size = 32 # Increased from 8. This is the maximum context length for predictions. A larger value helps the model see more of the text, leading to better coherence. max_iters = 15000 # Increased from 3000 to give the model more training time to learn complex patterns. eval_interval = 500 # How often to evaluate the model learning_rate = 3e-4 # A slightly lower learning rate is often better for more complex models. device = 'cuda' if torch.cuda.is_available() else 'cpu' # Use GPU if available eval_iters = 200 # Number of iterations for evaluation n_embd = 64 # Increased from 32. The dimension of the token embeddings. A larger embedding size allows the model to store more information about each character. n_layer = 4 # Increased from 2. The number of LSTM layers. More layers can capture more abstract patterns. dropout = 0.0 # Dropout rate for regularization # --- Data Preparation --- # This code now expects a 'dataset.jsonl' file to be present in the same directory. file_path = 'dataset.jsonl' corpus = "" try: with open(file_path, 'r') as f: for line in f: data_point = json.loads(line) # The corrected line now uses 'header' and 'formal_statement' corpus += data_point['header'] + '\n' + data_point['formal_statement'] + '\n' except FileNotFoundError: print(f"Error: The file '{file_path}' was not found. Please create it and run again.") exit() except (json.JSONDecodeError, KeyError) as e: print(f"Error: There was a problem parsing a line in '{file_path}'. Details: {e}") exit() if not corpus: print("Error: The corpus is empty. The dataset file might be empty or incorrectly formatted.") exit() # Create a simple character-level tokenizer. chars = sorted(list(set(corpus))) vocab_size = len(chars) stoi = {ch: i for i, ch in enumerate(chars)} itos = {i: ch for i, ch in enumerate(chars)} encode = lambda s: [stoi[c] for c in s] decode = lambda l: ''.join([itos[i] for i in l]) # Convert the entire text into a PyTorch tensor. data = torch.tensor(encode(corpus), dtype=torch.long) # Create a simple train/validation split. n = int(0.9 * len(data)) train_data = data[:n] val_data = data[n:] # --- Helper Functions --- # This function gets a random batch of data from either the training or validation set. def get_batch(split): data = train_data if split == 'train' else val_data ix = torch.randint(len(data) - block_size, (batch_size,)) x = torch.stack([data[i:i + block_size] for i in ix]) y = torch.stack([data[i + 1:i + block_size + 1] for i in ix]) x, y = x.to(device), y.to(device) return x, y # This function is used to estimate the model's loss. @torch.no_grad() def estimate_loss(): out = {} model.eval() # Set the model to evaluation mode. for split in ['train', 'val']: losses = torch.zeros(eval_iters) for k in range(eval_iters): X, Y = get_batch(split) logits, loss = model(X, Y) losses[k] = loss.item() out[split] = losses.mean() model.train() # Set the model back to training mode. return out # --- The Main LSTM Language Model --- class LanguageModel(nn.Module): def __init__(self): super().__init__() # An embedding table to convert tokens to dense vectors. self.token_embedding_table = nn.Embedding(vocab_size, n_embd) # An LSTM layer to process the sequence. self.lstm = nn.LSTM(n_embd, n_embd, num_layers=n_layer, batch_first=True) # A final linear layer to project the LSTM's output to the vocabulary size. self.lm_head = nn.Linear(n_embd, vocab_size) def forward(self, idx, targets=None): # Get the token embeddings. tok_emb = self.token_embedding_table(idx) # (B, T, n_embd) # Pass the embeddings through the LSTM layer. lstm_out, _ = self.lstm(tok_emb) # lstm_out shape: (B, T, n_embd) # Project the LSTM's output to the vocabulary size to get logits. logits = self.lm_head(lstm_out) # (B, T, vocab_size) loss = None if targets is not None: # Reshape for cross-entropy loss calculation. B, T, C = logits.shape logits = logits.view(B * T, C) targets = targets.view(B * T) loss = F.cross_entropy(logits, targets) return logits, loss def generate(self, idx, max_new_tokens): # The `generate` method for LSTMs needs to handle hidden and cell states. h_and_c = None # Start with no hidden state. for _ in range(max_new_tokens): # We only need the last token to predict the next one. idx_cond = idx[:, -1].unsqueeze(1) # (B, 1) tok_emb = self.token_embedding_table(idx_cond) # (B, 1, n_embd) # Pass the single token through the LSTM, along with the previous hidden state. lstm_out, h_and_c = self.lstm(tok_emb, h_and_c) # Focus on the output of the last time step. logits = self.lm_head(lstm_out[:, -1, :]) # (B, vocab_size) # Apply softmax to get probabilities. probs = F.softmax(logits, dim=-1) # Sample from the distribution. idx_next = torch.multinomial(probs, num_samples=1) # Append the new token to the sequence. idx = torch.cat((idx, idx_next), dim=1) return idx # --- Training and Generation --- model = LanguageModel() m = model.to(device) # Create a PyTorch optimizer. optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate) # Main training loop. for iter in range(max_iters): # Every few iterations, evaluate the loss on both splits. if iter % eval_interval == 0: losses = estimate_loss() print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}") # Sample a batch of data. xb, yb = get_batch('train') # Forward pass: compute loss. logits, loss = model(xb, yb) # Backward pass: compute gradients. optimizer.zero_grad(set_to_none=True) loss.backward() # Update the model parameters. optimizer.step() # --- Generate new text from the trained model --- context = torch.zeros((1, 1), dtype=torch.long, device=device) generated_text_indices = m.generate(context, max_new_tokens=20) print("\nGenerated text:") print(decode(generated_text_indices[0].tolist())) # Save the model's state dictionary after training torch.save(m.state_dict(), 'model.pt') print("Model saved to model.pt")