ayushKishor's picture
Add Pluto memory layer and pipeline fixes
23cdeed
# -*- coding: utf-8 -*-
"""
pluto/ingest.py — File ingestion: convert uploaded files to corpus Markdown.
Supports: .pdf, .docx, .doc, .txt, .md
At upload time, also splits into chunks, classifies each chunk,
tags it with the target model mode, and registers everything in the DocIndex.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pluto.doc_index import DocIndex
def ingest_file(
file_path: str | Path,
corpus_dir: str | Path,
doc_index: "DocIndex | None" = None,
) -> dict:
"""
Convert a file to Markdown, save in corpus, and register in DocIndex.
Returns dict with: {"doc_id": str, "filename": str, "chunks": int, "size": int}
"""
file_path = Path(file_path)
corpus_dir = Path(corpus_dir)
corpus_dir.mkdir(parents=True, exist_ok=True)
ext = file_path.suffix.lower()
filename = file_path.stem
# Extract text based on file type
if ext == ".pdf":
text = _extract_pdf(file_path)
elif ext in (".docx", ".doc"):
text = _extract_docx(file_path)
elif ext in (".txt", ".md", ".markdown"):
text = file_path.read_text(encoding="utf-8", errors="replace")
else:
raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx, .txt, .md")
if not text.strip():
raise ValueError(f"No text could be extracted from {file_path.name}")
# Convert to Markdown
md_content = _to_markdown(text, filename)
# Save to corpus
doc_id = _safe_doc_id(filename)
out_path = corpus_dir / f"{doc_id}.md"
# If the same doc already exists, overwrite it (re-upload = re-process)
if out_path.exists() and doc_index:
doc_index.remove_doc(doc_id) # Clear old index data so it gets re-understood
out_path.write_text(md_content, encoding="utf-8")
# ── Pre-chunk + classify + tag + register in DocIndex ─────────────
chunks = _split_into_chunks(md_content)
# Inject context headers so extraction agents know where each chunk sits
# (headers are stripped before storing raw chunks; added at query time via tools.get_chunk)
# Raw chunks are stored; headers injected in CorpusTools.get_chunk
chunk_meta_list = _classify_and_tag_chunks(chunks)
if doc_index is not None:
from pluto.doc_index import ChunkMeta
meta_objects = [
ChunkMeta(
chunk_id=m["chunk_id"],
chunk_type=m["chunk_type"],
mode=m["mode"],
header=m["header"],
)
for m in chunk_meta_list
]
doc_index.register_doc(
doc_id=doc_id,
filename=file_path.name,
chunks=chunks,
chunk_meta=meta_objects,
)
return {
"doc_id": doc_id,
"filename": file_path.name,
"output_path": str(out_path),
"chunks": len(chunks),
"size": len(md_content),
}
def _extract_pdf(path: Path) -> str:
"""Extract text and tables from PDF using pdfplumber."""
import logging
import pdfplumber
logger = logging.getLogger("pluto")
pages = []
with pdfplumber.open(str(path)) as pdf:
for i, page in enumerate(pdf.pages):
page_parts = []
text = page.extract_text(x_tolerance=2, y_tolerance=2)
if text and text.strip():
page_parts.append(text.strip())
tables = page.extract_tables()
for table in tables:
if table:
rows = [" | ".join(cell or "" for cell in row) for row in table]
page_parts.append("\n".join(rows))
if page_parts:
pages.append(f"## Page {i + 1}\n\n" + "\n\n".join(page_parts))
else:
logger.warning("pdfplumber returned empty text for page %s in %s", i + 1, path.name)
return "\n\n".join(pages)
def _extract_docx(path: Path) -> str:
"""Extract text from DOCX using python-docx."""
from docx import Document
doc = Document(str(path))
paragraphs = []
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
# Preserve heading styles
if para.style and para.style.name.startswith("Heading"):
level = para.style.name.replace("Heading ", "").strip()
try:
hashes = "#" * int(level)
except ValueError:
hashes = "##"
paragraphs.append(f"{hashes} {text}")
else:
paragraphs.append(text)
# Also extract tables
for table in doc.tables:
rows = []
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
rows.append("| " + " | ".join(cells) + " |")
if rows:
header = rows[0]
sep = "| " + " | ".join(["---"] * len(table.rows[0].cells)) + " |"
paragraphs.append("\n".join([header, sep] + rows[1:]))
return "\n\n".join(paragraphs)
def _to_markdown(text: str, title: str) -> str:
"""Wrap extracted text in a clean Markdown document."""
# Clean up excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+\n", "\n", text)
return f"# {title}\n\n{text.strip()}\n"
def _safe_doc_id(name: str) -> str:
"""Convert filename to a safe document ID."""
safe = re.sub(r"[^a-zA-Z0-9_-]", "_", name)
safe = re.sub(r"_+", "_", safe).strip("_")
return safe.lower() if safe else "document"
def _split_into_chunks(content: str, max_chunk: int = 1800) -> list[str]:
"""
Semantic chunking via NVIDIA NIM embeddings.
Falls back to paragraph splitting if NVIDIA key is absent.
See pluto/embedder.py for implementation details.
"""
from pluto.embedder import semantic_split
return semantic_split(content)
def _classify_and_tag_chunks(chunks: list[str]) -> list[dict]:
"""Classify each chunk and tag it with target model mode."""
from pluto.chunker import classify_chunk
from pluto.models import CHUNK_TYPE_TO_MODE
result = []
for i, chunk_text in enumerate(chunks):
chunk_type = classify_chunk(chunk_text)
mode = CHUNK_TYPE_TO_MODE[chunk_type]
# Extract nearest heading as header
header = ""
for line in chunk_text.split("\n"):
if line.startswith("#"):
header = line.lstrip("#").strip()
break
result.append({
"chunk_id": f"C{i}",
"chunk_type": chunk_type.value,
"mode": mode.value,
"header": header,
})
return result