noteguard-agent / src /deid.py
github-actions[bot]
Deploy 8244dcbebc2e76b6843857728efe8f835b7426b4 from main
4477b4e
Raw
History Blame Contribute Delete
11.4 kB
"""NoteGuard de-identification core.
Dependency-free and runnable on its own (`python noteguard/deid.py`).
If `presidio-analyzer` + a spaCy model are installed the module upgrades
free-text PERSON/LOCATION detection automatically; the rule + vault layer
below always runs without them.
Design:
- NHS-aware recognisers (NHS number, GMC/NMC, postcode, DOB, email, phone)
- vault of known identifiers from patients.csv + admissions.csv so redaction
is exact AND measurable
- patient-consistent surrogates (same original → same token across notes)
- DOB date-shift (kept clinically plausible); other identifiers tokenised
- reidentify() restores originals for the CLINICIAN's eyes only
- assert_clean() is the hard guarantee that no identifier reaches the model
"""
from __future__ import annotations
import csv
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
NHS_NUMBER = re.compile(r"\b\d{3}[ -]?\d{3}[ -]?\d{4}\b")
NHS_CONTEXT = re.compile(r"(?i)\bNHS(?:\s*(?:no\.?|number|#))?[:\s]*([0-9][0-9 \-]{6,12}\d)")
UK_POSTCODE = re.compile(r"\b([A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2})\b")
DOB = re.compile(r"\b(\d{1,2}[/\-.]\d{1,2}[/\-.]\d{2,4})\b")
EMAIL = re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b")
PHONE = re.compile(r"\b0\d{2,3}[ -]?\d{3,4}[ -]?\d{3,4}\b")
# GMC/NMC: optional connector word then optional colon/spaces before the ID
# Handles: "GMC No. 7654321", "NMC number: 18D6896L", "PIN 18D6896L", etc.
_CONN = r"(?:\s*(?:no\.?|number|reg(?:istration)?|pin|#))?[:\s]*"
GMC = re.compile(r"(?i)\bGMC" + _CONN + r"(\d{7})\b")
NMC = re.compile(r"(?i)\b(?:NMC|PIN)" + _CONN + r"(\d{2}[A-Z]\d{4}[A-Z])\b")
# Surrogate token pattern — catches any [LABEL_n] the model might invent
_SURROGATE_PAT = re.compile(r"\[[A-Z]+_\d+\]")
# Column names we look for in any CSV to extract person names
_NAME_COLS = frozenset(
{
"full_name",
"patient_name",
"first_name",
"surname",
"last_name",
"clinician_name",
"author_name",
"author",
"attending",
"attending_physician",
"nurse",
"consultant",
"doctor",
"provider",
}
)
# ── Optional NLP detector (Presidio + spaCy) ─────────────────────────────────
class _Detector:
"""Stub detector — no-op when Presidio/spaCy is not installed."""
def detect_persons(self, text: str) -> list[str]:
return []
def _build_detector() -> _Detector:
try:
from presidio_analyzer import AnalyzerEngine # type: ignore
engine = AnalyzerEngine()
class _PresidioDetector(_Detector):
def detect_persons(self, text: str) -> list[str]:
results = engine.analyze(text, language="en", entities=["PERSON", "LOCATION"])
return [text[r.start : r.end] for r in results if r.end > r.start]
return _PresidioDetector()
except Exception:
return _Detector()
_DETECTOR: _Detector = _build_detector()
# ── Vault loader ──────────────────────────────────────────────────────────────
def load_known_from_csv(patients_csv: str, admissions_csv: str | None = None) -> dict:
"""Build the identifier vault from the synthetic dataset's structured tables.
Reads ``full_name`` and common name columns from *patients_csv*, plus NHS
numbers. If *admissions_csv* is supplied, any clinician/author name columns
found there are added to the PERSON set so clinician names are caught
deterministically even without Presidio.
"""
known: dict[str, set] = {"PERSON": set(), "NHS": set()}
def _pull_row(row: dict) -> None:
for col in _NAME_COLS:
v = (row.get(col) or "").strip()
if len(v) > 2:
known["PERSON"].add(v)
with open(patients_csv, newline="", encoding="utf-8-sig") as f:
for row in csv.DictReader(f):
_pull_row(row)
nhs = (row.get("nhs_number") or "").strip()
if nhs:
known["NHS"].add(nhs)
if admissions_csv:
try:
with open(admissions_csv, newline="", encoding="utf-8-sig") as f:
for row in csv.DictReader(f):
_pull_row(row)
except FileNotFoundError:
pass
return {k: sorted(v) for k, v in known.items()}
# ── Core de-identification ────────────────────────────────────────────────────
@dataclass
class DeidResult:
clean_text: str
forward: dict
reverse: dict
residual: list
class NoteGuard:
def __init__(self, known=None, dob_shift_days=37, forward=None, reverse=None):
self.known = {k: list(v) for k, v in (known or {}).items()}
self.dob_shift = dob_shift_days
self.forward = dict(forward or {})
self.reverse = dict(reverse or {})
self._counter: dict[str, int] = {}
for tok in self.reverse:
m = re.match(r"\[([A-Z]+)_(\d+)\]", tok)
if m:
self._counter[m.group(1)] = max(self._counter.get(m.group(1), 0), int(m.group(2)))
@staticmethod
def _fix_mojibake(s: str) -> str:
# Each pair: (UTF-8 bytes of the real char decoded as Windows-1252, real char)
# · = · → · (middle dot U+00B7)
# ’ = ’ → ' (right single quote U+2019)
# – = â€" → – (en-dash U+2013; 0x93 in Win-1252 = U+201C)
# é = é → é (e-acute U+00E9)
return s.replace("·", "·").replace("’", "’").replace("–", "–").replace("é", "é")
def _surrogate(self, label: str, original: str) -> str:
if original in self.forward:
return self.forward[original]
self._counter[label] = self._counter.get(label, 0) + 1
tok = f"[{label}_{self._counter[label]}]"
self.forward[original] = tok
self.reverse[tok] = original
return tok
def _shift_date(self, s: str) -> str:
if s in self.forward:
return self.forward[s]
for fmt in ("%d/%m/%Y", "%d/%m/%y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"):
try:
shifted = (datetime.strptime(s, fmt) + timedelta(days=self.dob_shift)).strftime(fmt)
self.forward[s] = shifted
self.reverse[shifted] = s
return shifted
except ValueError:
continue
return self._surrogate("DATE", s)
def _redact(self, pattern, label, text, group=0, transform=None):
def repl(m):
original = m.group(group)
surr = transform(original) if transform else self._surrogate(label, original)
return m.group(0).replace(original, surr)
return pattern.sub(repl, text)
def deidentify(self, text: str) -> DeidResult:
t = self._fix_mojibake(text)
# Vault pass — patient names + NHS numbers from structured tables
for label in ("PERSON", "NHS"):
terms = [x for x in self.known.get(label, []) if x]
if terms:
alternatives = "|".join(re.escape(x) for x in sorted(terms, key=len, reverse=True))
pat = re.compile(r"\b(" + alternatives + r")\b")
t = self._redact(pat, label, t, group=1)
# Rule-based recognisers
t = self._redact(NHS_CONTEXT, "NHS", t, group=1)
t = self._redact(NHS_NUMBER, "NHS", t, group=0)
t = self._redact(GMC, "GMC", t, group=1)
t = self._redact(NMC, "NMC", t, group=1)
t = self._redact(EMAIL, "EMAIL", t, group=0)
t = self._redact(PHONE, "PHONE", t, group=0)
t = self._redact(UK_POSTCODE, "POSTCODE", t, group=1)
t = self._redact(DOB, "DOB", t, group=1, transform=self._shift_date)
# Optional NLP pass — catches clinician names and locations not in vault
for name in _DETECTOR.detect_persons(t):
if name and len(name) > 2 and name not in self.forward:
t = t.replace(name, self._surrogate("PERSON", name))
return DeidResult(t, dict(self.forward), dict(self.reverse), self._residual_known(t))
def _residual_known(self, text: str) -> list:
# Use word-boundary match, same as the deidentify vault pass, so that
# short names like "Dia" don't false-positive on "Diastolic"/"Diabetes".
return [
v
for vals in self.known.values()
for v in vals
if v and re.search(r"\b" + re.escape(v) + r"\b", text)
]
def residual_identifiers(self, text: str) -> list[str]:
"""Comprehensive leak check — used for the trust metric.
Covers:
- vault names that survived de-id
- regex patterns (NHS, email, GMC, NMC)
- orphaned surrogate tokens (invented by the model, no reverse mapping)
- NLP-detected persons/locations (when Presidio is installed)
"""
hits: list[str] = list(self._residual_known(text))
for pat in (NHS_CONTEXT, NHS_NUMBER, EMAIL, GMC, NMC):
if pat.search(text):
hits.append(f"pattern:{pat.pattern[:40]}")
# Orphaned surrogates: a [LABEL_n] in the text with no reverse mapping
# means the model invented a token we cannot restore — it's a leak.
for m in _SURROGATE_PAT.finditer(text):
tok = m.group(0)
if tok not in self.reverse:
hits.append(f"unmapped_token:{tok}")
# NLP pass (no-op when Presidio not installed)
for name in _DETECTOR.detect_persons(text):
if name:
hits.append(f"PERSON:{name[:30]}")
return list(dict.fromkeys(hits)) # deduplicate, preserve order
def assert_clean(self, text: str) -> None:
"""Hard guarantee: raises if any known identifier or regex pattern survives."""
hits = list(self._residual_known(text))
for pat in (NHS_CONTEXT, NHS_NUMBER, EMAIL, GMC, NMC):
if pat.search(text):
hits.append(pat.pattern[:40])
if hits:
raise ValueError(f"NoteGuard guarantee failed: identifiers reached the model boundary: {hits}")
def reidentify(self, text: str) -> str:
for tok, original in sorted(self.reverse.items(), key=lambda kv: len(kv[0]), reverse=True):
text = text.replace(tok, original)
return text
if __name__ == "__main__":
known = {"PERSON": ["Margaret Okafor"], "NHS": ["485 777 3456"]}
note = (
"02 Jan, Ward RJ1. Pt Margaret Okafor (NHS 485 777 3456, DOB 14/03/1934) "
"admitted post-fall. Nurse Chukwuebuka reviewed. "
"Contact a.okafor@example.com, 020 7946 0991. "
"GMC No. 7654321. NMC number: 18D6896L."
)
print("INPUT:\n", note, "\n")
ng = NoteGuard(known=known)
res = ng.deidentify(note)
print("DE-IDENTIFIED (what the model sees):\n", res.clean_text, "\n")
print("Residual identifiers:", res.residual)
ng.assert_clean(res.clean_text)
print("assert_clean: OK\n")
restored = NoteGuard(reverse=res.reverse).reidentify(res.clean_text)
print("RE-IDENTIFIED (clinician view only):\n", restored)