Spaces:
Configuration error
Configuration error
| """ | |
| PII Detection for CASCADE | |
| Industry standard PII (Personally Identifiable Information) detection | |
| based on Microsoft Presidio patterns and common PII taxonomies. | |
| References: | |
| - Microsoft Presidio: https://github.com/microsoft/presidio | |
| - NIST PII Guide: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf | |
| - GDPR Article 4 (personal data definition) | |
| PII Categories: | |
| 1. Direct Identifiers: Name, SSN, passport, driver's license | |
| 2. Quasi-Identifiers: Age, ZIP code, gender, dates | |
| 3. Sensitive Data: Health, financial, biometric | |
| Detection Methods: | |
| - Regex patterns (fast, high precision for structured PII) | |
| - Context-aware detection (surrounding words improve accuracy) | |
| - Checksum validation (SSN, credit cards, etc.) | |
| """ | |
| import re | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Callable, Dict, List, Optional, Pattern, Set, Tuple | |
| class PIIType(Enum): | |
| """Types of PII that can be detected.""" | |
| # Direct Identifiers | |
| PERSON_NAME = "PERSON_NAME" | |
| EMAIL = "EMAIL" | |
| PHONE_NUMBER = "PHONE_NUMBER" | |
| SSN = "SSN" # Social Security Number | |
| CREDIT_CARD = "CREDIT_CARD" | |
| IBAN = "IBAN" # International Bank Account Number | |
| IP_ADDRESS = "IP_ADDRESS" | |
| MAC_ADDRESS = "MAC_ADDRESS" | |
| PASSPORT = "PASSPORT" | |
| DRIVERS_LICENSE = "DRIVERS_LICENSE" | |
| # Quasi-Identifiers | |
| DATE_OF_BIRTH = "DATE_OF_BIRTH" | |
| AGE = "AGE" | |
| ZIPCODE = "ZIPCODE" | |
| ADDRESS = "ADDRESS" | |
| # Sensitive Data | |
| MEDICAL_RECORD = "MEDICAL_RECORD" | |
| API_KEY = "API_KEY" | |
| AWS_KEY = "AWS_KEY" | |
| PASSWORD = "PASSWORD" | |
| CRYPTO_WALLET = "CRYPTO_WALLET" | |
| # Location | |
| GPS_COORDINATES = "GPS_COORDINATES" | |
| # URLs and IDs | |
| URL = "URL" | |
| USERNAME = "USERNAME" | |
| class PIISeverity(Enum): | |
| """Severity levels for PII findings.""" | |
| CRITICAL = "critical" # Direct identifier, immediate re-identification risk | |
| HIGH = "high" # Sensitive data, significant privacy risk | |
| MEDIUM = "medium" # Quasi-identifier, re-identification when combined | |
| LOW = "low" # Minimal risk, contextual sensitivity | |
| class PIIMatch: | |
| """A detected PII instance.""" | |
| pii_type: PIIType | |
| severity: PIISeverity | |
| value: str # The matched text (may be redacted for display) | |
| start: int # Start position in text | |
| end: int # End position in text | |
| confidence: float # 0.0 to 1.0 | |
| context: str = "" # Surrounding text for context | |
| field_name: str = "" # Column/field where found | |
| row_index: int = -1 # Row index if applicable | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "type": self.pii_type.value, | |
| "severity": self.severity.value, | |
| "value_preview": self._redact(self.value), | |
| "start": self.start, | |
| "end": self.end, | |
| "confidence": self.confidence, | |
| "field_name": self.field_name, | |
| "row_index": self.row_index, | |
| } | |
| def _redact(self, value: str, show_chars: int = 4) -> str: | |
| """Partially redact the value for display.""" | |
| if len(value) <= show_chars: | |
| return "*" * len(value) | |
| return value[:show_chars] + "*" * (len(value) - show_chars) | |
| class PIIPattern: | |
| """A pattern for detecting PII.""" | |
| pii_type: PIIType | |
| severity: PIISeverity | |
| pattern: Pattern | |
| confidence: float = 0.85 | |
| validator: Optional[Callable[[str], bool]] = None # Additional validation | |
| context_patterns: List[str] = field(default_factory=list) # Boost confidence if context matches | |
| class PIIScanResult: | |
| """Result of scanning content for PII.""" | |
| total_matches: int = 0 | |
| matches_by_type: Dict[str, int] = field(default_factory=dict) | |
| matches_by_severity: Dict[str, int] = field(default_factory=dict) | |
| matches_by_field: Dict[str, int] = field(default_factory=dict) | |
| sample_matches: List[PIIMatch] = field(default_factory=list) # First N matches | |
| fields_with_pii: Set[str] = field(default_factory=set) | |
| high_risk_fields: Set[str] = field(default_factory=set) # Fields with CRITICAL/HIGH PII | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "total_matches": self.total_matches, | |
| "matches_by_type": self.matches_by_type, | |
| "matches_by_severity": self.matches_by_severity, | |
| "matches_by_field": self.matches_by_field, | |
| "fields_with_pii": list(self.fields_with_pii), | |
| "high_risk_fields": list(self.high_risk_fields), | |
| "sample_matches": [m.to_dict() for m in self.sample_matches[:10]], | |
| } | |
| def has_critical_pii(self) -> bool: | |
| """Check if any critical PII was found.""" | |
| return self.matches_by_severity.get("critical", 0) > 0 | |
| def has_high_risk_pii(self) -> bool: | |
| """Check if any high-risk PII was found.""" | |
| return ( | |
| self.matches_by_severity.get("critical", 0) > 0 or | |
| self.matches_by_severity.get("high", 0) > 0 | |
| ) | |
| def summary(self) -> str: | |
| """Human-readable summary.""" | |
| if self.total_matches == 0: | |
| return "No PII detected" | |
| lines = [f"Found {self.total_matches} PII instance(s):"] | |
| for sev in ["critical", "high", "medium", "low"]: | |
| count = self.matches_by_severity.get(sev, 0) | |
| if count > 0: | |
| lines.append(f" • {sev.upper()}: {count}") | |
| if self.high_risk_fields: | |
| lines.append(f" ⚠ High-risk fields: {', '.join(self.high_risk_fields)}") | |
| return "\n".join(lines) | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # VALIDATION FUNCTIONS | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| def validate_luhn(card_number: str) -> bool: | |
| """ | |
| Validate credit card using Luhn algorithm. | |
| Used by Visa, MasterCard, American Express, etc. | |
| """ | |
| digits = [int(d) for d in re.sub(r'\D', '', card_number)] | |
| if len(digits) < 13 or len(digits) > 19: | |
| return False | |
| # Luhn checksum | |
| checksum = 0 | |
| for i, digit in enumerate(reversed(digits)): | |
| if i % 2 == 1: | |
| digit *= 2 | |
| if digit > 9: | |
| digit -= 9 | |
| checksum += digit | |
| return checksum % 10 == 0 | |
| def validate_ssn(ssn: str) -> bool: | |
| """ | |
| Validate US Social Security Number format. | |
| SSN format: AAA-BB-CCCC | |
| - AAA: Area number (001-899, excluding 666) | |
| - BB: Group number (01-99) | |
| - CCCC: Serial number (0001-9999) | |
| """ | |
| clean = re.sub(r'\D', '', ssn) | |
| if len(clean) != 9: | |
| return False | |
| area = int(clean[:3]) | |
| group = int(clean[3:5]) | |
| serial = int(clean[5:]) | |
| # Invalid patterns | |
| if area == 0 or area == 666 or area >= 900: | |
| return False | |
| if group == 0: | |
| return False | |
| if serial == 0: | |
| return False | |
| # Known invalid SSNs (advertising, testing) | |
| invalid_ssns = { | |
| "078051120", # Woolworth promotional | |
| "219099999", # Advertising | |
| } | |
| if clean in invalid_ssns: | |
| return False | |
| return True | |
| def validate_iban(iban: str) -> bool: | |
| """ | |
| Validate IBAN using MOD-97 checksum. | |
| """ | |
| clean = re.sub(r'\s', '', iban).upper() | |
| if len(clean) < 15 or len(clean) > 34: | |
| return False | |
| # Move country code and check digits to end | |
| rearranged = clean[4:] + clean[:4] | |
| # Convert letters to numbers (A=10, B=11, etc.) | |
| numeric = "" | |
| for char in rearranged: | |
| if char.isdigit(): | |
| numeric += char | |
| else: | |
| numeric += str(ord(char) - ord('A') + 10) | |
| # MOD 97 check | |
| return int(numeric) % 97 == 1 | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # PII PATTERNS (Based on Microsoft Presidio) | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| PII_PATTERNS: List[PIIPattern] = [ | |
| # Email - RFC 5322 simplified | |
| PIIPattern( | |
| pii_type=PIIType.EMAIL, | |
| severity=PIISeverity.HIGH, | |
| pattern=re.compile( | |
| r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', | |
| re.IGNORECASE | |
| ), | |
| confidence=0.95, | |
| context_patterns=["email", "e-mail", "contact", "mail"], | |
| ), | |
| # Phone Number - International formats | |
| PIIPattern( | |
| pii_type=PIIType.PHONE_NUMBER, | |
| severity=PIISeverity.MEDIUM, | |
| pattern=re.compile( | |
| r''' | |
| (?: | |
| \+?1?[-.\s]? # Country code | |
| \(?[2-9]\d{2}\)?[-.\s]? # Area code | |
| [2-9]\d{2}[-.\s]? # Exchange | |
| \d{4} # Subscriber | |
| | | |
| \+?\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]? # International | |
| \d{1,4}[-.\s]?\d{1,9} | |
| ) | |
| ''', | |
| re.VERBOSE | |
| ), | |
| confidence=0.75, | |
| context_patterns=["phone", "tel", "mobile", "cell", "call", "fax"], | |
| ), | |
| # SSN - US Social Security Number | |
| PIIPattern( | |
| pii_type=PIIType.SSN, | |
| severity=PIISeverity.CRITICAL, | |
| pattern=re.compile( | |
| r'\b(?!000|666|9\d{2})\d{3}[-\s]?(?!00)\d{2}[-\s]?(?!0000)\d{4}\b' | |
| ), | |
| confidence=0.85, | |
| validator=validate_ssn, | |
| context_patterns=["ssn", "social security", "tax id", "taxpayer"], | |
| ), | |
| # Credit Card - Major card formats | |
| PIIPattern( | |
| pii_type=PIIType.CREDIT_CARD, | |
| severity=PIISeverity.CRITICAL, | |
| pattern=re.compile( | |
| r''' | |
| \b(?: | |
| 4[0-9]{12}(?:[0-9]{3})? # Visa | |
| | | |
| 5[1-5][0-9]{14} # MasterCard | |
| | | |
| 3[47][0-9]{13} # American Express | |
| | | |
| 6(?:011|5[0-9]{2})[0-9]{12} # Discover | |
| | | |
| (?:2131|1800|35\d{3})\d{11} # JCB | |
| )\b | |
| | | |
| \b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b # Spaced format | |
| ''', | |
| re.VERBOSE | |
| ), | |
| confidence=0.90, | |
| validator=validate_luhn, | |
| context_patterns=["card", "credit", "visa", "mastercard", "amex", "payment"], | |
| ), | |
| # IP Address - IPv4 | |
| PIIPattern( | |
| pii_type=PIIType.IP_ADDRESS, | |
| severity=PIISeverity.MEDIUM, | |
| pattern=re.compile( | |
| r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' | |
| ), | |
| confidence=0.90, | |
| context_patterns=["ip", "address", "server", "host", "client"], | |
| ), | |
| # IP Address - IPv6 | |
| PIIPattern( | |
| pii_type=PIIType.IP_ADDRESS, | |
| severity=PIISeverity.MEDIUM, | |
| pattern=re.compile( | |
| r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b' | |
| ), | |
| confidence=0.90, | |
| ), | |
| # MAC Address | |
| PIIPattern( | |
| pii_type=PIIType.MAC_ADDRESS, | |
| severity=PIISeverity.LOW, | |
| pattern=re.compile( | |
| r'\b(?:[0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}\b' | |
| ), | |
| confidence=0.95, | |
| ), | |
| # IBAN - International Bank Account Number | |
| PIIPattern( | |
| pii_type=PIIType.IBAN, | |
| severity=PIISeverity.CRITICAL, | |
| pattern=re.compile( | |
| r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b', | |
| re.IGNORECASE | |
| ), | |
| confidence=0.85, | |
| validator=validate_iban, | |
| context_patterns=["iban", "bank", "account", "transfer"], | |
| ), | |
| # API Key patterns | |
| PIIPattern( | |
| pii_type=PIIType.API_KEY, | |
| severity=PIISeverity.CRITICAL, | |
| pattern=re.compile( | |
| r''' | |
| (?: | |
| sk[-_]live[-_][a-zA-Z0-9]{24,} # Stripe | |
| | | |
| sk[-_]test[-_][a-zA-Z0-9]{24,} # Stripe test | |
| | | |
| pk[-_]live[-_][a-zA-Z0-9]{24,} # Stripe public | |
| | | |
| ghp_[a-zA-Z0-9]{36} # GitHub PAT | |
| | | |
| gho_[a-zA-Z0-9]{36} # GitHub OAuth | |
| | | |
| github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59} # GitHub fine-grained | |
| | | |
| xox[baprs]-[a-zA-Z0-9-]{10,} # Slack | |
| | | |
| ya29\.[a-zA-Z0-9_-]+ # Google OAuth | |
| ) | |
| ''', | |
| re.VERBOSE | |
| ), | |
| confidence=0.95, | |
| context_patterns=["api", "key", "token", "secret", "auth"], | |
| ), | |
| # AWS Access Key | |
| PIIPattern( | |
| pii_type=PIIType.AWS_KEY, | |
| severity=PIISeverity.CRITICAL, | |
| pattern=re.compile( | |
| r'\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b' | |
| ), | |
| confidence=0.95, | |
| context_patterns=["aws", "amazon", "key", "access"], | |
| ), | |
| # Crypto Wallet - Bitcoin | |
| PIIPattern( | |
| pii_type=PIIType.CRYPTO_WALLET, | |
| severity=PIISeverity.HIGH, | |
| pattern=re.compile( | |
| r'\b(?:bc1|[13])[a-zA-HJ-NP-Z0-9]{25,39}\b' | |
| ), | |
| confidence=0.80, | |
| context_patterns=["bitcoin", "btc", "wallet", "crypto"], | |
| ), | |
| # Crypto Wallet - Ethereum | |
| PIIPattern( | |
| pii_type=PIIType.CRYPTO_WALLET, | |
| severity=PIISeverity.HIGH, | |
| pattern=re.compile( | |
| r'\b0x[a-fA-F0-9]{40}\b' | |
| ), | |
| confidence=0.80, | |
| context_patterns=["ethereum", "eth", "wallet", "crypto"], | |
| ), | |
| # GPS Coordinates | |
| PIIPattern( | |
| pii_type=PIIType.GPS_COORDINATES, | |
| severity=PIISeverity.MEDIUM, | |
| pattern=re.compile( | |
| r'[-+]?(?:[1-8]?\d(?:\.\d+)?|90(?:\.0+)?)\s*,\s*[-+]?(?:180(?:\.0+)?|(?:(?:1[0-7]\d)|(?:[1-9]?\d))(?:\.\d+)?)' | |
| ), | |
| confidence=0.70, | |
| context_patterns=["location", "coordinates", "lat", "lng", "gps"], | |
| ), | |
| # Date of Birth patterns | |
| PIIPattern( | |
| pii_type=PIIType.DATE_OF_BIRTH, | |
| severity=PIISeverity.MEDIUM, | |
| pattern=re.compile( | |
| r'\b(?:0?[1-9]|1[0-2])[/\-.](?:0?[1-9]|[12]\d|3[01])[/\-.](?:19|20)\d{2}\b' | |
| ), | |
| confidence=0.60, # Low base - needs context | |
| context_patterns=["birth", "dob", "born", "birthday", "date of birth"], | |
| ), | |
| # US ZIP Code | |
| PIIPattern( | |
| pii_type=PIIType.ZIPCODE, | |
| severity=PIISeverity.LOW, | |
| pattern=re.compile( | |
| r'\b\d{5}(?:-\d{4})?\b' | |
| ), | |
| confidence=0.50, # Low - needs context | |
| context_patterns=["zip", "postal", "address", "code"], | |
| ), | |
| # URL (can contain sensitive info in path/query) | |
| PIIPattern( | |
| pii_type=PIIType.URL, | |
| severity=PIISeverity.LOW, | |
| pattern=re.compile( | |
| r'https?://[^\s<>"{}|\\^`\[\]]+', | |
| re.IGNORECASE | |
| ), | |
| confidence=0.70, | |
| ), | |
| ] | |
| class PIIScanner: | |
| """ | |
| Scanner for detecting PII in text and datasets. | |
| Uses regex patterns with optional validation and context boosting. | |
| """ | |
| def __init__( | |
| self, | |
| patterns: List[PIIPattern] = None, | |
| min_confidence: float = 0.5, | |
| context_boost: float = 0.1, | |
| ): | |
| """ | |
| Initialize scanner. | |
| Args: | |
| patterns: Custom patterns (defaults to PII_PATTERNS) | |
| min_confidence: Minimum confidence to report (0.0-1.0) | |
| context_boost: Confidence boost when context matches | |
| """ | |
| self.patterns = patterns or PII_PATTERNS | |
| self.min_confidence = min_confidence | |
| self.context_boost = context_boost | |
| def scan_text( | |
| self, | |
| text: str, | |
| field_name: str = "", | |
| row_index: int = -1, | |
| ) -> List[PIIMatch]: | |
| """ | |
| Scan text for PII. | |
| Args: | |
| text: Text to scan | |
| field_name: Optional field name for tracking | |
| row_index: Optional row index for tracking | |
| Returns: | |
| List of PIIMatch objects | |
| """ | |
| if not text or not isinstance(text, str): | |
| return [] | |
| matches = [] | |
| text_lower = text.lower() | |
| for pattern in self.patterns: | |
| for match in pattern.pattern.finditer(text): | |
| value = match.group() | |
| confidence = pattern.confidence | |
| # Validate if validator provided | |
| if pattern.validator: | |
| if not pattern.validator(value): | |
| continue | |
| # Context boost | |
| if pattern.context_patterns: | |
| for ctx in pattern.context_patterns: | |
| if ctx in text_lower: | |
| confidence = min(1.0, confidence + self.context_boost) | |
| break | |
| # Apply minimum confidence filter | |
| if confidence >= self.min_confidence: | |
| # Get surrounding context (50 chars each side) | |
| start = max(0, match.start() - 50) | |
| end = min(len(text), match.end() + 50) | |
| context = text[start:end] | |
| matches.append(PIIMatch( | |
| pii_type=pattern.pii_type, | |
| severity=pattern.severity, | |
| value=value, | |
| start=match.start(), | |
| end=match.end(), | |
| confidence=confidence, | |
| context=context, | |
| field_name=field_name, | |
| row_index=row_index, | |
| )) | |
| return matches | |
| def scan_dict( | |
| self, | |
| data: Dict[str, List[Any]], | |
| sample_size: int = 1000, | |
| ) -> PIIScanResult: | |
| """ | |
| Scan a columnar dict for PII. | |
| Args: | |
| data: Dict of column_name -> values | |
| sample_size: Max rows to scan per column | |
| Returns: | |
| PIIScanResult with aggregated findings | |
| """ | |
| result = PIIScanResult() | |
| for field_name, values in data.items(): | |
| if not values: | |
| continue | |
| # Sample values | |
| sample = values[:sample_size] | |
| for row_idx, value in enumerate(sample): | |
| if not isinstance(value, str): | |
| value = str(value) if value is not None else "" | |
| matches = self.scan_text(value, field_name, row_idx) | |
| for match in matches: | |
| result.total_matches += 1 | |
| # Count by type | |
| type_name = match.pii_type.value | |
| result.matches_by_type[type_name] = result.matches_by_type.get(type_name, 0) + 1 | |
| # Count by severity | |
| sev = match.severity.value | |
| result.matches_by_severity[sev] = result.matches_by_severity.get(sev, 0) + 1 | |
| # Count by field | |
| result.matches_by_field[field_name] = result.matches_by_field.get(field_name, 0) + 1 | |
| # Track fields | |
| result.fields_with_pii.add(field_name) | |
| if match.severity in [PIISeverity.CRITICAL, PIISeverity.HIGH]: | |
| result.high_risk_fields.add(field_name) | |
| # Keep samples | |
| if len(result.sample_matches) < 100: | |
| result.sample_matches.append(match) | |
| return result | |
| def scan_dataset( | |
| self, | |
| dataset, | |
| sample_size: int = 1000, | |
| ) -> PIIScanResult: | |
| """ | |
| Scan a HuggingFace Dataset or DatasetDict for PII. | |
| Args: | |
| dataset: HuggingFace Dataset or DatasetDict | |
| sample_size: Max rows to scan | |
| Returns: | |
| PIIScanResult with aggregated findings | |
| """ | |
| # Handle DatasetDict (multiple splits) | |
| if hasattr(dataset, 'keys') and callable(dataset.keys): | |
| combined = PIIScanResult() | |
| for split_name in dataset.keys(): | |
| split_result = self.scan_dataset(dataset[split_name], sample_size) | |
| # Merge results | |
| combined.total_matches += split_result.total_matches | |
| for k, v in split_result.matches_by_type.items(): | |
| combined.matches_by_type[k] = combined.matches_by_type.get(k, 0) + v | |
| for k, v in split_result.matches_by_severity.items(): | |
| combined.matches_by_severity[k] = combined.matches_by_severity.get(k, 0) + v | |
| for k, v in split_result.matches_by_field.items(): | |
| combined.matches_by_field[k] = combined.matches_by_field.get(k, 0) + v | |
| combined.fields_with_pii.update(split_result.fields_with_pii) | |
| combined.high_risk_fields.update(split_result.high_risk_fields) | |
| combined.sample_matches.extend(split_result.sample_matches[:20]) | |
| return combined | |
| # Single Dataset | |
| result = PIIScanResult() | |
| # Get column names | |
| if hasattr(dataset, 'features'): | |
| columns = list(dataset.features.keys()) | |
| elif hasattr(dataset, 'column_names'): | |
| columns = dataset.column_names | |
| else: | |
| return result | |
| # Sample rows | |
| num_rows = len(dataset) if hasattr(dataset, '__len__') else sample_size | |
| sample_indices = range(min(sample_size, num_rows)) | |
| for idx in sample_indices: | |
| row = dataset[idx] | |
| for col in columns: | |
| value = row.get(col) if isinstance(row, dict) else getattr(row, col, None) | |
| if not isinstance(value, str): | |
| value = str(value) if value is not None else "" | |
| matches = self.scan_text(value, col, idx) | |
| for match in matches: | |
| result.total_matches += 1 | |
| type_name = match.pii_type.value | |
| result.matches_by_type[type_name] = result.matches_by_type.get(type_name, 0) + 1 | |
| sev = match.severity.value | |
| result.matches_by_severity[sev] = result.matches_by_severity.get(sev, 0) + 1 | |
| result.matches_by_field[col] = result.matches_by_field.get(col, 0) + 1 | |
| result.fields_with_pii.add(col) | |
| if match.severity in [PIISeverity.CRITICAL, PIISeverity.HIGH]: | |
| result.high_risk_fields.add(col) | |
| if len(result.sample_matches) < 100: | |
| result.sample_matches.append(match) | |
| return result | |
| # Singleton scanner | |
| _scanner = PIIScanner() | |
| def scan_for_pii( | |
| data, | |
| sample_size: int = 1000, | |
| min_confidence: float = 0.5, | |
| ) -> PIIScanResult: | |
| """ | |
| Convenience function to scan data for PII. | |
| Args: | |
| data: Text, dict, or HuggingFace Dataset | |
| sample_size: Max rows to scan | |
| min_confidence: Minimum confidence threshold | |
| Returns: | |
| PIIScanResult with findings | |
| """ | |
| scanner = PIIScanner(min_confidence=min_confidence) | |
| if isinstance(data, str): | |
| matches = scanner.scan_text(data) | |
| result = PIIScanResult( | |
| total_matches=len(matches), | |
| sample_matches=matches, | |
| ) | |
| for m in matches: | |
| result.matches_by_type[m.pii_type.value] = result.matches_by_type.get(m.pii_type.value, 0) + 1 | |
| result.matches_by_severity[m.severity.value] = result.matches_by_severity.get(m.severity.value, 0) + 1 | |
| return result | |
| if isinstance(data, dict): | |
| return scanner.scan_dict(data, sample_size) | |
| # Assume HuggingFace Dataset | |
| return scanner.scan_dataset(data, sample_size) | |
| def quick_pii_check(data, sample_size: int = 100) -> bool: | |
| """ | |
| Quick check if data contains any PII. | |
| Returns True if PII is found, False otherwise. | |
| """ | |
| result = scan_for_pii(data, sample_size=sample_size, min_confidence=0.7) | |
| return result.total_matches > 0 | |