|
|
import fitz |
|
|
import re |
|
|
import os |
|
|
from typing import Dict, List, Optional |
|
|
|
|
|
|
|
|
def extract_text_from_pdf(pdf_path: str) -> str: |
|
|
""" |
|
|
Extract clean text from PDF file |
|
|
|
|
|
Args: |
|
|
pdf_path (str): Path to the PDF file |
|
|
|
|
|
Returns: |
|
|
str: Extracted and cleaned text |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If PDF cannot be opened or processed |
|
|
""" |
|
|
if not pdf_path or not os.path.exists(pdf_path): |
|
|
raise RuntimeError("PDF file not found or path is invalid") |
|
|
|
|
|
try: |
|
|
doc = fitz.open(pdf_path) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to open PDF: {str(e)}") |
|
|
|
|
|
full_text = "" |
|
|
|
|
|
try: |
|
|
total_pages = doc.page_count |
|
|
print(f"π Processing {total_pages} pages...") |
|
|
|
|
|
for page_num in range(total_pages): |
|
|
try: |
|
|
page = doc[page_num] |
|
|
|
|
|
|
|
|
text = page.get_text("text") |
|
|
|
|
|
if text.strip(): |
|
|
|
|
|
cleaned_text = clean_extracted_text(text) |
|
|
|
|
|
|
|
|
if page_num < total_pages - 1: |
|
|
cleaned_text += "\n\n--- PAGE BREAK ---\n\n" |
|
|
|
|
|
full_text += cleaned_text |
|
|
|
|
|
print(f"β
Page {page_num + 1} processed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β οΈ Error processing page {page_num + 1}: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error during text extraction: {str(e)}") |
|
|
|
|
|
finally: |
|
|
doc.close() |
|
|
|
|
|
if not full_text.strip(): |
|
|
raise RuntimeError( |
|
|
"No text found in PDF. The file may contain only images or be corrupted.") |
|
|
|
|
|
return post_process_text(full_text) |
|
|
|
|
|
|
|
|
def extract_text_with_metadata(pdf_path: str) -> Dict: |
|
|
""" |
|
|
Extract text with additional metadata and document info |
|
|
|
|
|
Args: |
|
|
pdf_path (str): Path to the PDF file |
|
|
|
|
|
Returns: |
|
|
dict: Complete extraction results with metadata |
|
|
""" |
|
|
if not pdf_path or not os.path.exists(pdf_path): |
|
|
raise RuntimeError("PDF file not found or path is invalid") |
|
|
|
|
|
try: |
|
|
doc = fitz.open(pdf_path) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to open PDF: {str(e)}") |
|
|
|
|
|
full_text = "" |
|
|
page_texts = [] |
|
|
|
|
|
try: |
|
|
total_pages = doc.page_count |
|
|
print(f"π Processing {total_pages} pages with metadata...") |
|
|
|
|
|
|
|
|
metadata = doc.metadata |
|
|
|
|
|
|
|
|
for page_num in range(total_pages): |
|
|
try: |
|
|
page = doc[page_num] |
|
|
text = page.get_text("text") |
|
|
|
|
|
if text.strip(): |
|
|
cleaned_text = clean_extracted_text(text) |
|
|
page_texts.append(cleaned_text) |
|
|
|
|
|
if page_num < total_pages - 1: |
|
|
cleaned_text += "\n\n--- PAGE BREAK ---\n\n" |
|
|
|
|
|
full_text += cleaned_text |
|
|
else: |
|
|
page_texts.append("") |
|
|
|
|
|
print(f"β
Page {page_num + 1} processed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β οΈ Error processing page {page_num + 1}: {e}") |
|
|
page_texts.append("") |
|
|
continue |
|
|
|
|
|
result = { |
|
|
'full_text': post_process_text(full_text), |
|
|
'page_texts': page_texts, |
|
|
'page_count': total_pages, |
|
|
'metadata': clean_metadata(metadata), |
|
|
'file_info': { |
|
|
'file_path': pdf_path, |
|
|
'file_size': os.path.getsize(pdf_path) if os.path.exists(pdf_path) else 0 |
|
|
} |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error during extraction with metadata: {str(e)}") |
|
|
|
|
|
finally: |
|
|
doc.close() |
|
|
|
|
|
|
|
|
def clean_extracted_text(text: str) -> str: |
|
|
""" |
|
|
Clean raw extracted text from PDF artifacts |
|
|
|
|
|
Args: |
|
|
text (str): Raw text from PDF |
|
|
|
|
|
Returns: |
|
|
str: Cleaned text |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
|
|
|
text = text.replace('\f', '') |
|
|
|
|
|
|
|
|
text = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[ \t]+', ' ', text) |
|
|
text = re.sub(r'\n[ \t]+', '\n', text) |
|
|
text = re.sub(r'[ \t]+\n', '\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\r\n?', '\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error cleaning text: {e}") |
|
|
return text.strip() if text else "" |
|
|
|
|
|
|
|
|
def post_process_text(text: str) -> str: |
|
|
""" |
|
|
Final post-processing of extracted text |
|
|
|
|
|
Args: |
|
|
text (str): Text to post-process |
|
|
|
|
|
Returns: |
|
|
str: Final processed text |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
|
|
|
replacements = { |
|
|
''': "'", # Smart quotes |
|
|
''': "'", |
|
|
'"': '"', |
|
|
'"': '"', |
|
|
'β': '-', |
|
|
'β': '--', |
|
|
'β¦': '...', |
|
|
'\u00a0': ' ', |
|
|
'\u2028': '\n', |
|
|
'\u2029': '\n\n', |
|
|
} |
|
|
|
|
|
for old_char, new_char in replacements.items(): |
|
|
text = text.replace(old_char, new_char) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n[a-zA-Z]\n', '\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n\s*\d{1,3}\s*\n', '\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error in post-processing: {e}") |
|
|
return text.strip() if text else "" |
|
|
|
|
|
|
|
|
def clean_metadata(metadata: dict) -> dict: |
|
|
""" |
|
|
Clean and structure PDF metadata |
|
|
|
|
|
Args: |
|
|
metadata (dict): Raw metadata from PDF |
|
|
|
|
|
Returns: |
|
|
dict: Cleaned metadata |
|
|
""" |
|
|
if not metadata: |
|
|
return {} |
|
|
|
|
|
try: |
|
|
cleaned = {} |
|
|
|
|
|
|
|
|
field_mapping = { |
|
|
'title': 'Title', |
|
|
'author': 'Author', |
|
|
'subject': 'Subject', |
|
|
'creator': 'Creator', |
|
|
'producer': 'Producer', |
|
|
'creationDate': 'Creation Date', |
|
|
'modDate': 'Modification Date' |
|
|
} |
|
|
|
|
|
for key, display_name in field_mapping.items(): |
|
|
value = metadata.get(key, '') |
|
|
if value and isinstance(value, str): |
|
|
|
|
|
value = value.strip() |
|
|
if value and value != 'Unknown': |
|
|
cleaned[display_name] = value |
|
|
|
|
|
return cleaned |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error cleaning metadata: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
def validate_pdf(pdf_path: str) -> bool: |
|
|
""" |
|
|
Validate if the file is a readable PDF |
|
|
|
|
|
Args: |
|
|
pdf_path (str): Path to PDF file |
|
|
|
|
|
Returns: |
|
|
bool: True if valid PDF, False otherwise |
|
|
""" |
|
|
try: |
|
|
if not pdf_path or not os.path.exists(pdf_path): |
|
|
return False |
|
|
|
|
|
|
|
|
if not pdf_path.lower().endswith('.pdf'): |
|
|
return False |
|
|
|
|
|
|
|
|
doc = fitz.open(pdf_path) |
|
|
|
|
|
|
|
|
has_pages = doc.page_count > 0 |
|
|
|
|
|
doc.close() |
|
|
return has_pages |
|
|
|
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def get_pdf_info(pdf_path: str) -> dict: |
|
|
""" |
|
|
Get basic information about PDF without extracting text |
|
|
|
|
|
Args: |
|
|
pdf_path (str): Path to PDF file |
|
|
|
|
|
Returns: |
|
|
dict: Basic PDF information |
|
|
""" |
|
|
try: |
|
|
if not validate_pdf(pdf_path): |
|
|
return {'error': 'Invalid PDF file'} |
|
|
|
|
|
doc = fitz.open(pdf_path) |
|
|
|
|
|
info = { |
|
|
'page_count': doc.page_count, |
|
|
'file_size': os.path.getsize(pdf_path), |
|
|
'is_encrypted': doc.needs_pass, |
|
|
'metadata': clean_metadata(doc.metadata) |
|
|
} |
|
|
|
|
|
doc.close() |
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': f'Error getting PDF info: {str(e)}'} |
|
|
|
|
|
|
|
|
def extract_images_info(pdf_path: str) -> List[dict]: |
|
|
""" |
|
|
Extract information about images in the PDF |
|
|
|
|
|
Args: |
|
|
pdf_path (str): Path to PDF file |
|
|
|
|
|
Returns: |
|
|
list: List of image information dictionaries |
|
|
""" |
|
|
try: |
|
|
if not validate_pdf(pdf_path): |
|
|
return [] |
|
|
|
|
|
doc = fitz.open(pdf_path) |
|
|
images_info = [] |
|
|
|
|
|
for page_num in range(doc.page_count): |
|
|
page = doc[page_num] |
|
|
image_list = page.get_images() |
|
|
|
|
|
for img_index, img in enumerate(image_list): |
|
|
img_info = { |
|
|
'page': page_num + 1, |
|
|
'index': img_index, |
|
|
'width': img[2] if len(img) > 2 else None, |
|
|
'height': img[3] if len(img) > 3 else None, |
|
|
} |
|
|
images_info.append(img_info) |
|
|
|
|
|
doc.close() |
|
|
return images_info |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error extracting image info: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_pdf_reader(): |
|
|
"""Test the PDF reader functionality""" |
|
|
print("=== PDF Reader Test ===") |
|
|
|
|
|
|
|
|
test_pdf = "sample.pdf" |
|
|
|
|
|
try: |
|
|
if os.path.exists(test_pdf): |
|
|
print(f"Testing with: {test_pdf}") |
|
|
|
|
|
|
|
|
is_valid = validate_pdf(test_pdf) |
|
|
print(f"Valid PDF: {is_valid}") |
|
|
|
|
|
if is_valid: |
|
|
|
|
|
info = get_pdf_info(test_pdf) |
|
|
print(f"Pages: {info.get('page_count', 'Unknown')}") |
|
|
|
|
|
|
|
|
text = extract_text_from_pdf(test_pdf) |
|
|
print(f"Extracted {len(text)} characters") |
|
|
print(f"First 100 chars: {text[:100]}...") |
|
|
|
|
|
else: |
|
|
print("No test PDF found. Create a 'sample.pdf' to test.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Test failed: {e}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_pdf_reader() |
|
|
|