Distopia22's picture
Add automatic PII removal during file extraction
fd20bd2
raw
history blame
6.48 kB
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from typing import Dict, List
import re
import logging
logger = logging.getLogger(__name__)
class PIIDetector:
"""Service to detect and remove Personal Identifiable Information from medical notes"""
def __init__(self):
"""Initialize PII detection engines"""
try:
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
# Entities to detect (common in medical notes)
self.entities_to_detect = [
"PERSON", # Names
"EMAIL_ADDRESS", # Email
"PHONE_NUMBER", # Phone numbers
"US_SSN", # Social Security Number
"CREDIT_CARD", # Credit card numbers
"US_DRIVER_LICENSE", # Driver's license
"LOCATION", # Addresses, cities
"DATE_TIME", # Birth dates, appointment dates
"US_PASSPORT", # Passport numbers
"MEDICAL_LICENSE", # Medical license numbers
"IP_ADDRESS", # IP addresses
"URL" # URLs
]
logger.info("✅ PII Detector initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize PII Detector: {str(e)}")
raise
def detect_pii(self, text: str) -> List[Dict]:
"""
Detect PII entities in text
Args:
text: Input text to analyze
Returns:
List of detected PII entities with details
"""
try:
results = self.analyzer.analyze(
text=text,
entities=self.entities_to_detect,
language='en'
)
pii_findings = []
for result in results:
pii_findings.append({
"entity_type": result.entity_type,
"start": result.start,
"end": result.end,
"score": result.score,
"text": text[result.start:result.end]
})
logger.info(f"🔍 Detected {len(pii_findings)} PII entities")
return pii_findings
except Exception as e:
logger.error(f"❌ Error detecting PII: {str(e)}")
return []
def remove_pii(self, text: str) -> Dict[str, any]:
"""
Remove PII from text while preserving medical information
Args:
text: Input text containing potential PII
Returns:
Dictionary with sanitized text and PII removal report
"""
try:
# Step 1: Detect PII
analyzer_results = self.analyzer.analyze(
text=text,
entities=self.entities_to_detect,
language='en'
)
if not analyzer_results:
logger.info("✅ No PII detected in text")
return {
"sanitized_text": text,
"pii_detected": [],
"pii_count": 0,
"was_pii_removed": False
}
# Step 2: Anonymize detected PII
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results
)
sanitized_text = anonymized_result.text
# Step 3: Additional pattern-based cleaning for medical notes
# Replace common medical note PII patterns
sanitized_text = self._clean_medical_patterns(sanitized_text)
# Step 4: Collect PII detection details
pii_detected = []
for result in analyzer_results:
pii_detected.append({
"entity_type": result.entity_type,
"start": result.start,
"end": result.end,
"score": result.score
})
logger.info(f"✅ Removed {len(pii_detected)} PII entities from text")
return {
"sanitized_text": sanitized_text,
"pii_detected": pii_detected,
"pii_count": len(pii_detected),
"was_pii_removed": True
}
except Exception as e:
logger.error(f"❌ Error removing PII: {str(e)}")
# Return original text if PII removal fails
return {
"sanitized_text": text,
"pii_detected": [],
"pii_count": 0,
"was_pii_removed": False,
"error": str(e)
}
def _clean_medical_patterns(self, text: str) -> str:
"""
Clean common medical note PII patterns that might be missed
Args:
text: Text to clean
Returns:
Cleaned text
"""
# Pattern 1: "Patient: <NAME>" or "Pt: <NAME>"
text = re.sub(
r'(Patient|Pt|Patient Name):\s*<[A-Z_]+>',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
# Pattern 2: "DOB: <DATE>"
text = re.sub(
r'(DOB|Date of Birth|Birth Date):\s*<[A-Z_]+>',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
# Pattern 3: "Address: <LOCATION>"
text = re.sub(
r'(Address|Addr|Home Address):\s*<[A-Z_]+>',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
# Pattern 4: "Phone: <PHONE_NUMBER>"
text = re.sub(
r'(Phone|Tel|Telephone|Cell|Mobile):\s*<[A-Z_]+>',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
# Pattern 5: "MRN: <NUMBER>" (Medical Record Number)
text = re.sub(
r'(MRN|Medical Record Number|Record #):\s*<[A-Z_]+>',
r'\1: [REDACTED]',
text,
flags=re.IGNORECASE
)
return text
# Singleton instance
pii_detector = PIIDetector()