Spaces:
Runtime error
Runtime error
| import PyPDF2 | |
| import pdfplumber | |
| from typing import Dict, List, Optional, Union, Any | |
| import re | |
| import logging | |
| import io | |
| logger = logging.getLogger(__name__) | |
| class PDFProcessor: | |
| """Handles PDF file processing and text extraction""" | |
| def __init__(self): | |
| self.supported_formats = ['.pdf'] | |
| def extract_text_from_pdf(self, pdf_file) -> Dict[str, Any]: | |
| """ | |
| Extract text content from PDF file | |
| Returns structured data with text, metadata, and page information | |
| """ | |
| try: | |
| # Handle bytes input from Gradio | |
| if isinstance(pdf_file, bytes): | |
| pdf_file = io.BytesIO(pdf_file) | |
| # Try pdfplumber first (better for complex layouts) | |
| with pdfplumber.open(pdf_file) as pdf: | |
| text_content = [] | |
| metadata = { | |
| 'total_pages': len(pdf.pages), | |
| 'title': '', | |
| 'author': '', | |
| 'subject': '' | |
| } | |
| # Extract metadata if available | |
| if pdf.metadata: | |
| metadata.update({ | |
| 'title': pdf.metadata.get('Title', ''), | |
| 'author': pdf.metadata.get('Author', ''), | |
| 'subject': pdf.metadata.get('Subject', '') | |
| }) | |
| # Extract text from each page | |
| for page_num, page in enumerate(pdf.pages, 1): | |
| page_text = page.extract_text() | |
| if page_text: | |
| text_content.append({ | |
| 'page_number': page_num, | |
| 'text': self._clean_text(page_text) | |
| }) | |
| combined_text = '\n\n'.join([page['text'] for page in text_content]) | |
| return { | |
| 'success': True, | |
| 'text': combined_text, | |
| 'pages': text_content, | |
| 'metadata': metadata, | |
| 'word_count': len(combined_text.split()), | |
| 'character_count': len(combined_text) | |
| } | |
| except Exception as e: | |
| logger.error(f"pdfplumber extraction failed: {str(e)}") | |
| # Fallback to PyPDF2 | |
| return self._extract_with_pypdf2(pdf_file) | |
| def _extract_with_pypdf2(self, pdf_file) -> Dict[str, Any]: | |
| """Fallback method using PyPDF2""" | |
| try: | |
| # Handle bytes input from Gradio | |
| if isinstance(pdf_file, bytes): | |
| pdf_file = io.BytesIO(pdf_file) | |
| else: | |
| pdf_file.seek(0) # Reset file pointer | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| text_content = [] | |
| metadata = { | |
| 'total_pages': len(reader.pages), | |
| 'title': '', | |
| 'author': '', | |
| 'subject': '' | |
| } | |
| # Extract metadata | |
| if reader.metadata: | |
| metadata.update({ | |
| 'title': reader.metadata.get('/Title', ''), | |
| 'author': reader.metadata.get('/Author', ''), | |
| 'subject': reader.metadata.get('/Subject', '') | |
| }) | |
| # Extract text from each page | |
| for page_num, page in enumerate(reader.pages, 1): | |
| page_text = page.extract_text() | |
| if page_text: | |
| text_content.append({ | |
| 'page_number': page_num, | |
| 'text': self._clean_text(page_text) | |
| }) | |
| combined_text = '\n\n'.join([page['text'] for page in text_content]) | |
| return { | |
| 'success': True, | |
| 'text': combined_text, | |
| 'pages': text_content, | |
| 'metadata': metadata, | |
| 'word_count': len(combined_text.split()), | |
| 'character_count': len(combined_text) | |
| } | |
| except Exception as e: | |
| logger.error(f"PyPDF2 extraction failed: {str(e)}") | |
| return { | |
| 'success': False, | |
| 'error': f"Failed to extract text from PDF: {str(e)}", | |
| 'text': '', | |
| 'pages': [], | |
| 'metadata': {}, | |
| 'word_count': 0, | |
| 'character_count': 0 | |
| } | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and normalize extracted text""" | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove page numbers and headers/footers (common patterns) | |
| text = re.sub(r'\n\d+\n', '\n', text) | |
| # Fix common PDF extraction issues | |
| text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text) # Split concatenated words | |
| text = re.sub(r'(\w)-\n(\w)', r'\1\2', text) # Fix hyphenated words across lines | |
| # Remove excessive line breaks | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| def validate_pdf(self, pdf_file) -> Dict[str, Any]: | |
| """Validate PDF file before processing""" | |
| try: | |
| # Handle bytes input from Gradio | |
| if isinstance(pdf_file, bytes): | |
| file_size = len(pdf_file) | |
| pdf_file = io.BytesIO(pdf_file) | |
| else: | |
| # Check file size (limit to 50MB) | |
| pdf_file.seek(0, 2) # Seek to end | |
| file_size = pdf_file.tell() | |
| pdf_file.seek(0) # Reset to beginning | |
| if file_size > 50 * 1024 * 1024: # 50MB limit | |
| return { | |
| 'valid': False, | |
| 'error': 'File size exceeds 50MB limit' | |
| } | |
| # Try to open the PDF to validate format | |
| try: | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| if len(reader.pages) == 0: | |
| return { | |
| 'valid': False, | |
| 'error': 'PDF contains no pages' | |
| } | |
| pdf_file.seek(0) # Reset file pointer | |
| return { | |
| 'valid': True, | |
| 'pages': len(reader.pages), | |
| 'size_mb': round(file_size / (1024 * 1024), 2) | |
| } | |
| except Exception as e: | |
| return { | |
| 'valid': False, | |
| 'error': f'Invalid PDF format: {str(e)}' | |
| } | |
| except Exception as e: | |
| return { | |
| 'valid': False, | |
| 'error': f'Error validating PDF: {str(e)}' | |
| } | |