Spaces:
Running
Running
File size: 6,710 Bytes
be54038 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | """
privacy.py — PII detection and masking via Microsoft Presidio.
Entities masked before any text is sent to the LLM:
PERSON, PHONE_NUMBER, EMAIL_ADDRESS, UK_NHS, UK_NIN,
CREDIT_CARD, IBAN_CODE, DATE_TIME (opt-in), LOCATION
Usage
-----
masker = PIIMasker()
clean_text, mapping = masker.mask(raw_markdown)
# ... call LLM with clean_text ...
# If you ever need to restore originals:
restored = masker.restore(llm_output, mapping)
"""
from __future__ import annotations
import re
from typing import Optional
from presidio_analyzer import AnalyzerEngine, RecognizerResult
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from settings import settings
# ---------------------------------------------------------------------------
# Default entity list (tuned for UK motor insurance documents)
# ---------------------------------------------------------------------------
UK_MOTOR_ENTITIES: list[str] = [
"PERSON",
"PHONE_NUMBER",
"EMAIL_ADDRESS",
"UK_NHS",
"UK_NIN", # National Insurance Number
"CREDIT_CARD",
"IBAN_CODE",
"LOCATION", # postcodes / addresses
"IP_ADDRESS",
"URL",
]
# Sentinel prefix used for replacement tokens so we can detect them reliably
_TOKEN_PREFIX = "MASKED_"
class PIIMasker:
"""
Stateless masker: call `mask()` to redact PII in a text string.
Parameters
----------
entities : list[str]
Presidio entity types to redact. Defaults to UK_MOTOR_ENTITIES.
language : str
ISO 639-1 language code passed to the Presidio analyzer.
mask_dates : bool
When True, DATE_TIME entities are also redacted. Default False
because insurance documents are date-heavy and stripping them
would break structured extraction.
score_threshold : float
Minimum confidence score (0-1) for a detected entity to be masked.
"""
def __init__(
self,
entities: Optional[list[str]] = None,
language: str = settings.pii.language,
mask_dates: bool = settings.pii.mask_dates,
score_threshold: float = settings.pii.score_threshold,
) -> None:
self._entities = list(entities or settings.pii.entities)
if mask_dates and "DATE_TIME" not in self._entities:
self._entities.append("DATE_TIME")
self._language = language
self._score_threshold = score_threshold
# Build NLP engine (spaCy en_core_web_lg preferred; falls back to sm)
nlp_config = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
}
try:
provider = NlpEngineProvider(nlp_configuration=nlp_config)
nlp_engine = provider.create_engine()
except OSError:
# Fall back to the small model if lg is not installed
nlp_config["models"][0]["model_name"] = "en_core_web_sm"
provider = NlpEngineProvider(nlp_configuration=nlp_config)
nlp_engine = provider.create_engine()
self._analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=[language])
self._anonymizer = AnonymizerEngine()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def mask(self, text: str) -> tuple[str, dict[str, str]]:
"""
Redact PII in *text* and return (masked_text, token_map).
token_map maps placeholder tokens back to original values, allowing
optional restoration after LLM processing.
Example
-------
>>> masked, mapping = masker.mask("John Smith drives AB12 CDE")
>>> masked
'MASKED_PERSON_1 drives AB12 CDE'
>>> mapping
{'MASKED_PERSON_1': 'John Smith'}
"""
results: list[RecognizerResult] = self._analyzer.analyze(
text=text,
entities=self._entities,
language=self._language,
score_threshold=self._score_threshold,
)
if not results:
return text, {}
# Build per-entity-type counters for unique token names
counters: dict[str, int] = {}
token_map: dict[str, str] = {}
operators: dict[str, OperatorConfig] = {}
# Sort by position so token numbering is left-to-right and deterministic
results_sorted = sorted(results, key=lambda r: r.start)
# We need custom lambda operators to generate named tokens.
# Presidio's "replace" operator uses a fixed `new_value`; we work
# around this by building a value map keyed on (entity_type, original).
original_to_token: dict[tuple[str, str], str] = {}
for r in results_sorted:
original = text[r.start : r.end]
key = (r.entity_type, original)
if key not in original_to_token:
counters[r.entity_type] = counters.get(r.entity_type, 0) + 1
token = f"{_TOKEN_PREFIX}{r.entity_type}_{counters[r.entity_type]}"
original_to_token[key] = token
token_map[token] = original
# Perform replacement manually (Presidio replace operator doesn't
# support per-occurrence dynamic values in a single pass).
masked_text = _replace_spans(text, results_sorted, original_to_token)
return masked_text, token_map
def restore(self, text: str, token_map: dict[str, str]) -> str:
"""
Substitute masked tokens back to original PII values.
This is provided for completeness / testing; in production the LLM
output is kept masked and stored as-is for GDPR compliance.
"""
for token, original in token_map.items():
text = text.replace(token, original)
return text
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _replace_spans(
text: str,
results: list[RecognizerResult],
original_to_token: dict[tuple[str, str], str],
) -> str:
"""
Replace PII spans in *text* with their corresponding tokens.
Processes spans right-to-left to keep offset arithmetic valid.
"""
chars = list(text)
for r in sorted(results, key=lambda r: r.start, reverse=True):
original = text[r.start : r.end]
token = original_to_token.get((r.entity_type, original), original)
chars[r.start : r.end] = list(token)
return "".join(chars)
|