""" PII (Personally Identifiable Information) Filter Module ========================================================= Regex-based detection and masking for emails, phone numbers, CNIC/SSN-like patterns, API keys, and addresses. """ import re from dataclasses import dataclass from typing import List, Dict, Tuple import pandas as pd @dataclass class PIIFilterConfig: """Configuration for PII filtering.""" filter_emails: bool = False filter_phones: bool = False filter_id_numbers: bool = False # CNIC / SSN patterns filter_api_keys: bool = False filter_addresses: bool = False mask_char: str = "[REDACTED]" # --------------------------------------------------------------------------- # Detection + Masking patterns # --------------------------------------------------------------------------- _EMAIL_PATTERN = re.compile( r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' ) _PHONE_PATTERN = re.compile( r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}' ) # SSN: 123-45-6789, CNIC: 12345-1234567-1 _ID_NUMBER_PATTERN = re.compile( r'\b\d{3}-\d{2}-\d{4}\b' # US SSN r'|\b\d{5}-\d{7}-\d{1}\b' # PK CNIC r'|\b\d{13}\b' # 13-digit ID ) # Long hex or base64 strings that look like API keys / secrets _API_KEY_PATTERN = re.compile( r'\b(?:sk|pk|api|key|secret|token)[_-]?[A-Za-z0-9]{20,}\b' r'|[A-Fa-f0-9]{32,}' r'|[A-Za-z0-9+/]{40,}={0,2}', re.IGNORECASE, ) # Basic address patterns (US-style zip, PO Box, street numbers) _ADDRESS_PATTERN = re.compile( r'\b\d{1,5}\s+\w+\s+(?:St|Street|Ave|Avenue|Blvd|Boulevard|Dr|Drive|Rd|Road|Ln|Lane|Way|Ct|Court)\b' r'|\bP\.?O\.?\s*Box\s+\d+\b' r'|\b\d{5}(?:-\d{4})?\b', # Zip code re.IGNORECASE, ) def detect_emails(text: str) -> List[str]: """Find all email addresses in text.""" return _EMAIL_PATTERN.findall(text) if isinstance(text, str) else [] def mask_emails(text: str, mask: str = "[REDACTED_EMAIL]") -> str: """Replace email addresses with mask.""" return _EMAIL_PATTERN.sub(mask, text) if isinstance(text, str) else text def detect_phones(text: str) -> List[str]: """Find all phone numbers in text.""" return _PHONE_PATTERN.findall(text) if isinstance(text, str) else [] def mask_phones(text: str, mask: str = "[REDACTED_PHONE]") -> str: """Replace phone numbers with mask.""" return _PHONE_PATTERN.sub(mask, text) if isinstance(text, str) else text def detect_id_numbers(text: str) -> List[str]: """Find SSN/CNIC-like patterns in text.""" return _ID_NUMBER_PATTERN.findall(text) if isinstance(text, str) else [] def mask_id_numbers(text: str, mask: str = "[REDACTED_ID]") -> str: """Replace ID number patterns with mask.""" return _ID_NUMBER_PATTERN.sub(mask, text) if isinstance(text, str) else text def detect_api_keys(text: str) -> List[str]: """Find API key / secret patterns in text.""" return _API_KEY_PATTERN.findall(text) if isinstance(text, str) else [] def mask_api_keys(text: str, mask: str = "[REDACTED_KEY]") -> str: """Replace API key patterns with mask.""" return _API_KEY_PATTERN.sub(mask, text) if isinstance(text, str) else text def detect_addresses(text: str) -> List[str]: """Find address-like patterns in text.""" return _ADDRESS_PATTERN.findall(text) if isinstance(text, str) else [] def mask_addresses(text: str, mask: str = "[REDACTED_ADDR]") -> str: """Replace address patterns with mask.""" return _ADDRESS_PATTERN.sub(mask, text) if isinstance(text, str) else text def apply_pii_filter( text: str, config: PIIFilterConfig, ) -> str: """Apply all enabled PII filters to a single text string.""" mask = config.mask_char if config.filter_emails: text = mask_emails(text, mask) if config.filter_phones: text = mask_phones(text, mask) if config.filter_id_numbers: text = mask_id_numbers(text, mask) if config.filter_api_keys: text = mask_api_keys(text, mask) if config.filter_addresses: text = mask_addresses(text, mask) return text def apply_pii_filter_df( df: pd.DataFrame, columns: List[str], config: PIIFilterConfig, ) -> pd.DataFrame: """Apply PII filtering to specified columns of a DataFrame.""" df = df.copy() for col in columns: if col in df.columns: df[col] = df[col].apply(lambda t: apply_pii_filter(str(t), config)) return df def detect_pii_summary( df: pd.DataFrame, columns: List[str], ) -> Dict[str, int]: """ Scan columns and count PII instances found. Returns dict like {"emails": 5, "phones": 2, ...}. """ summary = {"emails": 0, "phones": 0, "id_numbers": 0, "api_keys": 0, "addresses": 0} for col in columns: if col not in df.columns: continue for text in df[col].astype(str): summary["emails"] += len(detect_emails(text)) summary["phones"] += len(detect_phones(text)) summary["id_numbers"] += len(detect_id_numbers(text)) summary["api_keys"] += len(detect_api_keys(text)) summary["addresses"] += len(detect_addresses(text)) return summary