Spaces:
Running
Running
| """ | |
| NER-Based PHI Detection using Microsoft Presidio | |
| Extends the existing regex-based PHI detection in compliance.py with | |
| ML-powered Named Entity Recognition for detecting: | |
| - Patient names (not detectable by regex alone) | |
| - Addresses and geographic locations | |
| - Ages over 89 (HIPAA Safe Harbor) | |
| - Contextual identifiers that regex patterns miss | |
| Architecture: | |
| Presidio (primary, ML-based) -> regex fallback (existing patterns) | |
| The detector is loaded lazily to avoid startup overhead when not needed. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional | |
| from app.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Presidio entity types mapped to HIPAA Safe Harbor categories | |
| _HIPAA_ENTITY_TYPES = [ | |
| "PERSON", # Names | |
| "PHONE_NUMBER", # Phone numbers | |
| "EMAIL_ADDRESS", # Email addresses | |
| "US_SSN", # Social Security Numbers | |
| "LOCATION", # Addresses, geographic locations | |
| "DATE_TIME", # Dates (DOB, admission, discharge) | |
| "IP_ADDRESS", # IP addresses | |
| "US_DRIVER_LICENSE", # Driver's license | |
| "MEDICAL_LICENSE", # Medical license numbers | |
| "URL", # URLs that may contain PHI | |
| "NRP", # Nationality/religious/political group | |
| "AGE", # Ages (>89 are identifiers under Safe Harbor) | |
| ] | |
| # Minimum confidence score for a detection to be included | |
| _MIN_CONFIDENCE = 0.4 | |
| class PHIDetection: | |
| """A single PHI detection result.""" | |
| entity_type: str | |
| start: int | |
| end: int | |
| score: float | |
| text_snippet: str # First 4 chars + *** for audit logging | |
| class PHIScanResult: | |
| """Result of a comprehensive PHI scan.""" | |
| detections: list[PHIDetection] = field(default_factory=list) | |
| is_clean: bool = True | |
| detection_count: int = 0 | |
| entity_types_found: list[str] = field(default_factory=list) | |
| method: str = "regex" # "presidio", "regex", or "hybrid" | |
| def phi_count(self) -> int: | |
| return self.detection_count | |
| class PHIDetector: | |
| """PHI detection service combining Presidio NER with regex fallback. | |
| Usage: | |
| detector = get_phi_detector() | |
| result = detector.scan("Patient John Smith, DOB 01/15/1985") | |
| redacted = detector.redact("Patient John Smith, DOB 01/15/1985") | |
| """ | |
| def __init__(self): | |
| self._analyzer = None | |
| self._anonymizer = None | |
| self._presidio_available = False | |
| self._initialized = False | |
| def _init_presidio(self) -> bool: | |
| """Lazily initialize Presidio analyzer and anonymizer.""" | |
| if self._initialized: | |
| return self._presidio_available | |
| self._initialized = True | |
| try: | |
| from presidio_analyzer import AnalyzerEngine | |
| from presidio_anonymizer import AnonymizerEngine | |
| self._analyzer = AnalyzerEngine() | |
| self._anonymizer = AnonymizerEngine() | |
| self._presidio_available = True | |
| logger.info("Presidio PHI detector initialized successfully") | |
| except ImportError: | |
| logger.warning( | |
| "presidio-analyzer not installed. Falling back to regex-only PHI detection. " | |
| "Install with: pip install presidio-analyzer presidio-anonymizer" | |
| ) | |
| self._presidio_available = False | |
| except Exception as e: | |
| logger.warning(f"Presidio initialization failed: {e}. Using regex fallback.") | |
| self._presidio_available = False | |
| return self._presidio_available | |
| def scan(self, text: str) -> PHIScanResult: | |
| """Scan text for PHI using Presidio (primary) + regex (fallback). | |
| Args: | |
| text: Text to scan for PHI. | |
| Returns: | |
| PHIScanResult with all detections. | |
| """ | |
| if not text: | |
| return PHIScanResult() | |
| detections: list[PHIDetection] = [] | |
| method = "regex" | |
| # Try Presidio first | |
| if self._init_presidio() and self._analyzer is not None: | |
| method = "presidio" | |
| try: | |
| results = self._analyzer.analyze( | |
| text=text, | |
| entities=_HIPAA_ENTITY_TYPES, | |
| language="en", | |
| score_threshold=_MIN_CONFIDENCE, | |
| ) | |
| for r in results: | |
| snippet = text[r.start:r.start + 4] + "***" if r.end - r.start > 4 else "***" | |
| detections.append(PHIDetection( | |
| entity_type=r.entity_type, | |
| start=r.start, | |
| end=r.end, | |
| score=r.score, | |
| text_snippet=snippet, | |
| )) | |
| except Exception as e: | |
| logger.warning(f"Presidio scan failed, falling back to regex: {e}") | |
| method = "regex" | |
| # Always run regex as fallback/supplement | |
| from app.compliance import detect_phi as regex_detect_phi | |
| regex_detections = regex_detect_phi(text) | |
| if regex_detections: | |
| if method == "presidio": | |
| method = "hybrid" | |
| # Merge regex detections, avoiding duplicates (overlapping ranges) | |
| presidio_ranges = {(d.start, d.end) for d in detections} | |
| for rd in regex_detections: | |
| rd_range = (rd["start"], rd["end"]) | |
| # Check for overlap with existing detections | |
| overlaps = any( | |
| not (rd_range[1] <= p[0] or rd_range[0] >= p[1]) | |
| for p in presidio_ranges | |
| ) | |
| if not overlaps: | |
| detections.append(PHIDetection( | |
| entity_type=rd["pattern_type"], | |
| start=rd["start"], | |
| end=rd["end"], | |
| score=1.0, # Regex matches are binary | |
| text_snippet=rd["match"], | |
| )) | |
| entity_types = list({d.entity_type for d in detections}) | |
| return PHIScanResult( | |
| detections=detections, | |
| is_clean=len(detections) == 0, | |
| detection_count=len(detections), | |
| entity_types_found=entity_types, | |
| method=method, | |
| ) | |
| def redact(self, text: str) -> str: | |
| """Redact all detected PHI from text. | |
| Uses Presidio anonymizer if available, otherwise falls back to | |
| the existing regex-based redaction in compliance.py. | |
| Args: | |
| text: Text containing potential PHI. | |
| Returns: | |
| Text with PHI replaced by redaction markers. | |
| """ | |
| if not text: | |
| return text | |
| # Try Presidio anonymizer first | |
| if self._init_presidio() and self._analyzer is not None and self._anonymizer is not None: | |
| try: | |
| results = self._analyzer.analyze( | |
| text=text, | |
| entities=_HIPAA_ENTITY_TYPES, | |
| language="en", | |
| score_threshold=_MIN_CONFIDENCE, | |
| ) | |
| if results: | |
| anonymized = self._anonymizer.anonymize(text=text, analyzer_results=results) | |
| redacted = anonymized.text | |
| else: | |
| redacted = text | |
| # Second pass with regex for anything Presidio missed | |
| from app.compliance import redact_phi_text | |
| redacted = redact_phi_text(redacted) | |
| return redacted | |
| except Exception as e: | |
| logger.warning(f"Presidio redaction failed, using regex: {e}") | |
| # Regex-only fallback | |
| from app.compliance import redact_phi_text | |
| return redact_phi_text(text) | |
| def redact_for_storage(self, text: str) -> tuple[str, PHIScanResult]: | |
| """Redact PHI and return both the clean text and a scan report. | |
| Suitable for use before storing text in databases or vector stores. | |
| Performs double-pass redaction for safety. | |
| Args: | |
| text: Text to redact. | |
| Returns: | |
| Tuple of (redacted_text, scan_result). | |
| """ | |
| redacted = self.redact(text) | |
| # Verify the redacted text is clean | |
| verification = self.scan(redacted) | |
| if not verification.is_clean: | |
| # Second pass | |
| redacted = self.redact(redacted) | |
| verification = self.scan(redacted) | |
| return redacted, verification | |
| # --------------------------------------------------------------------------- | |
| # Singleton accessor | |
| # --------------------------------------------------------------------------- | |
| _detector: Optional[PHIDetector] = None | |
| def get_phi_detector() -> PHIDetector: | |
| """Get or create the singleton PHI detector instance.""" | |
| global _detector | |
| if _detector is None: | |
| _detector = PHIDetector() | |
| return _detector | |