| import re |
| from typing import Dict, List, Optional |
|
|
| KEYWORDS = { |
| "EMAIL": ["email", "e-mail", "mail", "contact"], |
| "PHONE": ["phone", "mobile", "cell", "tel", "telephone", "contact"], |
| "SSN": ["ssn", "social security", "social-security", "tax id"], |
| "CREDIT_CARD": ["card", "credit", "visa", "mastercard", "amex", "payment"], |
| "IP_ADDRESS": ["ip", "ipv4", "address", "host"], |
| "DOB": ["dob", "birth", "date of birth", "born"], |
| "ZIP_CODE": ["zip", "postal", "postcode", "address"], |
| "PERSON": ["name", "employee", "customer", "patient", "person", "contact"], |
| "LOCATION": ["address", "city", "state", "country", "location", "office"], |
| "ORGANIZATION": ["company", "organization", "org", "employer", "business"], |
| } |
|
|
| REGEX_PATTERNS = { |
| "EMAIL": re.compile(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[A-Za-z]{2,}\b"), |
| "PHONE": re.compile(r"(?:(?<!\d)(?:\+?1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?)\d{3}[-.\s]?\d{4}(?!\d))"), |
| "SSN": re.compile(r"\b\d{3}-?\d{2}-?\d{4}\b"), |
| "CREDIT_CARD": re.compile(r"\b(?:\d[ -]*?){13,19}\b"), |
| "IP_ADDRESS": re.compile(r"\b(?:25[0-5]|2[0-4]\d|1?\d?\d)(?:\.(?:25[0-5]|2[0-4]\d|1?\d?\d)){3}\b"), |
| "DOB": re.compile(r"\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)[a-z]*\s+\d{1,2},\s*\d{4})\b", re.IGNORECASE), |
| "ZIP_CODE": re.compile(r"\b\d{5}(?:-\d{4})?\b"), |
| } |
|
|
|
|
| def normalize_value(value: str) -> str: |
| return re.sub(r"\s+", " ", value.strip().lower()) |
|
|
|
|
|
|
| def redact_value(value: str) -> str: |
| if len(value) <= 4: |
| return "*" * len(value) |
| return f"{value[:2]}{'*' * max(len(value) - 4, 1)}{value[-2:]}" |
|
|
|
|
|
|
| def luhn_check(number: str) -> bool: |
| digits = re.sub(r"\D", "", number) |
| if len(digits) < 13 or len(digits) > 19: |
| return False |
| total = 0 |
| reverse_digits = digits[::-1] |
| for i, d in enumerate(reverse_digits): |
| n = int(d) |
| if i % 2 == 1: |
| n *= 2 |
| if n > 9: |
| n -= 9 |
| total += n |
| return total % 10 == 0 |
|
|
|
|
|
|
| def looks_like_zip_code(value: str, context: str) -> bool: |
| if re.fullmatch(r"\d{5}(?:-\d{4})?", value) is None: |
| return False |
| lowered = context.lower() |
| return any(k in lowered for k in KEYWORDS["ZIP_CODE"]) |
|
|
|
|
|
|
| def has_keyword_context(pii_type: str, context: str, field: Optional[str] = None) -> bool: |
| combined = f"{field or ''} {context}".lower() |
| return any(keyword in combined for keyword in KEYWORDS.get(pii_type, [])) |
|
|
|
|
|
|
| def regex_findings(text: str, context: str, field: Optional[str], line: Optional[int], column: Optional[int]) -> List[Dict]: |
| findings: List[Dict] = [] |
| for pii_type, pattern in REGEX_PATTERNS.items(): |
| for match in pattern.finditer(text): |
| value = match.group(0).strip() |
| if pii_type == "CREDIT_CARD" and not luhn_check(value): |
| continue |
| if pii_type == "ZIP_CODE" and not looks_like_zip_code(value, context + " " + (field or "")): |
| continue |
| confidence = 0.95 if has_keyword_context(pii_type, context, field) else 0.80 |
| findings.append( |
| { |
| "pii_type": pii_type, |
| "matched_value": value, |
| "confidence": confidence, |
| "line": line, |
| "column": column, |
| "field": field, |
| "source": "regex", |
| "start": match.start(), |
| "end": match.end(), |
| } |
| ) |
| return findings |
|
|
|
|
|
|
| def ner_findings(nlp, text: str, context: str, field: Optional[str], line: Optional[int], column: Optional[int]) -> List[Dict]: |
| if not text.strip(): |
| return [] |
| doc = nlp(text) |
| findings: List[Dict] = [] |
| allowed = {"PERSON", "GPE", "LOC", "ORG"} |
| label_map = {"PERSON": "PERSON", "GPE": "LOCATION", "LOC": "LOCATION", "ORG": "ORGANIZATION"} |
| for ent in doc.ents: |
| if ent.label_ not in allowed: |
| continue |
| value = ent.text.strip() |
| if len(value) < 2: |
| continue |
| mapped = label_map[ent.label_] |
| score = 0.60 |
| if has_keyword_context(mapped, context, field): |
| score = 0.70 |
| findings.append( |
| { |
| "pii_type": mapped, |
| "matched_value": value, |
| "confidence": score, |
| "line": line, |
| "column": column, |
| "field": field, |
| "source": "ner", |
| "start": ent.start_char, |
| "end": ent.end_char, |
| } |
| ) |
| return findings |
|
|
|
|
|
|
| def merge_findings(findings: List[Dict]) -> List[Dict]: |
| merged: Dict[str, Dict] = {} |
| for item in findings: |
| key = "|".join( |
| [ |
| item.get("pii_type", ""), |
| normalize_value(item.get("matched_value", "")), |
| str(item.get("line")), |
| str(item.get("column")), |
| str(item.get("field")), |
| ] |
| ) |
| existing = merged.get(key) |
| if existing is None: |
| merged[key] = dict(item) |
| continue |
| existing["confidence"] = max(existing["confidence"], item["confidence"]) |
| if existing.get("source") != item.get("source"): |
| existing["confidence"] = 1.0 |
| existing["source"] = "both" |
| return list(merged.values()) |
|
|