File size: 18,485 Bytes
13d5ab4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
"""
PHI De-identification Pipeline - Phase 2
HIPAA-compliant protected health information removal and anonymization.

This module provides comprehensive PHI detection and removal for medical documents
before AI processing, ensuring HIPAA compliance and data privacy.

Author: MiniMax Agent
Date: 2025-10-29
Version: 1.0.0
"""

import re
import hashlib
import logging
from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json

logger = logging.getLogger(__name__)


class PHICategory(Enum):
    """Categories of protected health information"""
    PATIENT_NAME = "patient_name"
    MEDICAL_RECORD_NUMBER = "mrn"
    DATE_OF_BIRTH = "dob"
    SOCIAL_SECURITY_NUMBER = "ssn"
    PHONE_NUMBER = "phone"
    EMAIL_ADDRESS = "email"
    ADDRESS = "address"
    DATE = "date"
    AGE_OVER_89 = "age_89_plus"
    BIO_METRIC_IDENTIFIER = "biometric"
    PHOTO = "photo"
    DEVICE_IDENTIFIER = "device_id"
    ACCOUNT_NUMBER = "account"
    CERTIFICATE_NUMBER = "certificate"
    VEHICLE_IDENTIFIER = "vehicle"
    WEB_URL = "web_url"
    IP_ADDRESS = "ip_address"
    FINGERPRINT = "fingerprint"
    FULL_FACE_PHOTO = "full_face_photo"


@dataclass
class PHIMatch:
    """PHI entity match with replacement information"""
    category: PHICategory
    original_text: str
    replacement: str
    start_position: int
    end_position: int
    confidence: float
    context: str


@dataclass
class DeidentificationResult:
    """Result of PHI de-identification process"""
    original_text: str
    deidentified_text: str
    phi_matches: List[PHIMatch]
    anonymization_method: str
    hash_original: str
    timestamp: datetime
    compliance_level: str  # HIPAA, GDPR, NONE
    audit_log: Dict[str, Any]


class PHIPatterns:
    """Comprehensive PHI detection patterns"""
    
    # Patient name patterns (various formats)
    NAME_PATTERNS = [
        r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b',  # First Last [Middle]
        r'\b([A-Z])\.?\s+([A-Z][a-z]+)\b',  # F. Last
        r'\b([A-Z][a-z]+),\s+([A-Z][a-z]+)\b',  # Last, First
        r'Patient Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)',
        r'Name:\s*([A-Z][a-z]+\s+[A-Z][a-z]+)',
    ]
    
    # Medical Record Number patterns
    MRN_PATTERNS = [
        r'\b(?:MRN|Medical Record Number|Patient ID|ID Number|Record #?)[:\s]*([A-Z0-9]{6,12})\b',
        r'\b(?:MRN|ID)[:\s]*([0-9]{6,10})\b',
        r'\bPatient\s*(?:ID|Number)[:\s]*([A-Z0-9]{6,12})\b',
    ]
    
    # Date of Birth patterns
    DOB_PATTERNS = [
        r'\b(?:DOB|Date of Birth|Birth Date|Born)[:\s]*([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\b',
        r'\b([0-9]{1,2}[/-][0-9]{1,2}[/-][0-9]{4})\s*(?:DOB|birth|Born)\b',
        r'\b(?:DOB|Date of Birth)[:\s]*(January|February|March|April|May|June|July|August|September|October|November|December)\s+([0-9]{1,2}),?\s+([0-9]{4})\b',
    ]
    
    # Social Security Number patterns
    SSN_PATTERNS = [
        r'\b(?:SSN|Social Security Number)[:\s]*([0-9]{3}-[0-9]{2}-[0-9]{4})\b',
        r'\b([0-9]{3}-[0-9]{2}-[0-9]{4})\b',
    ]
    
    # Phone number patterns
    PHONE_PATTERNS = [
        r'\b(?:Phone|Tel|Telephone|Mobile|Cell)[:\s]*([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b',
        r'\b([0-9]{3}[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})\b',
        r'\b\([0-9]{3}\)\s*[0-9]{3}[-.\s]?[0-9]{4}\b',
    ]
    
    # Email address patterns
    EMAIL_PATTERNS = [
        r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
        r'\b(?:Email|E-mail)[:\s]*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b',
    ]
    
    # Address patterns
    ADDRESS_PATTERNS = [
        r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl))\b',
        r'\b([0-9]{1,5}\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)),\s*([A-Za-z\s]+),\s*([A-Z]{2})\s*([0-9]{5})\b',
        r'\b(?:Address|Addr)[:\s]*([0-9]+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd))\b',
    ]
    
    # IP address patterns
    IP_PATTERNS = [
        r'\b(?:IP Address|IP)[:\s]*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b',
        r'\b([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b',
    ]
    
    # URL patterns
    URL_PATTERNS = [
        r'\b(?:URL|Website|Web)[:\s]*(https?://[^\s]+)\b',
        r'\b(https?://[^\s]+)\b',
    ]
    
    # Device identifier patterns
    DEVICE_PATTERNS = [
        r'\b(?:Device ID|Device|Serial Number|Serial)[:\s]*([A-Z0-9]{6,20})\b',
        r'\b(?:IMEI|IMSI|MAC Address)[:\s]*([A-F0-9]{15,17})\b',
    ]


