import os import re import json import torch import fitz from tqdm import tqdm from doctr.io import DocumentFile from doctr.models import ocr_predictor from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.documents import Document from config import CHUNK_SIZE, CHUNK_OVERLAP, PSEUDO_CHAPTER_PAGE_SPAN _device = torch.device("cuda" if torch.cuda.is_available() else "cpu") _ocr_model = ocr_predictor(pretrained=True).to(_device) CHAPTER_PATTERNS = [ r"^\s*chapter\s+(\d+|[ivxlcdm]+)\b", r"^\s*chapter\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve)\s*[:.\-]?\s*(.{3,120})?$", r"^\s*book\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten)\s*[:.\-]?\s*(.{3,120})?$", ] WORD_NUMBERS = { "one": "1", "two": "2", "three": "3", "four": "4", "five": "5", "six": "6", "seven": "7", "eight": "8", "nine": "9", "ten": "10", "eleven": "11", "twelve": "12" } ROMAN_MAP = { 'i': 1, 'v': 5, 'x': 10, 'l': 50, 'c': 100, 'd': 500, 'm': 1000 } def _roman_to_int(s: str): s = s.lower() if not s or any(ch not in ROMAN_MAP for ch in s): return None total = 0 prev = 0 for ch in reversed(s): val = ROMAN_MAP[ch] if val < prev: total -= val else: total += val prev = val return total def _normalize_chapter_id(raw: str) -> str: if not raw: return "" val = raw.strip().lower() if val in WORD_NUMBERS: return WORD_NUMBERS[val] if val.isdigit(): return str(int(val)) roman = _roman_to_int(val) if roman is not None and 0 < roman <= 200: return str(roman) return val def _is_scanned_pdf(filepath: str, sample_pages: int = 3) -> bool: doc = fitz.open(filepath) for i, page in enumerate(doc): if i >= sample_pages: break if len(page.get_text().strip()) > 50: return False return True def _detect_document_type(first_pages_text: str) -> str: text = first_pages_text.lower() paper_markers = [ "abstract", "introduction", "references", "related work", "experiments", "conclusion", "cvpr", "ieee", "arxiv" ] paper_score = sum(1 for marker in paper_markers if marker in text) if paper_score >= 2: return "paper" resume_markers = [ "resume", "curriculum vitae", "work experience", "employment history", "education", "skills", "certifications", "objective", "summary", "projects", "achievements", "linkedin", "github" ] resume_score = sum(1 for marker in resume_markers if marker in text) if resume_score >= 3: return "resume" return "book" def _extract_digital_pages(filepath: str) -> list[str]: doc = fitz.open(filepath) return [page.get_text() for page in tqdm(doc, desc="Reading digital PDF", unit="page")] def _extract_scanned_pages(filepath: str) -> list[str]: pdf_pages = DocumentFile.from_pdf(filepath) page_texts = [] for page in tqdm(pdf_pages, desc="OCR scanned PDF", unit="page"): result = _ocr_model([page]) page_texts.append(result.render()) return page_texts def _infer_section_hint(page_text: str, page_num: int, document_type: str) -> str: text = page_text.lower()[:2500] if document_type == "paper": heading_patterns = { "abstract": r"\babstract\b", "introduction": r"\bintroduction\b", "related_work": r"\brelated work\b", "method": r"\bmethod|methods\b", "experiments": r"\bexperiments?|results\b", "conclusion": r"\bconclusion|discussion\b", "references": r"\breferences\b", } for label, pattern in heading_patterns.items(): if re.search(pattern, text, re.IGNORECASE): return label if page_num == 1 and "abstract" in text: return "abstract" return "" def _heading_confidence(line: str, line_index: int) -> int: score = 0 clean = line.strip() if line_index <= 5: score += 3 elif line_index <= 12: score += 1 if 3 <= len(clean) <= 120: score += 2 if clean.isupper(): score += 2 if len(clean.split()) <= 12: score += 1 if any(token in clean.lower() for token in ["chapter", "book"]): score += 4 return score def _detect_chapter_start(page_text: str): lines = [line.strip() for line in page_text.splitlines() if line.strip()] candidates = [] for idx, line in enumerate(lines[:20]): clean = line.strip() for pattern in CHAPTER_PATTERNS: match = re.match(pattern, clean, re.IGNORECASE) if not match: continue groups = match.groups() raw_id = groups[0] if groups else "" title = groups[1].strip() if len(groups) > 1 and groups[1] else "" chapter_id = _normalize_chapter_id(raw_id) if not chapter_id: continue if chapter_id.isdigit() and int(chapter_id) > 50: continue if title and (len(title) < 3 or len(title) > 120): continue if clean.isdigit(): continue score = _heading_confidence(clean, idx) if score < 7: continue candidates.append((score, chapter_id, title, clean)) if not candidates: return None, None candidates.sort(reverse=True, key=lambda x: x[0]) _, chapter_id, title, _ = candidates[0] return chapter_id, title def _build_pseudo_chapter_map(num_pages: int) -> dict: chapter_map = {} chapter_num = 1 for start in range(1, num_pages + 1, PSEUDO_CHAPTER_PAGE_SPAN): end = min(start + PSEUDO_CHAPTER_PAGE_SPAN - 1, num_pages) chapter_map[str(chapter_num)] = { "title": f"Pseudo Chapter {chapter_num}", "start_page": start, "end_page": end, "pseudo": True, } chapter_num += 1 return chapter_map def _build_chapter_map(pages: list[str]) -> dict: chapter_map = {} current = None last_chapter_num = 0 for page_num, text in enumerate(pages, start=1): chapter_id, chapter_title = _detect_chapter_start(text) if chapter_id and chapter_id.isdigit(): chapter_num = int(chapter_id) if chapter_num < 1 or chapter_num > 50: continue if last_chapter_num and chapter_num > last_chapter_num + 3: continue if current is not None: chapter_map[current]["end_page"] = page_num - 1 if chapter_id not in chapter_map: chapter_map[chapter_id] = { "title": chapter_title, "start_page": page_num, "end_page": len(pages), "pseudo": False, } current = chapter_id last_chapter_num = chapter_num if len(chapter_map) < 3: return _build_pseudo_chapter_map(len(pages)) return chapter_map def read_doc(path: str): docs = [] corpus_meta = {} pdf_files = [f for f in os.listdir(path) if f.endswith(".pdf")] if not pdf_files: print("No PDF files found.") return docs, corpus_meta for filename in pdf_files: filepath = os.path.join(path, filename) print(f"\nProcessing: {filename}") if _is_scanned_pdf(filepath): print(" Detected: Scanned PDF → using OCR") pages = _extract_scanned_pages(filepath) else: print(" Detected: Digital PDF → using text extraction") pages = _extract_digital_pages(filepath) first_pages_text = "\n".join(pages[:5]) document_type = _detect_document_type(first_pages_text) chapter_map = _build_chapter_map(pages) if document_type == "book" else {} corpus_meta[filename] = { "document_type": document_type, "page_count": len(pages), "chapter_map": chapter_map, } print(f" Inferred document type: {document_type}") if chapter_map: print(f" Detected chapters: {list(chapter_map.keys())[:12]}{'...' if len(chapter_map) > 12 else ''}") for page_num, text in enumerate(pages, start=1): if not text or not text.strip(): continue section_hint = _infer_section_hint(text, page_num, document_type) chapter_label = "" for chap, info in chapter_map.items(): if info["start_page"] <= page_num <= info["end_page"]: chapter_label = chap break docs.append(Document( page_content=text, metadata={ "source": filename, "page": page_num, "document_type": document_type, "section_hint": section_hint, "chapter": chapter_label, } )) return docs, corpus_meta def divide_doc(docs: list[Document], chunk_size: int = CHUNK_SIZE, chunk_overlap: int = CHUNK_OVERLAP) -> list[Document]: splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", ". ", " ", ""], ) return splitter.split_documents(docs) def save_corpus_meta(corpus_meta: dict, file_path: str = "corpus_meta.json"): with open(file_path, "w", encoding="utf-8") as f: json.dump(corpus_meta, f, indent=2) def load_corpus_meta(file_path: str = "corpus_meta.json") -> dict: if not os.path.exists(file_path): return {} with open(file_path, "r", encoding="utf-8") as f: return json.load(f)