cryogenic22 commited on
Commit
46eb9e8
·
verified ·
1 Parent(s): 6ed286a

Create pdf_processor.py

Browse files
Files changed (1) hide show
  1. pdf_processor.py +350 -0
pdf_processor.py ADDED
@@ -0,0 +1,350 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ PDF processing utilities for extracting text, sections, and structured data from clinical documents.
3
+ """
4
+
5
+ import os
6
+ import re
7
+ import fitz # PyMuPDF
8
+ from typing import Dict, List, Tuple, Optional, Any
9
+ import json
10
+ from collections import defaultdict
11
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
12
+
13
+ class PDFProcessor:
14
+ """Main class for PDF processing, extraction, and chunking."""
15
+
16
+ def __init__(self, upload_dir="./data/uploads"):
17
+ """Initialize with the directory for uploaded PDFs."""
18
+ self.upload_dir = upload_dir
19
+ os.makedirs(upload_dir, exist_ok=True)
20
+
21
+ def save_uploaded_file(self, uploaded_file) -> str:
22
+ """Save an uploaded file to disk and return the path."""
23
+ file_path = os.path.join(self.upload_dir, uploaded_file.name)
24
+ with open(file_path, "wb") as f:
25
+ f.write(uploaded_file.getbuffer())
26
+ return file_path
27
+
28
+ def extract_text_from_pdf(self, pdf_path: str) -> Tuple[str, List[Dict]]:
29
+ """
30
+ Extract text from PDF with page numbers and attempt to identify section headers.
31
+
32
+ Returns:
33
+ Tuple containing:
34
+ - Full text string
35
+ - List of pages with text and page numbers
36
+ """
37
+ try:
38
+ doc = fitz.open(pdf_path)
39
+ full_text = ""
40
+ pages = []
41
+
42
+ for page_num, page in enumerate(doc):
43
+ text = page.get_text()
44
+ full_text += text + "\n\n"
45
+ pages.append({
46
+ "page_num": page_num + 1,
47
+ "text": text
48
+ })
49
+
50
+ doc.close()
51
+ return full_text, pages
52
+ except Exception as e:
53
+ print(f"Error extracting text from PDF {pdf_path}: {e}")
54
+ return "", []
55
+
56
+ def identify_section_titles(self, text: str) -> List[Dict]:
57
+ """
58
+ Identify potential section titles based on common patterns in clinical documents.
59
+
60
+ Returns:
61
+ List of dictionaries with section title and position info
62
+ """
63
+ # Common patterns for section headers in protocols and SAPs
64
+ patterns = [
65
+ # Numbered sections like "1. INTRODUCTION" or "2.3 Statistical Analysis"
66
+ r'^(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z\s]+)$',
67
+ # ALL CAPS headers like "OBJECTIVES AND ENDPOINTS"
68
+ r'^([A-Z][A-Z\s]{3,})$',
69
+ # Title case headers with optional trailing colon
70
+ r'^([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,5}):?$'
71
+ ]
72
+
73
+ sections = []
74
+ for line_num, line in enumerate(text.split('\n')):
75
+ line = line.strip()
76
+ if not line:
77
+ continue
78
+
79
+ for pattern in patterns:
80
+ matches = re.match(pattern, line)
81
+ if matches:
82
+ if len(matches.groups()) > 1:
83
+ # For numbered patterns
84
+ section_num, section_title = matches.groups()
85
+ sections.append({
86
+ "section_num": section_num,
87
+ "section_title": section_title.strip(),
88
+ "line_num": line_num,
89
+ "text": line
90
+ })
91
+ else:
92
+ # For unnumbered patterns
93
+ section_title = matches.group(1)
94
+ sections.append({
95
+ "section_num": None,
96
+ "section_title": section_title.strip(),
97
+ "line_num": line_num,
98
+ "text": line
99
+ })
100
+ break
101
+
102
+ return sections
103
+
104
+ def split_into_sections(self, full_text: str, filename: str) -> Dict[str, str]:
105
+ """
106
+ Split the full text into logical sections based on identified section titles.
107
+
108
+ Returns:
109
+ Dictionary mapping section names to their text content
110
+ """
111
+ # First identify potential section titles
112
+ lines = full_text.split('\n')
113
+ section_markers = self.identify_section_titles(full_text)
114
+
115
+ if not section_markers:
116
+ # If no sections found, treat the whole document as one section
117
+ return {"document": full_text}
118
+
119
+ # Sort section markers by line number
120
+ section_markers.sort(key=lambda x: x["line_num"])
121
+
122
+ # Create sections
123
+ sections = {}
124
+ for i in range(len(section_markers)):
125
+ start_line = section_markers[i]["line_num"]
126
+ section_name = section_markers[i]["section_title"]
127
+
128
+ # Determine end line (next section or end of document)
129
+ if i < len(section_markers) - 1:
130
+ end_line = section_markers[i+1]["line_num"]
131
+ else:
132
+ end_line = len(lines)
133
+
134
+ # Extract section text
135
+ section_text = '\n'.join(lines[start_line:end_line])
136
+ sections[section_name] = section_text
137
+
138
+ return sections
139
+
140
+ def chunk_text(self, text: str, metadata: Dict[str, Any],
141
+ chunk_size: int = 1000, overlap: int = 200) -> List[Dict]:
142
+ """
143
+ Split text into chunks suitable for embedding.
144
+
145
+ Args:
146
+ text: Text to chunk
147
+ metadata: Metadata to include with each chunk
148
+ chunk_size: Maximum size of each chunk
149
+ overlap: Overlap between chunks
150
+
151
+ Returns:
152
+ List of dictionaries with page_content and metadata
153
+ """
154
+ text_splitter = RecursiveCharacterTextSplitter(
155
+ chunk_size=chunk_size,
156
+ chunk_overlap=overlap,
157
+ length_function=len,
158
+ )
159
+
160
+ chunks = text_splitter.create_documents(
161
+ [text],
162
+ metadatas=[metadata]
163
+ )
164
+
165
+ return [{"page_content": chunk.page_content, "metadata": chunk.metadata} for chunk in chunks]
166
+
167
+ def process_document_for_vector_store(self, pdf_path: str,
168
+ document_metadata: Dict[str, Any]) -> List[Dict]:
169
+ """
170
+ Process a document for storage in the vector store.
171
+ Extract text, split into chunks, and add metadata.
172
+
173
+ Args:
174
+ pdf_path: Path to the PDF file
175
+ document_metadata: Metadata about the document
176
+
177
+ Returns:
178
+ List of dictionaries with page_content and metadata ready for vector store
179
+ """
180
+ full_text, pages = self.extract_text_from_pdf(pdf_path)
181
+ sections = self.split_into_sections(full_text, os.path.basename(pdf_path))
182
+
183
+ all_chunks = []
184
+
185
+ # Process each section as its own set of chunks
186
+ for section_name, section_text in sections.items():
187
+ section_metadata = document_metadata.copy()
188
+ section_metadata.update({
189
+ "section": section_name,
190
+ "source": os.path.basename(pdf_path)
191
+ })
192
+
193
+ chunks = self.chunk_text(section_text, section_metadata)
194
+ all_chunks.extend(chunks)
195
+
196
+ return all_chunks
197
+
198
+ def extract_tables_from_pdf(self, pdf_path: str) -> List[Dict]:
199
+ """
200
+ Attempt to extract tables from the PDF.
201
+ This is a simplified implementation and may not work for all PDFs.
202
+
203
+ Returns:
204
+ List of dictionaries with table info including page number and content
205
+ """
206
+ # This is a placeholder. Table extraction from PDFs is complex and often
207
+ # requires specialized libraries or even manual extraction/OCR
208
+ # For a production system, consider tools like Camelot, Tabula, or commercial APIs
209
+
210
+ return [] # Placeholder for actual table extraction
211
+
212
+ def identify_document_type(self, text: str, filename: str) -> str:
213
+ """
214
+ Attempt to identify the type of document (Protocol, SAP, etc.)
215
+ based on content and filename patterns.
216
+
217
+ Returns:
218
+ String indicating document type
219
+ """
220
+ lower_text = text.lower()
221
+ lower_filename = filename.lower()
222
+
223
+ # Check filename patterns
224
+ if "protocol" in lower_filename or "prot" in lower_filename:
225
+ return "Protocol"
226
+ elif "sap" in lower_filename or "analysis plan" in lower_filename:
227
+ return "Statistical Analysis Plan"
228
+ elif "csr" in lower_filename or "study report" in lower_filename:
229
+ return "Clinical Study Report"
230
+ elif "ib" in lower_filename or "investigator" in lower_filename and "brochure" in lower_filename:
231
+ return "Investigator Brochure"
232
+
233
+ # Check content patterns
234
+ if "statistical analysis plan" in lower_text:
235
+ return "Statistical Analysis Plan"
236
+ elif "clinical study protocol" in lower_text or "study protocol" in lower_text:
237
+ return "Protocol"
238
+ elif "clinical study report" in lower_text:
239
+ return "Clinical Study Report"
240
+ elif "investigator's brochure" in lower_text or "investigator brochure" in lower_text:
241
+ return "Investigator Brochure"
242
+
243
+ # Default
244
+ return "Unknown"
245
+
246
+ def extract_protocol_id(self, text: str, filename: str) -> Optional[str]:
247
+ """
248
+ Attempt to extract the protocol ID from the document text or filename.
249
+
250
+ Returns:
251
+ Protocol ID string if found, None otherwise
252
+ """
253
+ # Common patterns for protocol IDs
254
+ patterns = [
255
+ # Common format like: Protocol B9531002
256
+ r'[Pp]rotocol\s+([A-Z][0-9]{5,}[A-Z0-9]*)',
257
+ # Format with hyphen like: C5161-001
258
+ r'([A-Z][0-9]{4,}-[0-9]{3})',
259
+ # Standard pattern like: ABC-123-456
260
+ r'([A-Z]{2,5}-[0-9]{2,3}-[0-9]{2,3})',
261
+ # Simple alphanumeric like: XYZ12345
262
+ r'([A-Z]{2,5}[0-9]{4,6})'
263
+ ]
264
+
265
+ # Try to find in the first few hundred characters (often in the title)
266
+ sample_text = text[:1000]
267
+
268
+ for pattern in patterns:
269
+ matches = re.search(pattern, sample_text)
270
+ if matches:
271
+ return matches.group(1)
272
+
273
+ # Check filename
274
+ for pattern in patterns:
275
+ matches = re.search(pattern, filename)
276
+ if matches:
277
+ return matches.group(1)
278
+
279
+ return None
280
+
281
+ def extract_basic_metadata(self, pdf_path: str) -> Dict[str, Any]:
282
+ """
283
+ Extract basic metadata from a PDF without detailed structure extraction.
284
+
285
+ Returns:
286
+ Dictionary with basic document metadata
287
+ """
288
+ filename = os.path.basename(pdf_path)
289
+ full_text, _ = self.extract_text_from_pdf(pdf_path)
290
+
291
+ # Sample the first part of the document
292
+ sample_text = full_text[:5000]
293
+
294
+ # Extract potential protocol ID
295
+ protocol_id = self.extract_protocol_id(sample_text, filename)
296
+
297
+ # Determine document type
298
+ doc_type = self.identify_document_type(sample_text, filename)
299
+
300
+ # Extract title (usually in the first few lines)
301
+ lines = sample_text.split('\n')
302
+ title = next((line.strip() for line in lines if len(line.strip()) > 20 and len(line.strip()) < 200), "Unknown Title")
303
+
304
+ # Create basic metadata
305
+ metadata = {
306
+ "document_id": os.path.splitext(filename)[0],
307
+ "filename": filename,
308
+ "protocol_id": protocol_id,
309
+ "type": doc_type,
310
+ "title": title,
311
+ "path": pdf_path
312
+ }
313
+
314
+ return metadata
315
+
316
+ def process_complete_document(self, pdf_path: str) -> Dict[str, Any]:
317
+ """
318
+ Process a complete document for both structured data and vector storage.
319
+ This is the main entry point for document processing.
320
+
321
+ Returns:
322
+ Dictionary with processing results
323
+ """
324
+ results = {
325
+ "status": "success",
326
+ "pdf_path": pdf_path,
327
+ "filename": os.path.basename(pdf_path)
328
+ }
329
+
330
+ try:
331
+ # Step 1: Extract basic metadata
332
+ metadata = self.extract_basic_metadata(pdf_path)
333
+ results["metadata"] = metadata
334
+
335
+ # Step 2: Extract full text and split into sections
336
+ full_text, pages = self.extract_text_from_pdf(pdf_path)
337
+ sections = self.split_into_sections(full_text, os.path.basename(pdf_path))
338
+ results["sections"] = list(sections.keys())
339
+ results["page_count"] = len(pages)
340
+
341
+ # Step 3: Prepare chunks for vector store
342
+ chunks = self.process_document_for_vector_store(pdf_path, metadata)
343
+ results["chunk_count"] = len(chunks)
344
+ results["chunks"] = chunks
345
+
346
+ return results
347
+ except Exception as e:
348
+ results["status"] = "error"
349
+ results["error"] = str(e)
350
+ return results