|
|
from presidio_analyzer import AnalyzerEngine |
|
|
from presidio_anonymizer import AnonymizerEngine |
|
|
from typing import Dict, List |
|
|
import re |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PIIDetector: |
|
|
"""Service to detect and remove Personal Identifiable Information from medical notes""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize PII detection engines""" |
|
|
try: |
|
|
self.analyzer = AnalyzerEngine() |
|
|
self.anonymizer = AnonymizerEngine() |
|
|
|
|
|
|
|
|
self.entities_to_detect = [ |
|
|
"PERSON", |
|
|
"EMAIL_ADDRESS", |
|
|
"PHONE_NUMBER", |
|
|
"US_SSN", |
|
|
"CREDIT_CARD", |
|
|
"US_DRIVER_LICENSE", |
|
|
"LOCATION", |
|
|
"DATE_TIME", |
|
|
"US_PASSPORT", |
|
|
"MEDICAL_LICENSE", |
|
|
"IP_ADDRESS", |
|
|
"URL" |
|
|
] |
|
|
|
|
|
logger.info("✅ PII Detector initialized successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to initialize PII Detector: {str(e)}") |
|
|
raise |
|
|
|
|
|
def detect_pii(self, text: str) -> List[Dict]: |
|
|
""" |
|
|
Detect PII entities in text |
|
|
|
|
|
Args: |
|
|
text: Input text to analyze |
|
|
|
|
|
Returns: |
|
|
List of detected PII entities with details |
|
|
""" |
|
|
try: |
|
|
results = self.analyzer.analyze( |
|
|
text=text, |
|
|
entities=self.entities_to_detect, |
|
|
language='en' |
|
|
) |
|
|
|
|
|
pii_findings = [] |
|
|
for result in results: |
|
|
pii_findings.append({ |
|
|
"entity_type": result.entity_type, |
|
|
"start": result.start, |
|
|
"end": result.end, |
|
|
"score": result.score, |
|
|
"text": text[result.start:result.end] |
|
|
}) |
|
|
|
|
|
logger.info(f"🔍 Detected {len(pii_findings)} PII entities") |
|
|
return pii_findings |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error detecting PII: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def remove_pii(self, text: str) -> Dict[str, any]: |
|
|
""" |
|
|
Remove PII from text while preserving medical information |
|
|
|
|
|
Args: |
|
|
text: Input text containing potential PII |
|
|
|
|
|
Returns: |
|
|
Dictionary with sanitized text and PII removal report |
|
|
""" |
|
|
try: |
|
|
|
|
|
analyzer_results = self.analyzer.analyze( |
|
|
text=text, |
|
|
entities=self.entities_to_detect, |
|
|
language='en' |
|
|
) |
|
|
|
|
|
if not analyzer_results: |
|
|
logger.info("✅ No PII detected in text") |
|
|
return { |
|
|
"sanitized_text": text, |
|
|
"pii_detected": [], |
|
|
"pii_count": 0, |
|
|
"was_pii_removed": False |
|
|
} |
|
|
|
|
|
|
|
|
anonymized_result = self.anonymizer.anonymize( |
|
|
text=text, |
|
|
analyzer_results=analyzer_results |
|
|
) |
|
|
|
|
|
sanitized_text = anonymized_result.text |
|
|
|
|
|
|
|
|
|
|
|
sanitized_text = self._clean_medical_patterns(sanitized_text) |
|
|
|
|
|
|
|
|
pii_detected = [] |
|
|
for result in analyzer_results: |
|
|
pii_detected.append({ |
|
|
"entity_type": result.entity_type, |
|
|
"start": result.start, |
|
|
"end": result.end, |
|
|
"score": result.score |
|
|
}) |
|
|
|
|
|
logger.info(f"✅ Removed {len(pii_detected)} PII entities from text") |
|
|
|
|
|
return { |
|
|
"sanitized_text": sanitized_text, |
|
|
"pii_detected": pii_detected, |
|
|
"pii_count": len(pii_detected), |
|
|
"was_pii_removed": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error removing PII: {str(e)}") |
|
|
|
|
|
return { |
|
|
"sanitized_text": text, |
|
|
"pii_detected": [], |
|
|
"pii_count": 0, |
|
|
"was_pii_removed": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
def _clean_medical_patterns(self, text: str) -> str: |
|
|
""" |
|
|
Clean common medical note PII patterns that might be missed |
|
|
|
|
|
Args: |
|
|
text: Text to clean |
|
|
|
|
|
Returns: |
|
|
Cleaned text |
|
|
""" |
|
|
|
|
|
text = re.sub( |
|
|
r'(Patient|Pt|Patient Name):\s*<[A-Z_]+>', |
|
|
r'\1: [REDACTED]', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
|
|
|
text = re.sub( |
|
|
r'(DOB|Date of Birth|Birth Date):\s*<[A-Z_]+>', |
|
|
r'\1: [REDACTED]', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
|
|
|
text = re.sub( |
|
|
r'(Address|Addr|Home Address):\s*<[A-Z_]+>', |
|
|
r'\1: [REDACTED]', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
|
|
|
text = re.sub( |
|
|
r'(Phone|Tel|Telephone|Cell|Mobile):\s*<[A-Z_]+>', |
|
|
r'\1: [REDACTED]', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
|
|
|
text = re.sub( |
|
|
r'(MRN|Medical Record Number|Record #):\s*<[A-Z_]+>', |
|
|
r'\1: [REDACTED]', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
pii_detector = PIIDetector() |