email-phish-api / model /email_feature_extractor.py
Rasel Santillan
Update
28329ff
"""
Email Feature Extraction System for Phishing Detection
Extracts 21 specific features from email content using professional NLP libraries.
Enhanced with:
- NLTK for tokenization and stopwords
- spaCy for advanced linguistic analysis
- TextBlob for sentiment analysis
"""
import re
import logging
from typing import Dict, Any, List, Set
import numpy as np
import unicodedata
# NLP Libraries
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import spacy
from textblob import TextBlob
from langdetect import detect_langs, LangDetectException
from langdetect import DetectorFactory
# Ensure consistent language detection results
DetectorFactory.seed = 0
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# NLP Resources Initialization
# ============================================================================
def verify_nltk_resources():
"""
Verify that required NLTK resources are available.
Raises an error if any required resource is missing.
"""
# Verify punkt tokenizer
nltk.data.find('tokenizers/punkt')
nltk.data.find('tokenizers/punkt_tab')
# Verify stopwords corpus
nltk.data.find('corpora/stopwords')
# Verify POS tagger
nltk.data.find('taggers/averaged_perceptron_tagger')
logger.info("✓ NLTK resources verified")
def load_spacy_model():
"""
Load spaCy language model.
Raises an error if the model is not installed.
Returns:
spacy.Language: Loaded spaCy model
"""
nlp = spacy.load("en_core_web_sm")
logger.info("✓ spaCy model 'en_core_web_sm' loaded successfully")
return nlp
# Initialize NLP resources on module load - will fail fast if not available
verify_nltk_resources()
_spacy_nlp = load_spacy_model()
# ============================================================================
# Text Preprocessing and Normalization
# ============================================================================
def preprocess_email_text(text: str) -> str:
"""
Preprocess and normalize raw email text to handle multi-line input,
special characters, and formatting issues.
This function:
1. Handles None/empty input gracefully
2. Normalizes Unicode characters (e.g., smart quotes, special dashes)
3. Preserves URLs and email addresses (important phishing indicators)
4. Normalizes line breaks and whitespace
5. Removes excessive whitespace while preserving single spaces
6. Preserves semantic content and phishing indicators
Args:
text: Raw email text (may contain line breaks, tabs, special formatting)
Returns:
str: Cleaned and normalized text ready for feature extraction
Examples:
>>> preprocess_email_text("Hello\\n\\nWorld \\t Test")
'Hello World Test'
>>> preprocess_email_text("Your account\\r\\nhas been\\tsuspended")
'Your account has been suspended'
"""
# Handle None or empty input
if not text:
logger.debug("Empty text provided to preprocessor")
return ""
# Ensure text is a string
if not isinstance(text, str):
logger.warning(f"Non-string input to preprocessor: {type(text)}")
text = str(text)
# Step 1: Normalize Unicode characters
# This handles smart quotes, special dashes, accented characters, etc.
# NFKC normalization: compatibility decomposition followed by canonical composition
text = unicodedata.normalize('NFKC', text)
# Step 2: Normalize line breaks
# Convert all line break variations to single space
# This handles: \r\n (Windows), \n (Unix), \r (old Mac)
text = re.sub(r'\r\n|\r|\n', ' ', text)
# Step 3: Normalize tabs to spaces
text = text.replace('\t', ' ')
# Step 4: Remove zero-width characters and other invisible Unicode
# These can be used in obfuscation attempts
text = re.sub(r'[\u200b-\u200f\u202a-\u202e\ufeff]', '', text)
# Step 5: Normalize multiple spaces to single space
# This handles excessive whitespace while preserving word boundaries
text = re.sub(r'\s+', ' ', text)
# Step 6: Remove leading/trailing whitespace
text = text.strip()
# Step 7: Normalize common HTML entities if present
# Some emails may contain HTML entities
html_entities = {
' ': ' ',
'&': '&',
'&lt;': '<',
'&gt;': '>',
'&quot;': '"',
'&#39;': "'",
'&apos;': "'",
}
for entity, replacement in html_entities.items():
text = text.replace(entity, replacement)
# Step 8: Remove excessive punctuation repetition (e.g., "!!!!!!" -> "!")
# But preserve single instances as they may be phishing indicators
text = re.sub(r'([!?.]){3,}', r'\1\1', text)
logger.debug(f"Preprocessed text: {len(text)} chars (original: {len(text)} chars)")
return text
# ============================================================================
# Function Words and Keywords
# ============================================================================
# Mapping from langdetect ISO 639-1 codes to NLTK stopwords language names
# langdetect supports 55 languages, NLTK stopwords supports 32 languages
LANGDETECT_TO_NLTK_MAP = {
'ar': 'arabic',
'az': 'azerbaijani',
'eu': 'basque', # Basque
'be': 'belarusian', # Belarusian (added in newer NLTK)
'bn': 'bengali',
'ca': 'catalan',
'zh-cn': 'chinese',
'zh-tw': 'chinese', # Map Traditional Chinese to same stopwords
'da': 'danish',
'nl': 'dutch',
'en': 'english',
'fi': 'finnish',
'fr': 'french',
'de': 'german',
'el': 'greek',
'he': 'hebrew',
'hi': 'hinglish', # Hindi (mapped to hinglish which is Hindi-English mix)
'hu': 'hungarian',
'id': 'indonesian',
'it': 'italian',
'kk': 'kazakh',
'ne': 'nepali',
'no': 'norwegian',
'pt': 'portuguese',
'ro': 'romanian',
'ru': 'russian',
'sl': 'slovene',
'es': 'spanish',
'sv': 'swedish',
'tg': 'tajik',
'ta': 'tamil',
'tl': 'tagalog', # Filipino
'tr': 'turkish',
'sq': 'albanian', # Albanian
}
# Get set of all NLTK stopwords languages for validation
NLTK_STOPWORDS_LANGUAGES = set(stopwords.fileids())
# Minimum confidence threshold for language detection (0.0 to 1.0)
LANGUAGE_DETECTION_THRESHOLD = 0.1
class LanguageDetectionError(Exception):
"""Raised when language detection fails."""
pass
class UnsupportedLanguageError(Exception):
"""Raised when a detected language is not supported by NLTK stopwords."""
pass
def detect_languages(text: str) -> List[str]:
"""
Detect language(s) present in the text.
Uses langdetect library to identify one or more languages in the text.
Returns all languages that meet the confidence threshold.
Args:
text: The text to analyze for language detection
Returns:
List[str]: List of detected NLTK language names (e.g., ['english', 'spanish'])
Raises:
LanguageDetectionError: If language detection fails
UnsupportedLanguageError: If a detected language is not supported by NLTK stopwords
"""
if not text or not text.strip():
raise LanguageDetectionError("Cannot detect language from empty text")
# Detect languages with probabilities
detected = detect_langs(text)
if not detected:
raise LanguageDetectionError("Language detection returned no results")
# Filter by confidence threshold and map to NLTK language names
nltk_languages = []
unsupported_languages = []
for lang_prob in detected:
lang_code = str(lang_prob.lang)
probability = lang_prob.prob
# Skip low-confidence detections
if probability < LANGUAGE_DETECTION_THRESHOLD:
continue
# Map langdetect code to NLTK language name
if lang_code in LANGDETECT_TO_NLTK_MAP:
nltk_lang = LANGDETECT_TO_NLTK_MAP[lang_code]
# Verify the NLTK language is actually available
if nltk_lang in NLTK_STOPWORDS_LANGUAGES:
if nltk_lang not in nltk_languages:
nltk_languages.append(nltk_lang)
logger.debug(f"Detected language: {lang_code} -> {nltk_lang} (confidence: {probability:.2f})")
else:
# Language is in our map but not in NLTK
unsupported_languages.append((lang_code, nltk_lang, probability))
else:
# Language is not in our map at all
unsupported_languages.append((lang_code, None, probability))
# If we have unsupported languages with high confidence and no supported alternatives
if unsupported_languages and not nltk_languages:
unsupported_msgs = []
for lang_code, nltk_lang, prob in unsupported_languages:
if nltk_lang:
unsupported_msgs.append(f"{lang_code} (mapped to '{nltk_lang}' but not available in NLTK, confidence: {prob:.2f})")
else:
unsupported_msgs.append(f"{lang_code} (no NLTK mapping available, confidence: {prob:.2f})")
raise UnsupportedLanguageError(
f"Detected language(s) not supported by NLTK stopwords: {', '.join(unsupported_msgs)}"
)
if not nltk_languages:
raise LanguageDetectionError(
f"No languages detected with sufficient confidence (threshold: {LANGUAGE_DETECTION_THRESHOLD})"
)
return nltk_languages
def get_function_words(text: str) -> Set[str]:
"""
Get comprehensive set of function words (stopwords) based on detected language(s).
This function:
1. Analyzes the email text to detect the language(s) present
2. Returns stopwords for the detected language(s)
3. For mixed-language emails, returns combined stopwords from all detected languages
Args:
text: The email text to analyze for language detection
Returns:
Set[str]: Set of function words (stopwords) for the detected language(s)
Raises:
LanguageDetectionError: If language detection fails
UnsupportedLanguageError: If a detected language is not supported by NLTK stopwords
LangDetectException: If langdetect encounters an internal error
"""
# Detect language(s) in the text
detected_languages = detect_languages(text)
# Collect stopwords from all detected languages
function_words = set()
for language in detected_languages:
lang_stopwords = set(stopwords.words(language))
function_words.update(lang_stopwords)
logger.debug(f"Loaded {len(lang_stopwords)} stopwords for '{language}'")
# Add additional common function words for English if English is detected
if 'english' in detected_languages:
additional_words = {
'shall', 'might', 'must', 'ought', 'need', 'dare',
'used', 'having', 'being', 'does', 'did', 'done',
'may', 'should', 'would', 'could', 'can', 'will',
}
function_words.update(additional_words)
logger.info(f"Loaded {len(function_words)} function words for languages: {detected_languages}")
return function_words
# Phishing-related keywords (case-insensitive)
PHISHING_KEYWORDS = {
'account': r'\baccount\b',
'access': r'\baccess\b',
'bank': r'\bbank\b',
'credit': r'\bcredit\b',
'click': r'\bclick\b',
'identity': r'\bidentity\b',
'inconvenience': r'\binconvenience\b',
'information': r'\binformation\b',
'limited': r'\blimited\b',
'minutes': r'\bminutes?\b',
'password': r'\bpassword\b',
'recently': r'\brecently\b',
'risk': r'\brisk\b',
'social': r'\bsocial\b',
'security': r'\bsecurity\b',
'service': r'\bservice\b',
'suspended': r'\bsuspended\b',
}
def extract_words(text: str) -> List[str]:
"""
Extract words from text using NLTK tokenization.
Args:
text: Email content
Returns:
list: List of words (lowercase, alphabetic only)
"""
# Use NLTK's word tokenizer for better accuracy
tokens = word_tokenize(text.lower())
# Filter to keep only alphabetic words
words = [word for word in tokens if word.isalpha()]
return words
def count_keyword_occurrences(text: str, keyword: str, pattern: str) -> int:
"""
Count occurrences of a specific keyword in text.
Args:
text: Email content
keyword: Keyword name (for logging)
pattern: Regex pattern to match
Returns:
int: Count of keyword occurrences
"""
matches = re.findall(pattern, text.lower())
return len(matches)
def calculate_vocabulary_richness(words: list, total_chars: int) -> float:
"""
Calculate vocabulary richness as W/C (number of words / total characters).
Args:
words: List of words
total_chars: Total number of characters
Returns:
float: Vocabulary richness ratio
"""
if total_chars == 0:
return 0.0
num_words = len(words)
return num_words / total_chars
def calculate_function_word_ratio(words: list, text: str) -> float:
"""
Calculate the ratio of function words to total words (Function words/W).
Uses language detection to determine which stopwords to use for calculating
the function word ratio. Supports multi-language emails.
Args:
words: List of words (lowercase, alphabetic only)
text: Original email text (used for language detection)
Returns:
float: Function word ratio
Raises:
LanguageDetectionError: If language detection fails
UnsupportedLanguageError: If a detected language is not supported
LangDetectException: If langdetect encounters an internal error
"""
if len(words) == 0:
return 0.0
# Get function words based on detected language(s)
function_words = get_function_words(text)
function_word_count = sum(1 for word in words if word in function_words)
return function_word_count / len(words)
def count_unique_words(words: List[str]) -> int:
"""
Count the number of unique words in the text.
Args:
words: List of words
Returns:
int: Number of unique words
"""
return len(set(words))
# ============================================================================
# Advanced NLP Features (Optional Enhancement)
# ============================================================================
def extract_advanced_nlp_features(text: str) -> Dict[str, Any]:
"""
Extract advanced NLP features using spaCy and TextBlob.
These features provide additional insights but are not part of the core 21 features.
Args:
text: Email content
Returns:
dict: Dictionary of advanced features
"""
# Sentiment analysis using TextBlob
blob = TextBlob(text)
sentiment_polarity = blob.sentiment.polarity
sentiment_subjectivity = blob.sentiment.subjectivity
# spaCy analysis
doc = _spacy_nlp(text[:1000000]) # Limit text length for performance
# Named Entity Recognition
entities = list(doc.ents)
named_entities_count = len(entities)
# Count specific entity types
financial_entities = 0
person_entities = 0
org_entities = 0
for ent in entities:
if ent.label_ in ['MONEY', 'PERCENT', 'CARDINAL']:
financial_entities += 1
elif ent.label_ == 'PERSON':
person_entities += 1
elif ent.label_ == 'ORG':
org_entities += 1
# Part-of-speech analysis
pos_noun_ratio = 0.0
pos_verb_ratio = 0.0
pos_adj_ratio = 0.0
if len(doc) > 0:
pos_counts = {'NOUN': 0, 'VERB': 0, 'ADJ': 0}
for token in doc:
if token.pos_ in pos_counts:
pos_counts[token.pos_] += 1
total_tokens = len(doc)
pos_noun_ratio = pos_counts['NOUN'] / total_tokens
pos_verb_ratio = pos_counts['VERB'] / total_tokens
pos_adj_ratio = pos_counts['ADJ'] / total_tokens
advanced_features = {
'sentiment_polarity': sentiment_polarity,
'sentiment_subjectivity': sentiment_subjectivity,
'named_entities_count': named_entities_count,
'financial_entities': financial_entities,
'person_entities': person_entities,
'org_entities': org_entities,
'pos_noun_ratio': pos_noun_ratio,
'pos_verb_ratio': pos_verb_ratio,
'pos_adj_ratio': pos_adj_ratio,
}
logger.debug(f"Advanced NLP features extracted: {advanced_features}")
return advanced_features
def extract_features(email_text: str, include_advanced: bool = False) -> Dict[str, Any]:
"""
Extract all 21 features from email content using enhanced NLP libraries.
Features extracted (in exact order):
1. Total Number of Characters C
2. Vocabulary richness W/C
3-19. Keyword counts (Account, Access, Bank, Credit, Click, Identity,
Inconvenience, Information, Limited, Minutes, Password, Recently,
Risk, Social, Security, Service, Suspended)
20. Total number of Function words/W
21. Unique Words
Enhanced with:
- Automatic text preprocessing and normalization (handles multi-line input)
- NLTK word tokenization (more accurate than regex)
- NLTK stopwords for function word detection (more comprehensive)
- Optional spaCy analysis for advanced features
Args:
email_text: Raw email content as string (can be multi-line with formatting)
include_advanced: If True, include advanced NLP features (not used by model)
Returns:
dict: Dictionary containing all 21 features with exact column names
(plus optional advanced features if include_advanced=True)
"""
# Handle empty or None input
if not email_text or not isinstance(email_text, str):
raise ValueError("Email text must be a non-empty string")
# PREPROCESSING: Normalize and clean the raw email text
# This handles multi-line input, special characters, excessive whitespace, etc.
original_length = len(email_text)
email_text = preprocess_email_text(email_text)
if original_length > 0:
logger.debug(f"Text preprocessing: {original_length} -> {len(email_text)} chars")
# 1. Total Number of Characters C
total_chars = len(email_text)
# Extract words for further analysis (using NLTK tokenization)
words = extract_words(email_text)
# 2. Vocabulary richness W/C
vocab_richness = calculate_vocabulary_richness(words, total_chars)
# 3-19. Count keyword occurrences
keyword_counts = {}
for keyword, pattern in PHISHING_KEYWORDS.items():
count = count_keyword_occurrences(email_text, keyword, pattern)
# Capitalize first letter to match dataset column names
column_name = keyword.capitalize()
keyword_counts[column_name] = count
# 20. Total number of Function words/W (using language-aware NLTK stopwords)
function_word_ratio = calculate_function_word_ratio(words, email_text)
# 21. Unique Words
unique_words = count_unique_words(words)
# Construct features dictionary with exact column names from dataset
features = {
'Total Number of Characters C': total_chars,
'Vocabulary richness W/C': vocab_richness,
'Account': keyword_counts['Account'],
'Access': keyword_counts['Access'],
'Bank': keyword_counts['Bank'],
'Credit': keyword_counts['Credit'],
'Click': keyword_counts['Click'],
'Identity': keyword_counts['Identity'],
'Inconvenience': keyword_counts['Inconvenience'],
'Information': keyword_counts['Information'],
'Limited': keyword_counts['Limited'],
'Minutes': keyword_counts['Minutes'],
'Password': keyword_counts['Password'],
'Recently': keyword_counts['Recently'],
'Risk': keyword_counts['Risk'],
'Social': keyword_counts['Social'],
'Security': keyword_counts['Security'],
'Service': keyword_counts['Service'],
'Suspended': keyword_counts['Suspended'],
'Total number of Function words/W': function_word_ratio,
'Unique Words': unique_words,
}
logger.info(f"✓ Successfully extracted all 21 features from email (length: {total_chars} chars, words: {len(words)})")
logger.debug(f"Core features: {features}")
# Optionally include advanced NLP features
if include_advanced:
advanced = extract_advanced_nlp_features(email_text)
features['_advanced'] = advanced
logger.debug(f"Advanced features: {advanced}")
return features