""" AI Phishing Email Detector - Premium Black & Gold UI TF-IDF + Logistic Regression trained on Kaggle Phishing Emails dataset from HuggingFace Files Author & Deployer: Umaima Qureshi Modified for HuggingFace Files Support """ import streamlit as st import pandas as pd import numpy as np import re from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, confusion_matrix, classification_report import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import seaborn as sns import io import os # Page Configuration st.set_page_config( page_title="AI Phishing Shield โ€“ by Umaima Qureshi", layout="wide", initial_sidebar_state="collapsed" ) # Initialize Session State if 'model_trained' not in st.session_state: st.session_state.model_trained = False if 'analysis_history' not in st.session_state: st.session_state.analysis_history = [] if 'cm_plot_cached' not in st.session_state: st.session_state.cm_plot_cached = None # Premium Black & Gold CSS Styling st.markdown(""" """, unsafe_allow_html=True) # Utility Functions def load_dataset_from_files(): """Load CSV dataset from HuggingFace Files""" df = None source = "" # List of possible CSV file locations in HuggingFace - ordered by priority possible_paths = [ "Phishing_Email.csv", "email_phishing_data.csv", "phishing_email.csv", "emails.csv", "phishing.csv", "./Phishing_Email.csv", "./email_phishing_data.csv", "./phishing_email.csv", ] # Try to find and load the CSV for path in possible_paths: if os.path.exists(path): try: st.info(f"๐Ÿ“‚ Found: {path} | Loading dataset...") df = pd.read_csv(path, encoding='utf-8', on_bad_lines='skip') source = path st.success(f"โœ… Successfully loaded dataset from: `{path}` ({len(df)} rows)") return df, source except UnicodeDecodeError: try: df = pd.read_csv(path, encoding='latin-1', on_bad_lines='skip') source = path st.success(f"โœ… Successfully loaded dataset from: `{path}` ({len(df)} rows)") return df, source except Exception as e: st.warning(f"โš ๏ธ Failed to load {path}: {str(e)}") continue except Exception as e: st.warning(f"โš ๏ธ Failed to load {path}: {str(e)}") continue return df, source def safe_read_csv(path): """Safely read CSV file""" try: return pd.read_csv(path) except Exception as e: st.error(f"Error reading CSV: {str(e)}") return pd.DataFrame() def sanitize_input(text): """Sanitize user input to prevent injection""" text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE) text = re.sub(r'<.*?>', '', text) return text def validate_email_input(text): """Validate email input""" if len(text.strip()) < 10: return False, "Email content too short for analysis (minimum 10 characters)" if len(text) > 10000: return False, "Email content too long (maximum 10,000 characters)" return True, "" def preprocess_text(text): """Enhanced preprocessing with better phishing indicator preservation""" if not isinstance(text, str): text = str(text) text = text.lower() text = re.sub(r'http\S+|www\S+|https\S+', ' suspiciousurl ', text) text = re.sub(r'\S+@\S+', ' emailaddress ', text) text = re.sub(r'\$\d+', ' moneymention ', text) text = re.sub(r'\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}', ' cardnumber ', text) text = re.sub(r'[^a-z\s]', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def calculate_phishing_score(text): """Enhanced phishing detection with multi-factor scoring""" score = 0 text_lower = text.lower() high_risk = ['verify', 'suspended', 'urgent', 'immediately', 'click here', 'act now', 'confirm identity', 'account locked', 'unusual activity', 'security alert', 'expire', 'limited time', 'action required', 'update payment', 'validate'] score += sum(15 for word in high_risk if word in text_lower) financial = ['bank', 'credit card', 'password', 'ssn', 'social security', 'paypal', 'billing', 'payment', 'account number', 'pin', 'cvv', 'credential'] score += sum(12 for word in financial if word in text_lower) prize_scam = ['won', 'winner', 'prize', 'claim now', 'congratulations', 'free money', 'inheritance', 'lottery', 'jackpot', 'cash prize', '$1000', '$10000'] score += sum(18 for word in prize_scam if word in text_lower) if any(urg in text_lower for urg in ['urgent', 'immediately', 'now', 'expire']) and \ any(fin in text_lower for fin in ['account', 'bank', 'payment', 'card']): score += 25 if re.search(r'http\S+|www\S+', text, re.IGNORECASE): url_count = len(re.findall(r'http\S+|www\S+', text, re.IGNORECASE)) score += min(url_count * 20, 40) if re.search(r'\b(enter|provide|submit|update|confirm).{0,20}(password|credential|info|detail)', text_lower): score += 20 threats = ['locked', 'suspended', 'terminated', 'closed', 'blocked', 'restricted'] score += sum(15 for word in threats if word in text_lower) if re.search(r'\b(dear customer|dear user|dear member|dear valued)\b', text_lower): score += 8 max_score = 200 probability = min(score / max_score, 0.99) return probability def generate_confusion_matrix_plot(_cm): """Generate confusion matrix plot - optimized for performance""" plt.style.use('dark_background') fig, ax = plt.subplots(figsize=(5, 4), facecolor='#1a1a1a', dpi=80) ax.set_facecolor('#1a1a1a') sns.heatmap( _cm, annot=True, fmt="d", ax=ax, cmap="YlOrBr", cbar=True, square=True, annot_kws={"size": 14, "weight": "bold", "color": "#0f0f0f"}, linewidths=1, linecolor='#0f0f0f', cbar_kws={'label': 'Count', 'shrink': 0.8}, vmin=0, vmax=_cm.max() ) ax.set_xlabel("Predicted", fontsize=10, fontweight='bold', color='#FFD700') ax.set_ylabel("Actual", fontsize=10, fontweight='bold', color='#FFD700') ax.set_xticklabels(["Safe", "Phishing"], fontsize=9, color='#e5e7eb') ax.set_yticklabels(["Safe", "Phishing"], fontsize=9, rotation=0, color='#e5e7eb') ax.set_title("Confusion Matrix", fontsize=12, fontweight='bold', pad=10, color='#FFD700') try: cbar = ax.collections[0].colorbar if cbar: cbar.ax.yaxis.set_tick_params(color='#e5e7eb') plt.setp(plt.getp(cbar.ax.axes, 'yticklabels'), color='#e5e7eb') except: pass plt.tight_layout() buf = io.BytesIO() plt.savefig(buf, format='png', facecolor='#1a1a1a', dpi=80, bbox_inches='tight') buf.seek(0) plt.close(fig) plt.close('all') return buf # Hero Header st.markdown("""
๐Ÿ›ก๏ธ AI Phishing Shield
Advanced Machine Learning Protection Against Email Threats
Powered by TF-IDF vectorization and Logistic Regression, trained on Kaggle phishing dataset. 80% Training | 20% Testing for maximum accuracy and robustness.
โšก Developed by Umaima Qureshi
""", unsafe_allow_html=True) # Load Dataset from HuggingFace Files st.markdown('
๐Ÿ“‚ Dataset Configuration
', unsafe_allow_html=True) with st.spinner("๐Ÿ”„ Loading dataset from HuggingFace Files..."): df, source = load_dataset_from_files() if df is None or len(df) == 0: st.error("โŒ No dataset found! Please ensure Phishing_Email.csv is uploaded to HuggingFace Files.") st.info("๐Ÿ“ Expected file: 'Phishing_Email.csv' with columns for email text and labels") st.stop() st.info(f"โœ… **Dataset Successfully Loaded** from: `{source}`") st.write(f"๐Ÿ“Š Dataset shape: {df.shape[0]} rows ร— {df.shape[1]} columns") # Validate and Prepare Dataset required_columns = 2 if len(df.columns) < required_columns or len(df) == 0: st.error("โš ๏ธ Invalid dataset format. Please ensure your CSV has email text and labels.") st.stop() # Handle unnamed index column if "Unnamed: 0" in df.columns: df = df.drop(columns=["Unnamed: 0"]) # Identify text and label columns text_col = "Email Text" if "Email Text" in df.columns else df.columns[0] label_col = "Email Type" if "Email Type" in df.columns else df.columns[-1] st.info(f"๐Ÿ“Œ Using columns: Text='{text_col}' | Label='{label_col}'") # Clean dataset df[text_col] = df[text_col].fillna("").astype(str) df = df[df[text_col].str.strip() != ""].reset_index(drop=True) # Handle labels label_map = {"Phishing Email": 1, "Safe Email": 0, "Phishing": 1, "Safe": 0, 1: 1, 0: 0} if df[label_col].dtype == object: df['label'] = df[label_col].map(label_map) df['label'] = df['label'].fillna(0).astype(int) else: df['label'] = df[label_col].astype(int) # Preprocess text df['processed_text'] = df[text_col].apply(preprocess_text) # Dataset Stats phishing_count = (df['label'] == 1).sum() safe_count = (df['label'] == 0).sum() total_count = len(df) st.markdown('
๐Ÿ“Š Dataset Statistics
', unsafe_allow_html=True) st.markdown(f"""
{total_count}
Total Emails
{phishing_count}
Phishing Detected
{safe_count}
Safe Emails
{(phishing_count/total_count*100):.1f}%
Threat Rate
""", unsafe_allow_html=True) with st.expander("๐Ÿ” View Dataset Preview", expanded=False): st.dataframe(df[[text_col, label_col]].head(10), use_container_width=True) # Model Training - 80/20 Split @st.cache_resource def train_model(processed_texts, labels): """Train model with 80% training and 20% testing split""" # 80% train, 20% test split X_train, X_test, y_train, y_test = train_test_split( processed_texts, labels, test_size=0.2, # 20% for testing random_state=42, stratify=labels if len(np.unique(labels)) > 1 else None ) st.write(f"๐Ÿ“ˆ Training set: {len(X_train)} samples (80%)") st.write(f"๐Ÿงช Testing set: {len(X_test)} samples (20%)") # Enhanced TF-IDF vectorizer = TfidfVectorizer( max_features=5000, ngram_range=(1, 3), min_df=1, max_df=0.95, sublinear_tf=True ) X_train_vec = vectorizer.fit_transform(X_train) X_test_vec = vectorizer.transform(X_test) # Logistic Regression with balanced weights model = LogisticRegression( max_iter=2000, solver='liblinear', class_weight='balanced', C=1.0, random_state=42 ) model.fit(X_train_vec, y_train) # Predictions and metrics y_pred = model.predict(X_test_vec) acc = accuracy_score(y_test, y_pred) cm = confusion_matrix(y_test, y_pred) report = classification_report(y_test, y_pred, output_dict=True, zero_division=0) return { "vectorizer": vectorizer, "model": model, "accuracy": acc, "confusion_matrix": cm, "report": report, "X_test": X_test, "y_test": y_test, "y_pred": y_pred } # Train or retrieve cached model if not st.session_state.model_trained: with st.spinner("๐Ÿค– Training model with 80/20 split..."): model_info = train_model(df['processed_text'].tolist(), df['label'].values) st.session_state.model_info = model_info st.session_state.model_trained = True st.success("โœ… Model trained successfully!") else: model_info = st.session_state.model_info vectorizer = model_info["vectorizer"] model = model_info["model"] accuracy = model_info["accuracy"] # Model Performance st.markdown('
๐ŸŽฏ Model Performance (20% Test Set)
', unsafe_allow_html=True) col1, col2, col3 = st.columns(3) with col1: st.markdown(f"""
Accuracy
{accuracy:.1%}
""", unsafe_allow_html=True) with col2: precision = model_info["report"].get("1", {}).get("precision", 0) st.markdown(f"""
Precision
{precision:.1%}
""", unsafe_allow_html=True) with col3: recall = model_info["report"].get("1", {}).get("recall", 0) st.markdown(f"""
Recall
{recall:.1%}
""", unsafe_allow_html=True) # Confusion Matrix Section with st.expander("๐Ÿ“ˆ Detailed Metrics & Confusion Matrix"): col_matrix, col_report = st.columns([1, 1.5]) with col_matrix: if st.session_state.cm_plot_cached is None: st.session_state.cm_plot_cached = generate_confusion_matrix_plot(model_info["confusion_matrix"]) st.image(st.session_state.cm_plot_cached, use_container_width=True) with col_report: st.markdown("**๐Ÿ“Š Classification Report:**") report_df = pd.DataFrame(model_info["report"]).transpose().round(3) st.dataframe(report_df, use_container_width=True, height=250) # Inference UI st.markdown('
โœ‰๏ธ Email Threat Scanner
', unsafe_allow_html=True) col_input, col_hints = st.columns([2, 1]) with col_input: email_input = st.text_area( "Paste email content for analysis", height=280, placeholder="Example: Urgent! Your account has been compromised. Click here to verify your identity immediately...", help="Paste the full email content including subject and body" ) if st.button("๐Ÿ” Analyze Email Threat"): if not email_input.strip(): st.warning("โš ๏ธ Please paste email content to analyze") else: email_input = sanitize_input(email_input) is_valid, error_msg = validate_email_input(email_input) if not is_valid: st.warning(f"โš ๏ธ {error_msg}") else: with st.spinner("๐Ÿ” Analyzing email threat..."): try: processed_input = preprocess_text(email_input) input_vec = vectorizer.transform([processed_input]) try: ml_proba = model.predict_proba(input_vec)[0][1] except AttributeError: decision = model.decision_function(input_vec)[0] ml_proba = 1 / (1 + np.exp(-decision)) ml_pred = model.predict(input_vec)[0] rule_score = calculate_phishing_score(email_input) hybrid_proba = (0.6 * ml_proba) + (0.4 * rule_score) final_pred = 1 if hybrid_proba > 0.5 else 0 # Dynamic color coding if hybrid_proba >= 0.8: alert_color = "#dc2626" alert_gradient = "linear-gradient(135deg, #dc2626 0%, #991b1b 100%)" shadow_color = "220, 38, 38" emoji = "๐Ÿšจ" risk_level = "CRITICAL THREAT" elif hybrid_proba >= 0.6: alert_color = "#ef4444" alert_gradient = "linear-gradient(135deg, #ef4444 0%, #dc2626 100%)" shadow_color = "239, 68, 68" emoji = "โš ๏ธ" risk_level = "HIGH RISK" elif hybrid_proba >= 0.4: alert_color = "#f97316" alert_gradient = "linear-gradient(135deg, #f97316 0%, #ea580c 100%)" shadow_color = "249, 115, 22" emoji = "โšก" risk_level = "MEDIUM RISK" elif hybrid_proba >= 0.2: alert_color = "#eab308" alert_gradient = "linear-gradient(135deg, #eab308 0%, #ca8a04 100%)" shadow_color = "234, 179, 8" emoji = "โš ๏ธ" risk_level = "LOW RISK" else: alert_color = "#10b981" alert_gradient = "linear-gradient(135deg, #10b981 0%, #059669 100%)" shadow_color = "16, 185, 129" emoji = "โœ…" risk_level = "SAFE" if final_pred == 1: conf_pct = f"{hybrid_proba:.1%}" st.markdown(f"""
{emoji}
{risk_level} DETECTED
Threat Confidence: {conf_pct}
ML Score: {ml_proba:.1%} | Rule Score: {rule_score:.1%}
""", unsafe_allow_html=True) st.markdown("**๐Ÿ” Threat Indicators Detected:**") indicators = [] if "suspiciousurl" in processed_input or re.search(r'http\S+|www\S+', email_input, re.IGNORECASE): indicators.append("๐Ÿ”— Suspicious URL tokens detected") if re.search(r'\b(urgent|immediately|verify|password|suspended|click|act now|action required)\b', email_input, re.IGNORECASE): indicators.append("โšก Urgency manipulation tactics") if re.search(r'\b(bank|account|verify|login|password|security|credential|paypal)\b', email_input, re.IGNORECASE): indicators.append("๐Ÿฆ Financial/security keywords present") if re.search(r'\b(winner|prize|congratulations|claim|free|won)\b', email_input, re.IGNORECASE): indicators.append("๐ŸŽ Reward/prize baiting language") if re.search(r'\b(confirm|update|validate|unlock|restore)\b', email_input, re.IGNORECASE): indicators.append("๐Ÿ” Account action requests") if "cardnumber" in processed_input: indicators.append("๐Ÿ’ณ Credit card pattern detected") if "moneymention" in processed_input: indicators.append("๐Ÿ’ฐ Money amount mentioned") for indicator in indicators: st.markdown(f"- {indicator}") st.error("๐Ÿšจ **Recommendation:** Do NOT click any links. Delete this email immediately and report to your IT security team.") else: conf_pct = f"{(1-hybrid_proba):.1%}" st.markdown(f"""
{emoji}
EMAIL APPEARS SAFE
Safety Confidence: {conf_pct}
ML Score: {(1-ml_proba):.1%} | Rule Score: {(1-rule_score):.1%}
""", unsafe_allow_html=True) st.info("๐Ÿ’ก **Best Practice:** Always verify sender identity and be cautious with unexpected emails.") st.session_state.analysis_history.append({ 'timestamp': pd.Timestamp.now(), 'result': 'Phishing' if final_pred == 1 else 'Safe', 'confidence': f"{hybrid_proba:.2%}", 'preview': email_input[:50] + "..." }) except Exception as e: st.error(f"โš ๏ธ Analysis failed: {str(e)}") with col_hints: st.markdown("""
๐Ÿง  AI Detection Insights
1
Urgency words like "urgent", "verify" raise red flags
2
Suspicious links are automatically flagged
3
Financial + urgency combo indicates high risk
4
Confidence >70% warrants caution
โšก
80/20 Split: Trained on 80%, tested on 20% for accuracy
""", unsafe_allow_html=True) # Footer st.markdown(""" """, unsafe_allow_html=True)