medical-report-analyzer / phi_deidentifier.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
PHI De-identification Pipeline - Phase 2
HIPAA-compliant protected health information removal and anonymization.
This module provides comprehensive PHI detection and removal for medical documents
before AI processing, ensuring HIPAA compliance and data privacy.
Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""
import re
import hashlib
import logging
from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json
logger = logging.getLogger(__name__)
class PHICategory(Enum):
"""Categories of protected health information"""
PATIENT_NAME = "patient_name"
MEDICAL_RECORD_NUMBER = "mrn"
DATE_OF_BIRTH = "dob"
SOCIAL_SECURITY_NUMBER = "ssn"
PHONE_NUMBER = "phone"
EMAIL_ADDRESS = "email"
ADDRESS = "address"
DATE = "date"
AGE_OVER_89 = "age_89_plus"
BIO_METRIC_IDENTIFIER = "biometric"
PHOTO = "photo"
DEVICE_IDENTIFIER = "device_id"
ACCOUNT_NUMBER = "account"
CERTIFICATE_NUMBER = "certificate"
VEHICLE_IDENTIFIER = "vehicle"
WEB_URL = "web_url"
IP_ADDRESS = "ip_address"
FINGERPRINT = "fingerprint"
FULL_FACE_PHOTO = "full_face_photo"
@dataclass
class PHIMatch:
"""PHI entity match with replacement information"""
category: PHICategory
original_text: str
replacement: str
start_position: int
end_position: int
confidence: float
context: str
@dataclass
class DeidentificationResult:
"""Result of PHI de-identification process"""
original_text: str
deidentified_text: str
phi_matches: List[PHIMatch]
anonymization_method: str
hash_original: str
timestamp: datetime
compliance_level: str # HIPAA, GDPR, NONE
audit_log: Dict[str, Any]
class PHIPatterns:
"""Comprehensive PHI detection patterns"""
# Patient name patterns (various formats)
NAME_PATTERNS = [
r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b', # First Last [Middle]
r'\b([A-Z])\.?\s+([A-Z][a-z]+)\b', # F. Last
r'\b([A-Z][a-z]+),\s+([A-Z][a-z]+)\b', # Last, First
r'Patient Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)',
r'Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)',
]
# Medical Record Number patterns
MRN_PATTERNS = [
r'\b(?:MRN|Medical Record Number|Patient ID|ID Number|Record #?)[:\s]*([A-Z0-9]{6,12})\b',
r'\b(?:MRN|ID)[:\s]*([0-9]{6,10})\b',
r'\bPatient\s*(?:ID|Number)[:\s]*([A-Z0-9]{6,12})\b',
]
# Date of Birth patterns
DOB_PATTERNS = [
r'\b(?:DOB|Date of Birth|Birth Date|Born)[:\s]*([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\b',
r'\b([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\s*(?:DOB|birth|Born)\b',
r'\b(?:DOB|Date of Birth)[:\s]*(January|February|March|April|May|June|July|August|September|October|November|December)\s+([0-9]{1,2}),?\s+([0-9]{4})\b',
]
# Social Security Number patterns
SSN_PATTERNS = [
r'\b(?:SSN|Social Security Number)[:\s]*([0-9]{3}-[0-9]{2}-[0-9]{4})\b',
r'\b([0-9]{3}-[0-9]{2}-[0-9]{4})\b',
]
# Phone number patterns
PHONE_PATTERNS = [
r'\b(?:Phone|Tel|Telephone|Mobile|Cell)[:\s]*([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b',
r'\b([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b',
r'\b\([0-9]{3}\)\s*[0-9]{3}[-.\s]?[0-9]{4}\b',
]
# Email address patterns
EMAIL_PATTERNS = [
r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
r'\b(?:Email|E-mail)[:\s]*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
]
# Address patterns
ADDRESS_PATTERNS = [
r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl))\b',
r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)),\s*([A-Za-z\s]+),\s*([A-Z]{2})\s*([0-9]{5})\b',
r'\b(?:Address|Addr)[:\s]*([0-9]+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd))\b',
]
# IP address patterns
IP_PATTERNS = [
r'\b(?:IP Address|IP)[:\s]*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b',
r'\b([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b',
]
# URL patterns
URL_PATTERNS = [
r'\b(?:URL|Website|Web)[:\s]*(https?://[^\s]+)\b',
r'\b(https?://[^\s]+)\b',
]
# Device identifier patterns
DEVICE_PATTERNS = [
r'\b(?:Device ID|Device|Serial Number|Serial)[:\s]*([A-Z0-9]{6,20})\b',
r'\b(?:IMEI|IMSI|MAC Address)[:\s]*([A-F0-9]{15,17})\b',
]
class MedicalPHIDeidentifier:
"""HIPAA-compliant PHI de-identification system"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or self._default_config()
self.patterns = PHIPatterns()
self.anonymization_cache = {}
def _default_config(self) -> Dict[str, Any]:
"""Default de-identification configuration"""
return {
"compliance_level": "HIPAA",
"preserve_medical_context": True,
"use_hashing": True,
"redaction_method": "placeholder",
"date_shift_days": 0, # For research use
"preserve_age_category": True, # Keep age ranges but not exact ages
"whitelist_terms": ["Dr.", "Mr.", "Ms.", "Mrs.", "MD", "DO"], # Terms to preserve
}
def deidentify_text(self, text: str, document_type: str = "general") -> DeidentificationResult:
"""
De-identify text by removing or replacing PHI
Args:
text: Text to de-identify
document_type: Type of medical document for targeted processing
Returns:
DeidentificationResult with de-identified text and audit log
"""
original_text = text
phi_matches = []
deidentified_text = text
audit_log = {
"processing_timestamp": datetime.now().isoformat(),
"document_type": document_type,
"original_length": len(text),
"phi_categories_found": [],
"replacements_made": 0
}
# Calculate hash of original for audit trail
hash_original = hashlib.sha256(text.encode()).hexdigest()
# Process each PHI category
categories_to_process = self._get_categories_for_doc_type(document_type)
for category in categories_to_process:
matches = self._detect_phi_category(text, category)
phi_matches.extend(matches)
if matches:
audit_log["phi_categories_found"].append(category.value)
audit_log["replacements_made"] += len(matches)
# Sort matches by position (descending) to avoid index shifts
phi_matches.sort(key=lambda x: x.start_position, reverse=True)
# Apply replacements
for match in phi_matches:
deidentified_text = (
deidentified_text[:match.start_position] +
match.replacement +
deidentified_text[match.end_position:]
)
# Apply document-specific processing
if document_type == "ecg":
deidentified_text = self._process_ecg_specific(deidentified_text)
elif document_type == "radiology":
deidentified_text = self._process_radiology_specific(deidentified_text)
elif document_type == "laboratory":
deidentified_text = self._process_laboratory_specific(deidentified_text)
# Final cleanup and validation
deidentified_text = self._final_cleanup(deidentified_text)
audit_log.update({
"final_length": len(deidentified_text),
"phi_matches_count": len(phi_matches),
"compression_ratio": len(deidentified_text) / len(text) if text else 1.0
})
return DeidentificationResult(
original_text=original_text,
deidentified_text=deidentified_text,
phi_matches=phi_matches,
anonymization_method=self.config["redaction_method"],
hash_original=hash_original,
timestamp=datetime.now(),
compliance_level=self.config["compliance_level"],
audit_log=audit_log
)
def _get_categories_for_doc_type(self, document_type: str) -> List[PHICategory]:
"""Get relevant PHI categories for document type"""
base_categories = [
PHICategory.PATIENT_NAME,
PHICategory.MEDICAL_RECORD_NUMBER,
PHICategory.DATE_OF_BIRTH,
PHICategory.PHONE_NUMBER,
PHICategory.EMAIL_ADDRESS,
PHICategory.ADDRESS,
PHICategory.IP_ADDRESS,
PHICategory.WEB_URL
]
if document_type == "ecg":
base_categories.extend([PHICategory.DEVICE_IDENTIFIER])
elif document_type == "radiology":
base_categories.extend([PHICategory.DEVICE_IDENTIFIER, PHICategory.ACCOUNT_NUMBER])
elif document_type == "laboratory":
base_categories.extend([PHICategory.ACCOUNT_NUMBER])
return base_categories
def _detect_phi_category(self, text: str, category: PHICategory) -> List[PHIMatch]:
"""Detect PHI for a specific category"""
matches = []
# Get relevant patterns for category
pattern_map = {
PHICategory.PATIENT_NAME: self.patterns.NAME_PATTERNS,
PHICategory.MEDICAL_RECORD_NUMBER: self.patterns.MRN_PATTERNS,
PHICategory.DATE_OF_BIRTH: self.patterns.DOB_PATTERNS,
PHICategory.SOCIAL_SECURITY_NUMBER: self.patterns.SSN_PATTERNS,
PHICategory.PHONE_NUMBER: self.patterns.PHONE_PATTERNS,
PHICategory.EMAIL_ADDRESS: self.patterns.EMAIL_PATTERNS,
PHICategory.ADDRESS: self.patterns.ADDRESS_PATTERNS,
PHICategory.IP_ADDRESS: self.patterns.IP_PATTERNS,
PHICategory.WEB_URL: self.patterns.URL_PATTERNS,
PHICategory.DEVICE_IDENTIFIER: self.patterns.DEVICE_PATTERNS,
}
patterns = pattern_map.get(category, [])
for pattern in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
original_text = match.group(0)
# Get capture group if present
if len(match.groups()) > 0:
captured_text = match.group(1)
replacement = self._generate_replacement(category, captured_text)
start_pos = match.start(1)
end_pos = match.end(1)
else:
replacement = self._generate_replacement(category, original_text)
start_pos = match.start()
end_pos = match.end()
# Extract context
context_start = max(0, start_pos - 50)
context_end = min(len(text), end_pos + 50)
context = text[context_start:context_end]
matches.append(PHIMatch(
category=category,
original_text=original_text,
replacement=replacement,
start_position=start_pos,
end_position=end_pos,
confidence=0.8, # Pattern-based confidence
context=context
))
return matches
def _generate_replacement(self, category: PHICategory, original: str) -> str:
"""Generate appropriate replacement for PHI category"""
if self.config["use_hashing"]:
# Use consistent hashing for the same input
if original not in self.anonymization_cache:
hash_obj = hashlib.md5(original.encode())
self.anonymization_cache[original] = f"[{category.value.upper()}_{hash_obj.hexdigest()[:8]}]"
return self.anonymization_cache[original]
else:
# Use generic placeholders
placeholder_map = {
PHICategory.PATIENT_NAME: "[PATIENT_NAME]",
PHICategory.MEDICAL_RECORD_NUMBER: "[MRN]",
PHICategory.DATE_OF_BIRTH: "[DOB]",
PHICategory.SOCIAL_SECURITY_NUMBER: "[SSN]",
PHICategory.PHONE_NUMBER: "[PHONE]",
PHICategory.EMAIL_ADDRESS: "[EMAIL]",
PHICategory.ADDRESS: "[ADDRESS]",
PHICategory.IP_ADDRESS: "[IP_ADDRESS]",
PHICategory.WEB_URL: "[URL]",
PHICategory.DEVICE_IDENTIFIER: "[DEVICE_ID]"
}
return placeholder_map.get(category, f"[{category.value.upper()}]")
def _process_ecg_specific(self, text: str) -> str:
"""ECG-specific PHI processing"""
# Preserve ECG technical terms but remove identifiers
ecg_preserve_terms = [
"ECG", "EKG", "lead", "rhythm", "rate", "interval", "waveform",
"QRS", "QT", "PR", "ST", "P wave", "T wave"
]
# Remove device-specific identifiers but keep technical data
text = re.sub(r'(?:Device|Equipment)[:\s]*([A-Z0-9]+)', '[DEVICE_ID]', text)
text = re.sub(r'(?:Serial|Model)[:\s]*([A-Z0-9]+)', '[DEVICE_SERIAL]', text)
return text
def _process_radiology_specific(self, text: str) -> str:
"""Radiology-specific PHI processing"""
# Preserve imaging parameters but remove identifiers
imaging_terms = [
"CT", "MRI", "X-ray", "ultrasound", "contrast", "slice", "plane",
"axial", "coronal", "sagittal", "enhancement", "attenuation"
]
# Remove facility and equipment identifiers
text = re.sub(r'(?:Facility|Hospital|Clinic)[:\s]*([A-Za-z\s]+)', '[FACILITY]', text)
text = re.sub(r'(?:Machine|Scanner|Equipment)[:\s]*([A-Za-z0-9\s]+)', '[IMAGING_DEVICE]', text)
return text
def _process_laboratory_specific(self, text: str) -> str:
"""Laboratory-specific PHI processing"""
# Preserve lab values and units but remove identifiers
lab_terms = [
"glucose", "cholesterol", "hemoglobin", "WBC", "RBC", "platelets",
"mg/dL", "g/dL", "10^3/μL", "normal", "abnormal", "elevated", "decreased"
]
# Remove lab facility identifiers
text = re.sub(r'(?:Lab|Laboratory)[:\s]*([A-Za-z\s]+)', '[LAB_FACILITY]', text)
text = re.sub(r'(?:Accession|Test)[:\s]*([A-Z0-9]+)', '[TEST_ID]', text)
return text
def _final_cleanup(self, text: str) -> str:
"""Final cleanup and validation of de-identified text"""
# Remove any residual patterns
text = re.sub(r'\s+', ' ', text) # Normalize whitespace
text = text.strip()
# Check for any remaining obvious PHI patterns
remaining_phi = self._check_residual_phi(text)
if remaining_phi:
logger.warning(f"Potential PHI detected after de-identification: {remaining_phi}")
return text
def _check_residual_phi(self, text: str) -> List[str]:
"""Check for any remaining PHI patterns"""
potential_phi = []
# Check for phone numbers
if re.search(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', text):
potential_phi.append("phone_number")
# Check for email addresses
if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
potential_phi.append("email_address")
# Check for SSN-like patterns
if re.search(r'\b\d{3}-\d{2}-\d{4}\b', text):
potential_phi.append("ssn_pattern")
return potential_phi
def batch_deidentify(self, texts: List[Tuple[str, str]]) -> List[DeidentificationResult]:
"""Batch de-identify multiple texts with document types"""
results = []
for text, doc_type in texts:
result = self.deidentify_text(text, doc_type)
results.append(result)
return results
def generate_audit_report(self, results: List[DeidentificationResult]) -> Dict[str, Any]:
"""Generate comprehensive audit report for compliance"""
total_phi_matches = sum(len(r.phi_matches) for r in results)
categories_found = {}
compliance_score = 0.0
for result in results:
for match in result.phi_matches:
cat = match.category.value
categories_found[cat] = categories_found.get(cat, 0) + 1
# Calculate compliance score based on coverage
if results:
avg_phi_per_doc = total_phi_matches / len(results)
compliance_score = min(1.0, 0.9 + (0.1 * (1.0 - min(avg_phi_per_doc / 10, 1.0))))
return {
"audit_timestamp": datetime.now().isoformat(),
"total_documents": len(results),
"total_phi_matches": total_phi_matches,
"phi_categories_found": categories_found,
"compliance_score": compliance_score,
"compliance_level": "HIPAA_COMPLIANT" if compliance_score > 0.8 else "NEEDS_REVIEW",
"recommendations": self._generate_recommendations(categories_found, compliance_score)
}
def _generate_recommendations(self, categories_found: Dict[str, int], compliance_score: float) -> List[str]:
"""Generate compliance recommendations"""
recommendations = []
if compliance_score < 0.8:
recommendations.append("Increase PHI detection patterns for better coverage")
if categories_found.get("patient_name", 0) > 5:
recommendations.append("Consider enhanced name detection patterns")
if categories_found.get("address", 0) > 0:
recommendations.append("Address detection appears effective")
if categories_found.get("device_identifier", 0) > 0:
recommendations.append("Device identifiers detected - ensure proper anonymization")
return recommendations
# Export main classes
__all__ = [
"MedicalPHIDeidentifier",
"PHICategory",
"PHIMatch",
"DeidentificationResult"
]