# -*- coding: utf-8 -*- """ pluto/ingest.py — File ingestion: convert uploaded files to corpus Markdown. Supports: .pdf, .docx, .doc, .txt, .md At upload time, also splits into chunks, classifies each chunk, tags it with the target model mode, and registers everything in the DocIndex. """ from __future__ import annotations import re from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from pluto.doc_index import DocIndex def ingest_file( file_path: str | Path, corpus_dir: str | Path, doc_index: "DocIndex | None" = None, ) -> dict: """ Convert a file to Markdown, save in corpus, and register in DocIndex. Returns dict with: {"doc_id": str, "filename": str, "chunks": int, "size": int} """ file_path = Path(file_path) corpus_dir = Path(corpus_dir) corpus_dir.mkdir(parents=True, exist_ok=True) ext = file_path.suffix.lower() filename = file_path.stem # Extract text based on file type if ext == ".pdf": text = _extract_pdf(file_path) elif ext in (".docx", ".doc"): text = _extract_docx(file_path) elif ext in (".txt", ".md", ".markdown"): text = file_path.read_text(encoding="utf-8", errors="replace") else: raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx, .txt, .md") if not text.strip(): raise ValueError(f"No text could be extracted from {file_path.name}") # Convert to Markdown md_content = _to_markdown(text, filename) # Save to corpus doc_id = _safe_doc_id(filename) out_path = corpus_dir / f"{doc_id}.md" # If the same doc already exists, overwrite it (re-upload = re-process) if out_path.exists() and doc_index: doc_index.remove_doc(doc_id) # Clear old index data so it gets re-understood out_path.write_text(md_content, encoding="utf-8") # ── Pre-chunk + classify + tag + register in DocIndex ───────────── chunks = _split_into_chunks(md_content) # Inject context headers so extraction agents know where each chunk sits # (headers are stripped before storing raw chunks; added at query time via tools.get_chunk) # Raw chunks are stored; headers injected in CorpusTools.get_chunk chunk_meta_list = _classify_and_tag_chunks(chunks) if doc_index is not None: from pluto.doc_index import ChunkMeta meta_objects = [ ChunkMeta( chunk_id=m["chunk_id"], chunk_type=m["chunk_type"], mode=m["mode"], header=m["header"], ) for m in chunk_meta_list ] doc_index.register_doc( doc_id=doc_id, filename=file_path.name, chunks=chunks, chunk_meta=meta_objects, ) return { "doc_id": doc_id, "filename": file_path.name, "output_path": str(out_path), "chunks": len(chunks), "size": len(md_content), } def _extract_pdf(path: Path) -> str: """Extract text and tables from PDF using pdfplumber.""" import logging import pdfplumber logger = logging.getLogger("pluto") pages = [] with pdfplumber.open(str(path)) as pdf: for i, page in enumerate(pdf.pages): page_parts = [] text = page.extract_text(x_tolerance=2, y_tolerance=2) if text and text.strip(): page_parts.append(text.strip()) tables = page.extract_tables() for table in tables: if table: rows = [" | ".join(cell or "" for cell in row) for row in table] page_parts.append("\n".join(rows)) if page_parts: pages.append(f"## Page {i + 1}\n\n" + "\n\n".join(page_parts)) else: logger.warning("pdfplumber returned empty text for page %s in %s", i + 1, path.name) return "\n\n".join(pages) def _extract_docx(path: Path) -> str: """Extract text from DOCX using python-docx.""" from docx import Document doc = Document(str(path)) paragraphs = [] for para in doc.paragraphs: text = para.text.strip() if not text: continue # Preserve heading styles if para.style and para.style.name.startswith("Heading"): level = para.style.name.replace("Heading ", "").strip() try: hashes = "#" * int(level) except ValueError: hashes = "##" paragraphs.append(f"{hashes} {text}") else: paragraphs.append(text) # Also extract tables for table in doc.tables: rows = [] for row in table.rows: cells = [cell.text.strip() for cell in row.cells] rows.append("| " + " | ".join(cells) + " |") if rows: header = rows[0] sep = "| " + " | ".join(["---"] * len(table.rows[0].cells)) + " |" paragraphs.append("\n".join([header, sep] + rows[1:])) return "\n\n".join(paragraphs) def _to_markdown(text: str, title: str) -> str: """Wrap extracted text in a clean Markdown document.""" # Clean up excessive whitespace text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"[ \t]+\n", "\n", text) return f"# {title}\n\n{text.strip()}\n" def _safe_doc_id(name: str) -> str: """Convert filename to a safe document ID.""" safe = re.sub(r"[^a-zA-Z0-9_-]", "_", name) safe = re.sub(r"_+", "_", safe).strip("_") return safe.lower() if safe else "document" def _split_into_chunks(content: str, max_chunk: int = 1800) -> list[str]: """ Semantic chunking via NVIDIA NIM embeddings. Falls back to paragraph splitting if NVIDIA key is absent. See pluto/embedder.py for implementation details. """ from pluto.embedder import semantic_split return semantic_split(content) def _classify_and_tag_chunks(chunks: list[str]) -> list[dict]: """Classify each chunk and tag it with target model mode.""" from pluto.chunker import classify_chunk from pluto.models import CHUNK_TYPE_TO_MODE result = [] for i, chunk_text in enumerate(chunks): chunk_type = classify_chunk(chunk_text) mode = CHUNK_TYPE_TO_MODE[chunk_type] # Extract nearest heading as header header = "" for line in chunk_text.split("\n"): if line.startswith("#"): header = line.lstrip("#").strip() break result.append({ "chunk_id": f"C{i}", "chunk_type": chunk_type.value, "mode": mode.value, "header": header, }) return result