aneeb15's picture
Initial release of Auto-FineTune-Ops
d4398e6
"""
PII (Personally Identifiable Information) Filter Module
=========================================================
Regex-based detection and masking for emails, phone numbers,
CNIC/SSN-like patterns, API keys, and addresses.
"""
import re
from dataclasses import dataclass
from typing import List, Dict, Tuple
import pandas as pd
@dataclass
class PIIFilterConfig:
"""Configuration for PII filtering."""
filter_emails: bool = False
filter_phones: bool = False
filter_id_numbers: bool = False # CNIC / SSN patterns
filter_api_keys: bool = False
filter_addresses: bool = False
mask_char: str = "[REDACTED]"
# ---------------------------------------------------------------------------
# Detection + Masking patterns
# ---------------------------------------------------------------------------
_EMAIL_PATTERN = re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
)
_PHONE_PATTERN = re.compile(
r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}'
)
# SSN: 123-45-6789, CNIC: 12345-1234567-1
_ID_NUMBER_PATTERN = re.compile(
r'\b\d{3}-\d{2}-\d{4}\b' # US SSN
r'|\b\d{5}-\d{7}-\d{1}\b' # PK CNIC
r'|\b\d{13}\b' # 13-digit ID
)
# Long hex or base64 strings that look like API keys / secrets
_API_KEY_PATTERN = re.compile(
r'\b(?:sk|pk|api|key|secret|token)[_-]?[A-Za-z0-9]{20,}\b'
r'|[A-Fa-f0-9]{32,}'
r'|[A-Za-z0-9+/]{40,}={0,2}',
re.IGNORECASE,
)
# Basic address patterns (US-style zip, PO Box, street numbers)
_ADDRESS_PATTERN = re.compile(
r'\b\d{1,5}\s+\w+\s+(?:St|Street|Ave|Avenue|Blvd|Boulevard|Dr|Drive|Rd|Road|Ln|Lane|Way|Ct|Court)\b'
r'|\bP\.?O\.?\s*Box\s+\d+\b'
r'|\b\d{5}(?:-\d{4})?\b', # Zip code
re.IGNORECASE,
)
def detect_emails(text: str) -> List[str]:
"""Find all email addresses in text."""
return _EMAIL_PATTERN.findall(text) if isinstance(text, str) else []
def mask_emails(text: str, mask: str = "[REDACTED_EMAIL]") -> str:
"""Replace email addresses with mask."""
return _EMAIL_PATTERN.sub(mask, text) if isinstance(text, str) else text
def detect_phones(text: str) -> List[str]:
"""Find all phone numbers in text."""
return _PHONE_PATTERN.findall(text) if isinstance(text, str) else []
def mask_phones(text: str, mask: str = "[REDACTED_PHONE]") -> str:
"""Replace phone numbers with mask."""
return _PHONE_PATTERN.sub(mask, text) if isinstance(text, str) else text
def detect_id_numbers(text: str) -> List[str]:
"""Find SSN/CNIC-like patterns in text."""
return _ID_NUMBER_PATTERN.findall(text) if isinstance(text, str) else []
def mask_id_numbers(text: str, mask: str = "[REDACTED_ID]") -> str:
"""Replace ID number patterns with mask."""
return _ID_NUMBER_PATTERN.sub(mask, text) if isinstance(text, str) else text
def detect_api_keys(text: str) -> List[str]:
"""Find API key / secret patterns in text."""
return _API_KEY_PATTERN.findall(text) if isinstance(text, str) else []
def mask_api_keys(text: str, mask: str = "[REDACTED_KEY]") -> str:
"""Replace API key patterns with mask."""
return _API_KEY_PATTERN.sub(mask, text) if isinstance(text, str) else text
def detect_addresses(text: str) -> List[str]:
"""Find address-like patterns in text."""
return _ADDRESS_PATTERN.findall(text) if isinstance(text, str) else []
def mask_addresses(text: str, mask: str = "[REDACTED_ADDR]") -> str:
"""Replace address patterns with mask."""
return _ADDRESS_PATTERN.sub(mask, text) if isinstance(text, str) else text
def apply_pii_filter(
text: str,
config: PIIFilterConfig,
) -> str:
"""Apply all enabled PII filters to a single text string."""
mask = config.mask_char
if config.filter_emails:
text = mask_emails(text, mask)
if config.filter_phones:
text = mask_phones(text, mask)
if config.filter_id_numbers:
text = mask_id_numbers(text, mask)
if config.filter_api_keys:
text = mask_api_keys(text, mask)
if config.filter_addresses:
text = mask_addresses(text, mask)
return text
def apply_pii_filter_df(
df: pd.DataFrame,
columns: List[str],
config: PIIFilterConfig,
) -> pd.DataFrame:
"""Apply PII filtering to specified columns of a DataFrame."""
df = df.copy()
for col in columns:
if col in df.columns:
df[col] = df[col].apply(lambda t: apply_pii_filter(str(t), config))
return df
def detect_pii_summary(
df: pd.DataFrame,
columns: List[str],
) -> Dict[str, int]:
"""
Scan columns and count PII instances found.
Returns dict like {"emails": 5, "phones": 2, ...}.
"""
summary = {"emails": 0, "phones": 0, "id_numbers": 0, "api_keys": 0, "addresses": 0}
for col in columns:
if col not in df.columns:
continue
for text in df[col].astype(str):
summary["emails"] += len(detect_emails(text))
summary["phones"] += len(detect_phones(text))
summary["id_numbers"] += len(detect_id_numbers(text))
summary["api_keys"] += len(detect_api_keys(text))
summary["addresses"] += len(detect_addresses(text))
return summary