tostido's picture
Initial commit - cascade-lattice 0.5.4
77bcbf1
"""
PII Detection for CASCADE
Industry standard PII (Personally Identifiable Information) detection
based on Microsoft Presidio patterns and common PII taxonomies.
References:
- Microsoft Presidio: https://github.com/microsoft/presidio
- NIST PII Guide: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf
- GDPR Article 4 (personal data definition)
PII Categories:
1. Direct Identifiers: Name, SSN, passport, driver's license
2. Quasi-Identifiers: Age, ZIP code, gender, dates
3. Sensitive Data: Health, financial, biometric
Detection Methods:
- Regex patterns (fast, high precision for structured PII)
- Context-aware detection (surrounding words improve accuracy)
- Checksum validation (SSN, credit cards, etc.)
"""
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Pattern, Set, Tuple
class PIIType(Enum):
"""Types of PII that can be detected."""
# Direct Identifiers
PERSON_NAME = "PERSON_NAME"
EMAIL = "EMAIL"
PHONE_NUMBER = "PHONE_NUMBER"
SSN = "SSN" # Social Security Number
CREDIT_CARD = "CREDIT_CARD"
IBAN = "IBAN" # International Bank Account Number
IP_ADDRESS = "IP_ADDRESS"
MAC_ADDRESS = "MAC_ADDRESS"
PASSPORT = "PASSPORT"
DRIVERS_LICENSE = "DRIVERS_LICENSE"
# Quasi-Identifiers
DATE_OF_BIRTH = "DATE_OF_BIRTH"
AGE = "AGE"
ZIPCODE = "ZIPCODE"
ADDRESS = "ADDRESS"
# Sensitive Data
MEDICAL_RECORD = "MEDICAL_RECORD"
API_KEY = "API_KEY"
AWS_KEY = "AWS_KEY"
PASSWORD = "PASSWORD"
CRYPTO_WALLET = "CRYPTO_WALLET"
# Location
GPS_COORDINATES = "GPS_COORDINATES"
# URLs and IDs
URL = "URL"
USERNAME = "USERNAME"
class PIISeverity(Enum):
"""Severity levels for PII findings."""
CRITICAL = "critical" # Direct identifier, immediate re-identification risk
HIGH = "high" # Sensitive data, significant privacy risk
MEDIUM = "medium" # Quasi-identifier, re-identification when combined
LOW = "low" # Minimal risk, contextual sensitivity
@dataclass
class PIIMatch:
"""A detected PII instance."""
pii_type: PIIType
severity: PIISeverity
value: str # The matched text (may be redacted for display)
start: int # Start position in text
end: int # End position in text
confidence: float # 0.0 to 1.0
context: str = "" # Surrounding text for context
field_name: str = "" # Column/field where found
row_index: int = -1 # Row index if applicable
def to_dict(self) -> Dict[str, Any]:
return {
"type": self.pii_type.value,
"severity": self.severity.value,
"value_preview": self._redact(self.value),
"start": self.start,
"end": self.end,
"confidence": self.confidence,
"field_name": self.field_name,
"row_index": self.row_index,
}
def _redact(self, value: str, show_chars: int = 4) -> str:
"""Partially redact the value for display."""
if len(value) <= show_chars:
return "*" * len(value)
return value[:show_chars] + "*" * (len(value) - show_chars)
@dataclass
class PIIPattern:
"""A pattern for detecting PII."""
pii_type: PIIType
severity: PIISeverity
pattern: Pattern
confidence: float = 0.85
validator: Optional[Callable[[str], bool]] = None # Additional validation
context_patterns: List[str] = field(default_factory=list) # Boost confidence if context matches
@dataclass
class PIIScanResult:
"""Result of scanning content for PII."""
total_matches: int = 0
matches_by_type: Dict[str, int] = field(default_factory=dict)
matches_by_severity: Dict[str, int] = field(default_factory=dict)
matches_by_field: Dict[str, int] = field(default_factory=dict)
sample_matches: List[PIIMatch] = field(default_factory=list) # First N matches
fields_with_pii: Set[str] = field(default_factory=set)
high_risk_fields: Set[str] = field(default_factory=set) # Fields with CRITICAL/HIGH PII
def to_dict(self) -> Dict[str, Any]:
return {
"total_matches": self.total_matches,
"matches_by_type": self.matches_by_type,
"matches_by_severity": self.matches_by_severity,
"matches_by_field": self.matches_by_field,
"fields_with_pii": list(self.fields_with_pii),
"high_risk_fields": list(self.high_risk_fields),
"sample_matches": [m.to_dict() for m in self.sample_matches[:10]],
}
def has_critical_pii(self) -> bool:
"""Check if any critical PII was found."""
return self.matches_by_severity.get("critical", 0) > 0
def has_high_risk_pii(self) -> bool:
"""Check if any high-risk PII was found."""
return (
self.matches_by_severity.get("critical", 0) > 0 or
self.matches_by_severity.get("high", 0) > 0
)
@property
def summary(self) -> str:
"""Human-readable summary."""
if self.total_matches == 0:
return "No PII detected"
lines = [f"Found {self.total_matches} PII instance(s):"]
for sev in ["critical", "high", "medium", "low"]:
count = self.matches_by_severity.get(sev, 0)
if count > 0:
lines.append(f" • {sev.upper()}: {count}")
if self.high_risk_fields:
lines.append(f" ⚠ High-risk fields: {', '.join(self.high_risk_fields)}")
return "\n".join(lines)
# ═══════════════════════════════════════════════════════════════════════════════
# VALIDATION FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════════
def validate_luhn(card_number: str) -> bool:
"""
Validate credit card using Luhn algorithm.
Used by Visa, MasterCard, American Express, etc.
"""
digits = [int(d) for d in re.sub(r'\D', '', card_number)]
if len(digits) < 13 or len(digits) > 19:
return False
# Luhn checksum
checksum = 0
for i, digit in enumerate(reversed(digits)):
if i % 2 == 1:
digit *= 2
if digit > 9:
digit -= 9
checksum += digit
return checksum % 10 == 0
def validate_ssn(ssn: str) -> bool:
"""
Validate US Social Security Number format.
SSN format: AAA-BB-CCCC
- AAA: Area number (001-899, excluding 666)
- BB: Group number (01-99)
- CCCC: Serial number (0001-9999)
"""
clean = re.sub(r'\D', '', ssn)
if len(clean) != 9:
return False
area = int(clean[:3])
group = int(clean[3:5])
serial = int(clean[5:])
# Invalid patterns
if area == 0 or area == 666 or area >= 900:
return False
if group == 0:
return False
if serial == 0:
return False
# Known invalid SSNs (advertising, testing)
invalid_ssns = {
"078051120", # Woolworth promotional
"219099999", # Advertising
}
if clean in invalid_ssns:
return False
return True
def validate_iban(iban: str) -> bool:
"""
Validate IBAN using MOD-97 checksum.
"""
clean = re.sub(r'\s', '', iban).upper()
if len(clean) < 15 or len(clean) > 34:
return False
# Move country code and check digits to end
rearranged = clean[4:] + clean[:4]
# Convert letters to numbers (A=10, B=11, etc.)
numeric = ""
for char in rearranged:
if char.isdigit():
numeric += char
else:
numeric += str(ord(char) - ord('A') + 10)
# MOD 97 check
return int(numeric) % 97 == 1
# ═══════════════════════════════════════════════════════════════════════════════
# PII PATTERNS (Based on Microsoft Presidio)
# ═══════════════════════════════════════════════════════════════════════════════
PII_PATTERNS: List[PIIPattern] = [
# Email - RFC 5322 simplified
PIIPattern(
pii_type=PIIType.EMAIL,
severity=PIISeverity.HIGH,
pattern=re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
re.IGNORECASE
),
confidence=0.95,
context_patterns=["email", "e-mail", "contact", "mail"],
),
# Phone Number - International formats
PIIPattern(
pii_type=PIIType.PHONE_NUMBER,
severity=PIISeverity.MEDIUM,
pattern=re.compile(
r'''
(?:
\+?1?[-.\s]? # Country code
\(?[2-9]\d{2}\)?[-.\s]? # Area code
[2-9]\d{2}[-.\s]? # Exchange
\d{4} # Subscriber
|
\+?\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]? # International
\d{1,4}[-.\s]?\d{1,9}
)
''',
re.VERBOSE
),
confidence=0.75,
context_patterns=["phone", "tel", "mobile", "cell", "call", "fax"],
),
# SSN - US Social Security Number
PIIPattern(
pii_type=PIIType.SSN,
severity=PIISeverity.CRITICAL,
pattern=re.compile(
r'\b(?!000|666|9\d{2})\d{3}[-\s]?(?!00)\d{2}[-\s]?(?!0000)\d{4}\b'
),
confidence=0.85,
validator=validate_ssn,
context_patterns=["ssn", "social security", "tax id", "taxpayer"],
),
# Credit Card - Major card formats
PIIPattern(
pii_type=PIIType.CREDIT_CARD,
severity=PIISeverity.CRITICAL,
pattern=re.compile(
r'''
\b(?:
4[0-9]{12}(?:[0-9]{3})? # Visa
|
5[1-5][0-9]{14} # MasterCard
|
3[47][0-9]{13} # American Express
|
6(?:011|5[0-9]{2})[0-9]{12} # Discover
|
(?:2131|1800|35\d{3})\d{11} # JCB
)\b
|
\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b # Spaced format
''',
re.VERBOSE
),
confidence=0.90,
validator=validate_luhn,
context_patterns=["card", "credit", "visa", "mastercard", "amex", "payment"],
),
# IP Address - IPv4
PIIPattern(
pii_type=PIIType.IP_ADDRESS,
severity=PIISeverity.MEDIUM,
pattern=re.compile(
r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
),
confidence=0.90,
context_patterns=["ip", "address", "server", "host", "client"],
),
# IP Address - IPv6
PIIPattern(
pii_type=PIIType.IP_ADDRESS,
severity=PIISeverity.MEDIUM,
pattern=re.compile(
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
),
confidence=0.90,
),
# MAC Address
PIIPattern(
pii_type=PIIType.MAC_ADDRESS,
severity=PIISeverity.LOW,
pattern=re.compile(
r'\b(?:[0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}\b'
),
confidence=0.95,
),
# IBAN - International Bank Account Number
PIIPattern(
pii_type=PIIType.IBAN,
severity=PIISeverity.CRITICAL,
pattern=re.compile(
r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b',
re.IGNORECASE
),
confidence=0.85,
validator=validate_iban,
context_patterns=["iban", "bank", "account", "transfer"],
),
# API Key patterns
PIIPattern(
pii_type=PIIType.API_KEY,
severity=PIISeverity.CRITICAL,
pattern=re.compile(
r'''
(?:
sk[-_]live[-_][a-zA-Z0-9]{24,} # Stripe
|
sk[-_]test[-_][a-zA-Z0-9]{24,} # Stripe test
|
pk[-_]live[-_][a-zA-Z0-9]{24,} # Stripe public
|
ghp_[a-zA-Z0-9]{36} # GitHub PAT
|
gho_[a-zA-Z0-9]{36} # GitHub OAuth
|
github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59} # GitHub fine-grained
|
xox[baprs]-[a-zA-Z0-9-]{10,} # Slack
|
ya29\.[a-zA-Z0-9_-]+ # Google OAuth
)
''',
re.VERBOSE
),
confidence=0.95,
context_patterns=["api", "key", "token", "secret", "auth"],
),
# AWS Access Key
PIIPattern(
pii_type=PIIType.AWS_KEY,
severity=PIISeverity.CRITICAL,
pattern=re.compile(
r'\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b'
),
confidence=0.95,
context_patterns=["aws", "amazon", "key", "access"],
),
# Crypto Wallet - Bitcoin
PIIPattern(
pii_type=PIIType.CRYPTO_WALLET,
severity=PIISeverity.HIGH,
pattern=re.compile(
r'\b(?:bc1|[13])[a-zA-HJ-NP-Z0-9]{25,39}\b'
),
confidence=0.80,
context_patterns=["bitcoin", "btc", "wallet", "crypto"],
),
# Crypto Wallet - Ethereum
PIIPattern(
pii_type=PIIType.CRYPTO_WALLET,
severity=PIISeverity.HIGH,
pattern=re.compile(
r'\b0x[a-fA-F0-9]{40}\b'
),
confidence=0.80,
context_patterns=["ethereum", "eth", "wallet", "crypto"],
),
# GPS Coordinates
PIIPattern(
pii_type=PIIType.GPS_COORDINATES,
severity=PIISeverity.MEDIUM,
pattern=re.compile(
r'[-+]?(?:[1-8]?\d(?:\.\d+)?|90(?:\.0+)?)\s*,\s*[-+]?(?:180(?:\.0+)?|(?:(?:1[0-7]\d)|(?:[1-9]?\d))(?:\.\d+)?)'
),
confidence=0.70,
context_patterns=["location", "coordinates", "lat", "lng", "gps"],
),
# Date of Birth patterns
PIIPattern(
pii_type=PIIType.DATE_OF_BIRTH,
severity=PIISeverity.MEDIUM,
pattern=re.compile(
r'\b(?:0?[1-9]|1[0-2])[/\-.](?:0?[1-9]|[12]\d|3[01])[/\-.](?:19|20)\d{2}\b'
),
confidence=0.60, # Low base - needs context
context_patterns=["birth", "dob", "born", "birthday", "date of birth"],
),
# US ZIP Code
PIIPattern(
pii_type=PIIType.ZIPCODE,
severity=PIISeverity.LOW,
pattern=re.compile(
r'\b\d{5}(?:-\d{4})?\b'
),
confidence=0.50, # Low - needs context
context_patterns=["zip", "postal", "address", "code"],
),
# URL (can contain sensitive info in path/query)
PIIPattern(
pii_type=PIIType.URL,
severity=PIISeverity.LOW,
pattern=re.compile(
r'https?://[^\s<>"{}|\\^`\[\]]+',
re.IGNORECASE
),
confidence=0.70,
),
]
class PIIScanner:
"""
Scanner for detecting PII in text and datasets.
Uses regex patterns with optional validation and context boosting.
"""
def __init__(
self,
patterns: List[PIIPattern] = None,
min_confidence: float = 0.5,
context_boost: float = 0.1,
):
"""
Initialize scanner.
Args:
patterns: Custom patterns (defaults to PII_PATTERNS)
min_confidence: Minimum confidence to report (0.0-1.0)
context_boost: Confidence boost when context matches
"""
self.patterns = patterns or PII_PATTERNS
self.min_confidence = min_confidence
self.context_boost = context_boost
def scan_text(
self,
text: str,
field_name: str = "",
row_index: int = -1,
) -> List[PIIMatch]:
"""
Scan text for PII.
Args:
text: Text to scan
field_name: Optional field name for tracking
row_index: Optional row index for tracking
Returns:
List of PIIMatch objects
"""
if not text or not isinstance(text, str):
return []
matches = []
text_lower = text.lower()
for pattern in self.patterns:
for match in pattern.pattern.finditer(text):
value = match.group()
confidence = pattern.confidence
# Validate if validator provided
if pattern.validator:
if not pattern.validator(value):
continue
# Context boost
if pattern.context_patterns:
for ctx in pattern.context_patterns:
if ctx in text_lower:
confidence = min(1.0, confidence + self.context_boost)
break
# Apply minimum confidence filter
if confidence >= self.min_confidence:
# Get surrounding context (50 chars each side)
start = max(0, match.start() - 50)
end = min(len(text), match.end() + 50)
context = text[start:end]
matches.append(PIIMatch(
pii_type=pattern.pii_type,
severity=pattern.severity,
value=value,
start=match.start(),
end=match.end(),
confidence=confidence,
context=context,
field_name=field_name,
row_index=row_index,
))
return matches
def scan_dict(
self,
data: Dict[str, List[Any]],
sample_size: int = 1000,
) -> PIIScanResult:
"""
Scan a columnar dict for PII.
Args:
data: Dict of column_name -> values
sample_size: Max rows to scan per column
Returns:
PIIScanResult with aggregated findings
"""
result = PIIScanResult()
for field_name, values in data.items():
if not values:
continue
# Sample values
sample = values[:sample_size]
for row_idx, value in enumerate(sample):
if not isinstance(value, str):
value = str(value) if value is not None else ""
matches = self.scan_text(value, field_name, row_idx)
for match in matches:
result.total_matches += 1
# Count by type
type_name = match.pii_type.value
result.matches_by_type[type_name] = result.matches_by_type.get(type_name, 0) + 1
# Count by severity
sev = match.severity.value
result.matches_by_severity[sev] = result.matches_by_severity.get(sev, 0) + 1
# Count by field
result.matches_by_field[field_name] = result.matches_by_field.get(field_name, 0) + 1
# Track fields
result.fields_with_pii.add(field_name)
if match.severity in [PIISeverity.CRITICAL, PIISeverity.HIGH]:
result.high_risk_fields.add(field_name)
# Keep samples
if len(result.sample_matches) < 100:
result.sample_matches.append(match)
return result
def scan_dataset(
self,
dataset,
sample_size: int = 1000,
) -> PIIScanResult:
"""
Scan a HuggingFace Dataset or DatasetDict for PII.
Args:
dataset: HuggingFace Dataset or DatasetDict
sample_size: Max rows to scan
Returns:
PIIScanResult with aggregated findings
"""
# Handle DatasetDict (multiple splits)
if hasattr(dataset, 'keys') and callable(dataset.keys):
combined = PIIScanResult()
for split_name in dataset.keys():
split_result = self.scan_dataset(dataset[split_name], sample_size)
# Merge results
combined.total_matches += split_result.total_matches
for k, v in split_result.matches_by_type.items():
combined.matches_by_type[k] = combined.matches_by_type.get(k, 0) + v
for k, v in split_result.matches_by_severity.items():
combined.matches_by_severity[k] = combined.matches_by_severity.get(k, 0) + v
for k, v in split_result.matches_by_field.items():
combined.matches_by_field[k] = combined.matches_by_field.get(k, 0) + v
combined.fields_with_pii.update(split_result.fields_with_pii)
combined.high_risk_fields.update(split_result.high_risk_fields)
combined.sample_matches.extend(split_result.sample_matches[:20])
return combined
# Single Dataset
result = PIIScanResult()
# Get column names
if hasattr(dataset, 'features'):
columns = list(dataset.features.keys())
elif hasattr(dataset, 'column_names'):
columns = dataset.column_names
else:
return result
# Sample rows
num_rows = len(dataset) if hasattr(dataset, '__len__') else sample_size
sample_indices = range(min(sample_size, num_rows))
for idx in sample_indices:
row = dataset[idx]
for col in columns:
value = row.get(col) if isinstance(row, dict) else getattr(row, col, None)
if not isinstance(value, str):
value = str(value) if value is not None else ""
matches = self.scan_text(value, col, idx)
for match in matches:
result.total_matches += 1
type_name = match.pii_type.value
result.matches_by_type[type_name] = result.matches_by_type.get(type_name, 0) + 1
sev = match.severity.value
result.matches_by_severity[sev] = result.matches_by_severity.get(sev, 0) + 1
result.matches_by_field[col] = result.matches_by_field.get(col, 0) + 1
result.fields_with_pii.add(col)
if match.severity in [PIISeverity.CRITICAL, PIISeverity.HIGH]:
result.high_risk_fields.add(col)
if len(result.sample_matches) < 100:
result.sample_matches.append(match)
return result
# Singleton scanner
_scanner = PIIScanner()
def scan_for_pii(
data,
sample_size: int = 1000,
min_confidence: float = 0.5,
) -> PIIScanResult:
"""
Convenience function to scan data for PII.
Args:
data: Text, dict, or HuggingFace Dataset
sample_size: Max rows to scan
min_confidence: Minimum confidence threshold
Returns:
PIIScanResult with findings
"""
scanner = PIIScanner(min_confidence=min_confidence)
if isinstance(data, str):
matches = scanner.scan_text(data)
result = PIIScanResult(
total_matches=len(matches),
sample_matches=matches,
)
for m in matches:
result.matches_by_type[m.pii_type.value] = result.matches_by_type.get(m.pii_type.value, 0) + 1
result.matches_by_severity[m.severity.value] = result.matches_by_severity.get(m.severity.value, 0) + 1
return result
if isinstance(data, dict):
return scanner.scan_dict(data, sample_size)
# Assume HuggingFace Dataset
return scanner.scan_dataset(data, sample_size)
def quick_pii_check(data, sample_size: int = 100) -> bool:
"""
Quick check if data contains any PII.
Returns True if PII is found, False otherwise.
"""
result = scan_for_pii(data, sample_size=sample_size, min_confidence=0.7)
return result.total_matches > 0