import json import os import re import hashlib from pathlib import Path from datetime import datetime RAW_PDF_DIR = Path("data/raw_pdfs") MCP_DIR = Path("mcp") OUT_DIR = Path(os.environ.get("RAG_OUT_DIR", "data/normalized")) SOURCES = Path("sources.json") # -------- PDF extraction -------- def extract_text_pypdf(pdf_path: Path) -> list[str]: from pypdf import PdfReader reader = PdfReader(str(pdf_path)) pages = [] for page in reader.pages: try: pages.append(page.extract_text() or "") except Exception: pages.append("") return pages def extract_text_pdfminer(pdf_path: Path) -> list[str]: from pdfminer.high_level import extract_text text = extract_text(str(pdf_path)) or "" return [text] def extract_pages(pdf_path: Path) -> list[str]: try: pages = extract_text_pypdf(pdf_path) nonempty = sum(1 for p in pages if p.strip()) if nonempty < max(1, len(pages) // 10): return extract_text_pdfminer(pdf_path) return pages except Exception: return extract_text_pdfminer(pdf_path) def sha256_file(p: Path) -> str: h = hashlib.sha256() with p.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() # -------- normalization + chunking -------- HYPHEN_BREAK = re.compile(r"(\w)-\n(\w)") MULTI_NL = re.compile(r"\n{3,}") WS = re.compile(r"[ \t]+") def normalize_text(s: str) -> str: s = s.replace("\r", "\n") s = HYPHEN_BREAK.sub(r"\1\2", s) s = WS.sub(" ", s) s = re.sub(r" *\n *", "\n", s) s = MULTI_NL.sub("\n\n", s) return s.strip() def chunk_text(text: str, target_chars: int = 2400, overlap_chars: int = 300) -> list[str]: paras = [p.strip() for p in text.split("\n\n") if p.strip()] chunks = [] buf = "" for p in paras: if not buf: buf = p elif len(buf) + 2 + len(p) <= target_chars: buf += "\n\n" + p else: chunks.append(buf) tail = buf[-overlap_chars:] if overlap_chars and len(buf) > overlap_chars else "" buf = (tail + "\n\n" + p).strip() if tail else p if buf: chunks.append(buf) # window oversized chunks out = [] for c in chunks: if len(c) <= target_chars * 2: out.append(c) else: step = max(1, target_chars - overlap_chars) for i in range(0, len(c), step): part = c[i:i + target_chars].strip() if part: out.append(part) return out # Best-effort heading split for PDFs SECTION_HEADING = re.compile(r"^(?:[A-Z][A-Z0-9 /,-]{6,}|(?:\d+(?:\.\d+){0,3})\s+[A-Z]).*$") CHAPTER_HEADING = re.compile(r"^(?:CHAPTER\s+\d+|Chapter\s+\d+|\d+\s+CHAPTER)\b") STOPWORDS = { "a","an","and","are","as","at","be","but","by","can","do","does","for","from","how","i","if","in","is","it","of","on","or", "that","the","their","then","there","these","this","to","was","were","what","when","where","which","who","why","with","you","your" } def sentence_split(text: str) -> list[str]: return [s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if s.strip()] def summarize_text(text: str, max_sentences: int = 3, max_chars: int = 800) -> str: sentences = sentence_split(text) summary = " ".join(sentences[:max_sentences]).strip() if len(summary) > max_chars: summary = summary[:max_chars].rsplit(" ", 1)[0].strip() return summary def extract_tags(text: str, title: str | None, section_title: str | None, max_tags: int = 8) -> list[str]: content = " ".join([t for t in [title, section_title, text] if t]) tokens = re.findall(r"[A-Za-z][A-Za-z0-9_]{2,}", content) lowered = [t.lower() for t in tokens if t.lower() not in STOPWORDS] freq = {} for t in lowered: freq[t] = freq.get(t, 0) + 1 keywords = sorted(freq.keys(), key=lambda k: (-freq[k], k))[:max_tags] entities = [] for m in re.findall(r"\b[A-Z][a-zA-Z]+\b(?:\s+[A-Z][a-zA-Z]+\b){0,2}", content): ent = m.strip() if ent.lower() in STOPWORDS: continue if ent not in entities: entities.append(ent) if len(entities) >= max_tags: break tags = [] for k in keywords + entities: if k and k not in tags: tags.append(k) return tags[:max_tags] def build_breadcrumbs(doc_title: str, section_title: str | None) -> str: if section_title: return f"Book: {doc_title} > Section: {section_title}" return f"Book: {doc_title}" def split_by_headings(pages: list[str]) -> list[dict]: blocks = [] current_title = None current = [] start_page = 1 for idx, page in enumerate(pages, start=1): lines = [ln.rstrip() for ln in page.split("\n")] for ln in lines: if SECTION_HEADING.match(ln.strip()) and len(ln.strip()) < 140: if current: blocks.append({ "title": current_title, "text": normalize_text("\n".join(current)), "page_start": start_page, "page_end": idx }) current = [] current_title = ln.strip() start_page = idx else: current.append(ln) if current: blocks.append({ "title": current_title, "text": normalize_text("\n".join(current)), "page_start": start_page, "page_end": len(pages) }) pruned = [b for b in blocks if len(b["text"]) >= 400] return pruned # MCP markdown split: chunk by headings to keep semantics MD_H1 = re.compile(r"(?m)^#\s+") def split_markdown(md: str) -> list[dict]: md = md.strip() if not md: return [] # Split on H1 headings but keep first if no heading if "\n# " not in "\n" + md: return [{"title": None, "text": normalize_text(md)}] blocks = [] current_title = None current = [] for line in md.splitlines(): if line.startswith("# "): if current: blocks.append({"title": current_title, "text": normalize_text("\n".join(current))}) current = [] current_title = line[2:].strip() or None else: current.append(line) if current: blocks.append({"title": current_title, "text": normalize_text("\n".join(current))}) return [b for b in blocks if len(b["text"]) >= 200] def main(): OUT_DIR.mkdir(parents=True, exist_ok=True) sources = json.loads(SOURCES.read_text(encoding="utf-8"))["sources"] out_jsonl = OUT_DIR / "chunks_books.jsonl" out_jsonl.write_text("", encoding="utf-8") manifest = { "generated_at": datetime.utcnow().isoformat() + "Z", "documents": [] } chunk_counter = 0 # Ingest PDFs defined in sources.json for s in sources: if s.get("format") != "pdf": continue pdf_path = RAW_PDF_DIR / s["filename"] if not pdf_path.exists(): print(f"[WARN] Missing PDF: {pdf_path}") continue pages = extract_pages(pdf_path) blocks = split_by_headings(pages) if not blocks: blocks = [] for i, p in enumerate(pages, start=1): t = normalize_text(p) if len(t) >= 400: blocks.append({"title": None, "text": t, "page_start": i, "page_end": i}) manifest["documents"].append({ "id": s["id"], "title": s["title"], "format": "pdf", "filename": s["filename"], "sha256": sha256_file(pdf_path), "blocks": len(blocks), "source_type": "book", "author": s.get("author"), "date": s.get("date") }) for b in blocks: chunks = chunk_text(b["text"], target_chars=2400, overlap_chars=300) section_title = b.get("title") breadcrumbs = build_breadcrumbs(s["title"], section_title) summary = summarize_text(b["text"]) summary_level = "chapter" if section_title and CHAPTER_HEADING.search(section_title) else "section" summary_tags = extract_tags(summary, s["title"], section_title) summary_rec = { "chunk_id": f"{s['id']}::summary::{chunk_counter + 1:06d}", "doc_id": s["id"], "doc_title": s["title"], "title": s["title"], "author": s.get("author"), "date": s.get("date"), "source_type": "book", "format": "pdf", "section_title": section_title, "page_start": b.get("page_start"), "page_end": b.get("page_end"), "breadcrumbs": breadcrumbs, "chunk_type": "summary", "summary_level": summary_level, "priority": 3, "tags": summary_tags, "url": None, "text": f"Breadcrumbs: {breadcrumbs}\nSummary ({summary_level}): {summary}" } if summary: chunk_counter += 1 with out_jsonl.open("a", encoding="utf-8") as f: f.write(json.dumps(summary_rec, ensure_ascii=False) + "\n") for c in chunks: chunk_counter += 1 tags = extract_tags(c, s["title"], section_title) rec = { "chunk_id": f"{s['id']}::{chunk_counter:06d}", "doc_id": s["id"], "doc_title": s["title"], "title": s["title"], "author": s.get("author"), "date": s.get("date"), "source_type": "book", "format": "pdf", "section_title": section_title, "page_start": b.get("page_start"), "page_end": b.get("page_end"), "breadcrumbs": breadcrumbs, "chunk_type": "section", "priority": 2, "tags": tags, "url": None, "text": f"Breadcrumbs: {breadcrumbs}\n{c}" } with out_jsonl.open("a", encoding="utf-8") as f: f.write(json.dumps(rec, ensure_ascii=False) + "\n") print(f"[OK] {s['id']}: {len(blocks)} blocks") # Ingest MCP markdown files if MCP_DIR.exists(): for md_path in sorted(MCP_DIR.glob("*.md")): md_text = md_path.read_text(encoding="utf-8", errors="ignore") blocks = split_markdown(md_text) doc_id = f"mcp::{md_path.stem}" manifest["documents"].append({ "id": doc_id, "title": f"MCP - {md_path.name}", "format": "markdown", "filename": str(md_path), "blocks": len(blocks), "source_type": "mcp", "author": None, "date": None }) for b in blocks: chunks = chunk_text(b["text"], target_chars=1600, overlap_chars=120) section_title = b.get("title") breadcrumbs = f"MCP: {md_path.name}" + (f" > Section: {section_title}" if section_title else "") for c in chunks: chunk_counter += 1 tags = extract_tags(c, f"MCP - {md_path.name}", section_title) rec = { "chunk_id": f"{doc_id}::{chunk_counter:06d}", "doc_id": doc_id, "doc_title": f"MCP - {md_path.name}", "title": f"MCP - {md_path.name}", "author": None, "date": None, "source_type": "mcp", "format": "markdown", "section_title": section_title, "page_start": None, "page_end": None, "breadcrumbs": breadcrumbs, "chunk_type": "section", "priority": 2, "tags": tags, "url": None, "text": f"Breadcrumbs: {breadcrumbs}\n{c}" } with out_jsonl.open("a", encoding="utf-8") as f: f.write(json.dumps(rec, ensure_ascii=False) + "\n") print(f"[OK] MCP: ingested markdown from {MCP_DIR}") (OUT_DIR / "manifest_books.json").write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\nDone: {out_jsonl} and {OUT_DIR/'manifest_books.json'}") if __name__ == "__main__": main()