AI-PolicyTrace / src /privacy.py
teja141290's picture
Deploy PolicyTrace Hugging Face Space
be54038
"""
privacy.py — PII detection and masking via Microsoft Presidio.
Entities masked before any text is sent to the LLM:
PERSON, PHONE_NUMBER, EMAIL_ADDRESS, UK_NHS, UK_NIN,
CREDIT_CARD, IBAN_CODE, DATE_TIME (opt-in), LOCATION
Usage
-----
masker = PIIMasker()
clean_text, mapping = masker.mask(raw_markdown)
# ... call LLM with clean_text ...
# If you ever need to restore originals:
restored = masker.restore(llm_output, mapping)
"""
from __future__ import annotations
import re
from typing import Optional
from presidio_analyzer import AnalyzerEngine, RecognizerResult
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from settings import settings
# ---------------------------------------------------------------------------
# Default entity list (tuned for UK motor insurance documents)
# ---------------------------------------------------------------------------
UK_MOTOR_ENTITIES: list[str] = [
"PERSON",
"PHONE_NUMBER",
"EMAIL_ADDRESS",
"UK_NHS",
"UK_NIN", # National Insurance Number
"CREDIT_CARD",
"IBAN_CODE",
"LOCATION", # postcodes / addresses
"IP_ADDRESS",
"URL",
]
# Sentinel prefix used for replacement tokens so we can detect them reliably
_TOKEN_PREFIX = "MASKED_"
class PIIMasker:
"""
Stateless masker: call `mask()` to redact PII in a text string.
Parameters
----------
entities : list[str]
Presidio entity types to redact. Defaults to UK_MOTOR_ENTITIES.
language : str
ISO 639-1 language code passed to the Presidio analyzer.
mask_dates : bool
When True, DATE_TIME entities are also redacted. Default False
because insurance documents are date-heavy and stripping them
would break structured extraction.
score_threshold : float
Minimum confidence score (0-1) for a detected entity to be masked.
"""
def __init__(
self,
entities: Optional[list[str]] = None,
language: str = settings.pii.language,
mask_dates: bool = settings.pii.mask_dates,
score_threshold: float = settings.pii.score_threshold,
) -> None:
self._entities = list(entities or settings.pii.entities)
if mask_dates and "DATE_TIME" not in self._entities:
self._entities.append("DATE_TIME")
self._language = language
self._score_threshold = score_threshold
# Build NLP engine (spaCy en_core_web_lg preferred; falls back to sm)
nlp_config = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
}
try:
provider = NlpEngineProvider(nlp_configuration=nlp_config)
nlp_engine = provider.create_engine()
except OSError:
# Fall back to the small model if lg is not installed
nlp_config["models"][0]["model_name"] = "en_core_web_sm"
provider = NlpEngineProvider(nlp_configuration=nlp_config)
nlp_engine = provider.create_engine()
self._analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=[language])
self._anonymizer = AnonymizerEngine()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def mask(self, text: str) -> tuple[str, dict[str, str]]:
"""
Redact PII in *text* and return (masked_text, token_map).
token_map maps placeholder tokens back to original values, allowing
optional restoration after LLM processing.
Example
-------
>>> masked, mapping = masker.mask("John Smith drives AB12 CDE")
>>> masked
'MASKED_PERSON_1 drives AB12 CDE'
>>> mapping
{'MASKED_PERSON_1': 'John Smith'}
"""
results: list[RecognizerResult] = self._analyzer.analyze(
text=text,
entities=self._entities,
language=self._language,
score_threshold=self._score_threshold,
)
if not results:
return text, {}
# Build per-entity-type counters for unique token names
counters: dict[str, int] = {}
token_map: dict[str, str] = {}
operators: dict[str, OperatorConfig] = {}
# Sort by position so token numbering is left-to-right and deterministic
results_sorted = sorted(results, key=lambda r: r.start)
# We need custom lambda operators to generate named tokens.
# Presidio's "replace" operator uses a fixed `new_value`; we work
# around this by building a value map keyed on (entity_type, original).
original_to_token: dict[tuple[str, str], str] = {}
for r in results_sorted:
original = text[r.start : r.end]
key = (r.entity_type, original)
if key not in original_to_token:
counters[r.entity_type] = counters.get(r.entity_type, 0) + 1
token = f"{_TOKEN_PREFIX}{r.entity_type}_{counters[r.entity_type]}"
original_to_token[key] = token
token_map[token] = original
# Perform replacement manually (Presidio replace operator doesn't
# support per-occurrence dynamic values in a single pass).
masked_text = _replace_spans(text, results_sorted, original_to_token)
return masked_text, token_map
def restore(self, text: str, token_map: dict[str, str]) -> str:
"""
Substitute masked tokens back to original PII values.
This is provided for completeness / testing; in production the LLM
output is kept masked and stored as-is for GDPR compliance.
"""
for token, original in token_map.items():
text = text.replace(token, original)
return text
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _replace_spans(
text: str,
results: list[RecognizerResult],
original_to_token: dict[tuple[str, str], str],
) -> str:
"""
Replace PII spans in *text* with their corresponding tokens.
Processes spans right-to-left to keep offset arithmetic valid.
"""
chars = list(text)
for r in sorted(results, key=lambda r: r.start, reverse=True):
original = text[r.start : r.end]
token = original_to_token.get((r.entity_type, original), original)
chars[r.start : r.end] = list(token)
return "".join(chars)