class MedicalPHIDeidentifier:
    """HIPAA-compliant PHI de-identification system"""
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        self.config = config or self._default_config()
        self.patterns = PHIPatterns()
        self.anonymization_cache = {}
        
    def _default_config(self) -> Dict[str, Any]:
        """Default de-identification configuration"""
        return {
            "compliance_level": "HIPAA",
            "preserve_medical_context": True,
            "use_hashing": True,
            "redaction_method": "placeholder",
            "date_shift_days": 0,  # For research use
            "preserve_age_category": True,  # Keep age ranges but not exact ages
            "whitelist_terms": ["Dr.", "Mr.", "Ms.", "Mrs.", "MD", "DO"],  # Terms to preserve
        }
    
    def deidentify_text(self, text: str, document_type: str = "general") -> DeidentificationResult:
        """
        De-identify text by removing or replacing PHI
        
        Args:
            text: Text to de-identify
            document_type: Type of medical document for targeted processing
            
        Returns:
            DeidentificationResult with de-identified text and audit log
        """
        original_text = text
        phi_matches = []
        deidentified_text = text
        audit_log = {
            "processing_timestamp": datetime.now().isoformat(),
            "document_type": document_type,
            "original_length": len(text),
            "phi_categories_found": [],
            "replacements_made": 0
        }
        
        # Calculate hash of original for audit trail
        hash_original = hashlib.sha256(text.encode()).hexdigest()
        
        # Process each PHI category
        categories_to_process = self._get_categories_for_doc_type(document_type)
        
        for category in categories_to_process:
            matches = self._detect_phi_category(text, category)
            phi_matches.extend(matches)
            
            if matches:
                audit_log["phi_categories_found"].append(category.value)
                audit_log["replacements_made"] += len(matches)
        
        # Sort matches by position (descending) to avoid index shifts
        phi_matches.sort(key=lambda x: x.start_position, reverse=True)
        
        # Apply replacements
        for match in phi_matches:
            deidentified_text = (
                deidentified_text[:match.start_position] + 
                match.replacement + 
                deidentified_text[match.end_position:]
            )
        
        # Apply document-specific processing
        if document_type == "ecg":
            deidentified_text = self._process_ecg_specific(deidentified_text)
        elif document_type == "radiology":
            deidentified_text = self._process_radiology_specific(deidentified_text)
        elif document_type == "laboratory":
            deidentified_text = self._process_laboratory_specific(deidentified_text)
        
        # Final cleanup and validation
        deidentified_text = self._final_cleanup(deidentified_text)
        
        audit_log.update({
            "final_length": len(deidentified_text),
            "phi_matches_count": len(phi_matches),
            "compression_ratio": len(deidentified_text) / len(text) if text else 1.0
        })
        
        return DeidentificationResult(
            original_text=original_text,
            deidentified_text=deidentified_text,
            phi_matches=phi_matches,
            anonymization_method=self.config["redaction_method"],
            hash_original=hash_original,
            timestamp=datetime.now(),
            compliance_level=self.config["compliance_level"],
            audit_log=audit_log
        )
    
    def _get_categories_for_doc_type(self, document_type: str) -> List[PHICategory]:
        """Get relevant PHI categories for document type"""
        base_categories = [
            PHICategory.PATIENT_NAME,
            PHICategory.MEDICAL_RECORD_NUMBER,
            PHICategory.DATE_OF_BIRTH,
            PHICategory.PHONE_NUMBER,
            PHICategory.EMAIL_ADDRESS,
            PHICategory.ADDRESS,
            PHICategory.IP_ADDRESS,
            PHICategory.WEB_URL
        ]
        
        if document_type == "ecg":
            base_categories.extend([PHICategory.DEVICE_IDENTIFIER])
        elif document_type == "radiology":
            base_categories.extend([PHICategory.DEVICE_IDENTIFIER, PHICategory.ACCOUNT_NUMBER])
        elif document_type == "laboratory":
            base_categories.extend([PHICategory.ACCOUNT_NUMBER])
        
        return base_categories
    
    def _detect_phi_category(self, text: str, category: PHICategory) -> List[PHIMatch]:
        """Detect PHI for a specific category"""
        matches = []
        
        # Get relevant patterns for category
        pattern_map = {
            PHICategory.PATIENT_NAME: self.patterns.NAME_PATTERNS,
            PHICategory.MEDICAL_RECORD_NUMBER: self.patterns.MRN_PATTERNS,
            PHICategory.DATE_OF_BIRTH: self.patterns.DOB_PATTERNS,
            PHICategory.SOCIAL_SECURITY_NUMBER: self.patterns.SSN_PATTERNS,
            PHICategory.PHONE_NUMBER: self.patterns.PHONE_PATTERNS,
            PHICategory.EMAIL_ADDRESS: self.patterns.EMAIL_PATTERNS,
            PHICategory.ADDRESS: self.patterns.ADDRESS_PATTERNS,
            PHICategory.IP_ADDRESS: self.patterns.IP_PATTERNS,
            PHICategory.WEB_URL: self.patterns.URL_PATTERNS,
            PHICategory.DEVICE_IDENTIFIER: self.patterns.DEVICE_PATTERNS,
        }
        
        patterns = pattern_map.get(category, [])
        
        for pattern in patterns:
            for match in re.finditer(pattern, text, re.IGNORECASE):
                original_text = match.group(0)
                
                # Get capture group if present
                if len(match.groups()) > 0:
                    captured_text = match.group(1)
                    replacement = self._generate_replacement(category, captured_text)
                    start_pos = match.start(1)
                    end_pos = match.end(1)
                else:
                    replacement = self._generate_replacement(category, original_text)
                    start_pos = match.start()
                    end_pos = match.end()
                
                # Extract context
                context_start = max(0, start_pos - 50)
                context_end = min(len(text), end_pos + 50)
                context = text[context_start:context_end]
                
                matches.append(PHIMatch(
                    category=category,
                    original_text=original_text,
                    replacement=replacement,
                    start_position=start_pos,
                    end_position=end_pos,
                    confidence=0.8,  # Pattern-based confidence
                    context=context
                ))
        
        return matches
    
    def _generate_replacement(self, category: PHICategory, original: str) -> str:
        """Generate appropriate replacement for PHI category"""
        if self.config["use_hashing"]:
            # Use consistent hashing for the same input
            if original not in self.anonymization_cache:
                hash_obj = hashlib.md5(original.encode())
                self.anonymization_cache[original] = f"[{category.value.upper()}_{hash_obj.hexdigest()[:8]}]"
            return self.anonymization_cache[original]
        else:
            # Use generic placeholders
            placeholder_map = {
                PHICategory.PATIENT_NAME: "[PATIENT_NAME]",
                PHICategory.MEDICAL_RECORD_NUMBER: "[MRN]",
                PHICategory.DATE_OF_BIRTH: "[DOB]",
                PHICategory.SOCIAL_SECURITY_NUMBER: "[SSN]",
                PHICategory.PHONE_NUMBER: "[PHONE]",
                PHICategory.EMAIL_ADDRESS: "[EMAIL]",
                PHICategory.ADDRESS: "[ADDRESS]",
                PHICategory.IP_ADDRESS: "[IP_ADDRESS]",
                PHICategory.WEB_URL: "[URL]",
                PHICategory.DEVICE_IDENTIFIER: "[DEVICE_ID]"
            }
            return placeholder_map.get(category, f"[{category.value.upper()}]")
    
    def _process_ecg_specific(self, text: str) -> str:
        """ECG-specific PHI processing"""
        # Preserve ECG technical terms but remove identifiers
        ecg_preserve_terms = [
            "ECG", "EKG", "lead", "rhythm", "rate", "interval", "waveform",
            "QRS", "QT", "PR", "ST", "P wave", "T wave"
        ]
        
        # Remove device-specific identifiers but keep technical data
        text = re.sub(r'(?:Device|Equipment)[:\s]*([A-Z0-9]+)', '[DEVICE_ID]', text)
        text = re.sub(r'(?:Serial|Model)[:\s]*([A-Z0-9]+)', '[DEVICE_SERIAL]', text)
        
        return text
    
    def _process_radiology_specific(self, text: str) -> str:
        """Radiology-specific PHI processing"""
        # Preserve imaging parameters but remove identifiers
        imaging_terms = [
            "CT", "MRI", "X-ray", "ultrasound", "contrast", "slice", "plane",
            "axial", "coronal", "sagittal", "enhancement", "attenuation"
        ]
        
        # Remove facility and equipment identifiers
        text = re.sub(r'(?:Facility|Hospital|Clinic)[:\s]*([A-Za-z\s]+)', '[FACILITY]', text)
        text = re.sub(r'(?:Machine|Scanner|Equipment)[:\s]*([A-Za-z0-9\s]+)', '[IMAGING_DEVICE]', text)
        
        return text
    
    def _process_laboratory_specific(self, text: str) -> str:
        """Laboratory-specific PHI processing"""
        # Preserve lab values and units but remove identifiers
        lab_terms = [
            "glucose", "cholesterol", "hemoglobin", "WBC", "RBC", "platelets",
            "mg/dL", "g/dL", "10^3/μL", "normal", "abnormal", "elevated", "decreased"
        ]
        
        # Remove lab facility identifiers
        text = re.sub(r'(?:Lab|Laboratory)[:\s]*([A-Za-z\s]+)', '[LAB_FACILITY]', text)
        text = re.sub(r'(?:Accession|Test)[:\s]*([A-Z0-9]+)', '[TEST_ID]', text)
        
        return text
    
    def _final_cleanup(self, text: str) -> str:
        """Final cleanup and validation of de-identified text"""
        # Remove any residual patterns
        text = re.sub(r'\s+', ' ', text)  # Normalize whitespace
        text = text.strip()
        
        # Check for any remaining obvious PHI patterns
        remaining_phi = self._check_residual_phi(text)
        if remaining_phi:
            logger.warning(f"Potential PHI detected after de-identification: {remaining_phi}")
        
        return text
    
    def _check_residual_phi(self, text: str) -> List[str]:
        """Check for any remaining PHI patterns"""
        potential_phi = []
        
        # Check for phone numbers
        if re.search(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b', text):
            potential_phi.append("phone_number")
        
        # Check for email addresses
        if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
            potential_phi.append("email_address")
        
        # Check for SSN-like patterns
        if re.search(r'\b\d{3}-\d{2}-\d{4}\b', text):
            potential_phi.append("ssn_pattern")
        
        return potential_phi
    
    def batch_deidentify(self, texts: List[Tuple[str, str]]) -> List[DeidentificationResult]:
        """Batch de-identify multiple texts with document types"""
        results = []
        for text, doc_type in texts:
            result = self.deidentify_text(text, doc_type)
            results.append(result)
        return results
    
    def generate_audit_report(self, results: List[DeidentificationResult]) -> Dict[str, Any]:
        """Generate comprehensive audit report for compliance"""
        total_phi_matches = sum(len(r.phi_matches) for r in results)
        categories_found = {}
        compliance_score = 0.0
        
        for result in results:
            for match in result.phi_matches:
                cat = match.category.value
                categories_found[cat] = categories_found.get(cat, 0) + 1
        
        # Calculate compliance score based on coverage
        if results:
            avg_phi_per_doc = total_phi_matches / len(results)
            compliance_score = min(1.0, 0.9 + (0.1 * (1.0 - min(avg_phi_per_doc / 10, 1.0))))
        
        return {
            "audit_timestamp": datetime.now().isoformat(),
            "total_documents": len(results),
            "total_phi_matches": total_phi_matches,
            "phi_categories_found": categories_found,
            "compliance_score": compliance_score,
            "compliance_level": "HIPAA_COMPLIANT" if compliance_score > 0.8 else "NEEDS_REVIEW",
            "recommendations": self._generate_recommendations(categories_found, compliance_score)
        }
    
    def _generate_recommendations(self, categories_found: Dict[str, int], compliance_score: float) -> List[str]:
        """Generate compliance recommendations"""
        recommendations = []
        
        if compliance_score < 0.8:
            recommendations.append("Increase PHI detection patterns for better coverage")
        
        if categories_found.get("patient_name", 0) > 5:
            recommendations.append("Consider enhanced name detection patterns")
        
        if categories_found.get("address", 0) > 0:
            recommendations.append("Address detection appears effective")
        
        if categories_found.get("device_identifier", 0) > 0:
            recommendations.append("Device identifiers detected - ensure proper anonymization")
        
        return recommendations


# Export main classes
__all__ = [
    "MedicalPHIDeidentifier",
    "PHICategory",
    "PHIMatch",
    "DeidentificationResult"
]