Spaces:
Running
Running
| """ | |
| privacy.py — PII detection and masking via Microsoft Presidio. | |
| Entities masked before any text is sent to the LLM: | |
| PERSON, PHONE_NUMBER, EMAIL_ADDRESS, UK_NHS, UK_NIN, | |
| CREDIT_CARD, IBAN_CODE, DATE_TIME (opt-in), LOCATION | |
| Usage | |
| ----- | |
| masker = PIIMasker() | |
| clean_text, mapping = masker.mask(raw_markdown) | |
| # ... call LLM with clean_text ... | |
| # If you ever need to restore originals: | |
| restored = masker.restore(llm_output, mapping) | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import Optional | |
| from presidio_analyzer import AnalyzerEngine, RecognizerResult | |
| from presidio_analyzer.nlp_engine import NlpEngineProvider | |
| from presidio_anonymizer import AnonymizerEngine | |
| from presidio_anonymizer.entities import OperatorConfig | |
| from settings import settings | |
| # --------------------------------------------------------------------------- | |
| # Default entity list (tuned for UK motor insurance documents) | |
| # --------------------------------------------------------------------------- | |
| UK_MOTOR_ENTITIES: list[str] = [ | |
| "PERSON", | |
| "PHONE_NUMBER", | |
| "EMAIL_ADDRESS", | |
| "UK_NHS", | |
| "UK_NIN", # National Insurance Number | |
| "CREDIT_CARD", | |
| "IBAN_CODE", | |
| "LOCATION", # postcodes / addresses | |
| "IP_ADDRESS", | |
| "URL", | |
| ] | |
| # Sentinel prefix used for replacement tokens so we can detect them reliably | |
| _TOKEN_PREFIX = "MASKED_" | |
| class PIIMasker: | |
| """ | |
| Stateless masker: call `mask()` to redact PII in a text string. | |
| Parameters | |
| ---------- | |
| entities : list[str] | |
| Presidio entity types to redact. Defaults to UK_MOTOR_ENTITIES. | |
| language : str | |
| ISO 639-1 language code passed to the Presidio analyzer. | |
| mask_dates : bool | |
| When True, DATE_TIME entities are also redacted. Default False | |
| because insurance documents are date-heavy and stripping them | |
| would break structured extraction. | |
| score_threshold : float | |
| Minimum confidence score (0-1) for a detected entity to be masked. | |
| """ | |
| def __init__( | |
| self, | |
| entities: Optional[list[str]] = None, | |
| language: str = settings.pii.language, | |
| mask_dates: bool = settings.pii.mask_dates, | |
| score_threshold: float = settings.pii.score_threshold, | |
| ) -> None: | |
| self._entities = list(entities or settings.pii.entities) | |
| if mask_dates and "DATE_TIME" not in self._entities: | |
| self._entities.append("DATE_TIME") | |
| self._language = language | |
| self._score_threshold = score_threshold | |
| # Build NLP engine (spaCy en_core_web_lg preferred; falls back to sm) | |
| nlp_config = { | |
| "nlp_engine_name": "spacy", | |
| "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], | |
| } | |
| try: | |
| provider = NlpEngineProvider(nlp_configuration=nlp_config) | |
| nlp_engine = provider.create_engine() | |
| except OSError: | |
| # Fall back to the small model if lg is not installed | |
| nlp_config["models"][0]["model_name"] = "en_core_web_sm" | |
| provider = NlpEngineProvider(nlp_configuration=nlp_config) | |
| nlp_engine = provider.create_engine() | |
| self._analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=[language]) | |
| self._anonymizer = AnonymizerEngine() | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def mask(self, text: str) -> tuple[str, dict[str, str]]: | |
| """ | |
| Redact PII in *text* and return (masked_text, token_map). | |
| token_map maps placeholder tokens back to original values, allowing | |
| optional restoration after LLM processing. | |
| Example | |
| ------- | |
| >>> masked, mapping = masker.mask("John Smith drives AB12 CDE") | |
| >>> masked | |
| 'MASKED_PERSON_1 drives AB12 CDE' | |
| >>> mapping | |
| {'MASKED_PERSON_1': 'John Smith'} | |
| """ | |
| results: list[RecognizerResult] = self._analyzer.analyze( | |
| text=text, | |
| entities=self._entities, | |
| language=self._language, | |
| score_threshold=self._score_threshold, | |
| ) | |
| if not results: | |
| return text, {} | |
| # Build per-entity-type counters for unique token names | |
| counters: dict[str, int] = {} | |
| token_map: dict[str, str] = {} | |
| operators: dict[str, OperatorConfig] = {} | |
| # Sort by position so token numbering is left-to-right and deterministic | |
| results_sorted = sorted(results, key=lambda r: r.start) | |
| # We need custom lambda operators to generate named tokens. | |
| # Presidio's "replace" operator uses a fixed `new_value`; we work | |
| # around this by building a value map keyed on (entity_type, original). | |
| original_to_token: dict[tuple[str, str], str] = {} | |
| for r in results_sorted: | |
| original = text[r.start : r.end] | |
| key = (r.entity_type, original) | |
| if key not in original_to_token: | |
| counters[r.entity_type] = counters.get(r.entity_type, 0) + 1 | |
| token = f"{_TOKEN_PREFIX}{r.entity_type}_{counters[r.entity_type]}" | |
| original_to_token[key] = token | |
| token_map[token] = original | |
| # Perform replacement manually (Presidio replace operator doesn't | |
| # support per-occurrence dynamic values in a single pass). | |
| masked_text = _replace_spans(text, results_sorted, original_to_token) | |
| return masked_text, token_map | |
| def restore(self, text: str, token_map: dict[str, str]) -> str: | |
| """ | |
| Substitute masked tokens back to original PII values. | |
| This is provided for completeness / testing; in production the LLM | |
| output is kept masked and stored as-is for GDPR compliance. | |
| """ | |
| for token, original in token_map.items(): | |
| text = text.replace(token, original) | |
| return text | |
| # --------------------------------------------------------------------------- | |
| # Internal helpers | |
| # --------------------------------------------------------------------------- | |
| def _replace_spans( | |
| text: str, | |
| results: list[RecognizerResult], | |
| original_to_token: dict[tuple[str, str], str], | |
| ) -> str: | |
| """ | |
| Replace PII spans in *text* with their corresponding tokens. | |
| Processes spans right-to-left to keep offset arithmetic valid. | |
| """ | |
| chars = list(text) | |
| for r in sorted(results, key=lambda r: r.start, reverse=True): | |
| original = text[r.start : r.end] | |
| token = original_to_token.get((r.entity_type, original), original) | |
| chars[r.start : r.end] = list(token) | |
| return "".join(chars) | |