Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import pandas as pd | |
| from collections import Counter | |
| from sklearn.preprocessing import LabelEncoder | |
| from torch.utils.data import Dataset, DataLoader | |
| import pickle | |
| import re | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| import gradio as gr | |
| import os | |
| import nltk | |
| from datasets import load_dataset # Import the datasets library | |
| # Download NLTK resources | |
| nltk.download("stopwords", quiet=True) | |
| nltk.download("wordnet", quiet=True) | |
| # Initialize stopwords and lemmatizer globally | |
| stop_words = set(stopwords.words("english")) | |
| lemmatizer = WordNetLemmatizer() | |
| # Device configuration | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Dataset Class | |
| class AmazonReviewDataset(Dataset): | |
| def __init__(self, dataset, max_length=50, sample_fraction=0.01, max_vocab_size=5000): | |
| # Load dataset | |
| print("Loading dataset from Hugging Face Dataset repository...") | |
| self.data = dataset.to_pandas() # Convert Hugging Face dataset to pandas DataFrame | |
| self.data = self.data.sample(frac=sample_fraction, random_state=42).reset_index(drop=True) | |
| print(f"Using {len(self.data)} samples ({sample_fraction * 100:.2f}% of the dataset).") | |
| # Clean text data | |
| self.data["text"] = self.data["text"].apply(self.clean_text) | |
| # Parameters | |
| self.max_length = max_length | |
| self.vocab = {"<PAD>": 0, "<UNK>": 1} | |
| self.label_encoder = LabelEncoder() | |
| # Build vocabulary | |
| print("Building vocabulary...") | |
| self._build_vocab(max_vocab_size) | |
| print("Vocabulary built successfully.") | |
| # Fit the label encoder using the 'polarity' column (change here) | |
| self.label_encoder.fit(self.data["polarity"]) | |
| def clean_text(self, text): | |
| # Remove special characters and numbers | |
| text = re.sub(r"[^a-zA-Z\s]", "", text) | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove stopwords | |
| text = " ".join([word for word in text.split() if word not in stop_words]) | |
| # Apply lemmatization | |
| text = " ".join([lemmatizer.lemmatize(word) for word in text.split()]) | |
| return text | |
| def _build_vocab(self, max_vocab_size): | |
| # Combine title and text columns | |
| all_text = self.data["title"].astype(str) + " " + self.data["text"].astype(str) | |
| all_text = all_text.fillna("") # Ensure no NaN values | |
| all_text = all_text[:50000] # Use only the first 50,000 rows | |
| # Tokenize and build vocabulary in smaller chunks | |
| token_counts = Counter() | |
| chunk_size = 5000 # Process smaller chunks | |
| for i in range(0, len(all_text), chunk_size): | |
| chunk = all_text[i:i + chunk_size] | |
| tokens = " ".join(chunk).split() # Tokenize the chunk | |
| token_counts.update(tokens) | |
| print(f"Processed {min(i + chunk_size, len(all_text))} rows...") | |
| # Keep only the most common tokens | |
| most_common_tokens = [token for token, _ in token_counts.most_common(max_vocab_size)] | |
| for token in most_common_tokens: | |
| self.vocab[token] = len(self.vocab) | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| # Update label handling to use the 'polarity' column | |
| label = self.data.iloc[idx]["polarity"] # Changed to 'polarity' | |
| title = str(self.data.iloc[idx]["title"]) | |
| text = str(self.data.iloc[idx]["text"]) | |
| combined_text = title + " " + text # Concatenate title and text | |
| tokens = combined_text.split()[:self.max_length] # Tokenize and truncate | |
| token_ids = [self.vocab.get(token, self.vocab["<UNK>"]) for token in tokens] # Convert tokens to IDs | |
| padding = [self.vocab["<PAD>"]] * (self.max_length - len(token_ids)) # Add padding | |
| token_ids += padding | |
| label_encoded = self.label_encoder.transform([label])[0] # Encode label | |
| return torch.tensor(token_ids, dtype=torch.long).to(device), torch.tensor(label_encoded, dtype=torch.long).to(device) | |
| # Policy Network | |
| class PolicyNetwork(nn.Module): | |
| def __init__(self, vocab_size, embed_dim=32, hidden_dim=128, num_classes=2): | |
| super(PolicyNetwork, self).__init__() | |
| self.embedding = nn.Embedding(vocab_size, embed_dim) | |
| self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True) | |
| self.fc = nn.Linear(hidden_dim * 2, num_classes) # Bidirectional LSTM doubles hidden size | |
| def forward(self, x): | |
| embedded = self.embedding(x) | |
| lstm_out, _ = self.lstm(embedded) | |
| out = self.fc(lstm_out[:, -1, :]) # Use the last hidden state | |
| return out | |
| # Training Function | |
| def train_rl_model(dataset, policy_net, optimizer, num_episodes=3, entropy_weight=0.01, lr=0.001, batch_size=16): | |
| dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4) | |
| for episode in range(num_episodes): | |
| print(f"Episode {episode + 1} started.") | |
| total_reward = 0 | |
| for batch in dataloader: | |
| tokenized_reviews, true_labels = batch | |
| logits = policy_net(tokenized_reviews) | |
| probs = torch.softmax(logits, dim=-1) | |
| actions = torch.multinomial(probs, 1).squeeze() | |
| # Define rewards based on correctness | |
| rewards = [1 if action == label else -1 for action, label in zip(actions, true_labels)] | |
| rewards_tensor = torch.tensor(rewards, dtype=torch.float32).to(device) | |
| rewards_tensor = (rewards_tensor - rewards_tensor.mean()) / (rewards_tensor.std() + 1e-8) # Normalize rewards | |
| # Compute loss | |
| loss = 0 | |
| entropy_loss = 0 | |
| for i, action in enumerate(actions): | |
| log_prob = torch.log(probs[i, action] + 1e-8) | |
| loss += -log_prob * rewards_tensor[i] | |
| entropy_loss += -(probs[i] * torch.log(probs[i] + 1e-8)).sum() | |
| loss += entropy_weight * entropy_loss | |
| # Backpropagation | |
| optimizer.zero_grad() | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| total_reward += sum(rewards) | |
| print(f"Episode {episode + 1}, Total Reward: {total_reward}, Loss: {loss.item()}") | |
| # Save the trained model as model1.pth | |
| torch.save(policy_net.state_dict(), "model1.pth") | |
| print("Model saved successfully as model1.pth") | |
| # Evaluation Function | |
| def evaluate_model(dataset, policy_net): | |
| dataloader = DataLoader(dataset, batch_size=16, shuffle=False, num_workers=4) | |
| correct = 0 | |
| total = 0 | |
| policy_net.eval() | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| tokenized_reviews, true_labels = batch | |
| logits = policy_net(tokenized_reviews) | |
| probs = torch.softmax(logits, dim=-1) | |
| predicted_classes = torch.argmax(probs, dim=-1) | |
| correct += (predicted_classes == true_labels).sum().item() | |
| total += true_labels.size(0) | |
| accuracy = correct / total | |
| print(f"Accuracy: {accuracy * 100:.2f}%") | |
| return accuracy | |
| # Prediction Function for Gradio | |
| def predict_review(review_text): | |
| with open("vocab.pkl", "rb") as f: | |
| vocab = pickle.load(f) | |
| with open("label_encoder.pkl", "rb") as f: | |
| label_encoder = pickle.load(f) | |
| tokenized_input = review_text.split()[:50] # Limit to max length | |
| token_ids = [vocab.get(word, vocab["<UNK>"]) for word in tokenized_input] | |
| padding = [vocab["<PAD>"]] * (50 - len(token_ids)) # Pad if shorter than max length | |
| token_ids += padding | |
| token_ids = torch.tensor(token_ids).unsqueeze(0).to(device) | |
| policy_net = PolicyNetwork(len(vocab), embed_dim=32, hidden_dim=128, num_classes=2).to(device) | |
| policy_net.load_state_dict(torch.load("model1.pth")) | |
| policy_net.eval() | |
| with torch.no_grad(): | |
| logits = policy_net(token_ids) | |
| probs = torch.softmax(logits, dim=-1) | |
| predicted_class = torch.argmax(probs, dim=-1).item() | |
| predicted_label = label_encoder.inverse_transform([predicted_class])[0] | |
| return predicted_label | |
| # Main Program | |
| if __name__ == "__main__": | |
| # Load datasets from Hugging Face Dataset repository | |
| dataset = load_dataset("harshjoshi2211/amazon_review_dataset1") | |
| train_dataset = AmazonReviewDataset(dataset["train"]) | |
| test_dataset = AmazonReviewDataset(dataset["test"]) | |
| print("Dataset loaded successfully.") | |
| # Initialize model and optimizer | |
| policy_net = PolicyNetwork(len(train_dataset.vocab), embed_dim=32, hidden_dim=128, num_classes=2).to(device) | |
| optimizer = optim.Adam(policy_net.parameters(), lr=0.001) | |
| # Train the model | |
| train_rl_model(train_dataset, policy_net, optimizer) | |
| # Evaluate the model | |
| evaluate_model(test_dataset, policy_net) | |
| # Save vocabulary and label encoder | |
| with open("vocab.pkl", "wb") as f: | |
| pickle.dump(train_dataset.vocab, f) | |
| with open("label_encoder.pkl", "wb") as f: | |
| pickle.dump(train_dataset.label_encoder, f) | |
| print("Vocabulary and label encoder saved successfully.") | |
| # Launch Gradio interface | |
| iface = gr.Interface( | |
| fn=predict_review, | |
| inputs="text", | |
| outputs="text", | |
| title="Amazon Review Sentiment Analysis", | |
| description="Enter a review to predict its sentiment (Positive/Negative)." | |
| ) | |
| iface.launch(share=True) | |