Spaces:
Sleeping
Sleeping
| """ | |
| PHI De-identification Pipeline - Phase 2 | |
| HIPAA-compliant protected health information removal and anonymization. | |
| This module provides comprehensive PHI detection and removal for medical documents | |
| before AI processing, ensuring HIPAA compliance and data privacy. | |
| Author: MiniMax Agent | |
| Date: 2025-10-29 | |
| Version: 1.0.0 | |
| """ | |
| import re | |
| import hashlib | |
| import logging | |
| from typing import Dict, List, Optional, Tuple, Any, Set | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from enum import Enum | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class PHICategory(Enum): | |
| """Categories of protected health information""" | |
| PATIENT_NAME = "patient_name" | |
| MEDICAL_RECORD_NUMBER = "mrn" | |
| DATE_OF_BIRTH = "dob" | |
| SOCIAL_SECURITY_NUMBER = "ssn" | |
| PHONE_NUMBER = "phone" | |
| EMAIL_ADDRESS = "email" | |
| ADDRESS = "address" | |
| DATE = "date" | |
| AGE_OVER_89 = "age_89_plus" | |
| BIO_METRIC_IDENTIFIER = "biometric" | |
| PHOTO = "photo" | |
| DEVICE_IDENTIFIER = "device_id" | |
| ACCOUNT_NUMBER = "account" | |
| CERTIFICATE_NUMBER = "certificate" | |
| VEHICLE_IDENTIFIER = "vehicle" | |
| WEB_URL = "web_url" | |
| IP_ADDRESS = "ip_address" | |
| FINGERPRINT = "fingerprint" | |
| FULL_FACE_PHOTO = "full_face_photo" | |
| class PHIMatch: | |
| """PHI entity match with replacement information""" | |
| category: PHICategory | |
| original_text: str | |
| replacement: str | |
| start_position: int | |
| end_position: int | |
| confidence: float | |
| context: str | |
| class DeidentificationResult: | |
| """Result of PHI de-identification process""" | |
| original_text: str | |
| deidentified_text: str | |
| phi_matches: List[PHIMatch] | |
| anonymization_method: str | |
| hash_original: str | |
| timestamp: datetime | |
| compliance_level: str # HIPAA, GDPR, NONE | |
| audit_log: Dict[str, Any] | |
| class PHIPatterns: | |
| """Comprehensive PHI detection patterns""" | |
| # Patient name patterns (various formats) | |
| NAME_PATTERNS = [ | |
| r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b', # First Last [Middle] | |
| r'\b([A-Z])\.?\s+([A-Z][a-z]+)\b', # F. Last | |
| r'\b([A-Z][a-z]+),\s+([A-Z][a-z]+)\b', # Last, First | |
| r'Patient Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', | |
| r'Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', | |
| ] | |
| # Medical Record Number patterns | |
| MRN_PATTERNS = [ | |
| r'\b(?:MRN|Medical Record Number|Patient ID|ID Number|Record #?)[:\s]*([A-Z0-9]{6,12})\b', | |
| r'\b(?:MRN|ID)[:\s]*([0-9]{6,10})\b', | |
| r'\bPatient\s*(?:ID|Number)[:\s]*([A-Z0-9]{6,12})\b', | |
| ] | |
| # Date of Birth patterns | |
| DOB_PATTERNS = [ | |
| r'\b(?:DOB|Date of Birth|Birth Date|Born)[:\s]*([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\b', | |
| r'\b([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\s*(?:DOB|birth|Born)\b', | |
| r'\b(?:DOB|Date of Birth)[:\s]*(January|February|March|April|May|June|July|August|September|October|November|December)\s+([0-9]{1,2}),?\s+([0-9]{4})\b', | |
| ] | |
| # Social Security Number patterns | |
| SSN_PATTERNS = [ | |
| r'\b(?:SSN|Social Security Number)[:\s]*([0-9]{3}-[0-9]{2}-[0-9]{4})\b', | |
| r'\b([0-9]{3}-[0-9]{2}-[0-9]{4})\b', | |
| ] | |
| # Phone number patterns | |
| PHONE_PATTERNS = [ | |
| r'\b(?:Phone|Tel|Telephone|Mobile|Cell)[:\s]*([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b', | |
| r'\b([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b', | |
| r'\b\([0-9]{3}\)\s*[0-9]{3}[-.\s]?[0-9]{4}\b', | |
| ] | |
| # Email address patterns | |
| EMAIL_PATTERNS = [ | |
| r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', | |
| r'\b(?:Email|E-mail)[:\s]*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', | |
| ] | |
| # Address patterns | |
| ADDRESS_PATTERNS = [ | |
| r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl))\b', | |
| r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)),\s*([A-Za-z\s]+),\s*([A-Z]{2})\s*([0-9]{5})\b', | |
| r'\b(?:Address|Addr)[:\s]*([0-9]+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd))\b', | |
| ] | |
| # IP address patterns | |
| IP_PATTERNS = [ | |
| r'\b(?:IP Address|IP)[:\s]*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b', | |
| r'\b([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b', | |
| ] | |
| # URL patterns | |
| URL_PATTERNS = [ | |
| r'\b(?:URL|Website|Web)[:\s]*(https?://[^\s]+)\b', | |
| r'\b(https?://[^\s]+)\b', | |
| ] | |
| # Device identifier patterns | |
| DEVICE_PATTERNS = [ | |
| r'\b(?:Device ID|Device|Serial Number|Serial)[:\s]*([A-Z0-9]{6,20})\b', | |
| r'\b(?:IMEI|IMSI|MAC Address)[:\s]*([A-F0-9]{15,17})\b', | |
| ] | |
| class MedicalPHIDeidentifier: | |
| """HIPAA-compliant PHI de-identification system""" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| self.config = config or self._default_config() | |
| self.patterns = PHIPatterns() | |
| self.anonymization_cache = {} | |
| def _default_config(self) -> Dict[str, Any]: | |
| """Default de-identification configuration""" | |
| return { | |
| "compliance_level": "HIPAA", | |
| "preserve_medical_context": True, | |
| "use_hashing": True, | |
| "redaction_method": "placeholder", | |
| "date_shift_days": 0, # For research use | |
| "preserve_age_category": True, # Keep age ranges but not exact ages | |
| "whitelist_terms": ["Dr.", "Mr.", "Ms.", "Mrs.", "MD", "DO"], # Terms to preserve | |
| } | |
| def deidentify_text(self, text: str, document_type: str = "general") -> DeidentificationResult: | |
| """ | |
| De-identify text by removing or replacing PHI | |
| Args: | |
| text: Text to de-identify | |
| document_type: Type of medical document for targeted processing | |
| Returns: | |
| DeidentificationResult with de-identified text and audit log | |
| """ | |
| original_text = text | |
| phi_matches = [] | |
| deidentified_text = text | |
| audit_log = { | |
| "processing_timestamp": datetime.now().isoformat(), | |
| "document_type": document_type, | |
| "original_length": len(text), | |
| "phi_categories_found": [], | |
| "replacements_made": 0 | |
| } | |
| # Calculate hash of original for audit trail | |
| hash_original = hashlib.sha256(text.encode()).hexdigest() | |
| # Process each PHI category | |
| categories_to_process = self._get_categories_for_doc_type(document_type) | |
| for category in categories_to_process: | |
| matches = self._detect_phi_category(text, category) | |
| phi_matches.extend(matches) | |
| if matches: | |
| audit_log["phi_categories_found"].append(category.value) | |
| audit_log["replacements_made"] += len(matches) | |
| # Sort matches by position (descending) to avoid index shifts | |
| phi_matches.sort(key=lambda x: x.start_position, reverse=True) | |
| # Apply replacements | |
| for match in phi_matches: | |
| deidentified_text = ( | |
| deidentified_text[:match.start_position] + | |
| match.replacement + | |
| deidentified_text[match.end_position:] | |
| ) | |
| # Apply document-specific processing | |
| if document_type == "ecg": | |
| deidentified_text = self._process_ecg_specific(deidentified_text) | |
| elif document_type == "radiology": | |
| deidentified_text = self._process_radiology_specific(deidentified_text) | |
| elif document_type == "laboratory": | |
| deidentified_text = self._process_laboratory_specific(deidentified_text) | |
| # Final cleanup and validation | |
| deidentified_text = self._final_cleanup(deidentified_text) | |
| audit_log.update({ | |
| "final_length": len(deidentified_text), | |
| "phi_matches_count": len(phi_matches), | |
| "compression_ratio": len(deidentified_text) / len(text) if text else 1.0 | |
| }) | |
| return DeidentificationResult( | |
| original_text=original_text, | |
| deidentified_text=deidentified_text, | |
| phi_matches=phi_matches, | |
| anonymization_method=self.config["redaction_method"], | |
| hash_original=hash_original, | |
| timestamp=datetime.now(), | |
| compliance_level=self.config["compliance_level"], | |
| audit_log=audit_log | |
| ) | |
| def _get_categories_for_doc_type(self, document_type: str) -> List[PHICategory]: | |
| """Get relevant PHI categories for document type""" | |
| base_categories = [ | |
| PHICategory.PATIENT_NAME, | |
| PHICategory.MEDICAL_RECORD_NUMBER, | |
| PHICategory.DATE_OF_BIRTH, | |
| PHICategory.PHONE_NUMBER, | |
| PHICategory.EMAIL_ADDRESS, | |
| PHICategory.ADDRESS, | |
| PHICategory.IP_ADDRESS, | |
| PHICategory.WEB_URL | |
| ] | |
| if document_type == "ecg": | |
| base_categories.extend([PHICategory.DEVICE_IDENTIFIER]) | |
| elif document_type == "radiology": | |
| base_categories.extend([PHICategory.DEVICE_IDENTIFIER, PHICategory.ACCOUNT_NUMBER]) | |
| elif document_type == "laboratory": | |
| base_categories.extend([PHICategory.ACCOUNT_NUMBER]) | |
| return base_categories | |
| def _detect_phi_category(self, text: str, category: PHICategory) -> List[PHIMatch]: | |
| """Detect PHI for a specific category""" | |
| matches = [] | |
| # Get relevant patterns for category | |
| pattern_map = { | |
| PHICategory.PATIENT_NAME: self.patterns.NAME_PATTERNS, | |
| PHICategory.MEDICAL_RECORD_NUMBER: self.patterns.MRN_PATTERNS, | |
| PHICategory.DATE_OF_BIRTH: self.patterns.DOB_PATTERNS, | |
| PHICategory.SOCIAL_SECURITY_NUMBER: self.patterns.SSN_PATTERNS, | |
| PHICategory.PHONE_NUMBER: self.patterns.PHONE_PATTERNS, | |
| PHICategory.EMAIL_ADDRESS: self.patterns.EMAIL_PATTERNS, | |
| PHICategory.ADDRESS: self.patterns.ADDRESS_PATTERNS, | |
| PHICategory.IP_ADDRESS: self.patterns.IP_PATTERNS, | |
| PHICategory.WEB_URL: self.patterns.URL_PATTERNS, | |
| PHICategory.DEVICE_IDENTIFIER: self.patterns.DEVICE_PATTERNS, | |
| } | |
| patterns = pattern_map.get(category, []) | |
| for pattern in patterns: | |
| for match in re.finditer(pattern, text, re.IGNORECASE): | |
| original_text = match.group(0) | |
| # Get capture group if present | |
| if len(match.groups()) > 0: | |
| captured_text = match.group(1) | |
| replacement = self._generate_replacement(category, captured_text) | |
| start_pos = match.start(1) | |
| end_pos = match.end(1) | |
| else: | |
| replacement = self._generate_replacement(category, original_text) | |
| start_pos = match.start() | |
| end_pos = match.end() | |
| # Extract context | |
| context_start = max(0, start_pos - 50) | |
| context_end = min(len(text), end_pos + 50) | |
| context = text[context_start:context_end] | |
| matches.append(PHIMatch( | |
| category=category, | |
| original_text=original_text, | |
| replacement=replacement, | |
| start_position=start_pos, | |
| end_position=end_pos, | |
| confidence=0.8, # Pattern-based confidence | |
| context=context | |
| )) | |
| return matches | |
| def _generate_replacement(self, category: PHICategory, original: str) -> str: | |
| """Generate appropriate replacement for PHI category""" | |
| if self.config["use_hashing"]: | |
| # Use consistent hashing for the same input | |
| if original not in self.anonymization_cache: | |
| hash_obj = hashlib.md5(original.encode()) | |
| self.anonymization_cache[original] = f"[{category.value.upper()}_{hash_obj.hexdigest()[:8]}]" | |
| return self.anonymization_cache[original] | |
| else: | |
| # Use generic placeholders | |
| placeholder_map = { | |
| PHICategory.PATIENT_NAME: "[PATIENT_NAME]", | |
| PHICategory.MEDICAL_RECORD_NUMBER: "[MRN]", | |
| PHICategory.DATE_OF_BIRTH: "[DOB]", | |
| PHICategory.SOCIAL_SECURITY_NUMBER: "[SSN]", | |
| PHICategory.PHONE_NUMBER: "[PHONE]", | |
| PHICategory.EMAIL_ADDRESS: "[EMAIL]", | |
| PHICategory.ADDRESS: "[ADDRESS]", | |
| PHICategory.IP_ADDRESS: "[IP_ADDRESS]", | |
| PHICategory.WEB_URL: "[URL]", | |
| PHICategory.DEVICE_IDENTIFIER: "[DEVICE_ID]" | |
| } | |
| return placeholder_map.get(category, f"[{category.value.upper()}]") | |
| def _process_ecg_specific(self, text: str) -> str: | |
| """ECG-specific PHI processing""" | |
| # Preserve ECG technical terms but remove identifiers | |
| ecg_preserve_terms = [ | |
| "ECG", "EKG", "lead", "rhythm", "rate", "interval", "waveform", | |
| "QRS", "QT", "PR", "ST", "P wave", "T wave" | |
| ] | |
| # Remove device-specific identifiers but keep technical data | |
| text = re.sub(r'(?:Device|Equipment)[:\s]*([A-Z0-9]+)', '[DEVICE_ID]', text) | |
| text = re.sub(r'(?:Serial|Model)[:\s]*([A-Z0-9]+)', '[DEVICE_SERIAL]', text) | |
| return text | |
| def _process_radiology_specific(self, text: str) -> str: | |
| """Radiology-specific PHI processing""" | |
| # Preserve imaging parameters but remove identifiers | |
| imaging_terms = [ | |
| "CT", "MRI", "X-ray", "ultrasound", "contrast", "slice", "plane", | |
| "axial", "coronal", "sagittal", "enhancement", "attenuation" | |
| ] | |
| # Remove facility and equipment identifiers | |
| text = re.sub(r'(?:Facility|Hospital|Clinic)[:\s]*([A-Za-z\s]+)', '[FACILITY]', text) | |
| text = re.sub(r'(?:Machine|Scanner|Equipment)[:\s]*([A-Za-z0-9\s]+)', '[IMAGING_DEVICE]', text) | |
| return text | |
| def _process_laboratory_specific(self, text: str) -> str: | |
| """Laboratory-specific PHI processing""" | |
| # Preserve lab values and units but remove identifiers | |
| lab_terms = [ | |
| "glucose", "cholesterol", "hemoglobin", "WBC", "RBC", "platelets", | |
| "mg/dL", "g/dL", "10^3/μL", "normal", "abnormal", "elevated", "decreased" | |
| ] | |
| # Remove lab facility identifiers | |
| text = re.sub(r'(?:Lab|Laboratory)[:\s]*([A-Za-z\s]+)', '[LAB_FACILITY]', text) | |
| text = re.sub(r'(?:Accession|Test)[:\s]*([A-Z0-9]+)', '[TEST_ID]', text) | |
| return text | |
| def _final_cleanup(self, text: str) -> str: | |
| """Final cleanup and validation of de-identified text""" | |
| # Remove any residual patterns | |
| text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
| text = text.strip() | |
| # Check for any remaining obvious PHI patterns | |
| remaining_phi = self._check_residual_phi(text) | |
| if remaining_phi: | |
| logger.warning(f"Potential PHI detected after de-identification: {remaining_phi}") | |
| return text | |
| def _check_residual_phi(self, text: str) -> List[str]: | |
| """Check for any remaining PHI patterns""" | |
| potential_phi = [] | |
| # Check for phone numbers | |
| if re.search(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', text): | |
| potential_phi.append("phone_number") | |
| # Check for email addresses | |
| if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text): | |
| potential_phi.append("email_address") | |
| # Check for SSN-like patterns | |
| if re.search(r'\b\d{3}-\d{2}-\d{4}\b', text): | |
| potential_phi.append("ssn_pattern") | |
| return potential_phi | |
| def batch_deidentify(self, texts: List[Tuple[str, str]]) -> List[DeidentificationResult]: | |
| """Batch de-identify multiple texts with document types""" | |
| results = [] | |
| for text, doc_type in texts: | |
| result = self.deidentify_text(text, doc_type) | |
| results.append(result) | |
| return results | |
| def generate_audit_report(self, results: List[DeidentificationResult]) -> Dict[str, Any]: | |
| """Generate comprehensive audit report for compliance""" | |
| total_phi_matches = sum(len(r.phi_matches) for r in results) | |
| categories_found = {} | |
| compliance_score = 0.0 | |
| for result in results: | |
| for match in result.phi_matches: | |
| cat = match.category.value | |
| categories_found[cat] = categories_found.get(cat, 0) + 1 | |
| # Calculate compliance score based on coverage | |
| if results: | |
| avg_phi_per_doc = total_phi_matches / len(results) | |
| compliance_score = min(1.0, 0.9 + (0.1 * (1.0 - min(avg_phi_per_doc / 10, 1.0)))) | |
| return { | |
| "audit_timestamp": datetime.now().isoformat(), | |
| "total_documents": len(results), | |
| "total_phi_matches": total_phi_matches, | |
| "phi_categories_found": categories_found, | |
| "compliance_score": compliance_score, | |
| "compliance_level": "HIPAA_COMPLIANT" if compliance_score > 0.8 else "NEEDS_REVIEW", | |
| "recommendations": self._generate_recommendations(categories_found, compliance_score) | |
| } | |
| def _generate_recommendations(self, categories_found: Dict[str, int], compliance_score: float) -> List[str]: | |
| """Generate compliance recommendations""" | |
| recommendations = [] | |
| if compliance_score < 0.8: | |
| recommendations.append("Increase PHI detection patterns for better coverage") | |
| if categories_found.get("patient_name", 0) > 5: | |
| recommendations.append("Consider enhanced name detection patterns") | |
| if categories_found.get("address", 0) > 0: | |
| recommendations.append("Address detection appears effective") | |
| if categories_found.get("device_identifier", 0) > 0: | |
| recommendations.append("Device identifiers detected - ensure proper anonymization") | |
| return recommendations | |
| # Export main classes | |
| __all__ = [ | |
| "MedicalPHIDeidentifier", | |
| "PHICategory", | |
| "PHIMatch", | |
| "DeidentificationResult" | |
| ] |