Hanan-Alnakhal commited on
Commit
a49b836
·
verified ·
1 Parent(s): 67e4e82

Update pdf_extractor.py

Browse files
Files changed (1) hide show
  1. pdf_extractor.py +188 -167
pdf_extractor.py CHANGED
@@ -1,168 +1,189 @@
1
- """
2
- PDF Extraction Module for Lab Reports
3
- Extracts lab test names, values, and ranges from uploaded PDF files
4
- """
5
-
6
- import pdfplumber
7
- import re
8
- from typing import Dict, List, Optional
9
- from dataclasses import dataclass
10
-
11
- @dataclass
12
- class LabResult:
13
- """Represents a single lab test result"""
14
- test_name: str
15
- value: str
16
- unit: str
17
- reference_range: str
18
- status: str # 'normal', 'high', 'low', 'unknown'
19
-
20
- class LabReportExtractor:
21
- """Extract structured data from lab report PDFs"""
22
-
23
- def __init__(self):
24
- # Common lab test patterns
25
- self.test_patterns = [
26
- r'(Hemoglobin|Hgb|Hb)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
27
- r'(WBC|White Blood Cell|Leukocyte)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
28
- r'(Glucose|Blood Sugar)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
29
- r'(Iron|Ferritin)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
30
- r'(Cholesterol|LDL|HDL)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
31
- ]
32
-
33
- def extract_from_pdf(self, pdf_path: str) -> List[LabResult]:
34
- """Extract lab results from PDF file"""
35
- results = []
36
-
37
- with pdfplumber.open(pdf_path) as pdf:
38
- for page in pdf.pages:
39
- text = page.extract_text()
40
-
41
- # Try to extract tables first (more structured)
42
- tables = page.extract_tables()
43
- if tables:
44
- results.extend(self._parse_tables(tables))
45
-
46
- # Fall back to pattern matching
47
- results.extend(self._parse_text(text))
48
-
49
- # Remove duplicates
50
- unique_results = self._deduplicate_results(results)
51
-
52
- return unique_results
53
-
54
- def _parse_tables(self, tables: List) -> List[LabResult]:
55
- """Parse lab results from extracted tables"""
56
- results = []
57
-
58
- for table in tables:
59
- if not table or len(table) < 2:
60
- continue
61
-
62
- # Assume first row is header
63
- headers = [h.lower() if h else '' for h in table[0]]
64
-
65
- # Find relevant columns
66
- test_col = self._find_column(headers, ['test', 'name', 'component'])
67
- value_col = self._find_column(headers, ['value', 'result'])
68
- unit_col = self._find_column(headers, ['unit', 'units'])
69
- range_col = self._find_column(headers, ['range', 'reference', 'normal'])
70
-
71
- # Parse data rows
72
- for row in table[1:]:
73
- if not row or len(row) <= max(test_col or 0, value_col or 0):
74
- continue
75
-
76
- test_name = row[test_col] if test_col is not None else ''
77
- value = row[value_col] if value_col is not None else ''
78
- unit = row[unit_col] if unit_col is not None else ''
79
- ref_range = row[range_col] if range_col is not None else ''
80
-
81
- if test_name and value:
82
- status = self._determine_status(value, ref_range)
83
- results.append(LabResult(
84
- test_name=test_name.strip(),
85
- value=str(value).strip(),
86
- unit=str(unit).strip() if unit else '',
87
- reference_range=str(ref_range).strip() if ref_range else '',
88
- status=status
89
- ))
90
-
91
- return results
92
-
93
- def _parse_text(self, text: str) -> List[LabResult]:
94
- """Parse lab results using regex patterns"""
95
- results = []
96
-
97
- for pattern in self.test_patterns:
98
- matches = re.finditer(pattern, text, re.IGNORECASE)
99
- for match in matches:
100
- groups = match.groups()
101
- if len(groups) >= 2:
102
- test_name = groups[0]
103
- value = groups[1]
104
- unit = groups[2] if len(groups) > 2 else ''
105
- ref_range = groups[3] if len(groups) > 3 else ''
106
-
107
- status = self._determine_status(value, ref_range)
108
- results.append(LabResult(
109
- test_name=test_name,
110
- value=value,
111
- unit=unit or '',
112
- reference_range=ref_range or '',
113
- status=status
114
- ))
115
-
116
- return results
117
-
118
- def _find_column(self, headers: List[str], keywords: List[str]) -> Optional[int]:
119
- """Find column index by keywords"""
120
- for i, header in enumerate(headers):
121
- for keyword in keywords:
122
- if keyword in header:
123
- return i
124
- return None
125
-
126
- def _determine_status(self, value: str, ref_range: str) -> str:
127
- """Determine if value is normal, high, or low"""
128
- try:
129
- val = float(value.replace(',', ''))
130
-
131
- # Parse reference range
132
- range_match = re.search(r'([\d.]+)\s*-\s*([\d.]+)', ref_range)
133
- if range_match:
134
- low = float(range_match.group(1))
135
- high = float(range_match.group(2))
136
-
137
- if val < low:
138
- return 'low'
139
- elif val > high:
140
- return 'high'
141
- else:
142
- return 'normal'
143
- except (ValueError, AttributeError):
144
- pass
145
-
146
- return 'unknown'
147
-
148
- def _deduplicate_results(self, results: List[LabResult]) -> List[LabResult]:
149
- """Remove duplicate test results"""
150
- seen = set()
151
- unique = []
152
-
153
- for result in results:
154
- key = (result.test_name.lower(), result.value)
155
- if key not in seen:
156
- seen.add(key)
157
- unique.append(result)
158
-
159
- return unique
160
-
161
- # Example usage
162
- if __name__ == "__main__":
163
- extractor = LabReportExtractor()
164
- results = extractor.extract_from_pdf("sample_lab_report.pdf")
165
-
166
- for result in results:
167
- print(f"{result.test_name}: {result.value} {result.unit} [{result.status}]")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  print(f" Reference: {result.reference_range}")
 
