#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MaterialMind RAG (verbose + env override) - Index PDFs/MD/TXT - Chroma persistent DB - FastEmbed first, ST fallback - Rebuild / Update / Ask Env: MATERIALMIND_DATA_DIR=/absolute/path/to/sources # optional """ import os, re, uuid, argparse, textwrap, logging, warnings import hashlib, json, shutil, datetime from pathlib import Path from typing import Iterable, List, Tuple, Dict, Any # -------- PATHS -------- ENV_DIR = os.getenv("MATERIALMIND_DATA_DIR") if ENV_DIR: DATA_DIR = Path(ENV_DIR).expanduser().resolve() BASE_DIR = DATA_DIR.parent else: BASE_DIR = Path(__file__).resolve().parent DATA_DIR = (BASE_DIR / "sources").resolve() DB_DIR = BASE_DIR / "index" / "chroma_v3" MANIFEST_PATH = BASE_DIR / "index" / "manifest.json" # -------- CONFIG -------- EMB_MODEL = "BAAI/bge-small-en-v1.5" CHUNK_CHARS = 1200 CHUNK_OVERLAP = 150 DEFAULT_TOPK = 5 DEFAULT_MODEL = "qwen2.5:7b-instruct" # Exported for app_user.py __all__ = ["search", "DATA_DIR", "DEFAULT_TOPK", "DEFAULT_MODEL"] logging.getLogger("pypdf").setLevel(logging.ERROR) warnings.filterwarnings("ignore", category=UserWarning, module="pypdf") def _lazy_imports(): global chromadb import chromadb # ---- Embeddings ---- _EMBED_FAST = None _EMBED_ST = None def init_embedder(): global _EMBED_FAST, _EMBED_ST if _EMBED_FAST or _EMBED_ST: return try: from fastembed import TextEmbedding _EMBED_FAST = TextEmbedding(model_name=EMB_MODEL) print(f"[EMB] FastEmbed: {EMB_MODEL}") except Exception as e: print(f"[WARN] FastEmbed not available ({e}); trying SentenceTransformers...") from sentence_transformers import SentenceTransformer _EMBED_ST = SentenceTransformer(EMB_MODEL) print(f"[EMB] SentenceTransformers: {EMB_MODEL}") def embed_texts(texts: List[str]) -> List[List[float]]: init_embedder() if _EMBED_FAST is not None: return [v for v in _EMBED_FAST.embed(texts)] return _EMBED_ST.encode(texts, normalize_embeddings=True).tolist() # ---- FS helpers ---- def ensure_dirs(): DATA_DIR.mkdir(parents=True, exist_ok=True) DB_DIR.mkdir(parents=True, exist_ok=True) MANIFEST_PATH.parent.mkdir(parents=True, exist_ok=True) def file_sig(path: Path): h = hashlib.sha1() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1<<20), b""): h.update(chunk) st = path.stat() return {"sha1": h.hexdigest(), "size": st.st_size, "mtime": int(st.st_mtime)} def load_manifest(): if MANIFEST_PATH.exists(): try: return json.loads(MANIFEST_PATH.read_text()) except Exception: return {} return {} def save_manifest(m): MANIFEST_PATH.write_text(json.dumps(m, indent=2)) # ---- Loaders ---- def normalize_spaces(t: str) -> str: t = t.replace("\r", "\n") t = re.sub(r"[ \t]+", " ", t) t = re.sub(r"\n{3,}", "\n\n", t) return t.strip() def load_text_from_pdf(path: Path): # 1) PyMuPDF try: import fitz doc = fitz.open(str(path)) empty = 0 for i, p in enumerate(doc): txt = p.get_text("text").strip() if txt: yield normalize_spaces(txt), i+1 else: empty += 1 doc.close() if empty == i+1: print(f"[HINT] '{path.name}' looks scanned (no text). Try OCR.") return except Exception: pass # 2) pypdf fallback try: from pypdf import PdfReader reader = PdfReader(str(path)) empty = 0 for i, p in enumerate(reader.pages): try: raw = p.extract_text() or "" except Exception: raw = "" txt = normalize_spaces(raw) if txt: yield txt, i+1 else: empty += 1 if empty == i+1: print(f"[HINT] '{path.name}' has no extractable text. OCR it.") except Exception as e: print(f"[WARN] {path.name}: {e}") def load_text_from_md_txt(path: Path) -> str: try: raw = path.read_text(errors="ignore") except Exception: raw = "" return normalize_spaces(raw) def chunk(text: str, max_chars=CHUNK_CHARS, overlap=CHUNK_OVERLAP): n = len(text) if n <= max_chars: if n > 0: yield text return i = 0 while i < n: j = min(i + max_chars, n) yield text[i:j] i = j - overlap if j < n else j def iter_documents(): for f in DATA_DIR.rglob("*"): if not f.is_file(): continue ext = f.suffix.lower() rel = f.relative_to(BASE_DIR).as_posix() if ext == ".pdf": any_txt = False for page_txt, page in load_text_from_pdf(f): any_txt = True for c in chunk(page_txt): yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": page}} if not any_txt: yield {"id": str(uuid.uuid4()), "text": f"[NO-TEXT] {f.name}", "meta": {"source": rel, "page": None}} elif ext in (".md", ".txt"): txt = load_text_from_md_txt(f) for c in chunk(txt): yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": None}} # ---- DB ---- def get_collection(reset=False): _lazy_imports() client = chromadb.PersistentClient(path=str(DB_DIR)) if reset: try: client.delete_collection("materialmind") except Exception: pass return client.get_or_create_collection(name="materialmind") def add_batch(col, ids, docs, metas): embs = embed_texts(docs) col.add(ids=ids, documents=docs, metadatas=metas, embeddings=embs) def build_index(batch_size=256) -> int: ensure_dirs() print(f"[PATH] DATA_DIR = {DATA_DIR}") print(f"[PATH] DB_DIR = {DB_DIR}") col = get_collection(reset=True) ids, docs, metas, total = [], [], [], 0 print(f"[BUILD] Scanning {DATA_DIR} ...") for doc in iter_documents(): if doc["text"].startswith("[NO-TEXT]"): print(f"[INFO] Skipping unextractable: {doc['meta']['source']}") continue ids.append(doc["id"]); docs.append(doc["text"]); metas.append(doc["meta"]) if len(ids) >= batch_size: add_batch(col, ids, docs, metas) total += len(ids); ids, docs, metas = [], [], [] print(f"[BUILD] Added {total} chunks...") if ids: add_batch(col, ids, docs, metas); total += len(ids) print(f"[BUILD] Done. Indexed {total} chunks.") return total def update_index(): ensure_dirs() print(f"[PATH] DATA_DIR = {DATA_DIR}") print(f"[PATH] DB_DIR = {DB_DIR}") col = get_collection(reset=False) manifest = load_manifest() current = {f.relative_to(BASE_DIR).as_posix(): f for f in DATA_DIR.rglob("*") if f.is_file()} # remove deleted for src in list(manifest.keys()): if src not in current: col.delete(where={"source": src}); manifest.pop(src, None) print(f"[DEL] {src}") # add/refresh changed for src, path in current.items(): try: sig = file_sig(path) except Exception: continue if manifest.get(src) == sig: continue col.delete(where={"source": src}) added = 0 ext = path.suffix.lower() if ext == ".pdf": any_txt = False for page_txt, page in load_text_from_pdf(path): any_txt = True for c in chunk(page_txt): add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": page}]) added += 1 if not any_txt: print(f"[INFO] Skipping unextractable: {src}") elif ext in (".md", ".txt"): txt = load_text_from_md_txt(path) for c in chunk(txt): add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": None}]) added += 1 manifest[src] = sig print(f"[UPD] {src} (+{added} chunks)") save_manifest(manifest) print("[UPDATE] Done.") def search(query: str, k: int = DEFAULT_TOPK) -> List[Tuple[str, str]]: col = get_collection(reset=False) qvec = embed_texts([query])[0] res = col.query(query_embeddings=[qvec], n_results=k, include=["documents", "metadatas"]) hits = [] for doc, meta in zip(res.get("documents", [[]])[0], res.get("metadatas", [[]])[0]): src = meta.get("source", "unknown"); page = meta.get("page") cite = f"{src}" + (f":p.{page}" if page else "") hits.append((doc, cite)) return hits # ---- CLI ---- def main(): ap = argparse.ArgumentParser(description="MaterialMind RAG") ap.add_argument("--rebuild", action="store_true") ap.add_argument("--update", action="store_true") ap.add_argument("--ask", type=str) ap.add_argument("--k", type=int, default=DEFAULT_TOPK) args = ap.parse_args() ensure_dirs() if args.rebuild: total = build_index() print(f"[BUILD] Indexed {total} chunks from {DATA_DIR}") if args.update: update_index() if args.ask: hits = search(args.ask, k=args.k) if not hits: print("No results. Add PDFs to DATA_DIR and --rebuild.") else: for i, (text, cite) in enumerate(hits, 1): print(f"[{i}] {cite}\n{textwrap.shorten(text.replace(chr(10),' '), 600, placeholder=' ...')}\n") if not any([args.rebuild, args.update, args.ask]): print(f"DATA_DIR: {DATA_DIR}\nDB_DIR: {DB_DIR}\nUsage: --rebuild | --update | --ask \"question\"") if __name__ == "__main__": main()