File size: 8,045 Bytes
abfd704 b7c4eb2 98cd18a abfd704 c4851ec abfd704 c4851ec abfd704 84981a4 abfd704 b7c4eb2 abfd704 98cd18a 84981a4 98cd18a abfd704 98cd18a abfd704 98cd18a c4851ec 98cd18a abfd704 c4851ec abfd704 84981a4 abfd704 98cd18a abfd704 b7c4eb2 abfd704 b7c4eb2 abfd704 84981a4 abfd704 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | """De-identification transforms.
Presidio anonymises per-document; the value NoteGuard adds is *cross-note,
patient-consistent* de-identification — the same patient maps to the same
surrogate across their whole admission journey. Only date-of-birth is treated as
PII; it is shifted by a single consistent per-patient offset (visit / admission
dates are clinically useful and left intact). That utility-preserving property is
what makes the cleaned data useful for downstream / federated training, not just safe.
Surrogates are realistic en_GB fakes (folded in from the Presidio branch's Faker
vault) so the output reads like a real note — better for training than `Patient_001`
tokens. Faker is optional: with it absent we fall back to deterministic tokens, so
the pure-Python guarantee holds.
"""
from __future__ import annotations
import hashlib
import random
import re
import string
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from .recognisers import Span
REDACTION = "redaction"
PSEUDONYM = "pseudonym"
# Human-readable placeholders for redaction — clearer than raw entity codes
# (e.g. "NMC number: [NMC number]" instead of "[ORGANIZATION] number: [NMC]").
_REDACT_LABEL = {
"PERSON": "name",
"DATE_TIME": "date of birth",
"UK_NHS": "NHS number",
"UK_NINO": "NI number",
"UK_POSTCODE": "postcode",
"UK_PASSPORT": "passport number",
"UK_VEHICLE_REGISTRATION": "vehicle registration",
"GMC": "GMC number",
"NMC": "NMC number",
"NHS_ODS": "ODS code",
"RECORD_ID": "record ID",
"LOCATION": "location",
"EMAIL_ADDRESS": "email",
"PHONE_NUMBER": "phone",
"IP_ADDRESS": "IP address",
"URL": "URL",
}
def redaction_label(entity_type: str) -> str:
return _REDACT_LABEL.get(entity_type, entity_type)
_DATE_FORMATS = ["%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d/%m/%y", "%d-%m-%y"]
_POSTCODE_INWARD = re.compile(r"\s*\d[A-Za-z]{2}\s*$")
try: # realistic surrogates if Faker is available
from faker import Faker
_FAKER: Faker | None = Faker("en_GB")
except Exception: # pragma: no cover - keeps the pure-Python path working
_FAKER = None
def _seed(value: str) -> int:
return int(hashlib.sha256(value.encode()).hexdigest(), 16)
def _fake(value: str, kind: str) -> str | None:
"""Deterministic realistic surrogate for `value` via Faker, or None if unavailable."""
if _FAKER is None:
return None
_FAKER.seed_instance(_seed(value) % (10 ** 9))
if kind == "PERSON":
return _FAKER.name()
if kind == "LOCATION":
return _FAKER.city()
if kind == "EMAIL_ADDRESS":
return _FAKER.safe_email()
if kind == "PHONE_NUMBER":
return _FAKER.phone_number()
return None
def _postcode_outward(value: str) -> str:
"""Reduce a postcode to its outward code (DAPB1523-style geo generalisation)."""
outward = _POSTCODE_INWARD.sub("", value).strip()
return outward or "[UK_POSTCODE]"
@dataclass
class Replacement:
original: str
replacement: str
entity_type: str
@dataclass
class PseudonymVault:
"""Stable original-value -> surrogate mapping (the 'mapping vault')."""
_map: dict[tuple[str, str], str] = field(default_factory=dict)
_counts: dict[str, int] = field(default_factory=dict)
def token_for(self, entity_type: str, value: str) -> str:
key = (entity_type, value.strip().lower())
if key not in self._map:
self._map[key] = self._make(entity_type, value)
return self._map[key]
def _make(self, entity_type: str, value: str) -> str:
if entity_type == "UK_NHS":
return _fake_nhs_number(value)
if entity_type == "UK_POSTCODE":
return _postcode_outward(value)
if entity_type == "UK_NINO":
return _fake_nino(value)
if entity_type == "UK_VEHICLE_REGISTRATION":
return _fake_vehicle(value)
realistic = _fake(value, entity_type)
if realistic is not None:
return realistic
# deterministic token fallback (no Faker, or an entity with no faker kind)
self._counts[entity_type] = self._counts.get(entity_type, 0) + 1
if entity_type == "PERSON":
return f"Patient_{self._counts[entity_type]:03d}"
return f"{entity_type}_{self._counts[entity_type]:03d}"
def export(self) -> dict[str, str]:
"""Audit/export of the vault (keep this secret in production)."""
return {f"{etype}:{val}": tok for (etype, val), tok in self._map.items()}
def _patient_date_offset(person_id: str, max_days: int = 365) -> int:
"""Deterministic per-patient shift in [-max_days, max_days], from person_id."""
h = int(hashlib.sha256(f"noteguard:{person_id}".encode()).hexdigest(), 16)
return (h % (2 * max_days + 1)) - max_days
def _fake_nino(value: str) -> str:
"""Format-correct fake NINO (XX999999X) — deterministic per original."""
rng = random.Random(_seed(value))
prefix = "".join(rng.choices(string.ascii_uppercase, k=2))
digits = "".join(str(rng.randint(0, 9)) for _ in range(6))
suffix = rng.choice("ABCD")
return f"{prefix}{digits}{suffix}"
def _fake_vehicle(value: str) -> str:
"""Format-correct fake UK registration plate (AB12 CDE) — deterministic per original."""
rng = random.Random(_seed(value))
area = "".join(rng.choices(string.ascii_uppercase, k=2))
age = "".join(str(rng.randint(0, 9)) for _ in range(2))
seq = "".join(rng.choices(string.ascii_uppercase, k=3))
return f"{area}{age} {seq}"
def _fake_nhs_number(value: str) -> str:
"""Deterministic, checksum-VALID fake NHS number (stable per original)."""
from .recognisers import nhs_number_is_valid
seed = _seed(value)
for _ in range(1000):
nine = f"{seed % 1_000_000_000:09d}"
total = sum(int(nine[i]) * (10 - i) for i in range(9))
check = 11 - (total % 11)
check = 0 if check == 11 else check
if check != 10:
candidate = nine + str(check)
if nhs_number_is_valid(candidate):
return candidate
seed = (seed * 1103515245 + 12345) & ((1 << 64) - 1)
return "0000000000"
def _shift_date(value: str, offset_days: int) -> str | None:
for fmt in _DATE_FORMATS:
try:
dt = datetime.strptime(value.strip(), fmt)
return (dt + timedelta(days=offset_days)).strftime(fmt)
except ValueError:
continue
return None
def apply_transform(
text: str,
spans: list[Span],
method: str = REDACTION,
vault: PseudonymVault | None = None,
person_id: str = "",
) -> tuple[str, list[Replacement]]:
"""Return (sanitised_text, replacements). Spans applied right-to-left."""
vault = vault or PseudonymVault()
offset = _patient_date_offset(person_id) if person_id else 0
out = text
used: list[Replacement] = []
for s in sorted(spans, key=lambda x: x.start, reverse=True):
original = text[s.start:s.end]
if method == REDACTION:
repl = f"[{redaction_label(s.entity_type)}]"
else: # PSEUDONYM
if s.entity_type == "DATE_TIME":
shifted = _shift_date(original, offset)
repl = shifted if shifted else f"[{redaction_label('DATE_TIME')}]"
else:
repl = vault.token_for(s.entity_type, original)
out = out[:s.start] + repl + out[s.end:]
used.append(Replacement(original, repl, s.entity_type))
used.reverse()
return out, used
if __name__ == "__main__":
from .recognisers import find_rule_spans
txt = "Pt John seen 12/03/1981, NHS 943 476 5919. Reviewed again 20/03/1981."
spans = find_rule_spans(txt)
for method in (REDACTION, PSEUDONYM):
v = PseudonymVault()
new, repls = apply_transform(txt, spans, method, v, person_id="p7")
print(f"\n[{method}] {new}")
for r in repls:
print(" ", r.original, "->", r.replacement, f"({r.entity_type})")
|