File size: 15,337 Bytes
a6b8ecc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
"""
PHI Annotation System for PDF documents.
This module provides tools to detect, annotate, and track PHI in medical PDFs.
"""

import json
import re
from pathlib import Path
from typing import Dict, List, Tuple, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import numpy as np
from PIL import Image, ImageDraw
import pdf2image
import cv2


@dataclass
class PHIAnnotation:
    """Represents a single PHI annotation in a document."""
    category: str  # PHI category (name, date, ssn, etc.)
    value: str  # The actual PHI value
    page: int  # Page number (1-indexed)
    bbox: Optional[Tuple[int, int, int, int]] = None  # Bounding box (x1, y1, x2, y2)
    confidence: float = 1.0  # Confidence score
    masked_value: Optional[str] = None  # Value after masking


@dataclass
class DocumentAnnotations:
    """Contains all PHI annotations for a document."""
    document_path: str
    annotations: List[PHIAnnotation]
    total_pages: int
    timestamp: str
    metadata: Dict[str, Any]


class PHIAnnotator:
    """Annotate PHI in medical documents."""

    # HIPAA PHI Categories
    PHI_CATEGORIES = {
        'name': 'Names (patients, physicians, family)',
        'date': 'Dates (except year alone)',
        'address': 'Geographic subdivisions smaller than state',
        'phone': 'Phone and fax numbers',
        'email': 'Email addresses',
        'ssn': 'Social Security Numbers',
        'mrn': 'Medical Record Numbers',
        'insurance_id': 'Health plan beneficiary numbers',
        'account': 'Account numbers',
        'license': 'Certificate/license numbers',
        'vehicle': 'Vehicle identifiers and license plates',
        'device_id': 'Device identifiers and serial numbers',
        'url': 'Web URLs',
        'ip': 'IP addresses',
        'biometric': 'Biometric identifiers',
        'unique_id': 'Any unique identifying number',
        'geo_small': 'Geographic subdivisions < state',
        'institution': 'Healthcare facility names',
    }

    # Regular expressions for PHI detection
    PHI_PATTERNS = {
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b|\b\d{9}\b',
        'phone': r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
        'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        'mrn': r'\b(?:MRN|Medical Record Number)[:\s]?[\w\d-]+\b',
        'date': r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b',
        'url': r'https?://[^\s]+',
        'ip': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
        'insurance_id': r'\b(?:INS|Policy|Member ID)[:\s]?[\w\d-]+\b',
        'license': r'\b(?:License|DEA|NPI)[:\s]?[\w\d-]+\b',
    }

    def __init__(self, confidence_threshold: float = 0.85):
        """
        Initialize PHI Annotator.

        Args:
            confidence_threshold: Minimum confidence for PHI detection
        """
        self.confidence_threshold = confidence_threshold

    def annotate_pdf(self, pdf_path: Path, dpi: int = 150) -> DocumentAnnotations:
        """
        Annotate PHI in a PDF document.

        Args:
            pdf_path: Path to PDF file
            dpi: DPI for PDF to image conversion

        Returns:
            DocumentAnnotations object
        """
        # Convert PDF to images
        images = pdf2image.convert_from_path(pdf_path, dpi=dpi)

        annotations = []
        for page_num, image in enumerate(images, 1):
            # Convert PIL Image to numpy array
            img_array = np.array(image)

            # Detect text regions using OCR
            text_regions = self._detect_text_regions(img_array)

            # Analyze each region for PHI
            for region in text_regions:
                phi_results = self._analyze_region_for_phi(region, page_num)
                annotations.extend(phi_results)

        # Create DocumentAnnotations object
        doc_annotations = DocumentAnnotations(
            document_path=str(pdf_path),
            annotations=annotations,
            total_pages=len(images),
            timestamp=datetime.now().isoformat(),
            metadata={
                'dpi': dpi,
                'confidence_threshold': self.confidence_threshold,
            }
        )

        return doc_annotations

    def _detect_text_regions(self, image: np.ndarray) -> List[Dict]:
        """
        Detect text regions in an image using computer vision.

        Args:
            image: Image array

        Returns:
            List of text regions with bounding boxes
        """
        # Convert to grayscale
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

        # Apply threshold to get binary image
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)

        # Find contours
        contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        regions = []
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)

            # Filter out very small regions
            if w > 20 and h > 10:
                regions.append({
                    'bbox': (x, y, x + w, y + h),
                    'area': w * h,
                })

        return regions

    def _analyze_region_for_phi(self, region: Dict, page_num: int) -> List[PHIAnnotation]:
        """
        Analyze a text region for PHI.

        Args:
            region: Text region dictionary
            page_num: Page number

        Returns:
            List of PHI annotations found
        """
        annotations = []

        # This is a placeholder - in reality, you would run OCR on the region
        # and then analyze the text for PHI patterns

        # For now, we'll simulate PHI detection
        # In production, this would use actual OCR results
        simulated_phi = self._simulate_phi_detection(region['bbox'])

        for phi_item in simulated_phi:
            annotation = PHIAnnotation(
                category=phi_item['category'],
                value=phi_item['value'],
                page=page_num,
                bbox=region['bbox'],
                confidence=phi_item['confidence'],
            )
            annotations.append(annotation)

        return annotations

    def _simulate_phi_detection(self, bbox: Tuple) -> List[Dict]:
        """
        Simulate PHI detection for testing.
        In production, this would be replaced with actual OCR and pattern matching.
        """
        import random

        # Randomly simulate finding PHI
        if random.random() < 0.3:  # 30% chance of finding PHI
            category = random.choice(list(self.PHI_CATEGORIES.keys()))
            return [{
                'category': category,
                'value': f"SIMULATED_{category.upper()}_VALUE",
                'confidence': random.uniform(0.85, 1.0),
            }]
        return []

    def apply_pattern_matching(self, text: str) -> List[Dict]:
        """
        Apply regex patterns to detect PHI in text.

        Args:
            text: Text to analyze

        Returns:
            List of detected PHI items
        """
        detections = []

        for category, pattern in self.PHI_PATTERNS.items():
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                detections.append({
                    'category': category,
                    'value': match.group(),
                    'start': match.start(),
                    'end': match.end(),
                    'confidence': 0.9,  # Pattern matching confidence
                })

        return detections

    def create_masked_image(
        self,
        image: Image.Image,
        annotations: List[PHIAnnotation],
        mask_type: str = 'black_box'
    ) -> Image.Image:
        """
        Create a masked version of the image with PHI redacted.

        Args:
            image: Original image
            annotations: PHI annotations
            mask_type: Type of masking ('black_box', 'blur', 'pixelate')

        Returns:
            Masked image
        """
        # Create a copy of the image
        masked_image = image.copy()
        draw = ImageDraw.Draw(masked_image)

        for annotation in annotations:
            if annotation.bbox:
                x1, y1, x2, y2 = annotation.bbox

                if mask_type == 'black_box':
                    # Draw black rectangle
                    draw.rectangle([x1, y1, x2, y2], fill='black')
                elif mask_type == 'blur':
                    # Apply blur to region
                    region = image.crop((x1, y1, x2, y2))
                    blurred = region.filter(ImageFilter.GaussianBlur(radius=10))
                    masked_image.paste(blurred, (x1, y1))
                elif mask_type == 'pixelate':
                    # Pixelate region
                    region = image.crop((x1, y1, x2, y2))
                    small = region.resize((10, 10), Image.NEAREST)
                    pixelated = small.resize(region.size, Image.NEAREST)
                    masked_image.paste(pixelated, (x1, y1))

        return masked_image

    def save_annotations(self, annotations: DocumentAnnotations, output_path: Path):
        """
        Save annotations to JSON file.

        Args:
            annotations: Document annotations
            output_path: Path to save JSON file
        """
        # Convert dataclass to dictionary
        data = {
            'document_path': annotations.document_path,
            'total_pages': annotations.total_pages,
            'timestamp': annotations.timestamp,
            'metadata': annotations.metadata,
            'annotations': [asdict(ann) for ann in annotations.annotations],
        }

        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)

    def load_annotations(self, json_path: Path) -> DocumentAnnotations:
        """
        Load annotations from JSON file.

        Args:
            json_path: Path to JSON file

        Returns:
            DocumentAnnotations object
        """
        with open(json_path, 'r') as f:
            data = json.load(f)

        # Convert dictionaries back to PHIAnnotation objects
        annotations = [
            PHIAnnotation(**ann_dict)
            for ann_dict in data['annotations']
        ]

        return DocumentAnnotations(
            document_path=data['document_path'],
            annotations=annotations,
            total_pages=data['total_pages'],
            timestamp=data['timestamp'],
            metadata=data['metadata'],
        )

    def calculate_statistics(self, annotations: DocumentAnnotations) -> Dict:
        """
        Calculate statistics about PHI in the document.

        Args:
            annotations: Document annotations

        Returns:
            Dictionary of statistics
        """
        stats = {
            'total_phi_items': len(annotations.annotations),
            'pages_with_phi': len(set(ann.page for ann in annotations.annotations)),
            'phi_by_category': {},
            'average_confidence': 0.0,
        }

        # Count by category
        for ann in annotations.annotations:
            if ann.category not in stats['phi_by_category']:
                stats['phi_by_category'][ann.category] = 0
            stats['phi_by_category'][ann.category] += 1

        # Calculate average confidence
        if annotations.annotations:
            confidences = [ann.confidence for ann in annotations.annotations]
            stats['average_confidence'] = sum(confidences) / len(confidences)

        return stats

    def create_annotation_report(
        self,
        annotations: DocumentAnnotations,
        output_path: Path
    ):
        """
        Create a human-readable report of PHI annotations.

        Args:
            annotations: Document annotations
            output_path: Path to save report
        """
        stats = self.calculate_statistics(annotations)

        report = []
        report.append("=" * 60)
        report.append("PHI ANNOTATION REPORT")
        report.append("=" * 60)
        report.append(f"Document: {annotations.document_path}")
        report.append(f"Timestamp: {annotations.timestamp}")
        report.append(f"Total Pages: {annotations.total_pages}")
        report.append("")
        report.append("STATISTICS")
        report.append("-" * 40)
        report.append(f"Total PHI Items: {stats['total_phi_items']}")
        report.append(f"Pages with PHI: {stats['pages_with_phi']}/{annotations.total_pages}")
        report.append(f"Average Confidence: {stats['average_confidence']:.2%}")
        report.append("")
        report.append("PHI BY CATEGORY")
        report.append("-" * 40)

        for category, count in sorted(stats['phi_by_category'].items()):
            description = self.PHI_CATEGORIES.get(category, 'Unknown')
            report.append(f"{category:15} {count:3} items - {description}")

        report.append("")
        report.append("DETAILED ANNOTATIONS")
        report.append("-" * 40)

        for i, ann in enumerate(annotations.annotations, 1):
            report.append(f"\n{i}. Category: {ann.category}")
            report.append(f"   Value: {ann.value[:30]}..." if len(ann.value) > 30 else f"   Value: {ann.value}")
            report.append(f"   Page: {ann.page}")
            report.append(f"   Confidence: {ann.confidence:.2%}")
            if ann.bbox:
                report.append(f"   Location: {ann.bbox}")

        report.append("")
        report.append("=" * 60)
        report.append("END OF REPORT")
        report.append("=" * 60)

        # Save report
        with open(output_path, 'w') as f:
            f.write('\n'.join(report))


