Spaces:
Running
Running
| """NoteGuard de-identification core. | |
| Dependency-free and runnable on its own (`python noteguard/deid.py`). | |
| If `presidio-analyzer` + a spaCy model are installed the module upgrades | |
| free-text PERSON/LOCATION detection automatically; the rule + vault layer | |
| below always runs without them. | |
| Design: | |
| - NHS-aware recognisers (NHS number, GMC/NMC, postcode, DOB, email, phone) | |
| - vault of known identifiers from patients.csv + admissions.csv so redaction | |
| is exact AND measurable | |
| - patient-consistent surrogates (same original → same token across notes) | |
| - DOB date-shift (kept clinically plausible); other identifiers tokenised | |
| - reidentify() restores originals for the CLINICIAN's eyes only | |
| - assert_clean() is the hard guarantee that no identifier reaches the model | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import re | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| NHS_NUMBER = re.compile(r"\b\d{3}[ -]?\d{3}[ -]?\d{4}\b") | |
| NHS_CONTEXT = re.compile(r"(?i)\bNHS(?:\s*(?:no\.?|number|#))?[:\s]*([0-9][0-9 \-]{6,12}\d)") | |
| UK_POSTCODE = re.compile(r"\b([A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2})\b") | |
| DOB = re.compile(r"\b(\d{1,2}[/\-.]\d{1,2}[/\-.]\d{2,4})\b") | |
| EMAIL = re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b") | |
| PHONE = re.compile(r"\b0\d{2,3}[ -]?\d{3,4}[ -]?\d{3,4}\b") | |
| # GMC/NMC: optional connector word then optional colon/spaces before the ID | |
| # Handles: "GMC No. 7654321", "NMC number: 18D6896L", "PIN 18D6896L", etc. | |
| _CONN = r"(?:\s*(?:no\.?|number|reg(?:istration)?|pin|#))?[:\s]*" | |
| GMC = re.compile(r"(?i)\bGMC" + _CONN + r"(\d{7})\b") | |
| NMC = re.compile(r"(?i)\b(?:NMC|PIN)" + _CONN + r"(\d{2}[A-Z]\d{4}[A-Z])\b") | |
| # Surrogate token pattern — catches any [LABEL_n] the model might invent | |
| _SURROGATE_PAT = re.compile(r"\[[A-Z]+_\d+\]") | |
| # Column names we look for in any CSV to extract person names | |
| _NAME_COLS = frozenset( | |
| { | |
| "full_name", | |
| "patient_name", | |
| "first_name", | |
| "surname", | |
| "last_name", | |
| "clinician_name", | |
| "author_name", | |
| "author", | |
| "attending", | |
| "attending_physician", | |
| "nurse", | |
| "consultant", | |
| "doctor", | |
| "provider", | |
| } | |
| ) | |
| # ── Optional NLP detector (Presidio + spaCy) ───────────────────────────────── | |
| class _Detector: | |
| """Stub detector — no-op when Presidio/spaCy is not installed.""" | |
| def detect_persons(self, text: str) -> list[str]: | |
| return [] | |
| def _build_detector() -> _Detector: | |
| try: | |
| from presidio_analyzer import AnalyzerEngine # type: ignore | |
| engine = AnalyzerEngine() | |
| class _PresidioDetector(_Detector): | |
| def detect_persons(self, text: str) -> list[str]: | |
| results = engine.analyze(text, language="en", entities=["PERSON", "LOCATION"]) | |
| return [text[r.start : r.end] for r in results if r.end > r.start] | |
| return _PresidioDetector() | |
| except Exception: | |
| return _Detector() | |
| _DETECTOR: _Detector = _build_detector() | |
| # ── Vault loader ────────────────────────────────────────────────────────────── | |
| def load_known_from_csv(patients_csv: str, admissions_csv: str | None = None) -> dict: | |
| """Build the identifier vault from the synthetic dataset's structured tables. | |
| Reads ``full_name`` and common name columns from *patients_csv*, plus NHS | |
| numbers. If *admissions_csv* is supplied, any clinician/author name columns | |
| found there are added to the PERSON set so clinician names are caught | |
| deterministically even without Presidio. | |
| """ | |
| known: dict[str, set] = {"PERSON": set(), "NHS": set()} | |
| def _pull_row(row: dict) -> None: | |
| for col in _NAME_COLS: | |
| v = (row.get(col) or "").strip() | |
| if len(v) > 2: | |
| known["PERSON"].add(v) | |
| with open(patients_csv, newline="", encoding="utf-8-sig") as f: | |
| for row in csv.DictReader(f): | |
| _pull_row(row) | |
| nhs = (row.get("nhs_number") or "").strip() | |
| if nhs: | |
| known["NHS"].add(nhs) | |
| if admissions_csv: | |
| try: | |
| with open(admissions_csv, newline="", encoding="utf-8-sig") as f: | |
| for row in csv.DictReader(f): | |
| _pull_row(row) | |
| except FileNotFoundError: | |
| pass | |
| return {k: sorted(v) for k, v in known.items()} | |
| # ── Core de-identification ──────────────────────────────────────────────────── | |
| class DeidResult: | |
| clean_text: str | |
| forward: dict | |
| reverse: dict | |
| residual: list | |
| class NoteGuard: | |
| def __init__(self, known=None, dob_shift_days=37, forward=None, reverse=None): | |
| self.known = {k: list(v) for k, v in (known or {}).items()} | |
| self.dob_shift = dob_shift_days | |
| self.forward = dict(forward or {}) | |
| self.reverse = dict(reverse or {}) | |
| self._counter: dict[str, int] = {} | |
| for tok in self.reverse: | |
| m = re.match(r"\[([A-Z]+)_(\d+)\]", tok) | |
| if m: | |
| self._counter[m.group(1)] = max(self._counter.get(m.group(1), 0), int(m.group(2))) | |
| def _fix_mojibake(s: str) -> str: | |
| # Each pair: (UTF-8 bytes of the real char decoded as Windows-1252, real char) | |
| # · = · → · (middle dot U+00B7) | |
| # ’ = ’ → ' (right single quote U+2019) | |
| # – = â€" → – (en-dash U+2013; 0x93 in Win-1252 = U+201C) | |
| # é = é → é (e-acute U+00E9) | |
| return s.replace("·", "·").replace("’", "’").replace("–", "–").replace("é", "é") | |
| def _surrogate(self, label: str, original: str) -> str: | |
| if original in self.forward: | |
| return self.forward[original] | |
| self._counter[label] = self._counter.get(label, 0) + 1 | |
| tok = f"[{label}_{self._counter[label]}]" | |
| self.forward[original] = tok | |
| self.reverse[tok] = original | |
| return tok | |
| def _shift_date(self, s: str) -> str: | |
| if s in self.forward: | |
| return self.forward[s] | |
| for fmt in ("%d/%m/%Y", "%d/%m/%y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"): | |
| try: | |
| shifted = (datetime.strptime(s, fmt) + timedelta(days=self.dob_shift)).strftime(fmt) | |
| self.forward[s] = shifted | |
| self.reverse[shifted] = s | |
| return shifted | |
| except ValueError: | |
| continue | |
| return self._surrogate("DATE", s) | |
| def _redact(self, pattern, label, text, group=0, transform=None): | |
| def repl(m): | |
| original = m.group(group) | |
| surr = transform(original) if transform else self._surrogate(label, original) | |
| return m.group(0).replace(original, surr) | |
| return pattern.sub(repl, text) | |
| def deidentify(self, text: str) -> DeidResult: | |
| t = self._fix_mojibake(text) | |
| # Vault pass — patient names + NHS numbers from structured tables | |
| for label in ("PERSON", "NHS"): | |
| terms = [x for x in self.known.get(label, []) if x] | |
| if terms: | |
| alternatives = "|".join(re.escape(x) for x in sorted(terms, key=len, reverse=True)) | |
| pat = re.compile(r"\b(" + alternatives + r")\b") | |
| t = self._redact(pat, label, t, group=1) | |
| # Rule-based recognisers | |
| t = self._redact(NHS_CONTEXT, "NHS", t, group=1) | |
| t = self._redact(NHS_NUMBER, "NHS", t, group=0) | |
| t = self._redact(GMC, "GMC", t, group=1) | |
| t = self._redact(NMC, "NMC", t, group=1) | |
| t = self._redact(EMAIL, "EMAIL", t, group=0) | |
| t = self._redact(PHONE, "PHONE", t, group=0) | |
| t = self._redact(UK_POSTCODE, "POSTCODE", t, group=1) | |
| t = self._redact(DOB, "DOB", t, group=1, transform=self._shift_date) | |
| # Optional NLP pass — catches clinician names and locations not in vault | |
| for name in _DETECTOR.detect_persons(t): | |
| if name and len(name) > 2 and name not in self.forward: | |
| t = t.replace(name, self._surrogate("PERSON", name)) | |
| return DeidResult(t, dict(self.forward), dict(self.reverse), self._residual_known(t)) | |
| def _residual_known(self, text: str) -> list: | |
| # Use word-boundary match, same as the deidentify vault pass, so that | |
| # short names like "Dia" don't false-positive on "Diastolic"/"Diabetes". | |
| return [ | |
| v | |
| for vals in self.known.values() | |
| for v in vals | |
| if v and re.search(r"\b" + re.escape(v) + r"\b", text) | |
| ] | |
| def residual_identifiers(self, text: str) -> list[str]: | |
| """Comprehensive leak check — used for the trust metric. | |
| Covers: | |
| - vault names that survived de-id | |
| - regex patterns (NHS, email, GMC, NMC) | |
| - orphaned surrogate tokens (invented by the model, no reverse mapping) | |
| - NLP-detected persons/locations (when Presidio is installed) | |
| """ | |
| hits: list[str] = list(self._residual_known(text)) | |
| for pat in (NHS_CONTEXT, NHS_NUMBER, EMAIL, GMC, NMC): | |
| if pat.search(text): | |
| hits.append(f"pattern:{pat.pattern[:40]}") | |
| # Orphaned surrogates: a [LABEL_n] in the text with no reverse mapping | |
| # means the model invented a token we cannot restore — it's a leak. | |
| for m in _SURROGATE_PAT.finditer(text): | |
| tok = m.group(0) | |
| if tok not in self.reverse: | |
| hits.append(f"unmapped_token:{tok}") | |
| # NLP pass (no-op when Presidio not installed) | |
| for name in _DETECTOR.detect_persons(text): | |
| if name: | |
| hits.append(f"PERSON:{name[:30]}") | |
| return list(dict.fromkeys(hits)) # deduplicate, preserve order | |
| def assert_clean(self, text: str) -> None: | |
| """Hard guarantee: raises if any known identifier or regex pattern survives.""" | |
| hits = list(self._residual_known(text)) | |
| for pat in (NHS_CONTEXT, NHS_NUMBER, EMAIL, GMC, NMC): | |
| if pat.search(text): | |
| hits.append(pat.pattern[:40]) | |
| if hits: | |
| raise ValueError(f"NoteGuard guarantee failed: identifiers reached the model boundary: {hits}") | |
| def reidentify(self, text: str) -> str: | |
| for tok, original in sorted(self.reverse.items(), key=lambda kv: len(kv[0]), reverse=True): | |
| text = text.replace(tok, original) | |
| return text | |
| if __name__ == "__main__": | |
| known = {"PERSON": ["Margaret Okafor"], "NHS": ["485 777 3456"]} | |
| note = ( | |
| "02 Jan, Ward RJ1. Pt Margaret Okafor (NHS 485 777 3456, DOB 14/03/1934) " | |
| "admitted post-fall. Nurse Chukwuebuka reviewed. " | |
| "Contact a.okafor@example.com, 020 7946 0991. " | |
| "GMC No. 7654321. NMC number: 18D6896L." | |
| ) | |
| print("INPUT:\n", note, "\n") | |
| ng = NoteGuard(known=known) | |
| res = ng.deidentify(note) | |
| print("DE-IDENTIFIED (what the model sees):\n", res.clean_text, "\n") | |
| print("Residual identifiers:", res.residual) | |
| ng.assert_clean(res.clean_text) | |
| print("assert_clean: OK\n") | |
| restored = NoteGuard(reverse=res.reverse).reidentify(res.clean_text) | |
| print("RE-IDENTIFIED (clinician view only):\n", restored) | |