| """Document ingestion and retrieval for the DRIFT companion.""" |
|
|
| import re |
| import uuid |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| import chromadb |
|
|
| from infj_bot.core.config import PROJECT_ROOT, DATA_DIR |
| from infj_bot.core.embeddings import ( |
| get_default_embedding_function, |
| LocalEmbeddingFunction, |
| ) |
|
|
| SUPPORTED_TEXT = { |
| ".txt", |
| ".md", |
| ".py", |
| ".js", |
| ".ts", |
| ".jsx", |
| ".tsx", |
| ".json", |
| ".yaml", |
| ".yml", |
| ".csv", |
| ".sh", |
| ".html", |
| ".css", |
| ".rs", |
| ".go", |
| ".java", |
| ".c", |
| ".cpp", |
| ".h", |
| } |
| MAX_INGEST_FILE_BYTES = 2_000_000 |
| MAX_DIRECTORY_FILES = 300 |
|
|
|
|
| def _is_relative_to(child: Path, parent: Path) -> bool: |
| try: |
| child.relative_to(parent) |
| return True |
| except ValueError: |
| return False |
|
|
|
|
| def _resolve_ingest_path(path: str) -> Path: |
| target = Path(path).expanduser() |
| if not target.is_absolute(): |
| target = PROJECT_ROOT / target |
| target = target.resolve() |
| allowed_roots = [PROJECT_ROOT.resolve(), Path.home().resolve()] |
| if not any(_is_relative_to(target, root) for root in allowed_roots): |
| raise PermissionError(f"Path {path} is outside the allowed ingestion roots.") |
| return target |
|
|
|
|
| def _chunk_text(text: str, chunk_size: int = 800, overlap: int = 100) -> List[str]: |
| """Split text into overlapping chunks by paragraphs.""" |
| paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()] |
| chunks = [] |
| current: List[str] = [] |
| current_len = 0 |
| for para in paragraphs: |
| para_len = len(para) |
| if current_len + para_len > chunk_size and current: |
| chunks.append("\n\n".join(current)) |
| |
| overlap_text: List[str] = [] |
| overlap_len = 0 |
| for p in reversed(current): |
| if overlap_len + len(p) > overlap: |
| break |
| overlap_text.insert(0, p) |
| overlap_len += len(p) |
| current = overlap_text |
| current_len = overlap_len |
| current.append(para) |
| current_len += para_len |
| if current: |
| chunks.append("\n\n".join(current)) |
| return chunks |
|
|
|
|
| def _read_pdf(path: Path) -> str: |
| try: |
| from pypdf import PdfReader |
|
|
| reader = PdfReader(str(path)) |
| parts = [] |
| for page in reader.pages: |
| text = page.extract_text() |
| if text: |
| parts.append(text) |
| return "\n\n".join(parts) |
| except Exception as exc: |
| raise RuntimeError(f"PDF read failed: {exc}") |
|
|
|
|
| def _read_file(path: Path) -> str: |
| if path.stat().st_size > MAX_INGEST_FILE_BYTES: |
| raise ValueError( |
| f"File too large for ingestion: {path} ({path.stat().st_size} bytes)" |
| ) |
| suffix = path.suffix.lower() |
| if suffix == ".pdf": |
| return _read_pdf(path) |
| return path.read_text(encoding="utf-8", errors="replace") |
|
|
|
|
| class DocumentStore: |
| def __init__( |
| self, persist_directory=None, embedding_function=None, use_semantic=True |
| ): |
| if persist_directory is None: |
| persist_directory = str(DATA_DIR / "chroma_db") |
| if embedding_function is None: |
| if use_semantic: |
| embedding_function = get_default_embedding_function() |
| else: |
| embedding_function = LocalEmbeddingFunction() |
| self.embedding_function = embedding_function |
| self.client = chromadb.PersistentClient(path=persist_directory) |
| self.collection = self.client.get_or_create_collection( |
| name="infj_documents", |
| embedding_function=embedding_function, |
| ) |
|
|
| def ingest(self, file_path: str, tags: Optional[List[str]] = None) -> int: |
| path = _resolve_ingest_path(file_path) |
| if not path.exists(): |
| raise FileNotFoundError(f"Not found: {path}") |
| if path.is_dir(): |
| return self.ingest_directory(path, tags=tags) |
|
|
| text = _read_file(path) |
| if not text.strip(): |
| return 0 |
|
|
| chunks = _chunk_text(text) |
| if not chunks: |
| return 0 |
|
|
| ids = [f"doc-{uuid.uuid4().hex[:12]}" for _ in chunks] |
| metadatas = [] |
| for i, _chunk in enumerate(chunks): |
| meta = { |
| "source": str(path), |
| "filename": path.name, |
| "chunk_index": i, |
| "total_chunks": len(chunks), |
| "tags": ",".join(tags or []), |
| } |
| metadatas.append(meta) |
|
|
| self.collection.add( |
| documents=chunks, |
| ids=ids, |
| metadatas=metadatas, |
| ) |
| return len(chunks) |
|
|
| def ingest_directory( |
| self, dir_path: Path, tags: Optional[List[str]] = None, recursive: bool = True |
| ) -> int: |
| total = 0 |
| scanned = 0 |
| pattern = "**/*" if recursive else "*" |
| for child in Path(dir_path).glob(pattern): |
| if child.is_file() and child.suffix.lower() in SUPPORTED_TEXT | {".pdf"}: |
| scanned += 1 |
| if scanned > MAX_DIRECTORY_FILES: |
| raise ValueError( |
| f"Directory ingestion stopped after {MAX_DIRECTORY_FILES} supported files." |
| ) |
| try: |
| n = self.ingest(str(child), tags=tags) |
| total += n |
| except Exception as exc: |
| print(f"[ingest skip] {child}: {exc}") |
| return total |
|
|
| def search(self, query: str, n_results: int = 5) -> List[dict]: |
| results = self.collection.query( |
| query_texts=[query], |
| n_results=n_results, |
| ) |
| out = [] |
| for i, doc in enumerate(results["documents"][0]): |
| meta = results["metadatas"][0][i] |
| out.append( |
| { |
| "document": doc, |
| "source": meta.get("source", "?"), |
| "filename": meta.get("filename", "?"), |
| "chunk_index": meta.get("chunk_index", 0), |
| } |
| ) |
| return out |
|
|
| def list_sources(self) -> List[str]: |
| results = self.collection.get(include=["metadatas"]) |
| sources = set() |
| for meta in results.get("metadatas", []): |
| if meta: |
| sources.add(meta.get("source", "?")) |
| return sorted(sources) |
|
|
| def delete_source(self, source_path: str) -> int: |
| results = self.collection.get( |
| where={"source": source_path}, |
| include=[], |
| ) |
| ids = results.get("ids", []) |
| if ids: |
| self.collection.delete(ids=ids) |
| return len(ids) |
|
|
| def count(self) -> int: |
| return self.collection.count() |
|
|
|
|
| def format_doc_results(results: List[dict]) -> str: |
| if not results: |
| return "No matching documents found." |
| lines = [] |
| for r in results: |
| lines.append( |
| f"[{r['filename']} chunk {r['chunk_index']}]\n{r['document'][:600]}" |
| ) |
| return "\n---\n".join(lines) |
|
|
|
|
| if __name__ == "__main__": |
| store = DocumentStore() |
| print(f"Document store initialized. Documents: {store.count()}") |
|
|