import re from typing import Dict, List, Optional KEYWORDS = { "EMAIL": ["email", "e-mail", "mail", "contact"], "PHONE": ["phone", "mobile", "cell", "tel", "telephone", "contact"], "SSN": ["ssn", "social security", "social-security", "tax id"], "CREDIT_CARD": ["card", "credit", "visa", "mastercard", "amex", "payment"], "IP_ADDRESS": ["ip", "ipv4", "address", "host"], "DOB": ["dob", "birth", "date of birth", "born"], "ZIP_CODE": ["zip", "postal", "postcode", "address"], "PERSON": ["name", "employee", "customer", "patient", "person", "contact"], "LOCATION": ["address", "city", "state", "country", "location", "office"], "ORGANIZATION": ["company", "organization", "org", "employer", "business"], } REGEX_PATTERNS = { "EMAIL": re.compile(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[A-Za-z]{2,}\b"), "PHONE": re.compile(r"(?:(? str: return re.sub(r"\s+", " ", value.strip().lower()) def redact_value(value: str) -> str: if len(value) <= 4: return "*" * len(value) return f"{value[:2]}{'*' * max(len(value) - 4, 1)}{value[-2:]}" def luhn_check(number: str) -> bool: digits = re.sub(r"\D", "", number) if len(digits) < 13 or len(digits) > 19: return False total = 0 reverse_digits = digits[::-1] for i, d in enumerate(reverse_digits): n = int(d) if i % 2 == 1: n *= 2 if n > 9: n -= 9 total += n return total % 10 == 0 def looks_like_zip_code(value: str, context: str) -> bool: if re.fullmatch(r"\d{5}(?:-\d{4})?", value) is None: return False lowered = context.lower() return any(k in lowered for k in KEYWORDS["ZIP_CODE"]) def has_keyword_context(pii_type: str, context: str, field: Optional[str] = None) -> bool: combined = f"{field or ''} {context}".lower() return any(keyword in combined for keyword in KEYWORDS.get(pii_type, [])) def regex_findings(text: str, context: str, field: Optional[str], line: Optional[int], column: Optional[int]) -> List[Dict]: findings: List[Dict] = [] for pii_type, pattern in REGEX_PATTERNS.items(): for match in pattern.finditer(text): value = match.group(0).strip() if pii_type == "CREDIT_CARD" and not luhn_check(value): continue if pii_type == "ZIP_CODE" and not looks_like_zip_code(value, context + " " + (field or "")): continue confidence = 0.95 if has_keyword_context(pii_type, context, field) else 0.80 findings.append( { "pii_type": pii_type, "matched_value": value, "confidence": confidence, "line": line, "column": column, "field": field, "source": "regex", "start": match.start(), "end": match.end(), } ) return findings def ner_findings(nlp, text: str, context: str, field: Optional[str], line: Optional[int], column: Optional[int]) -> List[Dict]: if not text.strip(): return [] doc = nlp(text) findings: List[Dict] = [] allowed = {"PERSON", "GPE", "LOC", "ORG"} label_map = {"PERSON": "PERSON", "GPE": "LOCATION", "LOC": "LOCATION", "ORG": "ORGANIZATION"} for ent in doc.ents: if ent.label_ not in allowed: continue value = ent.text.strip() if len(value) < 2: continue mapped = label_map[ent.label_] score = 0.60 if has_keyword_context(mapped, context, field): score = 0.70 findings.append( { "pii_type": mapped, "matched_value": value, "confidence": score, "line": line, "column": column, "field": field, "source": "ner", "start": ent.start_char, "end": ent.end_char, } ) return findings def merge_findings(findings: List[Dict]) -> List[Dict]: merged: Dict[str, Dict] = {} for item in findings: key = "|".join( [ item.get("pii_type", ""), normalize_value(item.get("matched_value", "")), str(item.get("line")), str(item.get("column")), str(item.get("field")), ] ) existing = merged.get(key) if existing is None: merged[key] = dict(item) continue existing["confidence"] = max(existing["confidence"], item["confidence"]) if existing.get("source") != item.get("source"): existing["confidence"] = 1.0 existing["source"] = "both" return list(merged.values())