|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import torch |
|
|
from transformers import BertTokenizer |
|
|
import seaborn as sns |
|
|
import matplotlib.pyplot as plt |
|
|
from sklearn.feature_extraction.text import CountVectorizer |
|
|
|
|
|
|
|
|
|
|
|
file_path = 'spam_ham_dataset.csv' |
|
|
df = pd.read_csv(file_path) |
|
|
df.head() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df['text'] = df['text'].str.lower().str.replace(r'[^\w\s]', '', regex=True) |
|
|
df['text'].head() |
|
|
|
|
|
|
|
|
sns.countplot(x=df['label']) |
|
|
plt.title("Spam vs Ham Distribution") |
|
|
plt.show() |
|
|
|
|
|
|
|
|
df['char_count'] = df['text'].apply(len) |
|
|
df['word_count'] = df['text'].apply(lambda x: len(x.split())) |
|
|
|
|
|
plt.figure(figsize=(12, 5)) |
|
|
sns.histplot(data=df, x='word_count', hue='label', bins=30, kde=True) |
|
|
plt.xlim(0, 1000) |
|
|
plt.title("Word Count Distribution by Label") |
|
|
plt.xlabel("Number of Words") |
|
|
plt.ylabel("Frequency") |
|
|
plt.show() |
|
|
|
|
|
def get_top_words(corpus, n=None): |
|
|
vec = CountVectorizer(stop_words='english').fit(corpus) |
|
|
bag_of_words = vec.transform(corpus) |
|
|
sum_words = bag_of_words.sum(axis=0) |
|
|
words_freq = [(word, sum_words[0, idx]) for word, idx in vec.vocabulary_.items()] |
|
|
words_freq = sorted(words_freq, key=lambda x: x[1], reverse=True) |
|
|
return words_freq[:n] |
|
|
|
|
|
|
|
|
top_spam_words = get_top_words(df[df['label'] == "spam"]['text'], n=10) |
|
|
print("Top spam words:", top_spam_words) |
|
|
|
|
|
|
|
|
top_ham_words = get_top_words(df[df['label'] == "ham"]['text'], n=10) |
|
|
print("Top ham words:", top_ham_words) |
|
|
|
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.naive_bayes import MultinomialNB |
|
|
from sklearn.metrics import classification_report |
|
|
|
|
|
|
|
|
vectorizer = TfidfVectorizer() |
|
|
X = vectorizer.fit_transform(df['text']) |
|
|
y = df['label_num'] |
|
|
|
|
|
|
|
|
from sklearn.model_selection import train_test_split |
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
|
|
|
|
nb_model = MultinomialNB() |
|
|
nb_model.fit(X_train, y_train) |
|
|
|
|
|
|
|
|
y_pred = nb_model.predict(X_test) |
|
|
print(classification_report(y_test, y_pred)) |
|
|
|
|
|
import pandas as pd |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.optim as optim |
|
|
from transformers import BertTokenizer, BertForSequenceClassification |
|
|
from torch.utils.data import Dataset, DataLoader |
|
|
|
|
|
|
|
|
file_path = 'spam_ham_dataset.csv' |
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
df['label_num'] = df['label'].astype('category').cat.codes |
|
|
|
|
|
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') |
|
|
|
|
|
|
|
|
encodings = tokenizer(df['text'].tolist(), padding=True, truncation=True, max_length=128, return_tensors="pt") |
|
|
labels = torch.tensor(df['label_num'].values) |
|
|
|
|
|
|
|
|
class SpamDataset(Dataset): |
|
|
def __init__(self, encodings, labels): |
|
|
self.encodings = encodings |
|
|
self.labels = labels |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.labels) |
|
|
|
|
|
def __getitem__(self, idx): |
|
|
item = {key: val[idx] for key, val in self.encodings.items()} |
|
|
item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long) |
|
|
return item |
|
|
|
|
|
|
|
|
dataset = SpamDataset(encodings, labels) |
|
|
|
|
|
|
|
|
train_size = int(0.8 * len(dataset)) |
|
|
val_size = len(dataset) - train_size |
|
|
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size]) |
|
|
|
|
|
|
|
|
def collate_fn(batch): |
|
|
keys = batch[0].keys() |
|
|
collated = {key: torch.stack([b[key] for b in batch]) for key in keys} |
|
|
return collated |
|
|
|
|
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) |
|
|
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn) |
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2) |
|
|
model.to(device) |
|
|
|
|
|
|
|
|
optimizer = optim.AdamW(model.parameters(), lr=5e-5) |
|
|
loss_fn = nn.CrossEntropyLoss() |
|
|
|
|
|
|
|
|
EPOCHS = 10 |
|
|
|
|
|
for epoch in range(EPOCHS): |
|
|
model.train() |
|
|
total_loss = 0 |
|
|
|
|
|
for batch in train_loader: |
|
|
optimizer.zero_grad() |
|
|
|
|
|
|
|
|
inputs = {key: val.to(device) for key, val in batch.items()} |
|
|
labels = inputs.pop("labels").to(device) |
|
|
|
|
|
|
|
|
outputs = model(**inputs) |
|
|
loss = loss_fn(outputs.logits, labels) |
|
|
|
|
|
|
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
|
|
|
total_loss += loss.item() |
|
|
|
|
|
avg_loss = total_loss / len(train_loader) |
|
|
print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}") |
|
|
|
|
|
print("Training complete!") |
|
|
|
|
|
from sklearn.metrics import classification_report |
|
|
from transformers import BertTokenizer |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
|
def classify_email(email_text): |
|
|
model.eval() |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
inputs = tokenizer(email_text, padding=True, truncation=True, max_length=256, return_tensors="pt") |
|
|
|
|
|
|
|
|
inputs = {key: val.to(device) for key, val in inputs.items()} |
|
|
|
|
|
|
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
predictions = torch.argmax(logits, dim=1) |
|
|
|
|
|
|
|
|
probs = F.softmax(logits, dim=1) |
|
|
confidence = torch.max(probs).item() * 100 |
|
|
|
|
|
|
|
|
result = "Spam" if predictions.item() == 1 else "Ham" |
|
|
|
|
|
return { |
|
|
"result": result, |
|
|
"confidence": f"{confidence:.2f}%", |
|
|
} |
|
|
|
|
|
|
|
|
def evaluate_model_with_report(val_loader): |
|
|
model.eval() |
|
|
y_true = [] |
|
|
y_pred = [] |
|
|
correct = 0 |
|
|
total = 0 |
|
|
|
|
|
with torch.no_grad(): |
|
|
for batch in val_loader: |
|
|
inputs = {key: val.to(device) for key, val in batch.items()} |
|
|
labels = inputs.pop("labels").to(device) |
|
|
|
|
|
outputs = model(**inputs) |
|
|
predictions = torch.argmax(outputs.logits, dim=1) |
|
|
|
|
|
|
|
|
y_true.extend(labels.cpu().numpy()) |
|
|
y_pred.extend(predictions.cpu().numpy()) |
|
|
|
|
|
|
|
|
correct += (predictions == labels).sum().item() |
|
|
total += labels.size(0) |
|
|
|
|
|
|
|
|
accuracy = correct / total if total > 0 else 0 |
|
|
print(f"Validation Accuracy: {accuracy:.4f}") |
|
|
|
|
|
|
|
|
print("\nClassification Report:") |
|
|
print(classification_report(y_true, y_pred, target_names=["Ham", "Spam"])) |
|
|
|
|
|
return accuracy |
|
|
|
|
|
|
|
|
accuracy = evaluate_model_with_report(val_loader) |
|
|
print(f"Model Validation Accuracy: {accuracy:.4f}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_performance_metrics(): |
|
|
y_pred = model.predict(X_test) |
|
|
accuracy = evaluate_model_with_report(val_loader) |
|
|
report = classification_report(y_true, y_pred, target_names=["Ham", "Spam"]) |
|
|
return { |
|
|
"accuracy": f"{accuracy:.2%}", |
|
|
"precision": f"{report['1']['precision']:.2%}", |
|
|
"recall": f"{report['1']['recall']:.2%}", |
|
|
"f1_score": f"{report['1']['f1-score']:.2%}" |
|
|
} |
|
|
|
|
|
def email_analysis_pipeline(email_text): |
|
|
results = classify_email(email_text) |
|
|
accuracy = evaluate_model_with_report(val_loader) |
|
|
return { |
|
|
results["result"], |
|
|
results["confidence"], |
|
|
accuracy |
|
|
} |
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
performance_metrics = generate_performance_metrics() |
|
|
|
|
|
|
|
|
with gr.Blocks(css=custom_css) as interface: |
|
|
gr.Markdown("Spam Email Classification") |
|
|
gr.Markdown( |
|
|
""" |
|
|
Brief description of the project here |
|
|
|
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
email_input = gr.Textbox( |
|
|
lines=8, placeholder="Type or paste your email content here...", label="Email Content" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
result_output = gr.HTML(label="Classification Result") |
|
|
confidence_output = gr.Textbox(label="Confidence Score", interactive=False) |
|
|
accuracy_output = gr.Textbox(label="Accuracy", interactive=False) |
|
|
|
|
|
|
|
|
analyze_button = gr.Button("Analyze Email 🕵️♂️") |
|
|
|
|
|
analyze_button.click( |
|
|
fn=email_analysis_pipeline, |
|
|
inputs=email_input, |
|
|
outputs=[result_output, confidence_output, accuracy_output] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("## 📊 Model Performance Analytics") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Textbox(value=performance_metrics["accuracy"], label="Accuracy", interactive=False, elem_classes=["metric"]) |
|
|
gr.Textbox(value=performance_metrics["precision"], label="Precision", interactive=False, elem_classes=["metric"]) |
|
|
gr.Textbox(value=performance_metrics["recall"], label="Recall", interactive=False, elem_classes=["metric"]) |
|
|
gr.Textbox(value=performance_metrics["f1_score"], label="F1 Score", interactive=False, elem_classes=["metric"]) |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Confusion Matrix") |
|
|
gr.HTML(f"<img src='data:image/png;base64,{performance_metrics['confusion_matrix_plot']}' style='max-width: 100%; height: auto;' />") |
|
|
|
|
|
gr.Markdown("## 📘 Glossary and Explanation of Labels") |
|
|
gr.Markdown( |
|
|
""" |
|
|
### Labels: |
|
|
- **Spam:** Unwanted or harmful emails flagged by the system. |
|
|
- **Ham:** Legitimate, safe emails. |
|
|
|
|
|
### Metrics: |
|
|
- **Accuracy:** The percentage of correct classifications. |
|
|
- **Precision:** Out of predicted Spam, how many are actually Spam. |
|
|
- **Recall:** Out of all actual Spam emails, how many are predicted as Spam. |
|
|
- **F1 Score:** Harmonic mean of Precision and Recall. |
|
|
""" |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
interface = create_interface() |
|
|
interface.launch(share=True) |
|
|
|