leygit's picture
Rename app.py to app_backup.py
95dca58 verified
raw
history blame
10.9 kB
import pandas as pd
import numpy as np
import torch
from transformers import BertTokenizer
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
# Load dataset
file_path = 'spam_ham_dataset.csv'
df = pd.read_csv(file_path)
df.head()
# Preprocessing
#.str.replace(r'[^\w\s]', '', regex=True) removes everthing except letters, numbers, and spaces
# df['text'].str.lower() converts everything in the text column to lower case only
df['text'] = df['text'].str.lower().str.replace(r'[^\w\s]', '', regex=True)
df['text'].head()
sns.countplot(x=df['label'])
plt.title("Spam vs Ham Distribution")
plt.show()
# Calculate text length metrics
df['char_count'] = df['text'].apply(len)
df['word_count'] = df['text'].apply(lambda x: len(x.split()))
# Plot word count distribution for spam and ham
plt.figure(figsize=(12, 5))
sns.histplot(data=df, x='word_count', hue='label', bins=30, kde=True)
plt.xlim(0, 1000)
plt.title("Word Count Distribution by Label")
plt.xlabel("Number of Words")
plt.ylabel("Frequency")
plt.show()
def get_top_words(corpus, n=None):
vec = CountVectorizer(stop_words='english').fit(corpus)
bag_of_words = vec.transform(corpus)
sum_words = bag_of_words.sum(axis=0)
words_freq = [(word, sum_words[0, idx]) for word, idx in vec.vocabulary_.items()]
words_freq = sorted(words_freq, key=lambda x: x[1], reverse=True)
return words_freq[:n]
# Top 10 words for spam
top_spam_words = get_top_words(df[df['label'] == "spam"]['text'], n=10)
print("Top spam words:", top_spam_words)
# Top 10 words for ham
top_ham_words = get_top_words(df[df['label'] == "ham"]['text'], n=10)
print("Top ham words:", top_ham_words)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import classification_report
# TF-IDF Vectorization
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(df['text'])
y = df['label_num']
# Train-Test Split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train Naïve Bayes Model
nb_model = MultinomialNB()
nb_model.fit(X_train, y_train)
# Predictions
y_pred = nb_model.predict(X_test)
print(classification_report(y_test, y_pred))
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
# Load dataset
file_path = 'spam_ham_dataset.csv'
df = pd.read_csv(file_path)
# Convert label column to numeric (0 for ham, 1 for spam)
df['label_num'] = df['label'].astype('category').cat.codes
# Load tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# Tokenize dataset
encodings = tokenizer(df['text'].tolist(), padding=True, truncation=True, max_length=128, return_tensors="pt")
labels = torch.tensor(df['label_num'].values)
# Custom Dataset
class SpamDataset(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
item = {key: val[idx] for key, val in self.encodings.items()} # Keep as PyTorch tensors
item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long) # Ensure labels are `long`
return item
# Create dataset
dataset = SpamDataset(encodings, labels)
# Split dataset (80% train, 20% validation)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
# DataLoader Function (Fix Collate)
def collate_fn(batch):
keys = batch[0].keys()
collated = {key: torch.stack([b[key] for b in batch]) for key in keys}
return collated
# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)
# Load BERT model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
model.to(device)
# Define optimizer and loss function
optimizer = optim.AdamW(model.parameters(), lr=5e-5)
loss_fn = nn.CrossEntropyLoss()
# Training Loop
EPOCHS = 10
for epoch in range(EPOCHS):
model.train()
total_loss = 0
for batch in train_loader:
optimizer.zero_grad()
# Move batch to device
inputs = {key: val.to(device) for key, val in batch.items()}
labels = inputs.pop("labels").to(device) # Move labels to device
# Forward pass
outputs = model(**inputs)
loss = loss_fn(outputs.logits, labels)
# Backward pass
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
print("Training complete!")
from sklearn.metrics import classification_report
from transformers import BertTokenizer
import torch
import torch.nn.functional as F
# Classification function
def classify_email(email_text):
model.eval() # Set model to evaluation mode
with torch.no_grad():
# Tokenize and convert input text to tensor
inputs = tokenizer(email_text, padding=True, truncation=True, max_length=256, return_tensors="pt")
# Move inputs to the appropriate device
inputs = {key: val.to(device) for key, val in inputs.items()}
# Get model predictions
outputs = model(**inputs)
logits = outputs.logits
# Convert logits to predicted class
predictions = torch.argmax(logits, dim=1)
# Convert logits to probabilities using softmax
probs = F.softmax(logits, dim=1)
confidence = torch.max(probs).item() * 100 # Convert to percentage
# Convert numeric prediction to label
result = "Spam" if predictions.item() == 1 else "Ham"
return {
"result": result,
"confidence": f"{confidence:.2f}%",
}
# Evaluation function with detailed classification report
def evaluate_model_with_report(val_loader):
model.eval() # Set model to evaluation mode
y_true = []
y_pred = []
correct = 0
total = 0
with torch.no_grad():
for batch in val_loader:
inputs = {key: val.to(device) for key, val in batch.items()}
labels = inputs.pop("labels").to(device)
outputs = model(**inputs)
predictions = torch.argmax(outputs.logits, dim=1)
# Collect labels and predictions
y_true.extend(labels.cpu().numpy())
y_pred.extend(predictions.cpu().numpy())
# Calculate accuracy
correct += (predictions == labels).sum().item()
total += labels.size(0)
# Calculate accuracy
accuracy = correct / total if total > 0 else 0
print(f"Validation Accuracy: {accuracy:.4f}")
# Print classification report
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=["Ham", "Spam"]))
return accuracy
# Run evaluation with classification report
accuracy = evaluate_model_with_report(val_loader)
print(f"Model Validation Accuracy: {accuracy:.4f}")
## App Deployment Functions
def generate_performance_metrics():
y_pred = model.predict(X_test)
accuracy = evaluate_model_with_report(val_loader)
report = classification_report(y_true, y_pred, target_names=["Ham", "Spam"])
return {
"accuracy": f"{accuracy:.2%}",
"precision": f"{report['1']['precision']:.2%}",
"recall": f"{report['1']['recall']:.2%}",
"f1_score": f"{report['1']['f1-score']:.2%}"
}
def email_analysis_pipeline(email_text):
results = classify_email(email_text)
accuracy = evaluate_model_with_report(val_loader)
return {
results["result"],
results["confidence"],
accuracy
}
## Gradio Interface
import gradio as gr
# Create Gradio Interface
def create_interface():
performance_metrics = generate_performance_metrics()
# Introduction - Title + Brief Description
with gr.Blocks(css=custom_css) as interface:
gr.Markdown("Spam Email Classification")
gr.Markdown(
"""
Brief description of the project here
"""
)
# Email Text Input
with gr.Row():
email_input = gr.Textbox(
lines=8, placeholder="Type or paste your email content here...", label="Email Content"
)
# Email Text Results and Analysis
with gr.Row():
result_output = gr.HTML(label="Classification Result") # label = [function that prints classification result]
confidence_output = gr.Textbox(label="Confidence Score", interactive=False)
accuracy_output = gr.Textbox(label="Accuracy", interactive=False)
analyze_button = gr.Button("Analyze Email 🕵️‍♂️")
analyze_button.click(
fn=email_analysis_pipeline,
inputs=email_input,
outputs=[result_output, confidence_output, accuracy_output]
)
# Analysis
gr.Markdown("## 📊 Model Performance Analytics")
with gr.Row():
with gr.Column():
gr.Textbox(value=performance_metrics["accuracy"], label="Accuracy", interactive=False, elem_classes=["metric"])
gr.Textbox(value=performance_metrics["precision"], label="Precision", interactive=False, elem_classes=["metric"])
gr.Textbox(value=performance_metrics["recall"], label="Recall", interactive=False, elem_classes=["metric"])
gr.Textbox(value=performance_metrics["f1_score"], label="F1 Score", interactive=False, elem_classes=["metric"])
with gr.Column():
gr.Markdown("### Confusion Matrix")
gr.HTML(f"<img src='data:image/png;base64,{performance_metrics['confusion_matrix_plot']}' style='max-width: 100%; height: auto;' />")
gr.Markdown("## 📘 Glossary and Explanation of Labels")
gr.Markdown(
"""
### Labels:
- **Spam:** Unwanted or harmful emails flagged by the system.
- **Ham:** Legitimate, safe emails.
### Metrics:
- **Accuracy:** The percentage of correct classifications.
- **Precision:** Out of predicted Spam, how many are actually Spam.
- **Recall:** Out of all actual Spam emails, how many are predicted as Spam.
- **F1 Score:** Harmonic mean of Precision and Recall.
"""
)
return interface
# Launch the interface
interface = create_interface()
interface.launch(share=True)