snikhilesh commited on
Commit
614f2ea
·
verified ·
1 Parent(s): 54797df

Deploy file_detector.py to backend/ directory

Browse files
Files changed (1) hide show
  1. backend/file_detector.py +333 -0
backend/file_detector.py ADDED
@@ -0,0 +1,333 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File Detection and Routing System - Phase 2
3
+ Multi-format medical file detection with confidence scoring and routing logic.
4
+
5
+ This module provides robust file type detection for medical documents including
6
+ PDFs, DICOM files, ECG signals, and archives with confidence-based routing.
7
+
8
+ Author: MiniMax Agent
9
+ Date: 2025-10-29
10
+ Version: 1.0.0
11
+ """
12
+
13
+ import os
14
+ import mimetypes
15
+ import hashlib
16
+ from typing import Dict, List, Optional, Tuple, Any
17
+ from pathlib import Path
18
+ import magic
19
+ from dataclasses import dataclass
20
+ from enum import Enum
21
+ import logging
22
+
23
+ # Configure logging
24
+ logger = logging.getLogger(__name__)
25
+
26
+
27
+ class MedicalFileType(Enum):
28
+ """Enumerated medical file types for routing"""
29
+ PDF_CLINICAL = "pdf_clinical"
30
+ PDF_RADIOLOGY = "pdf_radiology"
31
+ PDF_LABORATORY = "pdf_laboratory"
32
+ PDF_ECG_REPORT = "pdf_ecg_report"
33
+ DICOM_CT = "dicom_ct"
34
+ DICOM_MRI = "dicom_mri"
35
+ DICOM_XRAY = "dicom_xray"
36
+ DICOM_ULTRASOUND = "dicom_ultrasound"
37
+ ECG_XML = "ecg_xml"
38
+ ECG_SCPE = "ecg_scpe"
39
+ ECG_CSV = "ecg_csv"
40
+ ECG_WFDB = "ecg_wfdb"
41
+ ARCHIVE_ZIP = "archive_zip"
42
+ ARCHIVE_TAR = "archive_tar"
43
+ IMAGE_TIFF = "image_tiff"
44
+ IMAGE_JPEG = "image_jpeg"
45
+ UNKNOWN = "unknown"
46
+
47
+
48
+ @dataclass
49
+ class FileDetectionResult:
50
+ """Result of file type detection with confidence scoring"""
51
+ file_type: MedicalFileType
52
+ confidence: float
53
+ detected_features: List[str]
54
+ mime_type: str
55
+ file_size: int
56
+ metadata: Dict[str, Any]
57
+ recommended_extractor: str
58
+
59
+
60
+ class MedicalFileDetector:
61
+ """Medical file type detection with multi-modal analysis"""
62
+
63
+ def __init__(self):
64
+ self.known_patterns = self._init_detection_patterns()
65
+ self.magic = magic.Magic(mime=True)
66
+
67
+ def _init_detection_patterns(self) -> Dict[str, Dict]:
68
+ """Initialize detection patterns for various medical file types"""
69
+ return {
70
+ # PDF Patterns
71
+ "pdf_clinical": {
72
+ "extensions": [".pdf"],
73
+ "magic_bytes": [[b"%PDF"]],
74
+ "keywords": ["clinical", "progress note", "consultation", "assessment", "plan"],
75
+ "extractor": "pdf_text_extractor"
76
+ },
77
+ "pdf_radiology": {
78
+ "extensions": [".pdf"],
79
+ "magic_bytes": [[b"%PDF"]],
80
+ "keywords": ["radiology", "ct scan", "mri", "x-ray", "imaging", "findings", "impression"],
81
+ "extractor": "pdf_radiology_extractor"
82
+ },
83
+ "pdf_laboratory": {
84
+ "extensions": [".pdf"],
85
+ "magic_bytes": [[b"%PDF"]],
86
+ "keywords": ["laboratory", "lab results", "blood work", "test results", "reference range"],
87
+ "extractor": "pdf_laboratory_extractor"
88
+ },
89
+ "pdf_ecg_report": {
90
+ "extensions": [".pdf"],
91
+ "magic_bytes": [[b"%PDF"]],
92
+ "keywords": ["ecg", "ekg", "electrocardiogram", "rhythm", "heart rate", "st segment"],
93
+ "extractor": "pdf_ecg_extractor"
94
+ },
95
+
96
+ # DICOM Patterns
97
+ "dicom_ct": {
98
+ "extensions": [".dcm", ".dicom"],
99
+ "magic_bytes": [[b"DICM"]],
100
+ "keywords": ["computed tomography", "ct", "slice"],
101
+ "extractor": "dicom_processor"
102
+ },
103
+ "dicom_mri": {
104
+ "extensions": [".dcm", ".dicom"],
105
+ "magic_bytes": [[b"DICM"]],
106
+ "keywords": ["magnetic resonance", "mri", "t1", "t2", "flair"],
107
+ "extractor": "dicom_processor"
108
+ },
109
+ "dicom_xray": {
110
+ "extensions": [".dcm", ".dicom"],
111
+ "magic_bytes": [[b"DICM"]],
112
+ "keywords": ["x-ray", "radiograph", "chest", "abdomen", "bone"],
113
+ "extractor": "dicom_processor"
114
+ },
115
+ "dicom_ultrasound": {
116
+ "extensions": [".dcm", ".dicom"],
117
+ "magic_bytes": [[b"DICM"]],
118
+ "keywords": ["ultrasound", "sonogram", "echocardiogram"],
119
+ "extractor": "dicom_processor"
120
+ },
121
+
122
+ # ECG File Patterns
123
+ "ecg_xml": {
124
+ "extensions": [".xml", ".ecg"],
125
+ "magic_bytes": [[b"<?xml"], [b"<ECG"], [b"<electrocardiogram"]],
126
+ "keywords": ["ecg", "lead", "signal", "waveform"],
127
+ "extractor": "ecg_xml_processor"
128
+ },
129
+ "ecg_scpe": {
130
+ "extensions": [".scp", ".scpe"],
131
+ "magic_bytes": [[b"SCP-ECG"]],
132
+ "keywords": ["scp-ecg", "electrocardiogram"],
133
+ "extractor": "ecg_scp_processor"
134
+ },
135
+ "ecg_csv": {
136
+ "extensions": [".csv"],
137
+ "magic_bytes": [],
138
+ "keywords": ["time", "lead", "voltage", "millivolts", "ecg"],
139
+ "extractor": "ecg_csv_processor"
140
+ },
141
+
142
+ # Archive Patterns
143
+ "archive_zip": {
144
+ "extensions": [".zip"],
145
+ "magic_bytes": [[b"PK"]],
146
+ "keywords": [],
147
+ "extractor": "archive_processor"
148
+ },
149
+ "archive_tar": {
150
+ "extensions": [".tar", ".gz", ".tgz"],
151
+ "magic_bytes": [[b"ustar"], [b"\x1f\x8b"]],
152
+ "keywords": [],
153
+ "extractor": "archive_processor"
154
+ },
155
+
156
+ # Image Patterns
157
+ "image_tiff": {
158
+ "extensions": [".tiff", ".tif"],
159
+ "magic_bytes": [[b"II*\x00"], [b"MM\x00*"]],
160
+ "keywords": [],
161
+ "extractor": "image_processor"
162
+ },
163
+ "image_jpeg": {
164
+ "extensions": [".jpg", ".jpeg"],
165
+ "magic_bytes": [[b"\xff\xd8\xff"]],
166
+ "keywords": [],
167
+ "extractor": "image_processor"
168
+ }
169
+ }
170
+
171
+ def detect_file_type(self, file_path: str, content_sample: Optional[bytes] = None) -> FileDetectionResult:
172
+ """
173
+ Detect medical file type with confidence scoring
174
+
175
+ Args:
176
+ file_path: Path to the file
177
+ content_sample: Optional sample of file content for detection
178
+
179
+ Returns:
180
+ FileDetectionResult with detected type and confidence
181
+ """
182
+ try:
183
+ # Get basic file info
184
+ file_size = os.path.getsize(file_path)
185
+ file_ext = Path(file_path).suffix.lower()
186
+ detected_features = []
187
+
188
+ # Try mime type detection
189
+ mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
190
+
191
+ # Get file content sample if not provided
192
+ if content_sample is None:
193
+ with open(file_path, 'rb') as f:
194
+ content_sample = f.read(min(8192, file_size)) # Read first 8KB
195
+
196
+ # Analyze against known patterns
197
+ pattern_scores = []
198
+
199
+ for pattern_name, pattern_config in self.known_patterns.items():
200
+ score = 0.0
201
+ features = []
202
+
203
+ # Check file extension
204
+ if file_ext in pattern_config.get("extensions", []):
205
+ score += 0.3
206
+ features.append(f"extension_{file_ext}")
207
+
208
+ # Check magic bytes
209
+ for magic_bytes in pattern_config.get("magic_bytes", []):
210
+ if magic_bytes in content_sample:
211
+ score += 0.4
212
+ features.append("magic_bytes")
213
+ break
214
+
215
+ # Check content keywords
216
+ try:
217
+ content_text = content_sample.decode('utf-8', errors='ignore').lower()
218
+ for keyword in pattern_config.get("keywords", []):
219
+ if keyword.lower() in content_text:
220
+ score += 0.1
221
+ features.append(f"keyword_{keyword}")
222
+ except:
223
+ pass # Non-text content
224
+
225
+ # Additional scoring based on file characteristics
226
+ if pattern_name.startswith("dicom") and file_size > 1024*1024: # DICOM files are typically >1MB
227
+ score += 0.1
228
+ features.append("size_dicom")
229
+
230
+ if pattern_name.startswith("pdf") and 1024 < file_size < 50*1024*1024: # Reasonable PDF size
231
+ score += 0.1
232
+ features.append("size_pdf")
233
+
234
+ if score > 0:
235
+ pattern_scores.append((pattern_name, score, features))
236
+
237
+ # Select best match
238
+ if pattern_scores:
239
+ best_pattern, best_score, best_features = max(pattern_scores, key=lambda x: x[1])
240
+ file_type = MedicalFileType(best_pattern)
241
+ confidence = min(best_score, 1.0) # Cap at 1.0
242
+ detected_features = best_features
243
+ recommended_extractor = self.known_patterns[best_pattern]["extractor"]
244
+ else:
245
+ # Fallback to unknown
246
+ file_type = MedicalFileType.UNKNOWN
247
+ confidence = 0.1
248
+ detected_features = ["no_pattern_match"]
249
+ recommended_extractor = "generic_extractor"
250
+
251
+ # Adjust confidence based on file size
252
+ if file_size < 100: # Very small files
253
+ confidence *= 0.5
254
+ detected_features.append("very_small_file")
255
+ elif file_size > 100*1024*1024: # Very large files
256
+ confidence *= 0.8
257
+ detected_features.append("large_file")
258
+
259
+ metadata = {
260
+ "file_extension": file_ext,
261
+ "detection_method": "multi_modal",
262
+ "content_length": len(content_sample)
263
+ }
264
+
265
+ logger.info(f"File detection: {file_path} -> {file_type.value} (confidence: {confidence:.2f})")
266
+
267
+ return FileDetectionResult(
268
+ file_type=file_type,
269
+ confidence=confidence,
270
+ detected_features=detected_features,
271
+ mime_type=mime_type,
272
+ file_size=file_size,
273
+ metadata=metadata,
274
+ recommended_extractor=recommended_extractor
275
+ )
276
+
277
+ except Exception as e:
278
+ logger.error(f"File detection error for {file_path}: {str(e)}")
279
+ return FileDetectionResult(
280
+ file_type=MedicalFileType.UNKNOWN,
281
+ confidence=0.0,
282
+ detected_features=["detection_error"],
283
+ mime_type="application/octet-stream",
284
+ file_size=0,
285
+ metadata={"error": str(e)},
286
+ recommended_extractor="error_handler"
287
+ )
288
+
289
+ def batch_detect(self, file_paths: List[str]) -> List[FileDetectionResult]:
290
+ """Detect file types for multiple files"""
291
+ results = []
292
+ for file_path in file_paths:
293
+ if os.path.exists(file_path):
294
+ result = self.detect_file_type(file_path)
295
+ results.append(result)
296
+ else:
297
+ logger.warning(f"File not found: {file_path}")
298
+ return results
299
+
300
+ def get_routing_info(self, detection_result: FileDetectionResult) -> Dict[str, Any]:
301
+ """Get routing information for detected file type"""
302
+ return {
303
+ "extractor": detection_result.recommended_extractor,
304
+ "priority": "high" if detection_result.confidence > 0.8 else "medium" if detection_result.confidence > 0.5 else "low",
305
+ "requires_ocr": detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY,
306
+ MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT],
307
+ "supports_batch": detection_result.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI,
308
+ MedicalFileType.ECG_CSV, MedicalFileType.ARCHIVE_ZIP],
309
+ "phi_risk": "high" if detection_result.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY,
310
+ MedicalFileType.PDF_LABORATORY] else "medium"
311
+ }
312
+
313
+
314
+ def calculate_file_hash(file_path: str) -> str:
315
+ """Calculate SHA256 hash for file deduplication"""
316
+ hash_sha256 = hashlib.sha256()
317
+ try:
318
+ with open(file_path, "rb") as f:
319
+ for chunk in iter(lambda: f.read(4096), b""):
320
+ hash_sha256.update(chunk)
321
+ return hash_sha256.hexdigest()
322
+ except Exception as e:
323
+ logger.error(f"Hash calculation error for {file_path}: {str(e)}")
324
+ return ""
325
+
326
+
327
+ # Export main classes and functions
328
+ __all__ = [
329
+ "MedicalFileDetector",
330
+ "MedicalFileType",
331
+ "FileDetectionResult",
332
+ "calculate_file_hash"
333
+ ]