Spaces:
Runtime error
Runtime error
| # app.py - Main Hugging Face Spaces Application | |
| import gradio as gr | |
| import PyPDF2 | |
| import pdfplumber | |
| import fitz # PyMuPDF | |
| import pandas as pd | |
| import re | |
| import logging | |
| import os | |
| import tempfile | |
| from typing import Dict, List, Tuple, Optional | |
| from pathlib import Path | |
| import json | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class PDFProcessorError(Exception): | |
| """Custom exception for PDF processing errors""" | |
| pass | |
| def enhanced_pdf_processor(file_path: str) -> Dict: | |
| """ | |
| Enhanced PDF processor for Hugging Face deployment | |
| """ | |
| results = { | |
| 'text': '', | |
| 'tables': [], | |
| 'metadata': {}, | |
| 'extraction_method': 'unknown', | |
| 'success': False, | |
| 'error': None, | |
| 'file_info': {}, | |
| 'summary': '' | |
| } | |
| try: | |
| # Validate file | |
| if not os.path.exists(file_path): | |
| results['error'] = f"File does not exist: {file_path}" | |
| return results | |
| # Get file info | |
| results['file_info'] = get_file_info(file_path) | |
| # Try different extraction methods | |
| extraction_methods = [ | |
| ('PyMuPDF', extract_with_pymupdf), | |
| ('pdfplumber', extract_with_pdfplumber), | |
| ('PyPDF2', extract_with_pypdf2) | |
| ] | |
| for method_name, method_func in extraction_methods: | |
| try: | |
| logger.info(f"Trying extraction method: {method_name}") | |
| if method_name == 'pdfplumber': | |
| text_result, tables = method_func(file_path) | |
| if text_result and len(text_result.strip()) > 10: | |
| results['text'] = text_result | |
| results['tables'] = tables | |
| results['extraction_method'] = method_name | |
| results['success'] = True | |
| break | |
| elif method_name == 'PyMuPDF': | |
| text_result, metadata = method_func(file_path) | |
| if text_result and len(text_result.strip()) > 10: | |
| results['text'] = text_result | |
| results['metadata'] = metadata | |
| results['extraction_method'] = method_name | |
| results['success'] = True | |
| break | |
| else: # PyPDF2 | |
| text_result = method_func(file_path) | |
| if text_result and len(text_result.strip()) > 10: | |
| results['text'] = text_result | |
| results['extraction_method'] = method_name | |
| results['success'] = True | |
| break | |
| except Exception as e: | |
| logger.warning(f"{method_name} failed: {str(e)}") | |
| continue | |
| # Generate summary if successful | |
| if results['success']: | |
| results['summary'] = generate_document_summary(results['text']) | |
| else: | |
| results['error'] = "All extraction methods failed" | |
| except Exception as e: | |
| results['error'] = f"Processing error: {str(e)}" | |
| logger.error(f"PDF processing error: {e}") | |
| return results | |
| def extract_with_pypdf2(file_path: str) -> str: | |
| """Extract text using PyPDF2""" | |
| text = "" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| if reader.is_encrypted: | |
| try: | |
| reader.decrypt("") | |
| except: | |
| raise PDFProcessorError("PDF is encrypted") | |
| for page_num, page in enumerate(reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" | |
| except Exception as e: | |
| logger.warning(f"Failed to extract page {page_num + 1}: {e}") | |
| return clean_text(text) | |
| except Exception as e: | |
| raise PDFProcessorError(f"PyPDF2 extraction failed: {e}") | |
| def extract_with_pdfplumber(file_path: str) -> Tuple[str, List[Dict]]: | |
| """Extract text and tables using pdfplumber""" | |
| text = "" | |
| tables = [] | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| for page_num, page in enumerate(pdf.pages): | |
| try: | |
| # Extract text | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" | |
| # Extract tables | |
| page_tables = page.extract_tables() | |
| for table_num, table in enumerate(page_tables): | |
| if table and len(table) > 1: | |
| tables.append({ | |
| 'page': page_num + 1, | |
| 'table_number': table_num + 1, | |
| 'data': table, | |
| 'text_representation': table_to_text(table) | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Failed to process page {page_num + 1}: {e}") | |
| return clean_text(text), tables | |
| except Exception as e: | |
| raise PDFProcessorError(f"pdfplumber extraction failed: {e}") | |
| def extract_with_pymupdf(file_path: str) -> Tuple[str, Dict]: | |
| """Extract text using PyMuPDF""" | |
| text = "" | |
| metadata = {} | |
| try: | |
| doc = fitz.open(file_path) | |
| # Extract metadata | |
| try: | |
| doc_metadata = doc.metadata or {} | |
| metadata = { | |
| 'page_count': doc.page_count, | |
| 'title': doc_metadata.get('title', ''), | |
| 'author': doc_metadata.get('author', ''), | |
| 'subject': doc_metadata.get('subject', ''), | |
| 'creator': doc_metadata.get('creator', ''), | |
| 'creation_date': doc_metadata.get('creationDate', '') | |
| } | |
| except Exception as e: | |
| metadata = {'page_count': doc.page_count} | |
| # Extract text | |
| for page_num in range(doc.page_count): | |
| try: | |
| page = doc[page_num] | |
| page_text = page.get_text() | |
| if page_text: | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" | |
| except Exception as e: | |
| logger.warning(f"Failed to extract page {page_num + 1}: {e}") | |
| doc.close() | |
| return clean_text(text), metadata | |
| except Exception as e: | |
| raise PDFProcessorError(f"PyMuPDF extraction failed: {e}") | |
| def clean_text(text: str) -> str: | |
| """Clean extracted text""" | |
| if not text: | |
| return "" | |
| # Remove excessive whitespace | |
| text = re.sub(r'\n\s*\n', '\n\n', text) | |
| text = re.sub(r' +', ' ', text) | |
| # Remove problematic characters | |
| text = text.replace('\ufffd', '') | |
| text = text.replace('\x00', '') | |
| text = text.replace('\u200b', '') | |
| return text.strip() | |
| def table_to_text(table: List[List]) -> str: | |
| """Convert table to text""" | |
| if not table: | |
| return "" | |
| text_lines = [] | |
| for row in table: | |
| if row: | |
| clean_row = [str(cell).strip() if cell else "" for cell in row] | |
| if any(clean_row): | |
| text_lines.append(" | ".join(clean_row)) | |
| return "\n".join(text_lines) | |
| def get_file_info(file_path: str) -> Dict: | |
| """Get file information""" | |
| try: | |
| path = Path(file_path) | |
| stat = path.stat() | |
| return { | |
| 'name': path.name, | |
| 'size': stat.st_size, | |
| 'size_mb': round(stat.st_size / (1024 * 1024), 2) | |
| } | |
| except Exception: | |
| return {} | |
| def generate_document_summary(text: str) -> str: | |
| """Generate a simple document summary""" | |
| if not text: | |
| return "No text extracted" | |
| # Basic statistics | |
| words = len(text.split()) | |
| lines = len(text.split('\n')) | |
| chars = len(text) | |
| # Extract first few sentences for preview | |
| sentences = re.split(r'[.!?]+', text) | |
| preview = '. '.join(sentences[:3]).strip() | |
| if len(preview) > 300: | |
| preview = preview[:300] + "..." | |
| return f""" | |
| Document Statistics: | |
| - Characters: {chars:,} | |
| - Words: {words:,} | |
| - Lines: {lines:,} | |
| Preview: | |
| {preview} | |
| """ | |
| def process_pdf_file(file) -> Tuple[str, str, str, str]: | |
| """ | |
| Process uploaded PDF file for Gradio interface | |
| """ | |
| if file is None: | |
| return "No file uploaded", "", "", "" | |
| try: | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| tmp_file.write(file.read()) | |
| tmp_file_path = tmp_file.name | |
| # Process the PDF | |
| result = enhanced_pdf_processor(tmp_file_path) | |
| # Clean up | |
| os.unlink(tmp_file_path) | |
| if result['success']: | |
| # Format results for display | |
| status = f"✅ Successfully processed using {result['extraction_method']}" | |
| # File info | |
| file_info = result.get('file_info', {}) | |
| info = f""" | |
| File: {file_info.get('name', 'Unknown')} | |
| Size: {file_info.get('size_mb', 0)} MB | |
| Pages: {result.get('metadata', {}).get('page_count', 'Unknown')} | |
| """ | |
| # Summary | |
| summary = result.get('summary', 'No summary available') | |
| # Full text (truncated for display) | |
| full_text = result['text'] | |
| if len(full_text) > 5000: | |
| display_text = full_text[:5000] + f"\n\n... (Text truncated. Total length: {len(full_text)} characters)" | |
| else: | |
| display_text = full_text | |
| # Tables info | |
| if result['tables']: | |
| tables_info = f"\n\nTables found: {len(result['tables'])}" | |
| for i, table in enumerate(result['tables'][:3]): # Show first 3 tables | |
| tables_info += f"\n\nTable {i+1} (Page {table['page']}):\n" | |
| tables_info += table['text_representation'][:500] | |
| if len(table['text_representation']) > 500: | |
| tables_info += "..." | |
| display_text += tables_info | |
| return status, info, summary, display_text | |
| else: | |
| error_msg = result.get('error', 'Unknown error') | |
| return f"❌ Processing failed: {error_msg}", "", "", "" | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}", "", "", "" | |
| def answer_question(text: str, question: str) -> str: | |
| """ | |
| Simple keyword-based question answering | |
| """ | |
| if not text or not question: | |
| return "Please provide both text and a question." | |
| # Convert to lowercase for searching | |
| text_lower = text.lower() | |
| question_lower = question.lower() | |
| # Extract keywords from question | |
| keywords = [word for word in question_lower.split() if len(word) > 3] | |
| # Find relevant sentences | |
| sentences = re.split(r'[.!?]+', text) | |
| relevant_sentences = [] | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| score = sum(1 for keyword in keywords if keyword in sentence_lower) | |
| if score > 0: | |
| relevant_sentences.append((sentence.strip(), score)) | |
| # Sort by relevance and take top 3 | |
| relevant_sentences.sort(key=lambda x: x[1], reverse=True) | |
| top_sentences = [sent[0] for sent in relevant_sentences[:3]] | |
| if top_sentences: | |
| return f"Based on the document, here are the most relevant sections:\n\n" + "\n\n".join(top_sentences) | |
| else: | |
| return "I couldn't find information related to your question in the document." | |
| # Global variable to store extracted text | |
| extracted_text = "" | |
| def update_extracted_text(status, info, summary, full_text): | |
| """Update global extracted text variable""" | |
| global extracted_text | |
| extracted_text = full_text | |
| return status, info, summary, full_text | |
| def qa_interface(question): | |
| """Interface for question answering""" | |
| global extracted_text | |
| return answer_question(extracted_text, question) | |
| # Create Gradio interface | |
| with gr.Blocks(title="PDF Processor & Q&A System") as app: | |
| gr.Markdown("# 📄 PDF Processor & Question Answering System") | |
| gr.Markdown("Upload a PDF file to extract text and ask questions about its content.") | |
| with gr.Tab("PDF Processing"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
| process_btn = gr.Button("Process PDF", variant="primary") | |
| with gr.Column(): | |
| status_output = gr.Textbox(label="Status", lines=2) | |
| info_output = gr.Textbox(label="File Information", lines=4) | |
| summary_output = gr.Textbox(label="Document Summary", lines=8) | |
| text_output = gr.Textbox(label="Extracted Text", lines=15, max_lines=20) | |
| with gr.Tab("Question & Answer"): | |
| gr.Markdown("Ask questions about the processed PDF content.") | |
| with gr.Row(): | |
| question_input = gr.Textbox(label="Your Question", placeholder="What is this document about?") | |
| ask_btn = gr.Button("Ask Question", variant="primary") | |
| answer_output = gr.Textbox(label="Answer", lines=8) | |
| # Event handlers | |
| process_btn.click( | |
| fn=process_pdf_file, | |
| inputs=[file_input], | |
| outputs=[status_output, info_output, summary_output, text_output] | |
| ).then( | |
| fn=update_extracted_text, | |
| inputs=[status_output, info_output, summary_output, text_output], | |
| outputs=[status_output, info_output, summary_output, text_output] | |
| ) | |
| ask_btn.click( | |
| fn=qa_interface, | |
| inputs=[question_input], | |
| outputs=[answer_output] | |
| ) | |
| # Example | |
| gr.Examples( | |
| examples=[ | |
| ["What is the main topic of this document?"], | |
| ["What are the key findings?"], | |
| ["Who are the authors?"], | |
| ["What is the conclusion?"] | |
| ], | |
| inputs=[question_input] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |