open-navigator / extraction /universal_extractor.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
#!/usr/bin/env python3
"""
Universal document text extractor for government documents.
Handles: PDF, PowerPoint, Word, Excel, HTML, Images (OCR)
Usage:
from extraction.universal_extractor import UniversalDocumentExtractor
extractor = UniversalDocumentExtractor()
result = extractor.extract_from_url("https://example.com/agenda.pdf")
print(result['text'])
"""
import io
from pathlib import Path
from typing import Optional, Dict
import httpx
from loguru import logger
# PDF extraction
try:
from PyPDF2 import PdfReader
except ImportError:
PdfReader = None
logger.warning("PDF support disabled. Install: pip install PyPDF2")
try:
import pdfplumber
except ImportError:
pdfplumber = None
logger.debug("pdfplumber not available (optional)")
# PowerPoint extraction
try:
from pptx import Presentation
except ImportError:
Presentation = None
logger.warning("PowerPoint support disabled. Install: pip install python-pptx")
# Word extraction
try:
from docx import Document
except ImportError:
Document = None
logger.warning("Word support disabled. Install: pip install python-docx")
# Excel extraction
try:
import pandas as pd
except ImportError:
pd = None
logger.warning("Excel support disabled. Install: pip install openpyxl pandas")
# HTML extraction
try:
from bs4 import BeautifulSoup
except ImportError:
BeautifulSoup = None
logger.warning("HTML support disabled. Install: pip install beautifulsoup4")
# OCR extraction (for images/scanned PDFs)
try:
import pytesseract
from PIL import Image
except ImportError:
pytesseract = None
Image = None
logger.debug("OCR support disabled (optional). Install: pip install pytesseract pillow")
class UniversalDocumentExtractor:
"""Extract text from any government document format."""
def __init__(self):
"""Initialize extractor with HTTP client."""
self.client = httpx.Client(timeout=30, follow_redirects=True)
def extract_from_url(self, url: str) -> Dict[str, any]:
"""
Download document from URL and extract text.
Args:
url: Document URL
Returns:
Dict with:
- url: Source URL
- format: File format (.pdf, .pptx, etc.)
- text: Extracted text
- file_size_kb: Size in KB
- text_length: Length of extracted text
- success: Whether extraction succeeded
"""
logger.info(f"Downloading: {url}")
try:
# Download file
response = self.client.get(url)
response.raise_for_status()
file_bytes = response.content
# Detect format from URL or Content-Type
file_ext = self._detect_format(url, response.headers.get('content-type', ''))
logger.debug(f"Detected format: {file_ext}")
# Extract based on format
if file_ext == '.pdf':
text = self.extract_pdf(file_bytes)
elif file_ext in ['.ppt', '.pptx']:
text = self.extract_powerpoint(file_bytes)
elif file_ext in ['.doc', '.docx']:
text = self.extract_word(file_bytes)
elif file_ext in ['.xls', '.xlsx']:
text = self.extract_excel(file_bytes)
elif file_ext in ['.html', '.htm']:
text = self.extract_html(file_bytes)
elif file_ext in ['.jpg', '.jpeg', '.png', '.tiff', '.tif']:
text = self.extract_image_ocr(file_bytes)
else:
logger.warning(f"Unknown format: {file_ext}")
text = ""
success = bool(text.strip())
return {
'url': url,
'format': file_ext,
'text': text,
'file_size_kb': len(file_bytes) // 1024,
'text_length': len(text),
'success': success
}
except Exception as e:
logger.error(f"Failed to extract from {url}: {e}")
return {
'url': url,
'format': 'unknown',
'text': '',
'file_size_kb': 0,
'text_length': 0,
'success': False,
'error': str(e)
}
def _detect_format(self, url: str, content_type: str) -> str:
"""Detect document format from URL or Content-Type."""
# Try URL extension first
url_lower = url.lower()
for ext in ['.pdf', '.pptx', '.ppt', '.docx', '.doc', '.xlsx', '.xls',
'.html', '.htm', '.jpg', '.jpeg', '.png', '.tiff', '.tif']:
if ext in url_lower:
return ext
# Try Content-Type
content_type_lower = content_type.lower()
if 'pdf' in content_type_lower:
return '.pdf'
elif 'powerpoint' in content_type_lower or 'presentation' in content_type_lower:
return '.pptx'
elif 'word' in content_type_lower or 'msword' in content_type_lower:
return '.docx'
elif 'excel' in content_type_lower or 'spreadsheet' in content_type_lower:
return '.xlsx'
elif 'html' in content_type_lower:
return '.html'
elif 'image' in content_type_lower:
return '.jpg'
return '.unknown'
def extract_pdf(self, file_bytes: bytes) -> str:
"""Extract text from PDF."""
if PdfReader is None:
logger.error("PyPDF2 not installed")
return ""
try:
# Try PyPDF2 first (faster)
pdf_reader = PdfReader(io.BytesIO(file_bytes))
text = ""
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
# If no text extracted, might be scanned PDF
if not text.strip() and pdfplumber:
logger.info("PDF appears to be scanned, trying pdfplumber...")
with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
text = "\n".join(page.extract_text() or "" for page in pdf.pages)
return text.strip()
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
return ""
def extract_powerpoint(self, file_bytes: bytes) -> str:
"""Extract text from PowerPoint (.ppt, .pptx)."""
if Presentation is None:
logger.error("python-pptx not installed")
return ""
try:
prs = Presentation(io.BytesIO(file_bytes))
text_parts = []
for slide_num, slide in enumerate(prs.slides, 1):
# Extract text from all shapes
slide_text = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
slide_text.append(shape.text)
if slide_text:
text_parts.append(f"=== Slide {slide_num} ===")
text_parts.append("\n".join(slide_text))
text_parts.append("")
# Extract speaker notes if available
if slide.has_notes_slide:
notes = slide.notes_slide.notes_text_frame.text
if notes:
text_parts.append(f"Notes: {notes}")
text_parts.append("")
return "\n".join(text_parts).strip()
except Exception as e:
logger.error(f"PowerPoint extraction failed: {e}")
return ""
def extract_word(self, file_bytes: bytes) -> str:
"""Extract text from Word (.doc, .docx)."""
if Document is None:
logger.error("python-docx not installed")
return ""
try:
doc = Document(io.BytesIO(file_bytes))
text_parts = []
# Extract paragraphs
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text)
# Extract tables
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells)
if row_text.strip():
text_parts.append(row_text)
return "\n".join(text_parts).strip()
except Exception as e:
logger.error(f"Word extraction failed: {e}")
return ""
def extract_excel(self, file_bytes: bytes) -> str:
"""Extract text from Excel (.xls, .xlsx)."""
if pd is None:
logger.error("pandas/openpyxl not installed")
return ""
try:
# Use pandas to read all sheets
excel_file = io.BytesIO(file_bytes)
all_sheets = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl')
text_parts = []
for sheet_name, df in all_sheets.items():
text_parts.append(f"=== Sheet: {sheet_name} ===")
# Convert DataFrame to text
text_parts.append(df.to_string(index=False))
text_parts.append("")
return "\n".join(text_parts).strip()
except Exception as e:
logger.error(f"Excel extraction failed: {e}")
return ""
def extract_html(self, file_bytes: bytes) -> str:
"""Extract text from HTML."""
if BeautifulSoup is None:
logger.error("BeautifulSoup not installed")
return ""
try:
soup = BeautifulSoup(file_bytes, 'html.parser')
# Remove script and style tags
for script in soup(["script", "style", "nav", "header", "footer"]):
script.decompose()
# Get text
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text.strip()
except Exception as e:
logger.error(f"HTML extraction failed: {e}")
return ""
def extract_image_ocr(self, file_bytes: bytes) -> str:
"""Extract text from image using OCR (for scanned documents)."""
if pytesseract is None or Image is None:
logger.error("pytesseract/PIL not installed")
logger.info("Install: pip install pytesseract pillow")
logger.info("Also install tesseract: sudo apt-get install tesseract-ocr")
return ""
try:
image = Image.open(io.BytesIO(file_bytes))
# Run OCR
text = pytesseract.image_to_string(image)
return text.strip()
except Exception as e:
logger.error(f"OCR extraction failed: {e}")
logger.info("Make sure tesseract is installed: sudo apt-get install tesseract-ocr")
return ""
def close(self):
"""Close HTTP client."""
self.client.close()
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
# Example usage and testing
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python universal_extractor.py <url>")
print("\nExample:")
print(" python universal_extractor.py https://example.com/agenda.pdf")
sys.exit(1)
url = sys.argv[1]
with UniversalDocumentExtractor() as extractor:
result = extractor.extract_from_url(url)
print(f"\n{'='*70}")
print(f"URL: {result['url']}")
print(f"Format: {result['format']}")
print(f"File Size: {result['file_size_kb']} KB")
print(f"Text Length: {result['text_length']} characters")
print(f"Success: {result['success']}")
print(f"{'='*70}\n")
if result['success']:
# Show first 500 characters
preview = result['text'][:500]
print("Preview:")
print(preview)
if len(result['text']) > 500:
print("\n... (truncated)")
else:
print(f"Error: {result.get('error', 'Unknown error')}")