policy-analysis / utils /ingest_pdf.py
kaburia's picture
rewrite
b022bee
"""Ingestion pipeline to build a page‑level FAISS index with rich metadata.
Features:
- Per page extraction (page_index 0-based, page_label as shown in PDF)
- Optional OCR fallback for blank / low-text pages (scanned PDFs)
- Records include: doc_id, doc_title, page_index, page_label, text,
section_heading (heuristic), span_start/stop (page chars),
has_anchor flags for configured phrases.
- Outputs JSONL + builds FAISS vector store (sentence-transformers/all-MiniLM-L6-v2)
Note: This is a lightweight scaffold; tune heading detection + anchors as needed.
"""
from __future__ import annotations
import os, re, json, uuid
from dataclasses import dataclass, asdict
from typing import List, Dict, Iterable
from pypdf import PdfReader
import pytesseract
from PIL import Image
from io import BytesIO
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
ANCHOR_PHRASES = [
"Specifically these objectives are",
]
HEADING_PATTERN = re.compile(r"^\s*(?:[A-Z][A-Z \-]{3,}|\d+\.[0-9.]*\s+.+)$")
@dataclass
class PageRecord:
doc_id: str
doc_title: str
page_index: int
page_label: str
text: str
section_heading: str
span_start: int
span_stop: int
has_anchors: Dict[str, bool]
source: str # original path
def _extract_page_label(reader, idx: int) -> str:
# Attempt to read logical page label from PDF (if present); fallback to idx+1
try:
return reader.page_labels[idx]
except Exception:
return str(idx + 1)
def _ocr_page(page) -> str:
try:
images = page.images
except Exception:
images = []
texts = []
for img_obj in images:
try:
data = img_obj.data
im = Image.open(BytesIO(data))
txt = pytesseract.image_to_string(im)
if txt.strip():
texts.append(txt)
except Exception:
continue
return "\n".join(texts).strip()
def _heading_from_text(text: str) -> str:
lines = [l.strip() for l in text.splitlines() if l.strip()]
for l in lines[:8]: # inspect first few lines
if HEADING_PATTERN.match(l) and len(l.split()) <= 16:
return l[:120]
return ""
def ingest_pdf(path: str, doc_id: str = None, doc_title: str = None) -> List[PageRecord]:
reader = PdfReader(path)
doc_id = doc_id or uuid.uuid5(uuid.NAMESPACE_URL, path).hex[:12]
doc_title = doc_title or os.path.splitext(os.path.basename(path))[0]
records: List[PageRecord] = []
for i, page in enumerate(reader.pages):
try:
raw = page.extract_text() or ""
except Exception:
raw = ""
if len(raw.strip()) < 20: # fallback to OCR for likely scanned page
raw_ocr = _ocr_page(page)
if len(raw_ocr) > len(raw):
raw = raw_ocr
page_label = _extract_page_label(reader, i)
heading = _heading_from_text(raw)
has_anchors = {a: (a.lower() in raw.lower()) for a in ANCHOR_PHRASES}
rec = PageRecord(
doc_id=doc_id,
doc_title=doc_title,
page_index=i,
page_label=str(page_label),
text=raw,
section_heading=heading,
span_start=0,
span_stop=len(raw),
has_anchors=has_anchors,
source=path,
)
records.append(rec)
return records
def build_vectorstore(records: List[PageRecord], index_dir: str = "faiss_index_new") -> str:
os.makedirs(index_dir, exist_ok=True)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
docs = [Document(page_content=r.text, metadata={
"doc_id": r.doc_id,
"doc_title": r.doc_title,
"page_index": r.page_index,
"page_label": r.page_label,
"section_heading": r.section_heading,
"span_start": r.span_start,
"span_stop": r.span_stop,
"source": r.source,
**{f"anchor_{k}": v for k, v in r.has_anchors.items()}
}) for r in records]
vs = FAISS.from_documents(docs, embeddings)
vs.save_local(index_dir)
# also write JSONL
with open(os.path.join(index_dir, "pages.jsonl"), "w", encoding="utf-8") as f:
for r in records:
f.write(json.dumps(asdict(r), ensure_ascii=False) + "\n")
return index_dir
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("pdf", help="Path to PDF")
ap.add_argument("--doc-id")
ap.add_argument("--doc-title")
ap.add_argument("--out", default="faiss_index_new")
args = ap.parse_args()
recs = ingest_pdf(args.pdf, doc_id=args.doc_id, doc_title=args.doc_title)
build_vectorstore(recs, args.out)
print(f"Ingested {len(recs)} pages -> {args.out}")