| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | import numpy as np |
| | import pickle |
| | from torch.utils.data import Dataset, DataLoader |
| | from safetensors.torch import save_file |
| | import logging |
| | import json |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| |
|
| | |
| | sequence_length = 64 |
| | batch_size = 16 |
| | num_epochs = 1 |
| | learning_rate = 0.00001 |
| | embedding_dim = 256 |
| | hidden_dim = 800 |
| | num_layers = 4 |
| |
|
| | |
| | logging.info('Reading the text file...') |
| | with open('fulltext.json', 'r') as file: |
| | text = file.read() |
| | logging.info('Text file read successfully.') |
| |
|
| | |
| | logging.info('Preprocessing the text...') |
| | words = json.loads(text) |
| | vocab = set(words) |
| | vocab.add('<pad>') |
| | vocab.add('<UNK>') |
| | word2idx = {word: idx for idx, word in enumerate(vocab)} |
| | idx2word = {idx: word for idx, word in enumerate(vocab)} |
| | vocab_size = len(vocab) |
| |
|
| | logging.info(f'Vocabulary size: {vocab_size}') |
| | |
| |
|
| | |
| | logging.info('Creating sequences...') |
| | sequences = [] |
| | for i in range(len(words) - sequence_length): |
| | seq = words[i:i + sequence_length] |
| | label = words[i + sequence_length] |
| | sequences.append((seq, label)) |
| |
|
| | logging.info(f'Number of sequences: {len(sequences)}') |
| |
|
| | |
| | class TextDataset(Dataset): |
| | def __init__(self, sequences, word2idx): |
| | self.sequences = sequences |
| | self.word2idx = word2idx |
| |
|
| | def __len__(self): |
| | return len(self.sequences) |
| |
|
| | def __getitem__(self, idx): |
| | seq, label = self.sequences[idx] |
| | seq_idx = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in seq] |
| | label_idx = self.word2idx.get(label, self.word2idx['<UNK>']) |
| | return torch.tensor(seq_idx, dtype=torch.long), torch.tensor(label_idx, dtype=torch.long) |
| |
|
| | logging.info('Creating dataset and dataloader...') |
| | dataset = TextDataset(sequences, word2idx) |
| | dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) |
| |
|
| | |
| | class LSTMModel(nn.Module): |
| | def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers): |
| | super(LSTMModel, self).__init__() |
| | self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| | self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True) |
| | self.fc = nn.Linear(hidden_dim, vocab_size) |
| |
|
| | def forward(self, x): |
| | embeds = self.embedding(x) |
| | lstm_out, _ = self.lstm(embeds) |
| | logits = self.fc(lstm_out[:, -1, :]) |
| | return logits |
| |
|
| | logging.info('Initializing the LSTM model...') |
| | model = LSTMModel(vocab_size, embedding_dim, hidden_dim, num_layers) |
| | criterion = nn.CrossEntropyLoss() |
| | optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
| |
|
| | |
| | logging.info('Starting training...') |
| | for epoch in range(num_epochs): |
| | for batch_idx, batch in enumerate(dataloader): |
| | inputs, targets = batch |
| | outputs = model(inputs) |
| | loss = criterion(outputs, targets) |
| |
|
| | optimizer.zero_grad() |
| | loss.backward() |
| | optimizer.step() |
| |
|
| | if batch_idx % 10 == 0: |
| | logging.info(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}/{len(dataloader)}], Loss: {loss.item():.4f}') |
| |
|
| | |
| | logging.info('Saving the model...') |
| | save_file(model.state_dict(), 'lstm_model.safetensors') |
| | with open('word2idx.pkl', 'wb') as f: |
| | pickle.dump(word2idx, f) |
| | with open('idx2word.pkl', 'wb') as f: |
| | pickle.dump(idx2word, f) |
| |
|
| | logging.info('Model and vocabulary saved successfully.') |
| |
|