Spaces:
Configuration error
Configuration error
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| MaterialMind RAG (verbose + env override) | |
| - Index PDFs/MD/TXT | |
| - Chroma persistent DB | |
| - FastEmbed first, ST fallback | |
| - Rebuild / Update / Ask | |
| Env: | |
| MATERIALMIND_DATA_DIR=/absolute/path/to/sources # optional | |
| """ | |
| import os, re, uuid, argparse, textwrap, logging, warnings | |
| import hashlib, json, shutil, datetime | |
| from pathlib import Path | |
| from typing import Iterable, List, Tuple, Dict, Any | |
| # -------- PATHS -------- | |
| ENV_DIR = os.getenv("MATERIALMIND_DATA_DIR") | |
| if ENV_DIR: | |
| DATA_DIR = Path(ENV_DIR).expanduser().resolve() | |
| BASE_DIR = DATA_DIR.parent | |
| else: | |
| BASE_DIR = Path(__file__).resolve().parent | |
| DATA_DIR = (BASE_DIR / "sources").resolve() | |
| DB_DIR = BASE_DIR / "index" / "chroma_v3" | |
| MANIFEST_PATH = BASE_DIR / "index" / "manifest.json" | |
| # -------- CONFIG -------- | |
| EMB_MODEL = "BAAI/bge-small-en-v1.5" | |
| CHUNK_CHARS = 1200 | |
| CHUNK_OVERLAP = 150 | |
| DEFAULT_TOPK = 5 | |
| DEFAULT_MODEL = "qwen2.5:7b-instruct" | |
| # Exported for app_user.py | |
| __all__ = ["search", "DATA_DIR", "DEFAULT_TOPK", "DEFAULT_MODEL"] | |
| logging.getLogger("pypdf").setLevel(logging.ERROR) | |
| warnings.filterwarnings("ignore", category=UserWarning, module="pypdf") | |
| def _lazy_imports(): | |
| global chromadb | |
| import chromadb | |
| # ---- Embeddings ---- | |
| _EMBED_FAST = None | |
| _EMBED_ST = None | |
| def init_embedder(): | |
| global _EMBED_FAST, _EMBED_ST | |
| if _EMBED_FAST or _EMBED_ST: | |
| return | |
| try: | |
| from fastembed import TextEmbedding | |
| _EMBED_FAST = TextEmbedding(model_name=EMB_MODEL) | |
| print(f"[EMB] FastEmbed: {EMB_MODEL}") | |
| except Exception as e: | |
| print(f"[WARN] FastEmbed not available ({e}); trying SentenceTransformers...") | |
| from sentence_transformers import SentenceTransformer | |
| _EMBED_ST = SentenceTransformer(EMB_MODEL) | |
| print(f"[EMB] SentenceTransformers: {EMB_MODEL}") | |
| def embed_texts(texts: List[str]) -> List[List[float]]: | |
| init_embedder() | |
| if _EMBED_FAST is not None: | |
| return [v for v in _EMBED_FAST.embed(texts)] | |
| return _EMBED_ST.encode(texts, normalize_embeddings=True).tolist() | |
| # ---- FS helpers ---- | |
| def ensure_dirs(): | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| DB_DIR.mkdir(parents=True, exist_ok=True) | |
| MANIFEST_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| def file_sig(path: Path): | |
| h = hashlib.sha1() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(1<<20), b""): | |
| h.update(chunk) | |
| st = path.stat() | |
| return {"sha1": h.hexdigest(), "size": st.st_size, "mtime": int(st.st_mtime)} | |
| def load_manifest(): | |
| if MANIFEST_PATH.exists(): | |
| try: return json.loads(MANIFEST_PATH.read_text()) | |
| except Exception: return {} | |
| return {} | |
| def save_manifest(m): MANIFEST_PATH.write_text(json.dumps(m, indent=2)) | |
| # ---- Loaders ---- | |
| def normalize_spaces(t: str) -> str: | |
| t = t.replace("\r", "\n") | |
| t = re.sub(r"[ \t]+", " ", t) | |
| t = re.sub(r"\n{3,}", "\n\n", t) | |
| return t.strip() | |
| def load_text_from_pdf(path: Path): | |
| # 1) PyMuPDF | |
| try: | |
| import fitz | |
| doc = fitz.open(str(path)) | |
| empty = 0 | |
| for i, p in enumerate(doc): | |
| txt = p.get_text("text").strip() | |
| if txt: yield normalize_spaces(txt), i+1 | |
| else: empty += 1 | |
| doc.close() | |
| if empty == i+1: | |
| print(f"[HINT] '{path.name}' looks scanned (no text). Try OCR.") | |
| return | |
| except Exception: | |
| pass | |
| # 2) pypdf fallback | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(str(path)) | |
| empty = 0 | |
| for i, p in enumerate(reader.pages): | |
| try: raw = p.extract_text() or "" | |
| except Exception: raw = "" | |
| txt = normalize_spaces(raw) | |
| if txt: yield txt, i+1 | |
| else: empty += 1 | |
| if empty == i+1: | |
| print(f"[HINT] '{path.name}' has no extractable text. OCR it.") | |
| except Exception as e: | |
| print(f"[WARN] {path.name}: {e}") | |
| def load_text_from_md_txt(path: Path) -> str: | |
| try: raw = path.read_text(errors="ignore") | |
| except Exception: raw = "" | |
| return normalize_spaces(raw) | |
| def chunk(text: str, max_chars=CHUNK_CHARS, overlap=CHUNK_OVERLAP): | |
| n = len(text) | |
| if n <= max_chars: | |
| if n > 0: yield text | |
| return | |
| i = 0 | |
| while i < n: | |
| j = min(i + max_chars, n) | |
| yield text[i:j] | |
| i = j - overlap if j < n else j | |
| def iter_documents(): | |
| for f in DATA_DIR.rglob("*"): | |
| if not f.is_file(): continue | |
| ext = f.suffix.lower() | |
| rel = f.relative_to(BASE_DIR).as_posix() | |
| if ext == ".pdf": | |
| any_txt = False | |
| for page_txt, page in load_text_from_pdf(f): | |
| any_txt = True | |
| for c in chunk(page_txt): | |
| yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": page}} | |
| if not any_txt: | |
| yield {"id": str(uuid.uuid4()), "text": f"[NO-TEXT] {f.name}", "meta": {"source": rel, "page": None}} | |
| elif ext in (".md", ".txt"): | |
| txt = load_text_from_md_txt(f) | |
| for c in chunk(txt): | |
| yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": None}} | |
| # ---- DB ---- | |
| def get_collection(reset=False): | |
| _lazy_imports() | |
| client = chromadb.PersistentClient(path=str(DB_DIR)) | |
| if reset: | |
| try: client.delete_collection("materialmind") | |
| except Exception: pass | |
| return client.get_or_create_collection(name="materialmind") | |
| def add_batch(col, ids, docs, metas): | |
| embs = embed_texts(docs) | |
| col.add(ids=ids, documents=docs, metadatas=metas, embeddings=embs) | |
| def build_index(batch_size=256) -> int: | |
| ensure_dirs() | |
| print(f"[PATH] DATA_DIR = {DATA_DIR}") | |
| print(f"[PATH] DB_DIR = {DB_DIR}") | |
| col = get_collection(reset=True) | |
| ids, docs, metas, total = [], [], [], 0 | |
| print(f"[BUILD] Scanning {DATA_DIR} ...") | |
| for doc in iter_documents(): | |
| if doc["text"].startswith("[NO-TEXT]"): | |
| print(f"[INFO] Skipping unextractable: {doc['meta']['source']}") | |
| continue | |
| ids.append(doc["id"]); docs.append(doc["text"]); metas.append(doc["meta"]) | |
| if len(ids) >= batch_size: | |
| add_batch(col, ids, docs, metas) | |
| total += len(ids); ids, docs, metas = [], [], [] | |
| print(f"[BUILD] Added {total} chunks...") | |
| if ids: | |
| add_batch(col, ids, docs, metas); total += len(ids) | |
| print(f"[BUILD] Done. Indexed {total} chunks.") | |
| return total | |
| def update_index(): | |
| ensure_dirs() | |
| print(f"[PATH] DATA_DIR = {DATA_DIR}") | |
| print(f"[PATH] DB_DIR = {DB_DIR}") | |
| col = get_collection(reset=False) | |
| manifest = load_manifest() | |
| current = {f.relative_to(BASE_DIR).as_posix(): f for f in DATA_DIR.rglob("*") if f.is_file()} | |
| # remove deleted | |
| for src in list(manifest.keys()): | |
| if src not in current: | |
| col.delete(where={"source": src}); manifest.pop(src, None) | |
| print(f"[DEL] {src}") | |
| # add/refresh changed | |
| for src, path in current.items(): | |
| try: sig = file_sig(path) | |
| except Exception: continue | |
| if manifest.get(src) == sig: continue | |
| col.delete(where={"source": src}) | |
| added = 0 | |
| ext = path.suffix.lower() | |
| if ext == ".pdf": | |
| any_txt = False | |
| for page_txt, page in load_text_from_pdf(path): | |
| any_txt = True | |
| for c in chunk(page_txt): | |
| add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": page}]) | |
| added += 1 | |
| if not any_txt: print(f"[INFO] Skipping unextractable: {src}") | |
| elif ext in (".md", ".txt"): | |
| txt = load_text_from_md_txt(path) | |
| for c in chunk(txt): | |
| add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": None}]) | |
| added += 1 | |
| manifest[src] = sig | |
| print(f"[UPD] {src} (+{added} chunks)") | |
| save_manifest(manifest) | |
| print("[UPDATE] Done.") | |
| def search(query: str, k: int = DEFAULT_TOPK) -> List[Tuple[str, str]]: | |
| col = get_collection(reset=False) | |
| qvec = embed_texts([query])[0] | |
| res = col.query(query_embeddings=[qvec], n_results=k, include=["documents", "metadatas"]) | |
| hits = [] | |
| for doc, meta in zip(res.get("documents", [[]])[0], res.get("metadatas", [[]])[0]): | |
| src = meta.get("source", "unknown"); page = meta.get("page") | |
| cite = f"{src}" + (f":p.{page}" if page else "") | |
| hits.append((doc, cite)) | |
| return hits | |
| # ---- CLI ---- | |
| def main(): | |
| ap = argparse.ArgumentParser(description="MaterialMind RAG") | |
| ap.add_argument("--rebuild", action="store_true") | |
| ap.add_argument("--update", action="store_true") | |
| ap.add_argument("--ask", type=str) | |
| ap.add_argument("--k", type=int, default=DEFAULT_TOPK) | |
| args = ap.parse_args() | |
| ensure_dirs() | |
| if args.rebuild: | |
| total = build_index() | |
| print(f"[BUILD] Indexed {total} chunks from {DATA_DIR}") | |
| if args.update: | |
| update_index() | |
| if args.ask: | |
| hits = search(args.ask, k=args.k) | |
| if not hits: | |
| print("No results. Add PDFs to DATA_DIR and --rebuild.") | |
| else: | |
| for i, (text, cite) in enumerate(hits, 1): | |
| print(f"[{i}] {cite}\n{textwrap.shorten(text.replace(chr(10),' '), 600, placeholder=' ...')}\n") | |
| if not any([args.rebuild, args.update, args.ask]): | |
| print(f"DATA_DIR: {DATA_DIR}\nDB_DIR: {DB_DIR}\nUsage: --rebuild | --update | --ask \"question\"") | |
| if __name__ == "__main__": | |
| main() | |