def main():
    """Example usage of PHI Annotator."""
    import argparse

    parser = argparse.ArgumentParser(description='Annotate PHI in PDF documents')
    parser.add_argument('--pdf', type=str, required=True, help='Path to PDF file')
    parser.add_argument('--output', type=str, help='Output directory for annotations')
    parser.add_argument('--report', action='store_true', help='Generate annotation report')
    parser.add_argument('--mask', action='store_true', help='Create masked version of PDF')

    args = parser.parse_args()

    # Create annotator
    annotator = PHIAnnotator()

    # Annotate PDF
    pdf_path = Path(args.pdf)
    print(f"Annotating PHI in {pdf_path}...")
    annotations = annotator.annotate_pdf(pdf_path)

    # Save annotations
    output_dir = Path(args.output) if args.output else pdf_path.parent
    output_dir.mkdir(exist_ok=True)

    json_path = output_dir / f"{pdf_path.stem}_annotations.json"
    annotator.save_annotations(annotations, json_path)
    print(f"Annotations saved to {json_path}")

    # Generate report if requested
    if args.report:
        report_path = output_dir / f"{pdf_path.stem}_report.txt"
        annotator.create_annotation_report(annotations, report_path)
        print(f"Report saved to {report_path}")

    # Print statistics
    stats = annotator.calculate_statistics(annotations)
    print(f"\nPHI Statistics:")
    print(f"  Total PHI items: {stats['total_phi_items']}")
    print(f"  Pages with PHI: {stats['pages_with_phi']}")
    print(f"  Categories found: {', '.join(stats['phi_by_category'].keys())}")


if __name__ == "__main__":
    main()