File size: 6,494 Bytes
abfd704 4d404c0 abfd704 84981a4 abfd704 2da24e4 b7c4eb2 abfd704 b7c4eb2 2da24e4 3305917 abfd704 39f19d3 abfd704 39f19d3 84981a4 3305917 84981a4 3305917 abfd704 39f19d3 abfd704 2da24e4 abfd704 2da24e4 abfd704 2da24e4 abfd704 2da24e4 abfd704 39f19d3 abfd704 39f19d3 abfd704 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | """Detection layer.
NoteGuard does not reinvent detection — Presidio is the engine. Our job is to
(1) compose Presidio's NER with our transparent rule layer, (2) keep everything
behind one `Detector` interface so the pipeline and eval are engine-agnostic, and
(3) make detection degrade gracefully to pure-Python rules when spaCy/Presidio
are unavailable.
"""
from __future__ import annotations
from typing import Protocol
from .recognisers import Span, find_rule_spans
class Detector(Protocol):
def detect(self, text: str) -> list[Span]: ...
class RuleDetector:
"""Pure-Python baseline. No external dependencies."""
name = "rules"
def detect(self, text: str) -> list[Span]:
return find_rule_spans(text)
class PresidioDetector:
"""Presidio AnalyzerEngine (spaCy NER + recognisers), unioned with our rules.
The rule layer is kept in the union because our NHS-number recogniser is
checksum-validated and our outputs stay auditable.
"""
name = "presidio+rules"
# Presidio entity types we keep. ORGANIZATION is deliberately EXCLUDED: spaCy lg
# over-tags abbreviations/labels ("NHS", "DOB …", "GMC") as ORG, which both creates
# false positives and swallows precise rule spans (dates, NHS numbers). NHS site
# names are caught instead by the _SITE_RE LOCATION rule (incl. "… Trust").
# DATE_TIME is also EXCLUDED: only date-of-birth dates are PII (caught by the _DOB_RE
# rule); visit / encounter / admission dates are clinically useful and left intact.
KEEP = {
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"LOCATION", "UK_NHS", "UK_NINO", "UK_PASSPORT",
"UK_VEHICLE_REGISTRATION", "IP_ADDRESS", "URL",
}
def __init__(
self,
spacy_model: str = "en_core_web_lg",
score_threshold: float = 0.6,
review_threshold: float = 0.35,
):
"""
spacy_model — en_core_web_lg (default, 100% name recall) or en_core_web_sm (faster).
score_threshold — spans above this are auto-confirmed and always redacted.
review_threshold — spans in [review_threshold, score_threshold) are flagged
needs_review=True: still redacted for safety, but surfaced for
human confirmation in the audit/UI (human-in-the-loop queue).
"""
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine import NlpEngineProvider
provider = NlpEngineProvider(nlp_configuration={
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": spacy_model}],
})
self.engine = AnalyzerEngine(nlp_engine=provider.create_engine())
self.score_threshold = score_threshold
self.review_threshold = review_threshold
self._register_uk_recognisers()
def _register_uk_recognisers(self) -> None:
"""Register UK entity recognisers that Presidio documents but does not ship."""
from presidio_analyzer import Pattern, PatternRecognizer
custom = [
PatternRecognizer(
supported_entity="UK_NINO",
patterns=[Pattern("UK NINO", r"\b[A-Za-z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-Da-d]\b", 0.5)],
context=["national insurance", "ni number", "nino"],
name="UkNinoRecognizer",
),
PatternRecognizer(
supported_entity="UK_PASSPORT",
# bare 9-digit is very low confidence — only scores up when a passport context word is nearby
patterns=[Pattern("UK passport", r"\b\d{9}\b", 0.05)],
context=["passport"],
name="UkPassportRecognizer",
),
PatternRecognizer(
supported_entity="UK_VEHICLE_REGISTRATION",
patterns=[Pattern("UK vehicle registration", r"\b[A-Za-z]{2}\d{2}\s?[A-Za-z]{3}\b", 0.4)],
context=["registration", "vehicle", "number plate", "car"],
name="UkVehicleRecognizer",
),
]
for rec in custom:
self.engine.registry.add_recognizer(rec)
def detect(self, text: str) -> list[Span]:
results = self.engine.analyze(text=text, language="en")
spans: list[Span] = []
for r in results:
if r.entity_type not in self.KEEP:
continue
if r.score >= self.score_threshold:
spans.append(Span(r.start, r.end, r.entity_type, text[r.start:r.end], r.score))
elif r.score >= self.review_threshold:
spans.append(
Span(r.start, r.end, r.entity_type, text[r.start:r.end], r.score,
needs_review=True)
)
spans += find_rule_spans(text) # rule-based detections are always confident
return _merge(spans)
# Precise pattern/checksum entities win over broad NER spans (PERSON/LOCATION) when
# they overlap — e.g. a rule DATE inside a spurious NER span should survive as the date.
_PRECISE = {
"UK_NHS", "DATE_TIME", "EMAIL_ADDRESS", "PHONE_NUMBER", "UK_POSTCODE",
"UK_NINO", "UK_VEHICLE_REGISTRATION", "UK_PASSPORT", "GMC", "NMC",
"NHS_ODS", "RECORD_ID",
}
def _merge(spans: list[Span]) -> list[Span]:
"""Return disjoint spans. On overlap, prefer precise rule entities, then the longer,
higher-scoring span. Disjoint output guarantees the transform can't corrupt text."""
def rank(s: Span):
return (1 if s.entity_type in _PRECISE else 0, s.end - s.start, s.score)
kept: list[Span] = []
for s in sorted(spans, key=rank, reverse=True):
if any(s.start < k.end and k.start < s.end for k in kept): # overlaps a kept span
continue
kept.append(s)
kept.sort(key=lambda s: s.start)
return kept
def build_detector(use_presidio: bool = True, spacy_model: str = "en_core_web_lg") -> Detector:
"""Best available detector; falls back to rules if Presidio import fails.
spacy_model defaults to en_core_web_lg (100% name recall in benchmarks).
Pass en_core_web_sm for faster startup when recall trade-off is acceptable.
"""
if use_presidio:
try:
return PresidioDetector(spacy_model=spacy_model)
except Exception as e: # pragma: no cover - environment dependent
print(f"[noteguard] Presidio unavailable ({e}); falling back to rules.")
return RuleDetector()
|