Spaces:
Configuration error
Configuration error
| """ | |
| PII (Personally Identifiable Information) Filter Module | |
| ========================================================= | |
| Regex-based detection and masking for emails, phone numbers, | |
| CNIC/SSN-like patterns, API keys, and addresses. | |
| """ | |
| import re | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Tuple | |
| import pandas as pd | |
| class PIIFilterConfig: | |
| """Configuration for PII filtering.""" | |
| filter_emails: bool = False | |
| filter_phones: bool = False | |
| filter_id_numbers: bool = False # CNIC / SSN patterns | |
| filter_api_keys: bool = False | |
| filter_addresses: bool = False | |
| mask_char: str = "[REDACTED]" | |
| # --------------------------------------------------------------------------- | |
| # Detection + Masking patterns | |
| # --------------------------------------------------------------------------- | |
| _EMAIL_PATTERN = re.compile( | |
| r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| ) | |
| _PHONE_PATTERN = re.compile( | |
| r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}' | |
| ) | |
| # SSN: 123-45-6789, CNIC: 12345-1234567-1 | |
| _ID_NUMBER_PATTERN = re.compile( | |
| r'\b\d{3}-\d{2}-\d{4}\b' # US SSN | |
| r'|\b\d{5}-\d{7}-\d{1}\b' # PK CNIC | |
| r'|\b\d{13}\b' # 13-digit ID | |
| ) | |
| # Long hex or base64 strings that look like API keys / secrets | |
| _API_KEY_PATTERN = re.compile( | |
| r'\b(?:sk|pk|api|key|secret|token)[_-]?[A-Za-z0-9]{20,}\b' | |
| r'|[A-Fa-f0-9]{32,}' | |
| r'|[A-Za-z0-9+/]{40,}={0,2}', | |
| re.IGNORECASE, | |
| ) | |
| # Basic address patterns (US-style zip, PO Box, street numbers) | |
| _ADDRESS_PATTERN = re.compile( | |
| r'\b\d{1,5}\s+\w+\s+(?:St|Street|Ave|Avenue|Blvd|Boulevard|Dr|Drive|Rd|Road|Ln|Lane|Way|Ct|Court)\b' | |
| r'|\bP\.?O\.?\s*Box\s+\d+\b' | |
| r'|\b\d{5}(?:-\d{4})?\b', # Zip code | |
| re.IGNORECASE, | |
| ) | |
| def detect_emails(text: str) -> List[str]: | |
| """Find all email addresses in text.""" | |
| return _EMAIL_PATTERN.findall(text) if isinstance(text, str) else [] | |
| def mask_emails(text: str, mask: str = "[REDACTED_EMAIL]") -> str: | |
| """Replace email addresses with mask.""" | |
| return _EMAIL_PATTERN.sub(mask, text) if isinstance(text, str) else text | |
| def detect_phones(text: str) -> List[str]: | |
| """Find all phone numbers in text.""" | |
| return _PHONE_PATTERN.findall(text) if isinstance(text, str) else [] | |
| def mask_phones(text: str, mask: str = "[REDACTED_PHONE]") -> str: | |
| """Replace phone numbers with mask.""" | |
| return _PHONE_PATTERN.sub(mask, text) if isinstance(text, str) else text | |
| def detect_id_numbers(text: str) -> List[str]: | |
| """Find SSN/CNIC-like patterns in text.""" | |
| return _ID_NUMBER_PATTERN.findall(text) if isinstance(text, str) else [] | |
| def mask_id_numbers(text: str, mask: str = "[REDACTED_ID]") -> str: | |
| """Replace ID number patterns with mask.""" | |
| return _ID_NUMBER_PATTERN.sub(mask, text) if isinstance(text, str) else text | |
| def detect_api_keys(text: str) -> List[str]: | |
| """Find API key / secret patterns in text.""" | |
| return _API_KEY_PATTERN.findall(text) if isinstance(text, str) else [] | |
| def mask_api_keys(text: str, mask: str = "[REDACTED_KEY]") -> str: | |
| """Replace API key patterns with mask.""" | |
| return _API_KEY_PATTERN.sub(mask, text) if isinstance(text, str) else text | |
| def detect_addresses(text: str) -> List[str]: | |
| """Find address-like patterns in text.""" | |
| return _ADDRESS_PATTERN.findall(text) if isinstance(text, str) else [] | |
| def mask_addresses(text: str, mask: str = "[REDACTED_ADDR]") -> str: | |
| """Replace address patterns with mask.""" | |
| return _ADDRESS_PATTERN.sub(mask, text) if isinstance(text, str) else text | |
| def apply_pii_filter( | |
| text: str, | |
| config: PIIFilterConfig, | |
| ) -> str: | |
| """Apply all enabled PII filters to a single text string.""" | |
| mask = config.mask_char | |
| if config.filter_emails: | |
| text = mask_emails(text, mask) | |
| if config.filter_phones: | |
| text = mask_phones(text, mask) | |
| if config.filter_id_numbers: | |
| text = mask_id_numbers(text, mask) | |
| if config.filter_api_keys: | |
| text = mask_api_keys(text, mask) | |
| if config.filter_addresses: | |
| text = mask_addresses(text, mask) | |
| return text | |
| def apply_pii_filter_df( | |
| df: pd.DataFrame, | |
| columns: List[str], | |
| config: PIIFilterConfig, | |
| ) -> pd.DataFrame: | |
| """Apply PII filtering to specified columns of a DataFrame.""" | |
| df = df.copy() | |
| for col in columns: | |
| if col in df.columns: | |
| df[col] = df[col].apply(lambda t: apply_pii_filter(str(t), config)) | |
| return df | |
| def detect_pii_summary( | |
| df: pd.DataFrame, | |
| columns: List[str], | |
| ) -> Dict[str, int]: | |
| """ | |
| Scan columns and count PII instances found. | |
| Returns dict like {"emails": 5, "phones": 2, ...}. | |
| """ | |
| summary = {"emails": 0, "phones": 0, "id_numbers": 0, "api_keys": 0, "addresses": 0} | |
| for col in columns: | |
| if col not in df.columns: | |
| continue | |
| for text in df[col].astype(str): | |
| summary["emails"] += len(detect_emails(text)) | |
| summary["phones"] += len(detect_phones(text)) | |
| summary["id_numbers"] += len(detect_id_numbers(text)) | |
| summary["api_keys"] += len(detect_api_keys(text)) | |
| summary["addresses"] += len(detect_addresses(text)) | |
| return summary | |