| from presidio_analyzer import AnalyzerEngine |
| from presidio_anonymizer import AnonymizerEngine |
| from typing import Dict, List |
| import re |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PIIDetector: |
| """Service to detect and remove Personal Identifiable Information from medical notes""" |
| |
| def __init__(self): |
| """Initialize PII detection engines""" |
| try: |
| self.analyzer = AnalyzerEngine() |
| self.anonymizer = AnonymizerEngine() |
| |
| |
| self.entities_to_detect = [ |
| "PERSON", |
| "EMAIL_ADDRESS", |
| "PHONE_NUMBER", |
| "US_SSN", |
| "CREDIT_CARD", |
| "US_DRIVER_LICENSE", |
| "LOCATION", |
| "DATE_TIME", |
| "US_PASSPORT", |
| "MEDICAL_LICENSE", |
| "IP_ADDRESS", |
| "URL" |
| ] |
| |
| logger.info("✅ PII Detector initialized successfully") |
| except Exception as e: |
| logger.error(f"❌ Failed to initialize PII Detector: {str(e)}") |
| raise |
| |
| def detect_pii(self, text: str) -> List[Dict]: |
| """ |
| Detect PII entities in text |
| |
| Args: |
| text: Input text to analyze |
| |
| Returns: |
| List of detected PII entities with details |
| """ |
| try: |
| results = self.analyzer.analyze( |
| text=text, |
| entities=self.entities_to_detect, |
| language='en' |
| ) |
| |
| pii_findings = [] |
| for result in results: |
| pii_findings.append({ |
| "entity_type": result.entity_type, |
| "start": result.start, |
| "end": result.end, |
| "score": result.score, |
| "text": text[result.start:result.end] |
| }) |
| |
| logger.info(f"🔍 Detected {len(pii_findings)} PII entities") |
| return pii_findings |
| |
| except Exception as e: |
| logger.error(f"❌ Error detecting PII: {str(e)}") |
| return [] |
| |
| def remove_pii(self, text: str) -> Dict[str, any]: |
| """ |
| Remove PII from text while preserving medical information |
| |
| Args: |
| text: Input text containing potential PII |
| |
| Returns: |
| Dictionary with sanitized text and PII removal report |
| """ |
| try: |
| |
| analyzer_results = self.analyzer.analyze( |
| text=text, |
| entities=self.entities_to_detect, |
| language='en' |
| ) |
| |
| if not analyzer_results: |
| logger.info("✅ No PII detected in text") |
| return { |
| "sanitized_text": text, |
| "pii_detected": [], |
| "pii_count": 0, |
| "was_pii_removed": False |
| } |
| |
| |
| anonymized_result = self.anonymizer.anonymize( |
| text=text, |
| analyzer_results=analyzer_results |
| ) |
| |
| sanitized_text = anonymized_result.text |
| |
| |
| |
| sanitized_text = self._clean_medical_patterns(sanitized_text) |
| |
| |
| pii_detected = [] |
| for result in analyzer_results: |
| pii_detected.append({ |
| "entity_type": result.entity_type, |
| "start": result.start, |
| "end": result.end, |
| "score": result.score |
| }) |
| |
| logger.info(f"✅ Removed {len(pii_detected)} PII entities from text") |
| |
| return { |
| "sanitized_text": sanitized_text, |
| "pii_detected": pii_detected, |
| "pii_count": len(pii_detected), |
| "was_pii_removed": True |
| } |
| |
| except Exception as e: |
| logger.error(f"❌ Error removing PII: {str(e)}") |
| |
| return { |
| "sanitized_text": text, |
| "pii_detected": [], |
| "pii_count": 0, |
| "was_pii_removed": False, |
| "error": str(e) |
| } |
| |
| def _clean_medical_patterns(self, text: str) -> str: |
| """ |
| Clean common medical note PII patterns that might be missed |
| |
| Args: |
| text: Text to clean |
| |
| Returns: |
| Cleaned text |
| """ |
| |
| text = re.sub( |
| r'(Patient|Pt|Patient Name):\s*<[A-Z_]+>', |
| r'\1: [REDACTED]', |
| text, |
| flags=re.IGNORECASE |
| ) |
| |
| |
| text = re.sub( |
| r'(DOB|Date of Birth|Birth Date):\s*<[A-Z_]+>', |
| r'\1: [REDACTED]', |
| text, |
| flags=re.IGNORECASE |
| ) |
| |
| |
| text = re.sub( |
| r'(Address|Addr|Home Address):\s*<[A-Z_]+>', |
| r'\1: [REDACTED]', |
| text, |
| flags=re.IGNORECASE |
| ) |
| |
| |
| text = re.sub( |
| r'(Phone|Tel|Telephone|Cell|Mobile):\s*<[A-Z_]+>', |
| r'\1: [REDACTED]', |
| text, |
| flags=re.IGNORECASE |
| ) |
| |
| |
| text = re.sub( |
| r'(MRN|Medical Record Number|Record #):\s*<[A-Z_]+>', |
| r'\1: [REDACTED]', |
| text, |
| flags=re.IGNORECASE |
| ) |
| |
| return text |
|
|
|
|
| |
| pii_detector = PIIDetector() |