| | import gradio as gr |
| | import os |
| | import json |
| | import base64 |
| | import tempfile |
| | from pathlib import Path |
| |
|
| | EXTRACTORS = ['pdf_plumber', 'py_pdf', 'docling', 'extractous', 'pypdfium2', 'pymupdf', 'pymupdf_llm'] |
| |
|
| | def add_page_breaks(text, page_offsets): |
| | """Add page break markers to text based on page_offsets.""" |
| | if not page_offsets: |
| | return text |
| | |
| | result = [] |
| | last_offset = 0 |
| | for offset in page_offsets: |
| | result.append(text[last_offset:offset]) |
| | result.append("\n<---page-break--->\n") |
| | last_offset = offset |
| | |
| | |
| | if last_offset < len(text): |
| | result.append(text[last_offset:]) |
| | |
| | return "".join(result) |
| |
|
| | class ExtractorComparer: |
| | def __init__(self): |
| | self.json_files = [] |
| | self.current_index = 0 |
| | self.current_data = None |
| | self.temp_pdf_path = None |
| | self.current_pdf_bytes = None |
| | |
| | def load_files(self, directory_path): |
| | """Load all JSON files from the specified directory.""" |
| | self.json_files = [] |
| | try: |
| | for filename in os.listdir(directory_path): |
| | if filename.endswith('.json') or filename.endswith('.jsonl'): |
| | self.json_files.append(os.path.join(directory_path, filename)) |
| | |
| | if self.json_files: |
| | self.current_index = 0 |
| | file_progress, annotation_status = self.get_progress_info() |
| | return file_progress, annotation_status |
| | else: |
| | return "No JSON files found", "No files loaded" |
| | except Exception as e: |
| | return f"Error loading files: {str(e)}", "Error" |
| | |
| | def load_current_file(self): |
| | """Load the current JSON file data.""" |
| | if not self.json_files: |
| | return None, "N/A", "N/A" |
| | |
| | try: |
| | with open(self.json_files[self.current_index], 'r') as f: |
| | self.current_data = json.load(f) |
| | |
| | |
| | pdf_bytes = None |
| | debug_info = "" |
| | if 'pdf_plumber' in self.current_data: |
| | plumber_data = self.current_data['pdf_plumber'] |
| | if 'media' in plumber_data and plumber_data['media'] and isinstance(plumber_data['media'], list) and len(plumber_data['media']) > 0: |
| | media_item = plumber_data['media'][0] |
| | if 'media_bytes' in media_item and media_item['media_bytes']: |
| | try: |
| | pdf_bytes = base64.b64decode(media_item['media_bytes']) |
| | self.current_pdf_bytes = pdf_bytes |
| | except Exception as e: |
| | debug_info = f"Error decoding media_bytes: {str(e)}" |
| | |
| | |
| | if pdf_bytes: |
| | if self.temp_pdf_path: |
| | try: |
| | os.remove(self.temp_pdf_path) |
| | except: |
| | pass |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
| | temp_file.write(pdf_bytes) |
| | self.temp_pdf_path = temp_file.name |
| | |
| | |
| | base64_pdf = base64.b64encode(pdf_bytes).decode('utf-8') |
| | |
| | |
| | file_progress, annotation_status = self.get_progress_info() |
| | |
| | return base64_pdf, file_progress, annotation_status |
| | else: |
| | file_progress, annotation_status = self.get_progress_info() |
| | return None, file_progress, annotation_status |
| | except Exception as e: |
| | return None, "Error loading file", "No annotation" |
| | |
| | def get_progress_info(self): |
| | """Generate progress information and annotation status.""" |
| | if not self.json_files: |
| | return "No files loaded", "No annotation" |
| | |
| | current_file = self.json_files[self.current_index] |
| | filename = Path(current_file).name |
| | |
| | |
| | file_progress = f"File {self.current_index + 1} of {len(self.json_files)}: {filename}" |
| | |
| | |
| | best_extractor_file = os.path.splitext(current_file)[0] + "_best.txt" |
| | annotation_status = "Not annotated" |
| | |
| | if os.path.exists(best_extractor_file): |
| | try: |
| | with open(best_extractor_file, 'r') as f: |
| | best_extractor = f.read().strip() |
| | annotation_status = f"Best extractor: {best_extractor}" |
| | except: |
| | pass |
| | |
| | |
| | annotated_count = 0 |
| | for json_file in self.json_files: |
| | best_file = os.path.splitext(json_file)[0] + "_best.txt" |
| | if os.path.exists(best_file): |
| | annotated_count += 1 |
| | |
| | file_progress = f"{file_progress} (Annotated: {annotated_count}/{len(self.json_files)})" |
| | |
| | return file_progress, annotation_status |
| | |
| | def get_extractor_text(self, extractor_name): |
| | """Get text with page breaks for the specified extractor.""" |
| | if not self.current_data or extractor_name not in self.current_data: |
| | return "" |
| | |
| | extractor_data = self.current_data[extractor_name] |
| | if 'text' not in extractor_data: |
| | return f"No text found for {extractor_name}" |
| | |
| | text = extractor_data.get('text', '') |
| | |
| | |
| | page_offsets = [] |
| | if 'media' in extractor_data and extractor_data['media'] and len(extractor_data['media']) > 0: |
| | media_item = extractor_data['media'][0] |
| | if 'metadata' in media_item and 'pdf_metadata' in media_item['metadata'] and 'page_offsets' in media_item['metadata']['pdf_metadata']: |
| | page_offsets = media_item['metadata']['pdf_metadata']['page_offsets'] |
| | |
| | return add_page_breaks(text, page_offsets) |
| | |
| | def next_pdf(self): |
| | """Load the next PDF in the list.""" |
| | if not self.json_files: |
| | return None, "N/A", "N/A" |
| | |
| | self.current_index = (self.current_index + 1) % len(self.json_files) |
| | return self.load_current_file() |
| | |
| | def prev_pdf(self): |
| | """Load the previous PDF in the list.""" |
| | if not self.json_files: |
| | return None, "N/A", "N/A" |
| | |
| | self.current_index = (self.current_index - 1) % len(self.json_files) |
| | return self.load_current_file() |
| | |
| | def set_best_extractor(self, extractor_name): |
| | """Record that this extractor is the best for the current file.""" |
| | if not self.json_files or not self.current_data: |
| | return "N/A", "N/A" |
| | |
| | try: |
| | |
| | result_file = os.path.splitext(self.json_files[self.current_index])[0] + "_best.txt" |
| | with open(result_file, 'w') as f: |
| | f.write(extractor_name) |
| | |
| | |
| | file_progress, annotation_status = self.get_progress_info() |
| | |
| | return file_progress, annotation_status |
| | except Exception as e: |
| | return "Error saving annotation", "No annotation" |
| |
|
| | def create_interface(): |
| | comparer = ExtractorComparer() |
| | |
| | |
| | custom_css = """ |
| | .extraction-text textarea { |
| | font-family: Arial, Helvetica, sans-serif !important; |
| | font-size: 14px !important; |
| | line-height: 1.5 !important; |
| | } |
| | """ |
| | |
| | with gr.Blocks(title="PDF Extractor Comparer", theme="soft", css=custom_css) as demo: |
| | gr.Markdown("## PDF Extractor Comparer") |
| | |
| | with gr.Row(): |
| | directory_input = gr.Textbox( |
| | label="Path to JSON Directory", |
| | placeholder="e.g., /path/to/your/json/files" |
| | ) |
| | load_button = gr.Button("Load PDFs", variant="primary") |
| | |
| | |
| | with gr.Row(): |
| | |
| | with gr.Column(scale=3): |
| | |
| | pdf_viewer_html = gr.HTML( |
| | label="PDF Document", |
| | value=''' |
| | <div style="width:100%; height:700px; position:relative; border:1px solid #ddd;"> |
| | <style> |
| | @font-face { |
| | font-family: 'Local Arial'; |
| | src: local('Arial'); |
| | } |
| | body { |
| | font-family: 'Local Arial', sans-serif; |
| | } |
| | </style> |
| | <object id="pdf-object" type="application/pdf" width="100%" height="100%" style="display:none;"> |
| | <p>PDF cannot be displayed</p> |
| | </object> |
| | <div id="pdf-fallback" style="position:absolute; top:0; left:0; width:100%; height:100%; |
| | display:flex; align-items:center; justify-content:center; padding:20px; text-align:center; font-family: Arial, sans-serif;"> |
| | Click "Load PDFs" to start viewing documents. |
| | </div> |
| | </div> |
| | ''' |
| | ) |
| | |
| | pdf_data_hidden = gr.Textbox(visible=False, elem_id="pdf_base64_data") |
| | |
| | |
| | with gr.Column(scale=1): |
| | |
| | file_progress_output = gr.Textbox(label="File Progress", interactive=False) |
| | annotation_status_output = gr.Textbox(label="Annotation Status", interactive=False) |
| | |
| | |
| | with gr.Row(): |
| | prev_button = gr.Button("⬅️ Previous", elem_id="prev_button") |
| | next_button = gr.Button("Next ➡️", elem_id="next_button") |
| | |
| | |
| | gr.Markdown("### Select Best Extractor") |
| | extractor_buttons = [] |
| | for extractor in EXTRACTORS: |
| | button = gr.Button(extractor, variant="secondary") |
| | extractor_buttons.append(button) |
| | button.click( |
| | comparer.set_best_extractor, |
| | inputs=[gr.Textbox(value=extractor, visible=False)], |
| | outputs=[file_progress_output, annotation_status_output] |
| | ) |
| | |
| | |
| | gr.Markdown("### Extractor Comparison") |
| | |
| | |
| | with gr.Row(): |
| | extractor1_dropdown = gr.Dropdown( |
| | choices=EXTRACTORS, |
| | label="Extractor 1", |
| | value=EXTRACTORS[0] if EXTRACTORS else None |
| | ) |
| | extractor2_dropdown = gr.Dropdown( |
| | choices=EXTRACTORS, |
| | label="Extractor 2", |
| | value=EXTRACTORS[1] if len(EXTRACTORS) > 1 else EXTRACTORS[0] if EXTRACTORS else None |
| | ) |
| | |
| | |
| | with gr.Row(): |
| | extractor1_text = gr.Textbox( |
| | label="Extractor 1 Output", |
| | lines=15, |
| | elem_classes=["extraction-text"] |
| | ) |
| | extractor2_text = gr.Textbox( |
| | label="Extractor 2 Output", |
| | lines=15, |
| | elem_classes=["extraction-text"] |
| | ) |
| | |
| | |
| | load_button.click( |
| | comparer.load_files, |
| | inputs=[directory_input], |
| | outputs=[file_progress_output, annotation_status_output] |
| | ).then( |
| | comparer.load_current_file, |
| | outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor1_dropdown], |
| | outputs=[extractor1_text] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor2_dropdown], |
| | outputs=[extractor2_text] |
| | ) |
| | |
| | prev_button.click( |
| | comparer.prev_pdf, |
| | outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor1_dropdown], |
| | outputs=[extractor1_text] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor2_dropdown], |
| | outputs=[extractor2_text] |
| | ) |
| | |
| | next_button.click( |
| | comparer.next_pdf, |
| | outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor1_dropdown], |
| | outputs=[extractor1_text] |
| | ).then( |
| | comparer.get_extractor_text, |
| | inputs=[extractor2_dropdown], |
| | outputs=[extractor2_text] |
| | ) |
| | |
| | extractor1_dropdown.change( |
| | comparer.get_extractor_text, |
| | inputs=[extractor1_dropdown], |
| | outputs=[extractor1_text] |
| | ) |
| | |
| | extractor2_dropdown.change( |
| | comparer.get_extractor_text, |
| | inputs=[extractor2_dropdown], |
| | outputs=[extractor2_text] |
| | ) |
| | |
| | |
| | demo.load( |
| | fn=None, |
| | js=""" |
| | function() { |
| | console.log("Setting up PDF viewer"); |
| | |
| | // Store the current blob URL |
| | var pdfBlobUrl = null; |
| | |
| | // Function to display PDF from base64 data |
| | function displayPdfFromBase64(base64Data) { |
| | try { |
| | if (!base64Data || base64Data.length < 100) { |
| | console.log("No valid PDF data received"); |
| | document.getElementById('pdf-fallback').style.display = 'flex'; |
| | document.getElementById('pdf-object').style.display = 'none'; |
| | return; |
| | } |
| | |
| | console.log("Displaying PDF from base64 data"); |
| | |
| | // Clean up previous blob URL |
| | if (pdfBlobUrl) { |
| | URL.revokeObjectURL(pdfBlobUrl); |
| | } |
| | |
| | // Convert base64 to binary |
| | const binary = atob(base64Data); |
| | const bytes = new Uint8Array(binary.length); |
| | for (let i = 0; i < binary.length; i++) { |
| | bytes[i] = binary.charCodeAt(i); |
| | } |
| | |
| | // Create blob and URL |
| | const blob = new Blob([bytes], {type: 'application/pdf'}); |
| | pdfBlobUrl = URL.createObjectURL(blob); |
| | |
| | // Display PDF in the object element |
| | const pdfObject = document.getElementById('pdf-object'); |
| | const fallback = document.getElementById('pdf-fallback'); |
| | |
| | if (pdfObject && fallback) { |
| | pdfObject.data = pdfBlobUrl; |
| | pdfObject.style.display = 'block'; |
| | fallback.style.display = 'none'; |
| | console.log("PDF displayed successfully"); |
| | } else { |
| | console.error("PDF viewer elements not found"); |
| | } |
| | } catch (error) { |
| | console.error("Error displaying PDF:", error); |
| | const fallback = document.getElementById('pdf-fallback'); |
| | if (fallback) { |
| | fallback.innerHTML = '<div style="color:red; font-family: Arial, sans-serif;">Error displaying PDF</div>'; |
| | fallback.style.display = 'flex'; |
| | } |
| | } |
| | } |
| | |
| | // Check for PDF data repeatedly |
| | function checkForPdfData() { |
| | const dataElement = document.getElementById('pdf_base64_data'); |
| | if (!dataElement) { |
| | console.log("PDF data element not found, will retry"); |
| | setTimeout(checkForPdfData, 1000); |
| | return; |
| | } |
| | |
| | const textarea = dataElement.querySelector('textarea'); |
| | if (!textarea) { |
| | console.log("Textarea not found, will retry"); |
| | setTimeout(checkForPdfData, 1000); |
| | return; |
| | } |
| | |
| | // Display initial data if available |
| | if (textarea.value && textarea.value.length > 100) { |
| | displayPdfFromBase64(textarea.value); |
| | } |
| | |
| | // Set up polling to check for changes |
| | setInterval(function() { |
| | if (textarea.value && textarea.value.length > 100) { |
| | displayPdfFromBase64(textarea.value); |
| | } |
| | }, 2000); |
| | } |
| | |
| | // Start checking for PDF data |
| | setTimeout(checkForPdfData, 1000); |
| | |
| | // Add keyboard shortcuts |
| | document.addEventListener('keydown', function(event) { |
| | if (event.target.tagName === 'INPUT' || event.target.tagName === 'TEXTAREA') { |
| | return; |
| | } |
| | |
| | var buttonId = null; |
| | if (event.key === 'ArrowLeft') buttonId = 'prev_button'; |
| | else if (event.key === 'ArrowRight') buttonId = 'next_button'; |
| | |
| | if (buttonId) { |
| | var button = document.getElementById(buttonId); |
| | if (button) { |
| | event.preventDefault(); |
| | button.click(); |
| | } |
| | } |
| | }); |
| | } |
| | """ |
| | ) |
| | |
| | return demo |
| |
|
| | if __name__ == "__main__": |
| | demo = create_interface() |
| | demo.launch() |