Spaces:
Sleeping
Sleeping
| """pipeline/pdf_processor.py — PDF text extraction + OCR fallback.""" | |
| import re | |
| from pathlib import Path | |
| from typing import Generator | |
| from dataclasses import dataclass | |
| import fitz, pdfplumber, pytesseract | |
| from pdf2image import convert_from_path | |
| from loguru import logger | |
| import utils.config as cfg | |
| class Chunk: | |
| source_file: str | |
| page_start: int | |
| page_end: int | |
| text: str | |
| was_ocr: bool = False | |
| def word_count(self): return len(self.text.split()) | |
| class PDFProcessor: | |
| MIN_CHARS = 80 | |
| CHUNK_WORDS = cfg.MAX_TOKENS_PER_CHUNK // 2 | |
| def process(self, pdf_path: Path) -> Generator[Chunk, None, None]: | |
| pages = self._extract_pages(pdf_path) | |
| yield from self._chunk(pages, pdf_path.name) | |
| def _extract_pages(self, path): | |
| mu = {} | |
| try: | |
| doc = fitz.open(str(path)) | |
| for i, pg in enumerate(doc): mu[i+1] = self._clean(pg.get_text("text")) | |
| doc.close() | |
| except: pass | |
| pl = {} | |
| try: | |
| with pdfplumber.open(str(path)) as pdf: | |
| for i, pg in enumerate(pdf.pages): | |
| try: pl[i+1] = self._clean(pg.extract_text() or "") | |
| except: pl[i+1] = "" | |
| except: pass | |
| total = max(len(mu), len(pl), 1) | |
| results = []; ocr_needed = [] | |
| for pnum in range(1, total+1): | |
| best = mu.get(pnum,"") if len(mu.get(pnum,"")) > len(pl.get(pnum,"")) else pl.get(pnum,"") | |
| if len(best) >= self.MIN_CHARS: results.append((pnum, best, False)) | |
| else: results.append((pnum, best, False)); ocr_needed.append(pnum) | |
| if ocr_needed: | |
| ocr = self._ocr(path, ocr_needed) | |
| for i,(pnum,_,_) in enumerate(results): | |
| if pnum in ocr: results[i] = (pnum, ocr[pnum], True) | |
| return results | |
| def _ocr(self, path, pages): | |
| out = {} | |
| try: | |
| imgs = convert_from_path(str(path), dpi=cfg.OCR_DPI, | |
| first_page=min(pages), last_page=max(pages)) | |
| for i, pnum in enumerate(range(min(pages), max(pages)+1)): | |
| if pnum in pages and i < len(imgs): | |
| out[pnum] = self._clean(pytesseract.image_to_string(imgs[i],lang="eng",config="--psm 6")) | |
| except Exception as e: logger.warning(f"OCR: {e}") | |
| return out | |
| def _chunk(self, pages, source): | |
| buf, words, p_start, any_ocr = [], 0, 1, False | |
| for pnum, text, ocr in pages: | |
| if not text: continue | |
| buf.append(text); words += len(text.split()) | |
| if ocr: any_ocr = True | |
| if words >= self.CHUNK_WORDS: | |
| yield Chunk(source, p_start, pnum, "\n\n".join(buf), any_ocr) | |
| buf, words, p_start, any_ocr = [text], len(text.split()), pnum, ocr | |
| if buf: | |
| last = pages[-1][0] if pages else p_start | |
| yield Chunk(source, p_start, last, "\n\n".join(buf), any_ocr) | |
| def _clean(text): | |
| if not text: return "" | |
| text = re.sub(r"(\w)-\n(\w)", r"\1\2", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| text = re.sub(r"[ \t]+", " ", text) | |
| return text.strip() | |