Spaces:
Sleeping
Sleeping
| """ | |
| Email Feature Extraction System for Phishing Detection | |
| Extracts 21 specific features from email content using professional NLP libraries. | |
| Enhanced with: | |
| - NLTK for tokenization and stopwords | |
| - spaCy for advanced linguistic analysis | |
| - TextBlob for sentiment analysis | |
| """ | |
| import re | |
| import logging | |
| from typing import Dict, Any, List, Set | |
| import numpy as np | |
| import unicodedata | |
| # NLP Libraries | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| import spacy | |
| from textblob import TextBlob | |
| from langdetect import detect_langs, LangDetectException | |
| from langdetect import DetectorFactory | |
| # Ensure consistent language detection results | |
| DetectorFactory.seed = 0 | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # NLP Resources Initialization | |
| # ============================================================================ | |
| def verify_nltk_resources(): | |
| """ | |
| Verify that required NLTK resources are available. | |
| Raises an error if any required resource is missing. | |
| """ | |
| # Verify punkt tokenizer | |
| nltk.data.find('tokenizers/punkt') | |
| nltk.data.find('tokenizers/punkt_tab') | |
| # Verify stopwords corpus | |
| nltk.data.find('corpora/stopwords') | |
| # Verify POS tagger | |
| nltk.data.find('taggers/averaged_perceptron_tagger') | |
| logger.info("✓ NLTK resources verified") | |
| def load_spacy_model(): | |
| """ | |
| Load spaCy language model. | |
| Raises an error if the model is not installed. | |
| Returns: | |
| spacy.Language: Loaded spaCy model | |
| """ | |
| nlp = spacy.load("en_core_web_sm") | |
| logger.info("✓ spaCy model 'en_core_web_sm' loaded successfully") | |
| return nlp | |
| # Initialize NLP resources on module load - will fail fast if not available | |
| verify_nltk_resources() | |
| _spacy_nlp = load_spacy_model() | |
| # ============================================================================ | |
| # Text Preprocessing and Normalization | |
| # ============================================================================ | |
| def preprocess_email_text(text: str) -> str: | |
| """ | |
| Preprocess and normalize raw email text to handle multi-line input, | |
| special characters, and formatting issues. | |
| This function: | |
| 1. Handles None/empty input gracefully | |
| 2. Normalizes Unicode characters (e.g., smart quotes, special dashes) | |
| 3. Preserves URLs and email addresses (important phishing indicators) | |
| 4. Normalizes line breaks and whitespace | |
| 5. Removes excessive whitespace while preserving single spaces | |
| 6. Preserves semantic content and phishing indicators | |
| Args: | |
| text: Raw email text (may contain line breaks, tabs, special formatting) | |
| Returns: | |
| str: Cleaned and normalized text ready for feature extraction | |
| Examples: | |
| >>> preprocess_email_text("Hello\\n\\nWorld \\t Test") | |
| 'Hello World Test' | |
| >>> preprocess_email_text("Your account\\r\\nhas been\\tsuspended") | |
| 'Your account has been suspended' | |
| """ | |
| # Handle None or empty input | |
| if not text: | |
| logger.debug("Empty text provided to preprocessor") | |
| return "" | |
| # Ensure text is a string | |
| if not isinstance(text, str): | |
| logger.warning(f"Non-string input to preprocessor: {type(text)}") | |
| text = str(text) | |
| # Step 1: Normalize Unicode characters | |
| # This handles smart quotes, special dashes, accented characters, etc. | |
| # NFKC normalization: compatibility decomposition followed by canonical composition | |
| text = unicodedata.normalize('NFKC', text) | |
| # Step 2: Normalize line breaks | |
| # Convert all line break variations to single space | |
| # This handles: \r\n (Windows), \n (Unix), \r (old Mac) | |
| text = re.sub(r'\r\n|\r|\n', ' ', text) | |
| # Step 3: Normalize tabs to spaces | |
| text = text.replace('\t', ' ') | |
| # Step 4: Remove zero-width characters and other invisible Unicode | |
| # These can be used in obfuscation attempts | |
| text = re.sub(r'[\u200b-\u200f\u202a-\u202e\ufeff]', '', text) | |
| # Step 5: Normalize multiple spaces to single space | |
| # This handles excessive whitespace while preserving word boundaries | |
| text = re.sub(r'\s+', ' ', text) | |
| # Step 6: Remove leading/trailing whitespace | |
| text = text.strip() | |
| # Step 7: Normalize common HTML entities if present | |
| # Some emails may contain HTML entities | |
| html_entities = { | |
| ' ': ' ', | |
| '&': '&', | |
| '<': '<', | |
| '>': '>', | |
| '"': '"', | |
| ''': "'", | |
| ''': "'", | |
| } | |
| for entity, replacement in html_entities.items(): | |
| text = text.replace(entity, replacement) | |
| # Step 8: Remove excessive punctuation repetition (e.g., "!!!!!!" -> "!") | |
| # But preserve single instances as they may be phishing indicators | |
| text = re.sub(r'([!?.]){3,}', r'\1\1', text) | |
| logger.debug(f"Preprocessed text: {len(text)} chars (original: {len(text)} chars)") | |
| return text | |
| # ============================================================================ | |
| # Function Words and Keywords | |
| # ============================================================================ | |
| # Mapping from langdetect ISO 639-1 codes to NLTK stopwords language names | |
| # langdetect supports 55 languages, NLTK stopwords supports 32 languages | |
| LANGDETECT_TO_NLTK_MAP = { | |
| 'ar': 'arabic', | |
| 'az': 'azerbaijani', | |
| 'eu': 'basque', # Basque | |
| 'be': 'belarusian', # Belarusian (added in newer NLTK) | |
| 'bn': 'bengali', | |
| 'ca': 'catalan', | |
| 'zh-cn': 'chinese', | |
| 'zh-tw': 'chinese', # Map Traditional Chinese to same stopwords | |
| 'da': 'danish', | |
| 'nl': 'dutch', | |
| 'en': 'english', | |
| 'fi': 'finnish', | |
| 'fr': 'french', | |
| 'de': 'german', | |
| 'el': 'greek', | |
| 'he': 'hebrew', | |
| 'hi': 'hinglish', # Hindi (mapped to hinglish which is Hindi-English mix) | |
| 'hu': 'hungarian', | |
| 'id': 'indonesian', | |
| 'it': 'italian', | |
| 'kk': 'kazakh', | |
| 'ne': 'nepali', | |
| 'no': 'norwegian', | |
| 'pt': 'portuguese', | |
| 'ro': 'romanian', | |
| 'ru': 'russian', | |
| 'sl': 'slovene', | |
| 'es': 'spanish', | |
| 'sv': 'swedish', | |
| 'tg': 'tajik', | |
| 'ta': 'tamil', | |
| 'tl': 'tagalog', # Filipino | |
| 'tr': 'turkish', | |
| 'sq': 'albanian', # Albanian | |
| } | |
| # Get set of all NLTK stopwords languages for validation | |
| NLTK_STOPWORDS_LANGUAGES = set(stopwords.fileids()) | |
| # Minimum confidence threshold for language detection (0.0 to 1.0) | |
| LANGUAGE_DETECTION_THRESHOLD = 0.1 | |
| class LanguageDetectionError(Exception): | |
| """Raised when language detection fails.""" | |
| pass | |
| class UnsupportedLanguageError(Exception): | |
| """Raised when a detected language is not supported by NLTK stopwords.""" | |
| pass | |
| def detect_languages(text: str) -> List[str]: | |
| """ | |
| Detect language(s) present in the text. | |
| Uses langdetect library to identify one or more languages in the text. | |
| Returns all languages that meet the confidence threshold. | |
| Args: | |
| text: The text to analyze for language detection | |
| Returns: | |
| List[str]: List of detected NLTK language names (e.g., ['english', 'spanish']) | |
| Raises: | |
| LanguageDetectionError: If language detection fails | |
| UnsupportedLanguageError: If a detected language is not supported by NLTK stopwords | |
| """ | |
| if not text or not text.strip(): | |
| raise LanguageDetectionError("Cannot detect language from empty text") | |
| # Detect languages with probabilities | |
| detected = detect_langs(text) | |
| if not detected: | |
| raise LanguageDetectionError("Language detection returned no results") | |
| # Filter by confidence threshold and map to NLTK language names | |
| nltk_languages = [] | |
| unsupported_languages = [] | |
| for lang_prob in detected: | |
| lang_code = str(lang_prob.lang) | |
| probability = lang_prob.prob | |
| # Skip low-confidence detections | |
| if probability < LANGUAGE_DETECTION_THRESHOLD: | |
| continue | |
| # Map langdetect code to NLTK language name | |
| if lang_code in LANGDETECT_TO_NLTK_MAP: | |
| nltk_lang = LANGDETECT_TO_NLTK_MAP[lang_code] | |
| # Verify the NLTK language is actually available | |
| if nltk_lang in NLTK_STOPWORDS_LANGUAGES: | |
| if nltk_lang not in nltk_languages: | |
| nltk_languages.append(nltk_lang) | |
| logger.debug(f"Detected language: {lang_code} -> {nltk_lang} (confidence: {probability:.2f})") | |
| else: | |
| # Language is in our map but not in NLTK | |
| unsupported_languages.append((lang_code, nltk_lang, probability)) | |
| else: | |
| # Language is not in our map at all | |
| unsupported_languages.append((lang_code, None, probability)) | |
| # If we have unsupported languages with high confidence and no supported alternatives | |
| if unsupported_languages and not nltk_languages: | |
| unsupported_msgs = [] | |
| for lang_code, nltk_lang, prob in unsupported_languages: | |
| if nltk_lang: | |
| unsupported_msgs.append(f"{lang_code} (mapped to '{nltk_lang}' but not available in NLTK, confidence: {prob:.2f})") | |
| else: | |
| unsupported_msgs.append(f"{lang_code} (no NLTK mapping available, confidence: {prob:.2f})") | |
| raise UnsupportedLanguageError( | |
| f"Detected language(s) not supported by NLTK stopwords: {', '.join(unsupported_msgs)}" | |
| ) | |
| if not nltk_languages: | |
| raise LanguageDetectionError( | |
| f"No languages detected with sufficient confidence (threshold: {LANGUAGE_DETECTION_THRESHOLD})" | |
| ) | |
| return nltk_languages | |
| def get_function_words(text: str) -> Set[str]: | |
| """ | |
| Get comprehensive set of function words (stopwords) based on detected language(s). | |
| This function: | |
| 1. Analyzes the email text to detect the language(s) present | |
| 2. Returns stopwords for the detected language(s) | |
| 3. For mixed-language emails, returns combined stopwords from all detected languages | |
| Args: | |
| text: The email text to analyze for language detection | |
| Returns: | |
| Set[str]: Set of function words (stopwords) for the detected language(s) | |
| Raises: | |
| LanguageDetectionError: If language detection fails | |
| UnsupportedLanguageError: If a detected language is not supported by NLTK stopwords | |
| LangDetectException: If langdetect encounters an internal error | |
| """ | |
| # Detect language(s) in the text | |
| detected_languages = detect_languages(text) | |
| # Collect stopwords from all detected languages | |
| function_words = set() | |
| for language in detected_languages: | |
| lang_stopwords = set(stopwords.words(language)) | |
| function_words.update(lang_stopwords) | |
| logger.debug(f"Loaded {len(lang_stopwords)} stopwords for '{language}'") | |
| # Add additional common function words for English if English is detected | |
| if 'english' in detected_languages: | |
| additional_words = { | |
| 'shall', 'might', 'must', 'ought', 'need', 'dare', | |
| 'used', 'having', 'being', 'does', 'did', 'done', | |
| 'may', 'should', 'would', 'could', 'can', 'will', | |
| } | |
| function_words.update(additional_words) | |
| logger.info(f"Loaded {len(function_words)} function words for languages: {detected_languages}") | |
| return function_words | |
| # Phishing-related keywords (case-insensitive) | |
| PHISHING_KEYWORDS = { | |
| 'account': r'\baccount\b', | |
| 'access': r'\baccess\b', | |
| 'bank': r'\bbank\b', | |
| 'credit': r'\bcredit\b', | |
| 'click': r'\bclick\b', | |
| 'identity': r'\bidentity\b', | |
| 'inconvenience': r'\binconvenience\b', | |
| 'information': r'\binformation\b', | |
| 'limited': r'\blimited\b', | |
| 'minutes': r'\bminutes?\b', | |
| 'password': r'\bpassword\b', | |
| 'recently': r'\brecently\b', | |
| 'risk': r'\brisk\b', | |
| 'social': r'\bsocial\b', | |
| 'security': r'\bsecurity\b', | |
| 'service': r'\bservice\b', | |
| 'suspended': r'\bsuspended\b', | |
| } | |
| def extract_words(text: str) -> List[str]: | |
| """ | |
| Extract words from text using NLTK tokenization. | |
| Args: | |
| text: Email content | |
| Returns: | |
| list: List of words (lowercase, alphabetic only) | |
| """ | |
| # Use NLTK's word tokenizer for better accuracy | |
| tokens = word_tokenize(text.lower()) | |
| # Filter to keep only alphabetic words | |
| words = [word for word in tokens if word.isalpha()] | |
| return words | |
| def count_keyword_occurrences(text: str, keyword: str, pattern: str) -> int: | |
| """ | |
| Count occurrences of a specific keyword in text. | |
| Args: | |
| text: Email content | |
| keyword: Keyword name (for logging) | |
| pattern: Regex pattern to match | |
| Returns: | |
| int: Count of keyword occurrences | |
| """ | |
| matches = re.findall(pattern, text.lower()) | |
| return len(matches) | |
| def calculate_vocabulary_richness(words: list, total_chars: int) -> float: | |
| """ | |
| Calculate vocabulary richness as W/C (number of words / total characters). | |
| Args: | |
| words: List of words | |
| total_chars: Total number of characters | |
| Returns: | |
| float: Vocabulary richness ratio | |
| """ | |
| if total_chars == 0: | |
| return 0.0 | |
| num_words = len(words) | |
| return num_words / total_chars | |
| def calculate_function_word_ratio(words: list, text: str) -> float: | |
| """ | |
| Calculate the ratio of function words to total words (Function words/W). | |
| Uses language detection to determine which stopwords to use for calculating | |
| the function word ratio. Supports multi-language emails. | |
| Args: | |
| words: List of words (lowercase, alphabetic only) | |
| text: Original email text (used for language detection) | |
| Returns: | |
| float: Function word ratio | |
| Raises: | |
| LanguageDetectionError: If language detection fails | |
| UnsupportedLanguageError: If a detected language is not supported | |
| LangDetectException: If langdetect encounters an internal error | |
| """ | |
| if len(words) == 0: | |
| return 0.0 | |
| # Get function words based on detected language(s) | |
| function_words = get_function_words(text) | |
| function_word_count = sum(1 for word in words if word in function_words) | |
| return function_word_count / len(words) | |
| def count_unique_words(words: List[str]) -> int: | |
| """ | |
| Count the number of unique words in the text. | |
| Args: | |
| words: List of words | |
| Returns: | |
| int: Number of unique words | |
| """ | |
| return len(set(words)) | |
| # ============================================================================ | |
| # Advanced NLP Features (Optional Enhancement) | |
| # ============================================================================ | |
| def extract_advanced_nlp_features(text: str) -> Dict[str, Any]: | |
| """ | |
| Extract advanced NLP features using spaCy and TextBlob. | |
| These features provide additional insights but are not part of the core 21 features. | |
| Args: | |
| text: Email content | |
| Returns: | |
| dict: Dictionary of advanced features | |
| """ | |
| # Sentiment analysis using TextBlob | |
| blob = TextBlob(text) | |
| sentiment_polarity = blob.sentiment.polarity | |
| sentiment_subjectivity = blob.sentiment.subjectivity | |
| # spaCy analysis | |
| doc = _spacy_nlp(text[:1000000]) # Limit text length for performance | |
| # Named Entity Recognition | |
| entities = list(doc.ents) | |
| named_entities_count = len(entities) | |
| # Count specific entity types | |
| financial_entities = 0 | |
| person_entities = 0 | |
| org_entities = 0 | |
| for ent in entities: | |
| if ent.label_ in ['MONEY', 'PERCENT', 'CARDINAL']: | |
| financial_entities += 1 | |
| elif ent.label_ == 'PERSON': | |
| person_entities += 1 | |
| elif ent.label_ == 'ORG': | |
| org_entities += 1 | |
| # Part-of-speech analysis | |
| pos_noun_ratio = 0.0 | |
| pos_verb_ratio = 0.0 | |
| pos_adj_ratio = 0.0 | |
| if len(doc) > 0: | |
| pos_counts = {'NOUN': 0, 'VERB': 0, 'ADJ': 0} | |
| for token in doc: | |
| if token.pos_ in pos_counts: | |
| pos_counts[token.pos_] += 1 | |
| total_tokens = len(doc) | |
| pos_noun_ratio = pos_counts['NOUN'] / total_tokens | |
| pos_verb_ratio = pos_counts['VERB'] / total_tokens | |
| pos_adj_ratio = pos_counts['ADJ'] / total_tokens | |
| advanced_features = { | |
| 'sentiment_polarity': sentiment_polarity, | |
| 'sentiment_subjectivity': sentiment_subjectivity, | |
| 'named_entities_count': named_entities_count, | |
| 'financial_entities': financial_entities, | |
| 'person_entities': person_entities, | |
| 'org_entities': org_entities, | |
| 'pos_noun_ratio': pos_noun_ratio, | |
| 'pos_verb_ratio': pos_verb_ratio, | |
| 'pos_adj_ratio': pos_adj_ratio, | |
| } | |
| logger.debug(f"Advanced NLP features extracted: {advanced_features}") | |
| return advanced_features | |
| def extract_features(email_text: str, include_advanced: bool = False) -> Dict[str, Any]: | |
| """ | |
| Extract all 21 features from email content using enhanced NLP libraries. | |
| Features extracted (in exact order): | |
| 1. Total Number of Characters C | |
| 2. Vocabulary richness W/C | |
| 3-19. Keyword counts (Account, Access, Bank, Credit, Click, Identity, | |
| Inconvenience, Information, Limited, Minutes, Password, Recently, | |
| Risk, Social, Security, Service, Suspended) | |
| 20. Total number of Function words/W | |
| 21. Unique Words | |
| Enhanced with: | |
| - Automatic text preprocessing and normalization (handles multi-line input) | |
| - NLTK word tokenization (more accurate than regex) | |
| - NLTK stopwords for function word detection (more comprehensive) | |
| - Optional spaCy analysis for advanced features | |
| Args: | |
| email_text: Raw email content as string (can be multi-line with formatting) | |
| include_advanced: If True, include advanced NLP features (not used by model) | |
| Returns: | |
| dict: Dictionary containing all 21 features with exact column names | |
| (plus optional advanced features if include_advanced=True) | |
| """ | |
| # Handle empty or None input | |
| if not email_text or not isinstance(email_text, str): | |
| raise ValueError("Email text must be a non-empty string") | |
| # PREPROCESSING: Normalize and clean the raw email text | |
| # This handles multi-line input, special characters, excessive whitespace, etc. | |
| original_length = len(email_text) | |
| email_text = preprocess_email_text(email_text) | |
| if original_length > 0: | |
| logger.debug(f"Text preprocessing: {original_length} -> {len(email_text)} chars") | |
| # 1. Total Number of Characters C | |
| total_chars = len(email_text) | |
| # Extract words for further analysis (using NLTK tokenization) | |
| words = extract_words(email_text) | |
| # 2. Vocabulary richness W/C | |
| vocab_richness = calculate_vocabulary_richness(words, total_chars) | |
| # 3-19. Count keyword occurrences | |
| keyword_counts = {} | |
| for keyword, pattern in PHISHING_KEYWORDS.items(): | |
| count = count_keyword_occurrences(email_text, keyword, pattern) | |
| # Capitalize first letter to match dataset column names | |
| column_name = keyword.capitalize() | |
| keyword_counts[column_name] = count | |
| # 20. Total number of Function words/W (using language-aware NLTK stopwords) | |
| function_word_ratio = calculate_function_word_ratio(words, email_text) | |
| # 21. Unique Words | |
| unique_words = count_unique_words(words) | |
| # Construct features dictionary with exact column names from dataset | |
| features = { | |
| 'Total Number of Characters C': total_chars, | |
| 'Vocabulary richness W/C': vocab_richness, | |
| 'Account': keyword_counts['Account'], | |
| 'Access': keyword_counts['Access'], | |
| 'Bank': keyword_counts['Bank'], | |
| 'Credit': keyword_counts['Credit'], | |
| 'Click': keyword_counts['Click'], | |
| 'Identity': keyword_counts['Identity'], | |
| 'Inconvenience': keyword_counts['Inconvenience'], | |
| 'Information': keyword_counts['Information'], | |
| 'Limited': keyword_counts['Limited'], | |
| 'Minutes': keyword_counts['Minutes'], | |
| 'Password': keyword_counts['Password'], | |
| 'Recently': keyword_counts['Recently'], | |
| 'Risk': keyword_counts['Risk'], | |
| 'Social': keyword_counts['Social'], | |
| 'Security': keyword_counts['Security'], | |
| 'Service': keyword_counts['Service'], | |
| 'Suspended': keyword_counts['Suspended'], | |
| 'Total number of Function words/W': function_word_ratio, | |
| 'Unique Words': unique_words, | |
| } | |
| logger.info(f"✓ Successfully extracted all 21 features from email (length: {total_chars} chars, words: {len(words)})") | |
| logger.debug(f"Core features: {features}") | |
| # Optionally include advanced NLP features | |
| if include_advanced: | |
| advanced = extract_advanced_nlp_features(email_text) | |
| features['_advanced'] = advanced | |
| logger.debug(f"Advanced features: {advanced}") | |
| return features | |