Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Universal document text extractor for government documents. | |
| Handles: PDF, PowerPoint, Word, Excel, HTML, Images (OCR) | |
| Usage: | |
| from extraction.universal_extractor import UniversalDocumentExtractor | |
| extractor = UniversalDocumentExtractor() | |
| result = extractor.extract_from_url("https://example.com/agenda.pdf") | |
| print(result['text']) | |
| """ | |
| import io | |
| from pathlib import Path | |
| from typing import Optional, Dict | |
| import httpx | |
| from loguru import logger | |
| # PDF extraction | |
| try: | |
| from PyPDF2 import PdfReader | |
| except ImportError: | |
| PdfReader = None | |
| logger.warning("PDF support disabled. Install: pip install PyPDF2") | |
| try: | |
| import pdfplumber | |
| except ImportError: | |
| pdfplumber = None | |
| logger.debug("pdfplumber not available (optional)") | |
| # PowerPoint extraction | |
| try: | |
| from pptx import Presentation | |
| except ImportError: | |
| Presentation = None | |
| logger.warning("PowerPoint support disabled. Install: pip install python-pptx") | |
| # Word extraction | |
| try: | |
| from docx import Document | |
| except ImportError: | |
| Document = None | |
| logger.warning("Word support disabled. Install: pip install python-docx") | |
| # Excel extraction | |
| try: | |
| import pandas as pd | |
| except ImportError: | |
| pd = None | |
| logger.warning("Excel support disabled. Install: pip install openpyxl pandas") | |
| # HTML extraction | |
| try: | |
| from bs4 import BeautifulSoup | |
| except ImportError: | |
| BeautifulSoup = None | |
| logger.warning("HTML support disabled. Install: pip install beautifulsoup4") | |
| # OCR extraction (for images/scanned PDFs) | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| except ImportError: | |
| pytesseract = None | |
| Image = None | |
| logger.debug("OCR support disabled (optional). Install: pip install pytesseract pillow") | |
| class UniversalDocumentExtractor: | |
| """Extract text from any government document format.""" | |
| def __init__(self): | |
| """Initialize extractor with HTTP client.""" | |
| self.client = httpx.Client(timeout=30, follow_redirects=True) | |
| def extract_from_url(self, url: str) -> Dict[str, any]: | |
| """ | |
| Download document from URL and extract text. | |
| Args: | |
| url: Document URL | |
| Returns: | |
| Dict with: | |
| - url: Source URL | |
| - format: File format (.pdf, .pptx, etc.) | |
| - text: Extracted text | |
| - file_size_kb: Size in KB | |
| - text_length: Length of extracted text | |
| - success: Whether extraction succeeded | |
| """ | |
| logger.info(f"Downloading: {url}") | |
| try: | |
| # Download file | |
| response = self.client.get(url) | |
| response.raise_for_status() | |
| file_bytes = response.content | |
| # Detect format from URL or Content-Type | |
| file_ext = self._detect_format(url, response.headers.get('content-type', '')) | |
| logger.debug(f"Detected format: {file_ext}") | |
| # Extract based on format | |
| if file_ext == '.pdf': | |
| text = self.extract_pdf(file_bytes) | |
| elif file_ext in ['.ppt', '.pptx']: | |
| text = self.extract_powerpoint(file_bytes) | |
| elif file_ext in ['.doc', '.docx']: | |
| text = self.extract_word(file_bytes) | |
| elif file_ext in ['.xls', '.xlsx']: | |
| text = self.extract_excel(file_bytes) | |
| elif file_ext in ['.html', '.htm']: | |
| text = self.extract_html(file_bytes) | |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.tiff', '.tif']: | |
| text = self.extract_image_ocr(file_bytes) | |
| else: | |
| logger.warning(f"Unknown format: {file_ext}") | |
| text = "" | |
| success = bool(text.strip()) | |
| return { | |
| 'url': url, | |
| 'format': file_ext, | |
| 'text': text, | |
| 'file_size_kb': len(file_bytes) // 1024, | |
| 'text_length': len(text), | |
| 'success': success | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to extract from {url}: {e}") | |
| return { | |
| 'url': url, | |
| 'format': 'unknown', | |
| 'text': '', | |
| 'file_size_kb': 0, | |
| 'text_length': 0, | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _detect_format(self, url: str, content_type: str) -> str: | |
| """Detect document format from URL or Content-Type.""" | |
| # Try URL extension first | |
| url_lower = url.lower() | |
| for ext in ['.pdf', '.pptx', '.ppt', '.docx', '.doc', '.xlsx', '.xls', | |
| '.html', '.htm', '.jpg', '.jpeg', '.png', '.tiff', '.tif']: | |
| if ext in url_lower: | |
| return ext | |
| # Try Content-Type | |
| content_type_lower = content_type.lower() | |
| if 'pdf' in content_type_lower: | |
| return '.pdf' | |
| elif 'powerpoint' in content_type_lower or 'presentation' in content_type_lower: | |
| return '.pptx' | |
| elif 'word' in content_type_lower or 'msword' in content_type_lower: | |
| return '.docx' | |
| elif 'excel' in content_type_lower or 'spreadsheet' in content_type_lower: | |
| return '.xlsx' | |
| elif 'html' in content_type_lower: | |
| return '.html' | |
| elif 'image' in content_type_lower: | |
| return '.jpg' | |
| return '.unknown' | |
| def extract_pdf(self, file_bytes: bytes) -> str: | |
| """Extract text from PDF.""" | |
| if PdfReader is None: | |
| logger.error("PyPDF2 not installed") | |
| return "" | |
| try: | |
| # Try PyPDF2 first (faster) | |
| pdf_reader = PdfReader(io.BytesIO(file_bytes)) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| # If no text extracted, might be scanned PDF | |
| if not text.strip() and pdfplumber: | |
| logger.info("PDF appears to be scanned, trying pdfplumber...") | |
| with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"PDF extraction failed: {e}") | |
| return "" | |
| def extract_powerpoint(self, file_bytes: bytes) -> str: | |
| """Extract text from PowerPoint (.ppt, .pptx).""" | |
| if Presentation is None: | |
| logger.error("python-pptx not installed") | |
| return "" | |
| try: | |
| prs = Presentation(io.BytesIO(file_bytes)) | |
| text_parts = [] | |
| for slide_num, slide in enumerate(prs.slides, 1): | |
| # Extract text from all shapes | |
| slide_text = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| slide_text.append(shape.text) | |
| if slide_text: | |
| text_parts.append(f"=== Slide {slide_num} ===") | |
| text_parts.append("\n".join(slide_text)) | |
| text_parts.append("") | |
| # Extract speaker notes if available | |
| if slide.has_notes_slide: | |
| notes = slide.notes_slide.notes_text_frame.text | |
| if notes: | |
| text_parts.append(f"Notes: {notes}") | |
| text_parts.append("") | |
| return "\n".join(text_parts).strip() | |
| except Exception as e: | |
| logger.error(f"PowerPoint extraction failed: {e}") | |
| return "" | |
| def extract_word(self, file_bytes: bytes) -> str: | |
| """Extract text from Word (.doc, .docx).""" | |
| if Document is None: | |
| logger.error("python-docx not installed") | |
| return "" | |
| try: | |
| doc = Document(io.BytesIO(file_bytes)) | |
| text_parts = [] | |
| # Extract paragraphs | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| text_parts.append(para.text) | |
| # Extract tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text.strip() for cell in row.cells) | |
| if row_text.strip(): | |
| text_parts.append(row_text) | |
| return "\n".join(text_parts).strip() | |
| except Exception as e: | |
| logger.error(f"Word extraction failed: {e}") | |
| return "" | |
| def extract_excel(self, file_bytes: bytes) -> str: | |
| """Extract text from Excel (.xls, .xlsx).""" | |
| if pd is None: | |
| logger.error("pandas/openpyxl not installed") | |
| return "" | |
| try: | |
| # Use pandas to read all sheets | |
| excel_file = io.BytesIO(file_bytes) | |
| all_sheets = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl') | |
| text_parts = [] | |
| for sheet_name, df in all_sheets.items(): | |
| text_parts.append(f"=== Sheet: {sheet_name} ===") | |
| # Convert DataFrame to text | |
| text_parts.append(df.to_string(index=False)) | |
| text_parts.append("") | |
| return "\n".join(text_parts).strip() | |
| except Exception as e: | |
| logger.error(f"Excel extraction failed: {e}") | |
| return "" | |
| def extract_html(self, file_bytes: bytes) -> str: | |
| """Extract text from HTML.""" | |
| if BeautifulSoup is None: | |
| logger.error("BeautifulSoup not installed") | |
| return "" | |
| try: | |
| soup = BeautifulSoup(file_bytes, 'html.parser') | |
| # Remove script and style tags | |
| for script in soup(["script", "style", "nav", "header", "footer"]): | |
| script.decompose() | |
| # Get text | |
| text = soup.get_text() | |
| # Clean up whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"HTML extraction failed: {e}") | |
| return "" | |
| def extract_image_ocr(self, file_bytes: bytes) -> str: | |
| """Extract text from image using OCR (for scanned documents).""" | |
| if pytesseract is None or Image is None: | |
| logger.error("pytesseract/PIL not installed") | |
| logger.info("Install: pip install pytesseract pillow") | |
| logger.info("Also install tesseract: sudo apt-get install tesseract-ocr") | |
| return "" | |
| try: | |
| image = Image.open(io.BytesIO(file_bytes)) | |
| # Run OCR | |
| text = pytesseract.image_to_string(image) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"OCR extraction failed: {e}") | |
| logger.info("Make sure tesseract is installed: sudo apt-get install tesseract-ocr") | |
| return "" | |
| def close(self): | |
| """Close HTTP client.""" | |
| self.client.close() | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| self.close() | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python universal_extractor.py <url>") | |
| print("\nExample:") | |
| print(" python universal_extractor.py https://example.com/agenda.pdf") | |
| sys.exit(1) | |
| url = sys.argv[1] | |
| with UniversalDocumentExtractor() as extractor: | |
| result = extractor.extract_from_url(url) | |
| print(f"\n{'='*70}") | |
| print(f"URL: {result['url']}") | |
| print(f"Format: {result['format']}") | |
| print(f"File Size: {result['file_size_kb']} KB") | |
| print(f"Text Length: {result['text_length']} characters") | |
| print(f"Success: {result['success']}") | |
| print(f"{'='*70}\n") | |
| if result['success']: | |
| # Show first 500 characters | |
| preview = result['text'][:500] | |
| print("Preview:") | |
| print(preview) | |
| if len(result['text']) > 500: | |
| print("\n... (truncated)") | |
| else: | |
| print(f"Error: {result.get('error', 'Unknown error')}") | |