sentiment_anals / src /data /preprocessing.py
abdou21367's picture
Upload 64 files
839c56d verified
"""
Text preprocessing and dataset utilities for sentiment analysis
Handles cleaning, vocabulary building, VADER features, and PyTorch dataset creation
"""
import re
import string
import pickle
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
import emoji
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from autocorrect import Speller
import torch
from torch.utils.data import Dataset, DataLoader
class EnhancedTextPreprocessor:
def __init__(self, vocab_size=10000, max_length=100, min_freq=2,
use_spell_check=False, use_lemmatization=False):
self.vocab_size = vocab_size
self.max_length = max_length
self.min_freq = min_freq
self.use_spell_check = use_spell_check
self.use_lemmatization = use_lemmatization
self.word2idx = {}
self.idx2word = {}
self.word_counts = Counter()
self.pad_token = '<PAD>'
self.unk_token = '<UNK>'
self.sos_token = '<SOS>'
self.eos_token = '<EOS>'
self.special_tokens = [self.pad_token, self.unk_token, self.sos_token, self.eos_token]
self.vader = SentimentIntensityAnalyzer()
if self.use_spell_check:
self.spell = Speller(lang='en')
self.negations = [
"not", "no", "never", "neither", "nobody", "nothing", "nowhere",
"none", "hardly", "scarcely", "barely", "doesn't", "isn't", "wasn't",
"shouldn't", "wouldn't", "couldn't", "won't", "can't", "don't"
]
self.slang_dict = {
"u": "you", "ur": "your", "r": "are", "y": "why",
"btw": "by the way", "idk": "i do not know", "imo": "in my opinion",
"tbh": "to be honest", "lol": "laughing out loud", "omg": "oh my god",
"wtf": "what the fuck", "smh": "shaking my head", "fyi": "for your information",
"asap": "as soon as possible", "brb": "be right back", "irl": "in real life",
"jk": "just kidding", "nvm": "never mind", "pls": "please", "thx": "thanks",
"gonna": "going to", "wanna": "want to", "gotta": "got to",
"kinda": "kind of", "sorta": "sort of", "dunno": "do not know",
"yeah": "yes", "yep": "yes", "nope": "no", "yup": "yes"
}
self.contractions = {
"can't": "can not", "won't": "will not", "n't": " not",
"'re": " are", "'ve": " have", "'ll": " will", "'d": " would",
"'m": " am", "i'm": "i am", "you're": "you are", "he's": "he is",
"she's": "she is", "it's": "it is", "we're": "we are", "they're": "they are",
"i've": "i have", "you've": "you have", "we've": "we have", "they've": "they have",
"i'll": "i will", "you'll": "you will", "he'll": "he will", "she'll": "she will",
"we'll": "we will", "they'll": "they will", "i'd": "i would", "you'd": "you would",
"he'd": "he would", "she'd": "she would", "we'd": "we would", "they'd": "they would",
"isn't": "is not", "aren't": "are not", "wasn't": "was not", "weren't": "were not",
"hasn't": "has not", "haven't": "have not", "hadn't": "had not",
"doesn't": "does not", "don't": "do not", "didn't": "did not",
"couldn't": "could not", "shouldn't": "should not", "wouldn't": "would not"
}
def clean_text(self, text):
if not isinstance(text, str) or not text.strip():
return ""
text = text.lower()
text = emoji.demojize(text, delimiters=(" ", " "))
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
text = re.sub(r'\S+@\S+', '', text)
for contraction, expansion in self.contractions.items():
text = text.replace(contraction, expansion)
words = text.split()
words = [self.slang_dict.get(word, word) for word in words]
text = ' '.join(words)
text = self.handle_negations_v2(text)
text = re.sub(r'(.)\1{2,}', r'\1\1', text)
text = re.sub(r'[^a-zA-Z0-9\s_]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
if self.use_spell_check:
words = text.split()
words = [self.spell(word) if len(word) > 2 else word for word in words]
text = ' '.join(words)
return text
def handle_negations_v2(self, text, window=3):
words = text.split()
result = []
i = 0
while i < len(words):
current_word = words[i].lower()
is_negation = any(neg in current_word for neg in self.negations)
if is_negation:
result.append(current_word)
marked = 0
for j in range(1, min(window + 1, len(words) - i)):
next_word = words[i + j].lower()
if any(neg in next_word for neg in self.negations):
break
result.append(f"NEG_{words[i + j]}")
marked = j
i += marked + 1
else:
result.append(words[i])
i += 1
return ' '.join(result)
def compute_vader_features(self, text):
try:
scores = self.vader.polarity_scores(text)
return np.array([
scores['compound'],
scores['pos'],
scores['neu'],
scores['neg']
], dtype=np.float32)
except Exception as e:
print(f"VADER error for text '{text[:50]}...': {e}")
return np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32)
def extract_text_features(self, text):
features = {}
features['length'] = len(text.split())
features['char_length'] = len(text)
features['caps_ratio'] = sum(1 for c in text if c.isupper()) / max(len(text), 1)
features['punct_ratio'] = sum(1 for c in text if c in string.punctuation) / max(len(text), 1)
features['avg_word_length'] = np.mean([len(word) for word in text.split()]) if text.split() else 0
return features
def build_vocabulary(self, texts):
print("Building vocabulary...")
for text in texts:
cleaned = self.clean_text(text)
words = cleaned.split()
self.word_counts.update(words)
for token in self.special_tokens:
self.word2idx[token] = len(self.word2idx)
self.idx2word[len(self.idx2word)] = token
filtered_words = [word for word, count in self.word_counts.most_common()
if count >= self.min_freq]
for word in filtered_words[:self.vocab_size - len(self.special_tokens)]:
if word not in self.word2idx:
idx = len(self.word2idx)
self.word2idx[word] = idx
self.idx2word[idx] = word
print(f"Vocabulary built: {len(self.word2idx)} words")
print(f"Total word occurrences: {sum(self.word_counts.values())}")
print(f"Unique words before filtering: {len(self.word_counts)}")
def text_to_sequence(self, text):
cleaned = self.clean_text(text)
words = cleaned.split()
unk_idx = self.word2idx[self.unk_token]
sequence = [self.word2idx.get(word, unk_idx) for word in words]
return sequence
def pad_sequence(self, sequence, add_special_tokens=False):
if add_special_tokens:
sos_idx = self.word2idx[self.sos_token]
eos_idx = self.word2idx[self.eos_token]
sequence = [sos_idx] + sequence + [eos_idx]
if len(sequence) > self.max_length:
sequence = sequence[:self.max_length]
pad_idx = self.word2idx[self.pad_token]
padded = sequence + [pad_idx] * (self.max_length - len(sequence))
return padded
def get_vocab_size(self):
return len(self.word2idx)
def get_padding_idx(self):
return self.word2idx[self.pad_token]
def save_vocabulary(self, path):
vocab_data = {
'word2idx': self.word2idx,
'idx2word': self.idx2word,
'word_counts': self.word_counts,
'vocab_size': self.vocab_size,
'max_length': self.max_length,
'min_freq': self.min_freq
}
with open(path, 'wb') as f:
pickle.dump(vocab_data, f)
print(f"Vocabulary saved to {path}")
def load_vocabulary(self, path):
with open(path, 'rb') as f:
vocab_data = pickle.load(f)
self.word2idx = vocab_data['word2idx']
self.idx2word = vocab_data['idx2word']
self.word_counts = vocab_data['word_counts']
self.vocab_size = vocab_data['vocab_size']
self.max_length = vocab_data['max_length']
self.min_freq = vocab_data['min_freq']
print(f"Vocabulary loaded from {path}")
print(f"Vocabulary size: {len(self.word2idx)}")
def prepare_data(data_path, test_size=0.1, val_size=0.1, random_state=42):
"""
Load and split dataset with proper stratification
Args:
data_path: Path to CSV file
test_size: Proportion for test set
val_size: Proportion for validation set (of remaining after test)
random_state: Seed for reproducibility
Returns:
Dictionary with train/val/test splits
"""
print("="*80)
print("LOADING AND PREPARING DATA")
print("="*80)
df = pd.read_csv(data_path)
print(f"\nDataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print(f"\nClass distribution:")
print(df['sentiment'].value_counts())
print(f"\nClass distribution (%):")
print(df['sentiment'].value_counts(normalize=True) * 100)
# ✅ CRITICAL FIX: Convert to numpy arrays explicitly for sklearn compatibility
# This resolves the type checker error and ensures stratify works correctly
texts = np.asarray(df['text'].values) # Explicit conversion to ndarray
labels = np.asarray(df['label'].values) # Explicit conversion to ndarray
# Verify stratification is possible
label_counts = np.bincount(labels)
min_class_count = np.min(label_counts)
if min_class_count < 2:
raise ValueError(
f"Stratification impossible: class with label {np.argmin(label_counts)} "
f"has only {min_class_count} samples (need at least 2 per class for train/test split)"
)
# First split: train+val vs test
X_temp, X_test, y_temp, y_test = train_test_split(
texts, labels,
test_size=test_size,
random_state=random_state,
stratify=labels # ✅ Now safe with explicit ndarray conversion
)
# Second split: train vs val (adjust val_size proportionally)
val_size_adjusted = val_size / (1 - test_size)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp,
test_size=val_size_adjusted,
random_state=random_state,
stratify=y_temp # ✅ Safe with ndarray
)
print(f"\nData split:")
print(f" Train: {len(X_train)} ({len(X_train)/len(texts)*100:.1f}%)")
print(f" Val: {len(X_val)} ({len(X_val)/len(texts)*100:.1f}%)")
print(f" Test: {len(X_test)} ({len(X_test)/len(texts)*100:.1f}%)")
# Verify stratification worked
print("\nStratification check (label distribution %):")
print(f" Train: {np.bincount(y_train) / len(y_train) * 100}")
print(f" Val: {np.bincount(y_val) / len(y_val) * 100}")
print(f" Test: {np.bincount(y_test) / len(y_test) * 100}")
data_dict = {
'train': {'texts': X_train, 'labels': y_train},
'val': {'texts': X_val, 'labels': y_val},
'test': {'texts': X_test, 'labels': y_test}
}
return data_dict
class SentimentDataset(Dataset):
"""
PyTorch dataset for sentiment analysis with VADER features
"""
def __init__(self, texts, labels, preprocessor, add_special_tokens=False):
# ✅ Ensure texts and labels are numpy arrays for consistency
self.texts = np.asarray(texts)
self.labels = np.asarray(labels)
self.preprocessor = preprocessor
self.add_special_tokens = add_special_tokens
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
# Handle non-string inputs gracefully
if not isinstance(text, str):
text = str(text)
sequence = self.preprocessor.text_to_sequence(text)
padded = self.preprocessor.pad_sequence(sequence, self.add_special_tokens)
length = min(len(sequence), self.preprocessor.max_length)
vader_features = self.preprocessor.compute_vader_features(text)
return {
'text': torch.LongTensor(padded),
'label': torch.LongTensor([label]),
'length': torch.LongTensor([length]),
'sentiment_score': torch.FloatTensor(vader_features)
}
def create_data_loaders(data_dict, preprocessor, batch_size=64, add_special_tokens=False):
"""
Create PyTorch DataLoaders for train/val/test splits
Args:
data_dict: Dictionary from prepare_data()
preprocessor: EnhancedTextPreprocessor instance
batch_size: Batch size for DataLoaders
add_special_tokens: Whether to add SOS/EOS tokens
Returns:
Dictionary of DataLoaders
"""
print("\nCreating data loaders...")
train_dataset = SentimentDataset(
data_dict['train']['texts'],
data_dict['train']['labels'],
preprocessor,
add_special_tokens
)
val_dataset = SentimentDataset(
data_dict['val']['texts'],
data_dict['val']['labels'],
preprocessor,
add_special_tokens
)
test_dataset = SentimentDataset(
data_dict['test']['texts'],
data_dict['test']['labels'],
preprocessor,
add_special_tokens
)
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0,
pin_memory=True
)
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0,
pin_memory=True
)
print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")
return {
'train': train_loader,
'val': val_loader,
'test': test_loader
}