| | import pandas as pd
|
| | import numpy as np
|
| | import os
|
| | import re
|
| | import joblib
|
| | import nltk
|
| |
|
| | from nltk.stem import WordNetLemmatizer
|
| | from nltk.corpus import stopwords
|
| |
|
| | from sklearn.feature_extraction.text import TfidfVectorizer
|
| |
|
| | from sklearn.pipeline import Pipeline
|
| | from sklearn.model_selection import train_test_split, GridSearchCV
|
| | from sklearn.metrics import classification_report
|
| |
|
| | from sklearn.ensemble import RandomForestClassifier
|
| |
|
| | from imblearn.over_sampling import RandomOverSampler
|
| | from imblearn.pipeline import Pipeline as ImbPipeline
|
| |
|
| | nltk.download('stopwords')
|
| | nltk.download('wordnet')
|
| |
|
| |
|
| | class EmailClassifier:
|
| | def __init__(self):
|
| | self.model = None
|
| | self.vectorizer = None
|
| | self.classes = None
|
| | self.lemmatizer = WordNetLemmatizer()
|
| | self.stop_words = set(stopwords.words('english'))
|
| |
|
| |
|
| | def preprocess(self, text: str) -> str:
|
| | text = text.lower()
|
| |
|
| |
|
| | text = re.sub(r'\S+@\S+', ' ', text)
|
| |
|
| |
|
| | text = re.sub(r'[^a-zA-Z0-9\s._-]', ' ', text)
|
| |
|
| | tokens = text.split()
|
| |
|
| |
|
| | custom_stop_words = self.stop_words - {'no', 'not', 'nor', 'against', 'aren', "aren't", 'isn', "isn't"}
|
| |
|
| |
|
| | tokens = [
|
| | self.lemmatizer.lemmatize(word)
|
| | for word in tokens
|
| | if word not in custom_stop_words and len(word) > 1
|
| | ]
|
| |
|
| | return ' '.join(tokens)
|
| |
|
| |
|
| |
|
| | def train(self, X, y, use_grid_search=False):
|
| | print("Preprocessing data...")
|
| | X_processed = [self.preprocess(text) for text in X]
|
| |
|
| | print("Oversampling minority classes...")
|
| | ros = RandomOverSampler(random_state=42)
|
| | X_resampled, y_resampled = ros.fit_resample(np.array(X_processed).reshape(-1, 1), y)
|
| | X_resampled = X_resampled.ravel()
|
| |
|
| | print("Initializing pipeline...")
|
| | pipeline = ImbPipeline([
|
| | ('tfidf', TfidfVectorizer(
|
| | stop_words='english',
|
| | max_features=15000,
|
| | ngram_range=(1, 3),
|
| | sublinear_tf=True
|
| | )),
|
| | ('clf', RandomForestClassifier(n_estimators=100, class_weight='balanced_subsample', random_state=42))
|
| | ])
|
| |
|
| |
|
| | if use_grid_search:
|
| | print("Running Grid Search...")
|
| | params = {
|
| | 'clf__alpha': [1e-4, 1e-3, 1e-2],
|
| | 'clf__penalty': ['l2', 'l1', 'elasticnet']
|
| | }
|
| | grid = GridSearchCV(pipeline, param_grid=params, scoring='f1_weighted', cv=5, verbose=2)
|
| | grid.fit(X_resampled, y_resampled)
|
| | self.model = grid.best_estimator_
|
| | print("Best Params:", grid.best_params_)
|
| | else:
|
| | print("Fitting model...")
|
| | pipeline.fit(X_resampled, y_resampled)
|
| | self.model = pipeline
|
| |
|
| | print("Model trained.")
|
| | self.classes = self.model.named_steps['clf'].classes_
|
| |
|
| |
|
| | def predict(self, text: str) -> str:
|
| | if not self.model:
|
| | raise ValueError("Model not trained or loaded")
|
| | processed_text = self.preprocess(text)
|
| | return self.model.predict([processed_text])[0]
|
| |
|
| | def save_model(self, model_path: str):
|
| | if not self.model:
|
| | raise ValueError("Model not trained")
|
| | joblib.dump(self.model, model_path)
|
| |
|
| | def load_model(self, model_path: str):
|
| | if not os.path.exists(model_path):
|
| | raise FileNotFoundError(f"Model file not found at {model_path}")
|
| | self.model = joblib.load(model_path)
|
| | self.classes = self.model.named_steps['clf'].classes_
|
| |
|
| | @staticmethod
|
| | def load_data_from_csv(csv_path: str, text_col: str = "email", label_col: str = "type"):
|
| | df = pd.read_csv(csv_path)
|
| | return df[[text_col, label_col]].dropna()
|
| |
|
| | def train_from_csv(self, csv_path: str, text_col: str = "email", label_col: str = "type", use_grid_search=False):
|
| | df = self.load_data_from_csv(csv_path, text_col, label_col)
|
| |
|
| | X_train, X_test, y_train, y_test = train_test_split(
|
| | df[text_col], df[label_col], test_size=0.2, random_state=42, stratify=df[label_col]
|
| | )
|
| | self.train(X_train, y_train, use_grid_search=use_grid_search)
|
| | X_test_processed = [self.preprocess(text) for text in X_test]
|
| | y_pred = self.model.predict(X_test_processed)
|
| | print(classification_report(y_test, y_pred))
|
| | self.save_model("email_classifier.joblib")
|
| | print("Model trained and saved to email_classifier.joblib")
|
| |
|