File size: 6,735 Bytes
23cdeed 66ad25b 23cdeed 66ad25b 23cdeed 66ad25b 23cdeed 66ad25b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | # -*- coding: utf-8 -*-
"""
pluto/ingest.py β File ingestion: convert uploaded files to corpus Markdown.
Supports: .pdf, .docx, .doc, .txt, .md
At upload time, also splits into chunks, classifies each chunk,
tags it with the target model mode, and registers everything in the DocIndex.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pluto.doc_index import DocIndex
def ingest_file(
file_path: str | Path,
corpus_dir: str | Path,
doc_index: "DocIndex | None" = None,
) -> dict:
"""
Convert a file to Markdown, save in corpus, and register in DocIndex.
Returns dict with: {"doc_id": str, "filename": str, "chunks": int, "size": int}
"""
file_path = Path(file_path)
corpus_dir = Path(corpus_dir)
corpus_dir.mkdir(parents=True, exist_ok=True)
ext = file_path.suffix.lower()
filename = file_path.stem
# Extract text based on file type
if ext == ".pdf":
text = _extract_pdf(file_path)
elif ext in (".docx", ".doc"):
text = _extract_docx(file_path)
elif ext in (".txt", ".md", ".markdown"):
text = file_path.read_text(encoding="utf-8", errors="replace")
else:
raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx, .txt, .md")
if not text.strip():
raise ValueError(f"No text could be extracted from {file_path.name}")
# Convert to Markdown
md_content = _to_markdown(text, filename)
# Save to corpus
doc_id = _safe_doc_id(filename)
out_path = corpus_dir / f"{doc_id}.md"
# If the same doc already exists, overwrite it (re-upload = re-process)
if out_path.exists() and doc_index:
doc_index.remove_doc(doc_id) # Clear old index data so it gets re-understood
out_path.write_text(md_content, encoding="utf-8")
# ββ Pre-chunk + classify + tag + register in DocIndex βββββββββββββ
chunks = _split_into_chunks(md_content)
# Inject context headers so extraction agents know where each chunk sits
# (headers are stripped before storing raw chunks; added at query time via tools.get_chunk)
# Raw chunks are stored; headers injected in CorpusTools.get_chunk
chunk_meta_list = _classify_and_tag_chunks(chunks)
if doc_index is not None:
from pluto.doc_index import ChunkMeta
meta_objects = [
ChunkMeta(
chunk_id=m["chunk_id"],
chunk_type=m["chunk_type"],
mode=m["mode"],
header=m["header"],
)
for m in chunk_meta_list
]
doc_index.register_doc(
doc_id=doc_id,
filename=file_path.name,
chunks=chunks,
chunk_meta=meta_objects,
)
return {
"doc_id": doc_id,
"filename": file_path.name,
"output_path": str(out_path),
"chunks": len(chunks),
"size": len(md_content),
}
def _extract_pdf(path: Path) -> str:
"""Extract text and tables from PDF using pdfplumber."""
import logging
import pdfplumber
logger = logging.getLogger("pluto")
pages = []
with pdfplumber.open(str(path)) as pdf:
for i, page in enumerate(pdf.pages):
page_parts = []
text = page.extract_text(x_tolerance=2, y_tolerance=2)
if text and text.strip():
page_parts.append(text.strip())
tables = page.extract_tables()
for table in tables:
if table:
rows = [" | ".join(cell or "" for cell in row) for row in table]
page_parts.append("\n".join(rows))
if page_parts:
pages.append(f"## Page {i + 1}\n\n" + "\n\n".join(page_parts))
else:
logger.warning("pdfplumber returned empty text for page %s in %s", i + 1, path.name)
return "\n\n".join(pages)
def _extract_docx(path: Path) -> str:
"""Extract text from DOCX using python-docx."""
from docx import Document
doc = Document(str(path))
paragraphs = []
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
# Preserve heading styles
if para.style and para.style.name.startswith("Heading"):
level = para.style.name.replace("Heading ", "").strip()
try:
hashes = "#" * int(level)
except ValueError:
hashes = "##"
paragraphs.append(f"{hashes} {text}")
else:
paragraphs.append(text)
# Also extract tables
for table in doc.tables:
rows = []
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
rows.append("| " + " | ".join(cells) + " |")
if rows:
header = rows[0]
sep = "| " + " | ".join(["---"] * len(table.rows[0].cells)) + " |"
paragraphs.append("\n".join([header, sep] + rows[1:]))
return "\n\n".join(paragraphs)
def _to_markdown(text: str, title: str) -> str:
"""Wrap extracted text in a clean Markdown document."""
# Clean up excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+\n", "\n", text)
return f"# {title}\n\n{text.strip()}\n"
def _safe_doc_id(name: str) -> str:
"""Convert filename to a safe document ID."""
safe = re.sub(r"[^a-zA-Z0-9_-]", "_", name)
safe = re.sub(r"_+", "_", safe).strip("_")
return safe.lower() if safe else "document"
def _split_into_chunks(content: str, max_chunk: int = 1800) -> list[str]:
"""
Semantic chunking via NVIDIA NIM embeddings.
Falls back to paragraph splitting if NVIDIA key is absent.
See pluto/embedder.py for implementation details.
"""
from pluto.embedder import semantic_split
return semantic_split(content)
def _classify_and_tag_chunks(chunks: list[str]) -> list[dict]:
"""Classify each chunk and tag it with target model mode."""
from pluto.chunker import classify_chunk
from pluto.models import CHUNK_TYPE_TO_MODE
result = []
for i, chunk_text in enumerate(chunks):
chunk_type = classify_chunk(chunk_text)
mode = CHUNK_TYPE_TO_MODE[chunk_type]
# Extract nearest heading as header
header = ""
for line in chunk_text.split("\n"):
if line.startswith("#"):
header = line.lstrip("#").strip()
break
result.append({
"chunk_id": f"C{i}",
"chunk_type": chunk_type.value,
"mode": mode.value,
"header": header,
})
return result
|