""" PHI De-identification Pipeline - Phase 2 HIPAA-compliant protected health information removal and anonymization. This module provides comprehensive PHI detection and removal for medical documents before AI processing, ensuring HIPAA compliance and data privacy. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import re import hashlib import logging from typing import Dict, List, Optional, Tuple, Any, Set from dataclasses import dataclass from datetime import datetime from enum import Enum import json logger = logging.getLogger(__name__) class PHICategory(Enum): """Categories of protected health information""" PATIENT_NAME = "patient_name" MEDICAL_RECORD_NUMBER = "mrn" DATE_OF_BIRTH = "dob" SOCIAL_SECURITY_NUMBER = "ssn" PHONE_NUMBER = "phone" EMAIL_ADDRESS = "email" ADDRESS = "address" DATE = "date" AGE_OVER_89 = "age_89_plus" BIO_METRIC_IDENTIFIER = "biometric" PHOTO = "photo" DEVICE_IDENTIFIER = "device_id" ACCOUNT_NUMBER = "account" CERTIFICATE_NUMBER = "certificate" VEHICLE_IDENTIFIER = "vehicle" WEB_URL = "web_url" IP_ADDRESS = "ip_address" FINGERPRINT = "fingerprint" FULL_FACE_PHOTO = "full_face_photo" @dataclass class PHIMatch: """PHI entity match with replacement information""" category: PHICategory original_text: str replacement: str start_position: int end_position: int confidence: float context: str @dataclass class DeidentificationResult: """Result of PHI de-identification process""" original_text: str deidentified_text: str phi_matches: List[PHIMatch] anonymization_method: str hash_original: str timestamp: datetime compliance_level: str # HIPAA, GDPR, NONE audit_log: Dict[str, Any] class PHIPatterns: """Comprehensive PHI detection patterns""" # Patient name patterns (various formats) NAME_PATTERNS = [ r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b', # First Last [Middle] r'\b([A-Z])\.?\s+([A-Z][a-z]+)\b', # F. Last r'\b([A-Z][a-z]+),\s+([A-Z][a-z]+)\b', # Last, First r'Patient Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', r'Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', ] # Medical Record Number patterns MRN_PATTERNS = [ r'\b(?:MRN|Medical Record Number|Patient ID|ID Number|Record #?)[:\s]*([A-Z0-9]{6,12})\b', r'\b(?:MRN|ID)[:\s]*([0-9]{6,10})\b', r'\bPatient\s*(?:ID|Number)[:\s]*([A-Z0-9]{6,12})\b', ] # Date of Birth patterns DOB_PATTERNS = [ r'\b(?:DOB|Date of Birth|Birth Date|Born)[:\s]*([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\b', r'\b([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\s*(?:DOB|birth|Born)\b', r'\b(?:DOB|Date of Birth)[:\s]*(January|February|March|April|May|June|July|August|September|October|November|December)\s+([0-9]{1,2}),?\s+([0-9]{4})\b', ] # Social Security Number patterns SSN_PATTERNS = [ r'\b(?:SSN|Social Security Number)[:\s]*([0-9]{3}-[0-9]{2}-[0-9]{4})\b', r'\b([0-9]{3}-[0-9]{2}-[0-9]{4})\b', ] # Phone number patterns PHONE_PATTERNS = [ r'\b(?:Phone|Tel|Telephone|Mobile|Cell)[:\s]*([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b', r'\b([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b', r'\b\([0-9]{3}\)\s*[0-9]{3}[-.\s]?[0-9]{4}\b', ] # Email address patterns EMAIL_PATTERNS = [ r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', r'\b(?:Email|E-mail)[:\s]*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', ] # Address patterns ADDRESS_PATTERNS = [ r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl))\b', r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)),\s*([A-Za-z\s]+),\s*([A-Z]{2})\s*([0-9]{5})\b', r'\b(?:Address|Addr)[:\s]*([0-9]+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd))\b', ] # IP address patterns IP_PATTERNS = [ r'\b(?:IP Address|IP)[:\s]*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b', r'\b([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b', ] # URL patterns URL_PATTERNS = [ r'\b(?:URL|Website|Web)[:\s]*(https?://[^\s]+)\b', r'\b(https?://[^\s]+)\b', ] # Device identifier patterns DEVICE_PATTERNS = [ r'\b(?:Device ID|Device|Serial Number|Serial)[:\s]*([A-Z0-9]{6,20})\b', r'\b(?:IMEI|IMSI|MAC Address)[:\s]*([A-F0-9]{15,17})\b', ] class MedicalPHIDeidentifier: """HIPAA-compliant PHI de-identification system""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or self._default_config() self.patterns = PHIPatterns() self.anonymization_cache = {} def _default_config(self) -> Dict[str, Any]: """Default de-identification configuration""" return { "compliance_level": "HIPAA", "preserve_medical_context": True, "use_hashing": True, "redaction_method": "placeholder", "date_shift_days": 0, # For research use "preserve_age_category": True, # Keep age ranges but not exact ages "whitelist_terms": ["Dr.", "Mr.", "Ms.", "Mrs.", "MD", "DO"], # Terms to preserve } def deidentify_text(self, text: str, document_type: str = "general") -> DeidentificationResult: """ De-identify text by removing or replacing PHI Args: text: Text to de-identify document_type: Type of medical document for targeted processing Returns: DeidentificationResult with de-identified text and audit log """ original_text = text phi_matches = [] deidentified_text = text audit_log = { "processing_timestamp": datetime.now().isoformat(), "document_type": document_type, "original_length": len(text), "phi_categories_found": [], "replacements_made": 0 } # Calculate hash of original for audit trail hash_original = hashlib.sha256(text.encode()).hexdigest() # Process each PHI category categories_to_process = self._get_categories_for_doc_type(document_type) for category in categories_to_process: matches = self._detect_phi_category(text, category) phi_matches.extend(matches) if matches: audit_log["phi_categories_found"].append(category.value) audit_log["replacements_made"] += len(matches) # Sort matches by position (descending) to avoid index shifts phi_matches.sort(key=lambda x: x.start_position, reverse=True) # Apply replacements for match in phi_matches: deidentified_text = ( deidentified_text[:match.start_position] + match.replacement + deidentified_text[match.end_position:] ) # Apply document-specific processing if document_type == "ecg": deidentified_text = self._process_ecg_specific(deidentified_text) elif document_type == "radiology": deidentified_text = self._process_radiology_specific(deidentified_text) elif document_type == "laboratory": deidentified_text = self._process_laboratory_specific(deidentified_text) # Final cleanup and validation deidentified_text = self._final_cleanup(deidentified_text) audit_log.update({ "final_length": len(deidentified_text), "phi_matches_count": len(phi_matches), "compression_ratio": len(deidentified_text) / len(text) if text else 1.0 }) return DeidentificationResult( original_text=original_text, deidentified_text=deidentified_text, phi_matches=phi_matches, anonymization_method=self.config["redaction_method"], hash_original=hash_original, timestamp=datetime.now(), compliance_level=self.config["compliance_level"], audit_log=audit_log ) def _get_categories_for_doc_type(self, document_type: str) -> List[PHICategory]: """Get relevant PHI categories for document type""" base_categories = [ PHICategory.PATIENT_NAME, PHICategory.MEDICAL_RECORD_NUMBER, PHICategory.DATE_OF_BIRTH, PHICategory.PHONE_NUMBER, PHICategory.EMAIL_ADDRESS, PHICategory.ADDRESS, PHICategory.IP_ADDRESS, PHICategory.WEB_URL ] if document_type == "ecg": base_categories.extend([PHICategory.DEVICE_IDENTIFIER]) elif document_type == "radiology": base_categories.extend([PHICategory.DEVICE_IDENTIFIER, PHICategory.ACCOUNT_NUMBER]) elif document_type == "laboratory": base_categories.extend([PHICategory.ACCOUNT_NUMBER]) return base_categories def _detect_phi_category(self, text: str, category: PHICategory) -> List[PHIMatch]: """Detect PHI for a specific category""" matches = [] # Get relevant patterns for category pattern_map = { PHICategory.PATIENT_NAME: self.patterns.NAME_PATTERNS, PHICategory.MEDICAL_RECORD_NUMBER: self.patterns.MRN_PATTERNS, PHICategory.DATE_OF_BIRTH: self.patterns.DOB_PATTERNS, PHICategory.SOCIAL_SECURITY_NUMBER: self.patterns.SSN_PATTERNS, PHICategory.PHONE_NUMBER: self.patterns.PHONE_PATTERNS, PHICategory.EMAIL_ADDRESS: self.patterns.EMAIL_PATTERNS, PHICategory.ADDRESS: self.patterns.ADDRESS_PATTERNS, PHICategory.IP_ADDRESS: self.patterns.IP_PATTERNS, PHICategory.WEB_URL: self.patterns.URL_PATTERNS, PHICategory.DEVICE_IDENTIFIER: self.patterns.DEVICE_PATTERNS, } patterns = pattern_map.get(category, []) for pattern in patterns: for match in re.finditer(pattern, text, re.IGNORECASE): original_text = match.group(0) # Get capture group if present if len(match.groups()) > 0: captured_text = match.group(1) replacement = self._generate_replacement(category, captured_text) start_pos = match.start(1) end_pos = match.end(1) else: replacement = self._generate_replacement(category, original_text) start_pos = match.start() end_pos = match.end() # Extract context context_start = max(0, start_pos - 50) context_end = min(len(text), end_pos + 50) context = text[context_start:context_end] matches.append(PHIMatch( category=category, original_text=original_text, replacement=replacement, start_position=start_pos, end_position=end_pos, confidence=0.8, # Pattern-based confidence context=context )) return matches def _generate_replacement(self, category: PHICategory, original: str) -> str: """Generate appropriate replacement for PHI category""" if self.config["use_hashing"]: # Use consistent hashing for the same input if original not in self.anonymization_cache: hash_obj = hashlib.md5(original.encode()) self.anonymization_cache[original] = f"[{category.value.upper()}_{hash_obj.hexdigest()[:8]}]" return self.anonymization_cache[original] else: # Use generic placeholders placeholder_map = { PHICategory.PATIENT_NAME: "[PATIENT_NAME]", PHICategory.MEDICAL_RECORD_NUMBER: "[MRN]", PHICategory.DATE_OF_BIRTH: "[DOB]", PHICategory.SOCIAL_SECURITY_NUMBER: "[SSN]", PHICategory.PHONE_NUMBER: "[PHONE]", PHICategory.EMAIL_ADDRESS: "[EMAIL]", PHICategory.ADDRESS: "[ADDRESS]", PHICategory.IP_ADDRESS: "[IP_ADDRESS]", PHICategory.WEB_URL: "[URL]", PHICategory.DEVICE_IDENTIFIER: "[DEVICE_ID]" } return placeholder_map.get(category, f"[{category.value.upper()}]") def _process_ecg_specific(self, text: str) -> str: """ECG-specific PHI processing""" # Preserve ECG technical terms but remove identifiers ecg_preserve_terms = [ "ECG", "EKG", "lead", "rhythm", "rate", "interval", "waveform", "QRS", "QT", "PR", "ST", "P wave", "T wave" ] # Remove device-specific identifiers but keep technical data text = re.sub(r'(?:Device|Equipment)[:\s]*([A-Z0-9]+)', '[DEVICE_ID]', text) text = re.sub(r'(?:Serial|Model)[:\s]*([A-Z0-9]+)', '[DEVICE_SERIAL]', text) return text def _process_radiology_specific(self, text: str) -> str: """Radiology-specific PHI processing""" # Preserve imaging parameters but remove identifiers imaging_terms = [ "CT", "MRI", "X-ray", "ultrasound", "contrast", "slice", "plane", "axial", "coronal", "sagittal", "enhancement", "attenuation" ] # Remove facility and equipment identifiers text = re.sub(r'(?:Facility|Hospital|Clinic)[:\s]*([A-Za-z\s]+)', '[FACILITY]', text) text = re.sub(r'(?:Machine|Scanner|Equipment)[:\s]*([A-Za-z0-9\s]+)', '[IMAGING_DEVICE]', text) return text def _process_laboratory_specific(self, text: str) -> str: """Laboratory-specific PHI processing""" # Preserve lab values and units but remove identifiers lab_terms = [ "glucose", "cholesterol", "hemoglobin", "WBC", "RBC", "platelets", "mg/dL", "g/dL", "10^3/μL", "normal", "abnormal", "elevated", "decreased" ] # Remove lab facility identifiers text = re.sub(r'(?:Lab|Laboratory)[:\s]*([A-Za-z\s]+)', '[LAB_FACILITY]', text) text = re.sub(r'(?:Accession|Test)[:\s]*([A-Z0-9]+)', '[TEST_ID]', text) return text def _final_cleanup(self, text: str) -> str: """Final cleanup and validation of de-identified text""" # Remove any residual patterns text = re.sub(r'\s+', ' ', text) # Normalize whitespace text = text.strip() # Check for any remaining obvious PHI patterns remaining_phi = self._check_residual_phi(text) if remaining_phi: logger.warning(f"Potential PHI detected after de-identification: {remaining_phi}") return text def _check_residual_phi(self, text: str) -> List[str]: """Check for any remaining PHI patterns""" potential_phi = [] # Check for phone numbers if re.search(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', text): potential_phi.append("phone_number") # Check for email addresses if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text): potential_phi.append("email_address") # Check for SSN-like patterns if re.search(r'\b\d{3}-\d{2}-\d{4}\b', text): potential_phi.append("ssn_pattern") return potential_phi def batch_deidentify(self, texts: List[Tuple[str, str]]) -> List[DeidentificationResult]: """Batch de-identify multiple texts with document types""" results = [] for text, doc_type in texts: result = self.deidentify_text(text, doc_type) results.append(result) return results def generate_audit_report(self, results: List[DeidentificationResult]) -> Dict[str, Any]: """Generate comprehensive audit report for compliance""" total_phi_matches = sum(len(r.phi_matches) for r in results) categories_found = {} compliance_score = 0.0 for result in results: for match in result.phi_matches: cat = match.category.value categories_found[cat] = categories_found.get(cat, 0) + 1 # Calculate compliance score based on coverage if results: avg_phi_per_doc = total_phi_matches / len(results) compliance_score = min(1.0, 0.9 + (0.1 * (1.0 - min(avg_phi_per_doc / 10, 1.0)))) return { "audit_timestamp": datetime.now().isoformat(), "total_documents": len(results), "total_phi_matches": total_phi_matches, "phi_categories_found": categories_found, "compliance_score": compliance_score, "compliance_level": "HIPAA_COMPLIANT" if compliance_score > 0.8 else "NEEDS_REVIEW", "recommendations": self._generate_recommendations(categories_found, compliance_score) } def _generate_recommendations(self, categories_found: Dict[str, int], compliance_score: float) -> List[str]: """Generate compliance recommendations""" recommendations = [] if compliance_score < 0.8: recommendations.append("Increase PHI detection patterns for better coverage") if categories_found.get("patient_name", 0) > 5: recommendations.append("Consider enhanced name detection patterns") if categories_found.get("address", 0) > 0: recommendations.append("Address detection appears effective") if categories_found.get("device_identifier", 0) > 0: recommendations.append("Device identifiers detected - ensure proper anonymization") return recommendations # Export main classes __all__ = [ "MedicalPHIDeidentifier", "PHICategory", "PHIMatch", "DeidentificationResult" ]