Spaces:
Running
Running
| """ | |
| Extracts and cleans text from downloaded PDF files. | |
| WHY PYMUPDF (fitz) over alternatives: | |
| Library | Speed | Quality | Handles columns? | |
| ---------------|--------|------------|----------------- | |
| PyMuPDF | Fast | ★★★★★ | Yes (sort=True) | |
| pdfplumber | Medium | ★★★★☆ | Partial | |
| pypdf2 | Medium | ★★★☆☆ | No | |
| pdfminer | Slow | ★★★★☆ | Partial | |
| PyMuPDF's sort=True parameter reads text in natural reading order | |
| (top-to-bottom, left-to-right) which is critical for multi-column | |
| academic papers. | |
| """ | |
| import json | |
| from pathlib import Path | |
| import fitz # PyMuPDF - imported as 'fitz' (legacy name from founder) | |
| from tqdm import tqdm | |
| from src.processing.text_cleaner import clean_text | |
| from src.utils.logger import get_logger | |
| from config.settings import ( | |
| RAW_DIR, | |
| PROCESSED_DIR, | |
| MIN_TEXT_LENGTH, | |
| MAX_TEXT_LENGTH | |
| ) | |
| logger = get_logger(__name__) | |
| class PDFExtractor: | |
| """ | |
| Extracts clean text from PDF files and saves to processed directory. | |
| Output structure for each paper: | |
| data/processed/2301.07041.json ← cleaned text + original metadata | |
| """ | |
| def __init__(self): | |
| self.pdf_dir = RAW_DIR / 'pdfs' | |
| def extract_text_from_pdf(self, pdf_path: Path) -> str: | |
| """ | |
| Extract raw text from a PDF using PyMuPDF. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| Raw extracted text string (not yet cleaned) | |
| HOW PYMUPDF READS PDFS: | |
| PDF is a page-based format. We iterate each page, | |
| extract text with sort=True (reading order), then | |
| join all pages. The 'text' flag tells PyMuPDF to | |
| extract plain text (vs HTML or dict formats). | |
| """ | |
| try: | |
| # Open PDF - fitz.open() handles file reading | |
| doc = fitz.open(str(pdf_path)) | |
| pages_text = [] | |
| for page_num, page in enumerate(doc): | |
| # get_text("text", sort = True) | |
| # "text" -> plain text extraction mode | |
| # sort = True -> respect reading order (critical for columns) | |
| page_text = page.get_text("text", sort = True) | |
| if page_text.strip(): | |
| pages_text.append(page_text) | |
| # Close the document to free memory | |
| doc.close() | |
| # Join all pages with double newline (paragraph seperator) | |
| full_text = '\n\n'.join(pages_text) | |
| return full_text | |
| except Exception as e: | |
| logger.error(f"Failed to extract text from {pdf_path.name}: {e}") | |
| return "" | |
| def validate_extracted_text(self, text: str, paper_id: str) -> tuple[bool, str]: | |
| """ | |
| Validate that extracted text is usable. | |
| Returns: | |
| (is_valid: bool, reason: str) | |
| VALIDATION RULES: | |
| 1. Not empty | |
| 2. Long enough to be a real paper (not a 1-page erratum) | |
| 3. Not too long (might indicate extraction corruption) | |
| 4. Contains alphabetic characters (not just symbols/numbers) | |
| 5. Is primarily English (our embedding model is English-optimized) | |
| """ | |
| if not text: | |
| return False, "Empty text" | |
| if len(text) < MIN_TEXT_LENGTH: | |
| return False, f"Too short: {len(text)} chars < {MIN_TEXT_LENGTH}" | |
| if len(text) > MAX_TEXT_LENGTH: | |
| return False, f"Too long: {len(text)} chars > {MAX_TEXT_LENGTH}" | |
| # Check that text contains substantial alphabetic content | |
| # (not just numbers, equations, or garbled encoding) | |
| alpha_chars = sum(1 for c in text if c.isalpha()) | |
| alpha_ratio = alpha_chars / len(text) | |
| if alpha_ratio < 0.4: | |
| return False, f"Low alphanumeric ration: {alpha_ratio:.2f} (likely encoding issue)" | |
| return True, "Valid" | |
| def process_paper(self, paper_metadata: dict) -> bool: | |
| """ | |
| Full pipeline for one paper: extract -> clean -> validate -> save. | |
| Args: | |
| paper_metadata: dict loaded from data/raw/{paper_id}.json | |
| Returns: | |
| True if processed successfully, False otherwise | |
| """ | |
| paper_id = paper_metadata['paper_id'] | |
| # Skip if already processed (idempotent) | |
| output_path = PROCESSED_DIR / f'{paper_id}.json' | |
| if output_path.exists(): | |
| logger.debug(f"Already processed: {paper_id}") | |
| return True | |
| # Check PDF exists | |
| pdf_path = self.pdf_dir / f"{paper_id}.pdf" | |
| if not pdf_path.exists(): | |
| logger.warning(f"PDF not found for {paper_id}, using abstract only") | |
| # FALLBACK: Use abstract as the text source | |
| # Abstract is short but better than nothing | |
| # This handles cases where PDF download failed | |
| text = paper_metadata.get("abstract", "") | |
| if not text: | |
| return False | |
| else: | |
| # Extract from PDF | |
| raw_text = self.extract_text_from_pdf(pdf_path) | |
| # Clean the text | |
| text = clean_text(raw_text) | |
| # Validate | |
| is_valid, reason = self.validate_extracted_text(text, paper_id) | |
| if not is_valid: | |
| logger.warning(f"Validation failed for {paper_id}: {reason}") | |
| return False | |
| # Build processed document | |
| #--------------------------------------------------------------------------- | |
| # processed_doc = { | |
| # # Copy all original metadata | |
| # **paper_metadata, | |
| # # Add processed text | |
| # "full_text": text, | |
| # "text_length": len(text), | |
| # "word_count": len(text.split()), | |
| # # Update pipeline state | |
| # "text_extracted": True, | |
| # "pdf_downloaded": paper_metadata.get("pdf_downloaded", False), | |
| # } | |
| #--------------------------------------------------------------------------- | |
| primary_cat = paper_metadata.get("primary_category") | |
| if not primary_cat: | |
| cats = paper_metadata.get("categories", []) | |
| primary_cat = cats[0] if cats else "cs.LG" | |
| processed_doc = { | |
| **paper_metadata, | |
| "primary_category": primary_cat, # Override with rescued value | |
| "full_text": text, | |
| "text_length": len(text), | |
| "word_count": len(text.split()), | |
| "text_extracted": True, | |
| "pdf_downloaded": paper_metadata.get("pdf_downloaded", False), | |
| } | |
| # Save to processed directory | |
| with open(output_path, "w", encoding = 'utf-8') as f: | |
| json.dump(processed_doc, f, indent = 2, ensure_ascii = False) | |
| logger.debug( | |
| f"Processed {paper_id}: " | |
| f"{processed_doc['word_count']} words, " | |
| f"{len(text)} chars" | |
| ) | |
| return True | |
| def process_all(self) -> dict: | |
| """ | |
| Process all papers that have been fetched. | |
| Loads metadata from data/raw/, extracts text, | |
| saves results to data/processed/. | |
| """ | |
| # Load all paper metadata from raw directory | |
| raw_files = [ | |
| f for f in RAW_DIR.glob("*.json") | |
| if f.name != "paper_index.json" | |
| ] | |
| logger.info(f"Found {len(raw_files)} papers to process") | |
| successful = 0 | |
| failed = 0 | |
| skipped = 0 | |
| for raw_file in tqdm(raw_files, desc = "Extracting text"): | |
| with open(raw_file, 'r', encoding = 'utf-8') as f: | |
| metadata = json.load(f) | |
| # Skip if already processed | |
| output_path = PROCESSED_DIR / f"{metadata['paper_id']}.json" | |
| if output_path.exists(): | |
| skipped += 1 | |
| continue | |
| success = self.process_paper(metadata) | |
| if success: | |
| successful += 1 | |
| else: | |
| failed += 1 | |
| stats = { | |
| "total": len(raw_files), | |
| "successful": successful, | |
| "failed": failed, | |
| "skipped": skipped, | |
| } | |
| logger.info(f"Processing complete: {stats}") | |
| return stats |