Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import torch | |
| import fitz | |
| from tqdm import tqdm | |
| from doctr.io import DocumentFile | |
| from doctr.models import ocr_predictor | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP, PSEUDO_CHAPTER_PAGE_SPAN | |
| _device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| _ocr_model = ocr_predictor(pretrained=True).to(_device) | |
| CHAPTER_PATTERNS = [ | |
| r"^\s*chapter\s+(\d+|[ivxlcdm]+)\b", | |
| r"^\s*chapter\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve)\s*[:.\-]?\s*(.{3,120})?$", | |
| r"^\s*book\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten)\s*[:.\-]?\s*(.{3,120})?$", | |
| ] | |
| WORD_NUMBERS = { | |
| "one": "1", "two": "2", "three": "3", "four": "4", "five": "5", "six": "6", | |
| "seven": "7", "eight": "8", "nine": "9", "ten": "10", "eleven": "11", "twelve": "12" | |
| } | |
| ROMAN_MAP = { | |
| 'i': 1, 'v': 5, 'x': 10, 'l': 50, 'c': 100, 'd': 500, 'm': 1000 | |
| } | |
| def _roman_to_int(s: str): | |
| s = s.lower() | |
| if not s or any(ch not in ROMAN_MAP for ch in s): | |
| return None | |
| total = 0 | |
| prev = 0 | |
| for ch in reversed(s): | |
| val = ROMAN_MAP[ch] | |
| if val < prev: | |
| total -= val | |
| else: | |
| total += val | |
| prev = val | |
| return total | |
| def _normalize_chapter_id(raw: str) -> str: | |
| if not raw: | |
| return "" | |
| val = raw.strip().lower() | |
| if val in WORD_NUMBERS: | |
| return WORD_NUMBERS[val] | |
| if val.isdigit(): | |
| return str(int(val)) | |
| roman = _roman_to_int(val) | |
| if roman is not None and 0 < roman <= 200: | |
| return str(roman) | |
| return val | |
| def _is_scanned_pdf(filepath: str, sample_pages: int = 3) -> bool: | |
| doc = fitz.open(filepath) | |
| for i, page in enumerate(doc): | |
| if i >= sample_pages: | |
| break | |
| if len(page.get_text().strip()) > 50: | |
| return False | |
| return True | |
| def _detect_document_type(first_pages_text: str) -> str: | |
| text = first_pages_text.lower() | |
| paper_markers = [ | |
| "abstract", "introduction", "references", "related work", | |
| "experiments", "conclusion", "cvpr", "ieee", "arxiv" | |
| ] | |
| paper_score = sum(1 for marker in paper_markers if marker in text) | |
| if paper_score >= 2: | |
| return "paper" | |
| resume_markers = [ | |
| "resume", "curriculum vitae", "work experience", "employment history", | |
| "education", "skills", "certifications", "objective", "summary", | |
| "projects", "achievements", "linkedin", "github" | |
| ] | |
| resume_score = sum(1 for marker in resume_markers if marker in text) | |
| if resume_score >= 3: | |
| return "resume" | |
| return "book" | |
| def _extract_digital_pages(filepath: str) -> list[str]: | |
| doc = fitz.open(filepath) | |
| return [page.get_text() for page in tqdm(doc, desc="Reading digital PDF", unit="page")] | |
| def _extract_scanned_pages(filepath: str) -> list[str]: | |
| pdf_pages = DocumentFile.from_pdf(filepath) | |
| page_texts = [] | |
| for page in tqdm(pdf_pages, desc="OCR scanned PDF", unit="page"): | |
| result = _ocr_model([page]) | |
| page_texts.append(result.render()) | |
| return page_texts | |
| def _infer_section_hint(page_text: str, page_num: int, document_type: str) -> str: | |
| text = page_text.lower()[:2500] | |
| if document_type == "paper": | |
| heading_patterns = { | |
| "abstract": r"\babstract\b", | |
| "introduction": r"\bintroduction\b", | |
| "related_work": r"\brelated work\b", | |
| "method": r"\bmethod|methods\b", | |
| "experiments": r"\bexperiments?|results\b", | |
| "conclusion": r"\bconclusion|discussion\b", | |
| "references": r"\breferences\b", | |
| } | |
| for label, pattern in heading_patterns.items(): | |
| if re.search(pattern, text, re.IGNORECASE): | |
| return label | |
| if page_num == 1 and "abstract" in text: | |
| return "abstract" | |
| return "" | |
| def _heading_confidence(line: str, line_index: int) -> int: | |
| score = 0 | |
| clean = line.strip() | |
| if line_index <= 5: | |
| score += 3 | |
| elif line_index <= 12: | |
| score += 1 | |
| if 3 <= len(clean) <= 120: | |
| score += 2 | |
| if clean.isupper(): | |
| score += 2 | |
| if len(clean.split()) <= 12: | |
| score += 1 | |
| if any(token in clean.lower() for token in ["chapter", "book"]): | |
| score += 4 | |
| return score | |
| def _detect_chapter_start(page_text: str): | |
| lines = [line.strip() for line in page_text.splitlines() if line.strip()] | |
| candidates = [] | |
| for idx, line in enumerate(lines[:20]): | |
| clean = line.strip() | |
| for pattern in CHAPTER_PATTERNS: | |
| match = re.match(pattern, clean, re.IGNORECASE) | |
| if not match: | |
| continue | |
| groups = match.groups() | |
| raw_id = groups[0] if groups else "" | |
| title = groups[1].strip() if len(groups) > 1 and groups[1] else "" | |
| chapter_id = _normalize_chapter_id(raw_id) | |
| if not chapter_id: | |
| continue | |
| if chapter_id.isdigit() and int(chapter_id) > 50: | |
| continue | |
| if title and (len(title) < 3 or len(title) > 120): | |
| continue | |
| if clean.isdigit(): | |
| continue | |
| score = _heading_confidence(clean, idx) | |
| if score < 7: | |
| continue | |
| candidates.append((score, chapter_id, title, clean)) | |
| if not candidates: | |
| return None, None | |
| candidates.sort(reverse=True, key=lambda x: x[0]) | |
| _, chapter_id, title, _ = candidates[0] | |
| return chapter_id, title | |
| def _build_pseudo_chapter_map(num_pages: int) -> dict: | |
| chapter_map = {} | |
| chapter_num = 1 | |
| for start in range(1, num_pages + 1, PSEUDO_CHAPTER_PAGE_SPAN): | |
| end = min(start + PSEUDO_CHAPTER_PAGE_SPAN - 1, num_pages) | |
| chapter_map[str(chapter_num)] = { | |
| "title": f"Pseudo Chapter {chapter_num}", | |
| "start_page": start, | |
| "end_page": end, | |
| "pseudo": True, | |
| } | |
| chapter_num += 1 | |
| return chapter_map | |
| def _build_chapter_map(pages: list[str]) -> dict: | |
| chapter_map = {} | |
| current = None | |
| last_chapter_num = 0 | |
| for page_num, text in enumerate(pages, start=1): | |
| chapter_id, chapter_title = _detect_chapter_start(text) | |
| if chapter_id and chapter_id.isdigit(): | |
| chapter_num = int(chapter_id) | |
| if chapter_num < 1 or chapter_num > 50: | |
| continue | |
| if last_chapter_num and chapter_num > last_chapter_num + 3: | |
| continue | |
| if current is not None: | |
| chapter_map[current]["end_page"] = page_num - 1 | |
| if chapter_id not in chapter_map: | |
| chapter_map[chapter_id] = { | |
| "title": chapter_title, | |
| "start_page": page_num, | |
| "end_page": len(pages), | |
| "pseudo": False, | |
| } | |
| current = chapter_id | |
| last_chapter_num = chapter_num | |
| if len(chapter_map) < 3: | |
| return _build_pseudo_chapter_map(len(pages)) | |
| return chapter_map | |
| def read_doc(path: str): | |
| docs = [] | |
| corpus_meta = {} | |
| pdf_files = [f for f in os.listdir(path) if f.endswith(".pdf")] | |
| if not pdf_files: | |
| print("No PDF files found.") | |
| return docs, corpus_meta | |
| for filename in pdf_files: | |
| filepath = os.path.join(path, filename) | |
| print(f"\nProcessing: {filename}") | |
| if _is_scanned_pdf(filepath): | |
| print(" Detected: Scanned PDF → using OCR") | |
| pages = _extract_scanned_pages(filepath) | |
| else: | |
| print(" Detected: Digital PDF → using text extraction") | |
| pages = _extract_digital_pages(filepath) | |
| first_pages_text = "\n".join(pages[:5]) | |
| document_type = _detect_document_type(first_pages_text) | |
| chapter_map = _build_chapter_map(pages) if document_type == "book" else {} | |
| corpus_meta[filename] = { | |
| "document_type": document_type, | |
| "page_count": len(pages), | |
| "chapter_map": chapter_map, | |
| } | |
| print(f" Inferred document type: {document_type}") | |
| if chapter_map: | |
| print(f" Detected chapters: {list(chapter_map.keys())[:12]}{'...' if len(chapter_map) > 12 else ''}") | |
| for page_num, text in enumerate(pages, start=1): | |
| if not text or not text.strip(): | |
| continue | |
| section_hint = _infer_section_hint(text, page_num, document_type) | |
| chapter_label = "" | |
| for chap, info in chapter_map.items(): | |
| if info["start_page"] <= page_num <= info["end_page"]: | |
| chapter_label = chap | |
| break | |
| docs.append(Document( | |
| page_content=text, | |
| metadata={ | |
| "source": filename, | |
| "page": page_num, | |
| "document_type": document_type, | |
| "section_hint": section_hint, | |
| "chapter": chapter_label, | |
| } | |
| )) | |
| return docs, corpus_meta | |
| def divide_doc(docs: list[Document], chunk_size: int = CHUNK_SIZE, chunk_overlap: int = CHUNK_OVERLAP) -> list[Document]: | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separators=["\n\n", "\n", ". ", " ", ""], | |
| ) | |
| return splitter.split_documents(docs) | |
| def save_corpus_meta(corpus_meta: dict, file_path: str = "corpus_meta.json"): | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(corpus_meta, f, indent=2) | |
| def load_corpus_meta(file_path: str = "corpus_meta.json") -> dict: | |
| if not os.path.exists(file_path): | |
| return {} | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return json.load(f) |