1
+ """
2
+ PDF Extraction Module for Lab Reports
3
+ Extracts lab test names, values, and ranges from uploaded PDF files
4
+ """
5
+
6
+ import pdfplumber
7
+ import re
8
+ from typing import Dict, List, Optional
9
+ from dataclasses import dataclass
10
+
11
+ @dataclass
12
+ class LabResult:
13
+ """Represents a single lab test result"""
14
+ test_name: str
15
+ value: str
16
+ unit: str
17
+ reference_range: str
18
+ status: str # 'normal', 'high', 'low', 'unknown'
19
+
20
+ class LabReportExtractor:
21
+ """Extract structured data from lab report PDFs"""
22
+
23
+ def __init__(self):
24
+ # Common lab test patterns
25
+ self.test_patterns = [
26
+ r'(Hemoglobin|Hgb|Hb)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
27
+ r'(WBC|White Blood Cell|Leukocyte)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
28
+ r'(Glucose|Blood Sugar)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
29
+ r'(Iron|Ferritin)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
30
+ r'(Cholesterol|LDL|HDL)\s*:?\s*([\d.]+)\s*([a-zA-Z/]+)?\s*(?:Ref\.?\s*Range:?\s*)?([\d.\-\s]+)',
31
+ ]
32
+
33
+ def extract_from_pdf(self, pdf_path: str) -> List[LabResult]:
34
+ """Extract lab results from PDF file"""
35
+ results = []
36
+
37
+ with pdfplumber.open(pdf_path) as pdf:
38
+ for page in pdf.pages:
39
+ text = page.extract_text()
40
+
41
+ # Try to extract tables first (more structured)
42
+ tables = page.extract_tables()
43
+ if tables:
44
+ results.extend(self._parse_tables(tables))
45
+
46
+ # Fall back to pattern matching
47
+ results.extend(self._parse_text(text))
48
+
49
+ # Remove duplicates
50
+ unique_results = self._deduplicate_results(results)
51
+
52
+ return unique_results
53
+
54
+ def _parse_tables(self, tables: List) -> List[LabResult]:
55
+ """Parse lab results from extracted tables"""
56
+ results = []
57
+
58
+ for table in tables:
59
+ if not table or len(table) < 2:
60
+ continue
61
+
62
+ # Assume first row is header
63
+ headers = [h.lower() if h else '' for h in table[0]]
64
+
65
+ # Find relevant columns
66
+ test_col = self._find_column(headers, ['test', 'name', 'component'])
67
+ value_col = self._find_column(headers, ['value', 'result'])
68
+ unit_col = self._find_column(headers, ['unit', 'units'])
69
+ range_col = self._find_column(headers, ['range', 'reference', 'normal'])
70
+
71
+ # Parse data rows
72
+ for row in table[1:]:
73
+ if not row or len(row) <= max(test_col or 0, value_col or 0):
74
+ continue
75
+
76
+ test_name = row[test_col] if test_col is not None else ''
77
+ value = row[value_col] if value_col is not None else ''
78
+ unit = row[unit_col] if unit_col is not None else ''
79
+ ref_range = row[range_col] if range_col is not None else ''
80
+
81
+ if test_name and value:
82
+ status = self._determine_status(value, ref_range)
83
+ results.append(LabResult(
84
+ test_name=test_name.strip(),
85
+ value=str(value).strip(),
86
+ unit=str(unit).strip() if unit else '',
87
+ reference_range=str(ref_range).strip() if ref_range else '',
88
+ status=status
89
+ ))
90
+
91
+ return results
92
+
93
+ def _parse_text(self, text: str) -> List[LabResult]:
94
+ """Parse lab results using regex patterns"""
95
+ results = []
96
+
97
+ for pattern in self.test_patterns:
98
+ matches = re.finditer(pattern, text, re.IGNORECASE)
99
+ for match in matches:
100
+ groups = match.groups()
101
+ if len(groups) >= 2:
102
+ test_name = groups[0]
103
+ value = groups[1]
104
+ unit = groups[2] if len(groups) > 2 else ''
105
+ ref_range = groups[3] if len(groups) > 3 else ''
106
+
107
+ status = self._determine_status(value, ref_range)
108
+ results.append(LabResult(
109
+ test_name=test_name,
110
+ value=value,
111
+ unit=unit or '',
112
+ reference_range=ref_range or '',
113
+ status=status
114
+ ))
115
+
116
+ return results
117
+
118
+ def _find_column(self, headers: List[str], keywords: List[str]) -> Optional[int]:
119
+ """Find column index by keywords"""
120
+ for i, header in enumerate(headers):
121
+ for keyword in keywords:
122
+ if keyword in header:
123
+ return i
124
+ return None
125
+
126
+ def _determine_status(self, value: str, ref_range: str) -> str:
127
+ """Determine if value is normal, high, or low"""
128
+ try:
129
+ # Clean value - remove any non-numeric characters except . and -
130
+ clean_value = re.sub(r'[^\d.-]', '', value)
131
+ if not clean_value or clean_value == '-':
132
+ return 'unknown'
133
+
134
+ val = float(clean_value)
135
+
136
+ # Parse reference range - try multiple patterns
137
+ # Pattern 1: "4.0 - 11.0" or "4.0-11.0"
138
+ range_match = re.search(r'([\d.]+)\s*-\s*([\d.]+)', ref_range)
139
+
140
+ if range_match:
141
+ low = float(range_match.group(1))
142
+ high = float(range_match.group(2))
143
+
144
+ # Add small tolerance for floating point comparison
145
+ if val < (low - 0.01):
146
+ return 'low'
147
+ elif val > (high + 0.01):
148
+ return 'high'
149
+ else:
150
+ return 'normal'
151
+
152
+ # Pattern 2: "< 100" (upper limit only)
153
+ upper_match = re.search(r'[<≤]\s*([\d.]+)', ref_range)
154
+ if upper_match:
155
+ upper = float(upper_match.group(1))
156
+ return 'high' if val > upper else 'normal'
157
+
158
+ # Pattern 3: "> 50" (lower limit only)
159
+ lower_match = re.search(r'[>≥]\s*([\d.]+)', ref_range)
160
+ if lower_match:
161
+ lower = float(lower_match.group(1))
162
+ return 'low' if val < lower else 'normal'
163
+
164
+ except (ValueError, AttributeError) as e:
165
+ print(f"Status detection error for value '{value}', range '{ref_range}': {e}")
166
+
167
+ return 'unknown'
168
+
169
+ def _deduplicate_results(self, results: List[LabResult]) -> List[LabResult]:
170
+ """Remove duplicate test results"""
171
+ seen = set()
172
+ unique = []
173
+
174
+ for result in results:
175
+ key = (result.test_name.lower(), result.value)
176
+ if key not in seen:
177
+ seen.add(key)
178
+ unique.append(result)
179
+
180
+ return unique
181
+
182
+ # Example usage
183
+ if __name__ == "__main__":
184
+ extractor = LabReportExtractor()
185
+ results = extractor.extract_from_pdf("sample_lab_report.pdf")
186
+
187
+ for result in results:
188
+ print(f"{result.test_name}: {result.value} {result.unit} [{result.status}]")
189
  print(f" Reference: {result.reference_range}")