TomRiddlesDiary / preprocessing.py
IshantSingh94's picture
feat: typewriter streaming, font fix, resume detection
ffd01f3
import os
import re
import json
import torch
import fitz
from tqdm import tqdm
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from config import CHUNK_SIZE, CHUNK_OVERLAP, PSEUDO_CHAPTER_PAGE_SPAN
_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
_ocr_model = ocr_predictor(pretrained=True).to(_device)
CHAPTER_PATTERNS = [
r"^\s*chapter\s+(\d+|[ivxlcdm]+)\b",
r"^\s*chapter\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve)\s*[:.\-]?\s*(.{3,120})?$",
r"^\s*book\s+(\d+|[ivxlcdm]+|one|two|three|four|five|six|seven|eight|nine|ten)\s*[:.\-]?\s*(.{3,120})?$",
]
WORD_NUMBERS = {
"one": "1", "two": "2", "three": "3", "four": "4", "five": "5", "six": "6",
"seven": "7", "eight": "8", "nine": "9", "ten": "10", "eleven": "11", "twelve": "12"
}
ROMAN_MAP = {
'i': 1, 'v': 5, 'x': 10, 'l': 50, 'c': 100, 'd': 500, 'm': 1000
}
def _roman_to_int(s: str):
s = s.lower()
if not s or any(ch not in ROMAN_MAP for ch in s):
return None
total = 0
prev = 0
for ch in reversed(s):
val = ROMAN_MAP[ch]
if val < prev:
total -= val
else:
total += val
prev = val
return total
def _normalize_chapter_id(raw: str) -> str:
if not raw:
return ""
val = raw.strip().lower()
if val in WORD_NUMBERS:
return WORD_NUMBERS[val]
if val.isdigit():
return str(int(val))
roman = _roman_to_int(val)
if roman is not None and 0 < roman <= 200:
return str(roman)
return val
def _is_scanned_pdf(filepath: str, sample_pages: int = 3) -> bool:
doc = fitz.open(filepath)
for i, page in enumerate(doc):
if i >= sample_pages:
break
if len(page.get_text().strip()) > 50:
return False
return True
def _detect_document_type(first_pages_text: str) -> str:
text = first_pages_text.lower()
paper_markers = [
"abstract", "introduction", "references", "related work",
"experiments", "conclusion", "cvpr", "ieee", "arxiv"
]
paper_score = sum(1 for marker in paper_markers if marker in text)
if paper_score >= 2:
return "paper"
resume_markers = [
"resume", "curriculum vitae", "work experience", "employment history",
"education", "skills", "certifications", "objective", "summary",
"projects", "achievements", "linkedin", "github"
]
resume_score = sum(1 for marker in resume_markers if marker in text)
if resume_score >= 3:
return "resume"
return "book"
def _extract_digital_pages(filepath: str) -> list[str]:
doc = fitz.open(filepath)
return [page.get_text() for page in tqdm(doc, desc="Reading digital PDF", unit="page")]
def _extract_scanned_pages(filepath: str) -> list[str]:
pdf_pages = DocumentFile.from_pdf(filepath)
page_texts = []
for page in tqdm(pdf_pages, desc="OCR scanned PDF", unit="page"):
result = _ocr_model([page])
page_texts.append(result.render())
return page_texts
def _infer_section_hint(page_text: str, page_num: int, document_type: str) -> str:
text = page_text.lower()[:2500]
if document_type == "paper":
heading_patterns = {
"abstract": r"\babstract\b",
"introduction": r"\bintroduction\b",
"related_work": r"\brelated work\b",
"method": r"\bmethod|methods\b",
"experiments": r"\bexperiments?|results\b",
"conclusion": r"\bconclusion|discussion\b",
"references": r"\breferences\b",
}
for label, pattern in heading_patterns.items():
if re.search(pattern, text, re.IGNORECASE):
return label
if page_num == 1 and "abstract" in text:
return "abstract"
return ""
def _heading_confidence(line: str, line_index: int) -> int:
score = 0
clean = line.strip()
if line_index <= 5:
score += 3
elif line_index <= 12:
score += 1
if 3 <= len(clean) <= 120:
score += 2
if clean.isupper():
score += 2
if len(clean.split()) <= 12:
score += 1
if any(token in clean.lower() for token in ["chapter", "book"]):
score += 4
return score
def _detect_chapter_start(page_text: str):
lines = [line.strip() for line in page_text.splitlines() if line.strip()]
candidates = []
for idx, line in enumerate(lines[:20]):
clean = line.strip()
for pattern in CHAPTER_PATTERNS:
match = re.match(pattern, clean, re.IGNORECASE)
if not match:
continue
groups = match.groups()
raw_id = groups[0] if groups else ""
title = groups[1].strip() if len(groups) > 1 and groups[1] else ""
chapter_id = _normalize_chapter_id(raw_id)
if not chapter_id:
continue
if chapter_id.isdigit() and int(chapter_id) > 50:
continue
if title and (len(title) < 3 or len(title) > 120):
continue
if clean.isdigit():
continue
score = _heading_confidence(clean, idx)
if score < 7:
continue
candidates.append((score, chapter_id, title, clean))
if not candidates:
return None, None
candidates.sort(reverse=True, key=lambda x: x[0])
_, chapter_id, title, _ = candidates[0]
return chapter_id, title
def _build_pseudo_chapter_map(num_pages: int) -> dict:
chapter_map = {}
chapter_num = 1
for start in range(1, num_pages + 1, PSEUDO_CHAPTER_PAGE_SPAN):
end = min(start + PSEUDO_CHAPTER_PAGE_SPAN - 1, num_pages)
chapter_map[str(chapter_num)] = {
"title": f"Pseudo Chapter {chapter_num}",
"start_page": start,
"end_page": end,
"pseudo": True,
}
chapter_num += 1
return chapter_map
def _build_chapter_map(pages: list[str]) -> dict:
chapter_map = {}
current = None
last_chapter_num = 0
for page_num, text in enumerate(pages, start=1):
chapter_id, chapter_title = _detect_chapter_start(text)
if chapter_id and chapter_id.isdigit():
chapter_num = int(chapter_id)
if chapter_num < 1 or chapter_num > 50:
continue
if last_chapter_num and chapter_num > last_chapter_num + 3:
continue
if current is not None:
chapter_map[current]["end_page"] = page_num - 1
if chapter_id not in chapter_map:
chapter_map[chapter_id] = {
"title": chapter_title,
"start_page": page_num,
"end_page": len(pages),
"pseudo": False,
}
current = chapter_id
last_chapter_num = chapter_num
if len(chapter_map) < 3:
return _build_pseudo_chapter_map(len(pages))
return chapter_map
def read_doc(path: str):
docs = []
corpus_meta = {}
pdf_files = [f for f in os.listdir(path) if f.endswith(".pdf")]
if not pdf_files:
print("No PDF files found.")
return docs, corpus_meta
for filename in pdf_files:
filepath = os.path.join(path, filename)
print(f"\nProcessing: {filename}")
if _is_scanned_pdf(filepath):
print(" Detected: Scanned PDF → using OCR")
pages = _extract_scanned_pages(filepath)
else:
print(" Detected: Digital PDF → using text extraction")
pages = _extract_digital_pages(filepath)
first_pages_text = "\n".join(pages[:5])
document_type = _detect_document_type(first_pages_text)
chapter_map = _build_chapter_map(pages) if document_type == "book" else {}
corpus_meta[filename] = {
"document_type": document_type,
"page_count": len(pages),
"chapter_map": chapter_map,
}
print(f" Inferred document type: {document_type}")
if chapter_map:
print(f" Detected chapters: {list(chapter_map.keys())[:12]}{'...' if len(chapter_map) > 12 else ''}")
for page_num, text in enumerate(pages, start=1):
if not text or not text.strip():
continue
section_hint = _infer_section_hint(text, page_num, document_type)
chapter_label = ""
for chap, info in chapter_map.items():
if info["start_page"] <= page_num <= info["end_page"]:
chapter_label = chap
break
docs.append(Document(
page_content=text,
metadata={
"source": filename,
"page": page_num,
"document_type": document_type,
"section_hint": section_hint,
"chapter": chapter_label,
}
))
return docs, corpus_meta
def divide_doc(docs: list[Document], chunk_size: int = CHUNK_SIZE, chunk_overlap: int = CHUNK_OVERLAP) -> list[Document]:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
)
return splitter.split_documents(docs)
def save_corpus_meta(corpus_meta: dict, file_path: str = "corpus_meta.json"):
with open(file_path, "w", encoding="utf-8") as f:
json.dump(corpus_meta, f, indent=2)
def load_corpus_meta(file_path: str = "corpus_meta.json") -> dict:
if not os.path.exists(file_path):
return {}
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)