"""De-identification transforms. Presidio anonymises per-document; the value NoteGuard adds is *cross-note, patient-consistent* de-identification — the same patient maps to the same surrogate across their whole admission journey. Only date-of-birth is treated as PII; it is shifted by a single consistent per-patient offset (visit / admission dates are clinically useful and left intact). That utility-preserving property is what makes the cleaned data useful for downstream / federated training, not just safe. Surrogates are realistic en_GB fakes (folded in from the Presidio branch's Faker vault) so the output reads like a real note — better for training than `Patient_001` tokens. Faker is optional: with it absent we fall back to deterministic tokens, so the pure-Python guarantee holds. """ from __future__ import annotations import hashlib import random import re import string from dataclasses import dataclass, field from datetime import datetime, timedelta from .recognisers import Span REDACTION = "redaction" PSEUDONYM = "pseudonym" # Human-readable placeholders for redaction — clearer than raw entity codes # (e.g. "NMC number: [NMC number]" instead of "[ORGANIZATION] number: [NMC]"). _REDACT_LABEL = { "PERSON": "name", "DATE_TIME": "date of birth", "UK_NHS": "NHS number", "UK_NINO": "NI number", "UK_POSTCODE": "postcode", "UK_PASSPORT": "passport number", "UK_VEHICLE_REGISTRATION": "vehicle registration", "GMC": "GMC number", "NMC": "NMC number", "NHS_ODS": "ODS code", "RECORD_ID": "record ID", "LOCATION": "location", "EMAIL_ADDRESS": "email", "PHONE_NUMBER": "phone", "IP_ADDRESS": "IP address", "URL": "URL", } def redaction_label(entity_type: str) -> str: return _REDACT_LABEL.get(entity_type, entity_type) _DATE_FORMATS = ["%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d/%m/%y", "%d-%m-%y"] _POSTCODE_INWARD = re.compile(r"\s*\d[A-Za-z]{2}\s*$") try: # realistic surrogates if Faker is available from faker import Faker _FAKER: Faker | None = Faker("en_GB") except Exception: # pragma: no cover - keeps the pure-Python path working _FAKER = None def _seed(value: str) -> int: return int(hashlib.sha256(value.encode()).hexdigest(), 16) def _fake(value: str, kind: str) -> str | None: """Deterministic realistic surrogate for `value` via Faker, or None if unavailable.""" if _FAKER is None: return None _FAKER.seed_instance(_seed(value) % (10 ** 9)) if kind == "PERSON": return _FAKER.name() if kind == "LOCATION": return _FAKER.city() if kind == "EMAIL_ADDRESS": return _FAKER.safe_email() if kind == "PHONE_NUMBER": return _FAKER.phone_number() return None def _postcode_outward(value: str) -> str: """Reduce a postcode to its outward code (DAPB1523-style geo generalisation).""" outward = _POSTCODE_INWARD.sub("", value).strip() return outward or "[UK_POSTCODE]" @dataclass class Replacement: original: str replacement: str entity_type: str @dataclass class PseudonymVault: """Stable original-value -> surrogate mapping (the 'mapping vault').""" _map: dict[tuple[str, str], str] = field(default_factory=dict) _counts: dict[str, int] = field(default_factory=dict) def token_for(self, entity_type: str, value: str) -> str: key = (entity_type, value.strip().lower()) if key not in self._map: self._map[key] = self._make(entity_type, value) return self._map[key] def _make(self, entity_type: str, value: str) -> str: if entity_type == "UK_NHS": return _fake_nhs_number(value) if entity_type == "UK_POSTCODE": return _postcode_outward(value) if entity_type == "UK_NINO": return _fake_nino(value) if entity_type == "UK_VEHICLE_REGISTRATION": return _fake_vehicle(value) realistic = _fake(value, entity_type) if realistic is not None: return realistic # deterministic token fallback (no Faker, or an entity with no faker kind) self._counts[entity_type] = self._counts.get(entity_type, 0) + 1 if entity_type == "PERSON": return f"Patient_{self._counts[entity_type]:03d}" return f"{entity_type}_{self._counts[entity_type]:03d}" def export(self) -> dict[str, str]: """Audit/export of the vault (keep this secret in production).""" return {f"{etype}:{val}": tok for (etype, val), tok in self._map.items()} def _patient_date_offset(person_id: str, max_days: int = 365) -> int: """Deterministic per-patient shift in [-max_days, max_days], from person_id.""" h = int(hashlib.sha256(f"noteguard:{person_id}".encode()).hexdigest(), 16) return (h % (2 * max_days + 1)) - max_days def _fake_nino(value: str) -> str: """Format-correct fake NINO (XX999999X) — deterministic per original.""" rng = random.Random(_seed(value)) prefix = "".join(rng.choices(string.ascii_uppercase, k=2)) digits = "".join(str(rng.randint(0, 9)) for _ in range(6)) suffix = rng.choice("ABCD") return f"{prefix}{digits}{suffix}" def _fake_vehicle(value: str) -> str: """Format-correct fake UK registration plate (AB12 CDE) — deterministic per original.""" rng = random.Random(_seed(value)) area = "".join(rng.choices(string.ascii_uppercase, k=2)) age = "".join(str(rng.randint(0, 9)) for _ in range(2)) seq = "".join(rng.choices(string.ascii_uppercase, k=3)) return f"{area}{age} {seq}" def _fake_nhs_number(value: str) -> str: """Deterministic, checksum-VALID fake NHS number (stable per original).""" from .recognisers import nhs_number_is_valid seed = _seed(value) for _ in range(1000): nine = f"{seed % 1_000_000_000:09d}" total = sum(int(nine[i]) * (10 - i) for i in range(9)) check = 11 - (total % 11) check = 0 if check == 11 else check if check != 10: candidate = nine + str(check) if nhs_number_is_valid(candidate): return candidate seed = (seed * 1103515245 + 12345) & ((1 << 64) - 1) return "0000000000" def _shift_date(value: str, offset_days: int) -> str | None: for fmt in _DATE_FORMATS: try: dt = datetime.strptime(value.strip(), fmt) return (dt + timedelta(days=offset_days)).strftime(fmt) except ValueError: continue return None def apply_transform( text: str, spans: list[Span], method: str = REDACTION, vault: PseudonymVault | None = None, person_id: str = "", ) -> tuple[str, list[Replacement]]: """Return (sanitised_text, replacements). Spans applied right-to-left.""" vault = vault or PseudonymVault() offset = _patient_date_offset(person_id) if person_id else 0 out = text used: list[Replacement] = [] for s in sorted(spans, key=lambda x: x.start, reverse=True): original = text[s.start:s.end] if method == REDACTION: repl = f"[{redaction_label(s.entity_type)}]" else: # PSEUDONYM if s.entity_type == "DATE_TIME": shifted = _shift_date(original, offset) repl = shifted if shifted else f"[{redaction_label('DATE_TIME')}]" else: repl = vault.token_for(s.entity_type, original) out = out[:s.start] + repl + out[s.end:] used.append(Replacement(original, repl, s.entity_type)) used.reverse() return out, used if __name__ == "__main__": from .recognisers import find_rule_spans txt = "Pt John seen 12/03/1981, NHS 943 476 5919. Reviewed again 20/03/1981." spans = find_rule_spans(txt) for method in (REDACTION, PSEUDONYM): v = PseudonymVault() new, repls = apply_transform(txt, spans, method, v, person_id="p7") print(f"\n[{method}] {new}") for r in repls: print(" ", r.original, "->", r.replacement, f"({r.entity_type})")