| """Detection layer. |
| |
| NoteGuard does not reinvent detection — Presidio is the engine. Our job is to |
| (1) compose Presidio's NER with our transparent rule layer, (2) keep everything |
| behind one `Detector` interface so the pipeline and eval are engine-agnostic, and |
| (3) make detection degrade gracefully to pure-Python rules when spaCy/Presidio |
| are unavailable. |
| """ |
| from __future__ import annotations |
|
|
| from typing import Protocol |
|
|
| from .recognisers import Span, find_rule_spans |
|
|
|
|
| class Detector(Protocol): |
| def detect(self, text: str) -> list[Span]: ... |
|
|
|
|
| class RuleDetector: |
| """Pure-Python baseline. No external dependencies.""" |
|
|
| name = "rules" |
|
|
| def detect(self, text: str) -> list[Span]: |
| return find_rule_spans(text) |
|
|
|
|
| class PresidioDetector: |
| """Presidio AnalyzerEngine (spaCy NER + recognisers), unioned with our rules. |
| |
| The rule layer is kept in the union because our NHS-number recogniser is |
| checksum-validated and our outputs stay auditable. |
| """ |
|
|
| name = "presidio+rules" |
|
|
| |
| |
| |
| |
| |
| |
| KEEP = { |
| "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", |
| "LOCATION", "UK_NHS", "UK_NINO", "UK_PASSPORT", |
| "UK_VEHICLE_REGISTRATION", "IP_ADDRESS", "URL", |
| } |
|
|
| def __init__( |
| self, |
| spacy_model: str = "en_core_web_lg", |
| score_threshold: float = 0.6, |
| review_threshold: float = 0.35, |
| ): |
| """ |
| spacy_model — en_core_web_lg (default, 100% name recall) or en_core_web_sm (faster). |
| score_threshold — spans above this are auto-confirmed and always redacted. |
| review_threshold — spans in [review_threshold, score_threshold) are flagged |
| needs_review=True: still redacted for safety, but surfaced for |
| human confirmation in the audit/UI (human-in-the-loop queue). |
| """ |
| from presidio_analyzer import AnalyzerEngine |
| from presidio_analyzer.nlp_engine import NlpEngineProvider |
|
|
| provider = NlpEngineProvider(nlp_configuration={ |
| "nlp_engine_name": "spacy", |
| "models": [{"lang_code": "en", "model_name": spacy_model}], |
| }) |
| self.engine = AnalyzerEngine(nlp_engine=provider.create_engine()) |
| self.score_threshold = score_threshold |
| self.review_threshold = review_threshold |
| self._register_uk_recognisers() |
|
|
| def _register_uk_recognisers(self) -> None: |
| """Register UK entity recognisers that Presidio documents but does not ship.""" |
| from presidio_analyzer import Pattern, PatternRecognizer |
|
|
| custom = [ |
| PatternRecognizer( |
| supported_entity="UK_NINO", |
| patterns=[Pattern("UK NINO", r"\b[A-Za-z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-Da-d]\b", 0.5)], |
| context=["national insurance", "ni number", "nino"], |
| name="UkNinoRecognizer", |
| ), |
| PatternRecognizer( |
| supported_entity="UK_PASSPORT", |
| |
| patterns=[Pattern("UK passport", r"\b\d{9}\b", 0.05)], |
| context=["passport"], |
| name="UkPassportRecognizer", |
| ), |
| PatternRecognizer( |
| supported_entity="UK_VEHICLE_REGISTRATION", |
| patterns=[Pattern("UK vehicle registration", r"\b[A-Za-z]{2}\d{2}\s?[A-Za-z]{3}\b", 0.4)], |
| context=["registration", "vehicle", "number plate", "car"], |
| name="UkVehicleRecognizer", |
| ), |
| ] |
| for rec in custom: |
| self.engine.registry.add_recognizer(rec) |
|
|
| def detect(self, text: str) -> list[Span]: |
| results = self.engine.analyze(text=text, language="en") |
| spans: list[Span] = [] |
| for r in results: |
| if r.entity_type not in self.KEEP: |
| continue |
| if r.score >= self.score_threshold: |
| spans.append(Span(r.start, r.end, r.entity_type, text[r.start:r.end], r.score)) |
| elif r.score >= self.review_threshold: |
| spans.append( |
| Span(r.start, r.end, r.entity_type, text[r.start:r.end], r.score, |
| needs_review=True) |
| ) |
| spans += find_rule_spans(text) |
| return _merge(spans) |
|
|
|
|
| |
| |
| _PRECISE = { |
| "UK_NHS", "DATE_TIME", "EMAIL_ADDRESS", "PHONE_NUMBER", "UK_POSTCODE", |
| "UK_NINO", "UK_VEHICLE_REGISTRATION", "UK_PASSPORT", "GMC", "NMC", |
| "NHS_ODS", "RECORD_ID", |
| } |
|
|
|
|
| def _merge(spans: list[Span]) -> list[Span]: |
| """Return disjoint spans. On overlap, prefer precise rule entities, then the longer, |
| higher-scoring span. Disjoint output guarantees the transform can't corrupt text.""" |
| def rank(s: Span): |
| return (1 if s.entity_type in _PRECISE else 0, s.end - s.start, s.score) |
|
|
| kept: list[Span] = [] |
| for s in sorted(spans, key=rank, reverse=True): |
| if any(s.start < k.end and k.start < s.end for k in kept): |
| continue |
| kept.append(s) |
| kept.sort(key=lambda s: s.start) |
| return kept |
|
|
|
|
| def build_detector(use_presidio: bool = True, spacy_model: str = "en_core_web_lg") -> Detector: |
| """Best available detector; falls back to rules if Presidio import fails. |
| |
| spacy_model defaults to en_core_web_lg (100% name recall in benchmarks). |
| Pass en_core_web_sm for faster startup when recall trade-off is acceptable. |
| """ |
| if use_presidio: |
| try: |
| return PresidioDetector(spacy_model=spacy_model) |
| except Exception as e: |
| print(f"[noteguard] Presidio unavailable ({e}); falling back to rules.") |
| return RuleDetector() |
|
|