materialmind2 / rag_mini.py
Azizahalq's picture
Upload 20 files
201d38b verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MaterialMind RAG (verbose + env override)
- Index PDFs/MD/TXT
- Chroma persistent DB
- FastEmbed first, ST fallback
- Rebuild / Update / Ask
Env:
MATERIALMIND_DATA_DIR=/absolute/path/to/sources # optional
"""
import os, re, uuid, argparse, textwrap, logging, warnings
import hashlib, json, shutil, datetime
from pathlib import Path
from typing import Iterable, List, Tuple, Dict, Any
# -------- PATHS --------
ENV_DIR = os.getenv("MATERIALMIND_DATA_DIR")
if ENV_DIR:
DATA_DIR = Path(ENV_DIR).expanduser().resolve()
BASE_DIR = DATA_DIR.parent
else:
BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = (BASE_DIR / "sources").resolve()
DB_DIR = BASE_DIR / "index" / "chroma_v3"
MANIFEST_PATH = BASE_DIR / "index" / "manifest.json"
# -------- CONFIG --------
EMB_MODEL = "BAAI/bge-small-en-v1.5"
CHUNK_CHARS = 1200
CHUNK_OVERLAP = 150
DEFAULT_TOPK = 5
DEFAULT_MODEL = "qwen2.5:7b-instruct"
# Exported for app_user.py
__all__ = ["search", "DATA_DIR", "DEFAULT_TOPK", "DEFAULT_MODEL"]
logging.getLogger("pypdf").setLevel(logging.ERROR)
warnings.filterwarnings("ignore", category=UserWarning, module="pypdf")
def _lazy_imports():
global chromadb
import chromadb
# ---- Embeddings ----
_EMBED_FAST = None
_EMBED_ST = None
def init_embedder():
global _EMBED_FAST, _EMBED_ST
if _EMBED_FAST or _EMBED_ST:
return
try:
from fastembed import TextEmbedding
_EMBED_FAST = TextEmbedding(model_name=EMB_MODEL)
print(f"[EMB] FastEmbed: {EMB_MODEL}")
except Exception as e:
print(f"[WARN] FastEmbed not available ({e}); trying SentenceTransformers...")
from sentence_transformers import SentenceTransformer
_EMBED_ST = SentenceTransformer(EMB_MODEL)
print(f"[EMB] SentenceTransformers: {EMB_MODEL}")
def embed_texts(texts: List[str]) -> List[List[float]]:
init_embedder()
if _EMBED_FAST is not None:
return [v for v in _EMBED_FAST.embed(texts)]
return _EMBED_ST.encode(texts, normalize_embeddings=True).tolist()
# ---- FS helpers ----
def ensure_dirs():
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_DIR.mkdir(parents=True, exist_ok=True)
MANIFEST_PATH.parent.mkdir(parents=True, exist_ok=True)
def file_sig(path: Path):
h = hashlib.sha1()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1<<20), b""):
h.update(chunk)
st = path.stat()
return {"sha1": h.hexdigest(), "size": st.st_size, "mtime": int(st.st_mtime)}
def load_manifest():
if MANIFEST_PATH.exists():
try: return json.loads(MANIFEST_PATH.read_text())
except Exception: return {}
return {}
def save_manifest(m): MANIFEST_PATH.write_text(json.dumps(m, indent=2))
# ---- Loaders ----
def normalize_spaces(t: str) -> str:
t = t.replace("\r", "\n")
t = re.sub(r"[ \t]+", " ", t)
t = re.sub(r"\n{3,}", "\n\n", t)
return t.strip()
def load_text_from_pdf(path: Path):
# 1) PyMuPDF
try:
import fitz
doc = fitz.open(str(path))
empty = 0
for i, p in enumerate(doc):
txt = p.get_text("text").strip()
if txt: yield normalize_spaces(txt), i+1
else: empty += 1
doc.close()
if empty == i+1:
print(f"[HINT] '{path.name}' looks scanned (no text). Try OCR.")
return
except Exception:
pass
# 2) pypdf fallback
try:
from pypdf import PdfReader
reader = PdfReader(str(path))
empty = 0
for i, p in enumerate(reader.pages):
try: raw = p.extract_text() or ""
except Exception: raw = ""
txt = normalize_spaces(raw)
if txt: yield txt, i+1
else: empty += 1
if empty == i+1:
print(f"[HINT] '{path.name}' has no extractable text. OCR it.")
except Exception as e:
print(f"[WARN] {path.name}: {e}")
def load_text_from_md_txt(path: Path) -> str:
try: raw = path.read_text(errors="ignore")
except Exception: raw = ""
return normalize_spaces(raw)
def chunk(text: str, max_chars=CHUNK_CHARS, overlap=CHUNK_OVERLAP):
n = len(text)
if n <= max_chars:
if n > 0: yield text
return
i = 0
while i < n:
j = min(i + max_chars, n)
yield text[i:j]
i = j - overlap if j < n else j
def iter_documents():
for f in DATA_DIR.rglob("*"):
if not f.is_file(): continue
ext = f.suffix.lower()
rel = f.relative_to(BASE_DIR).as_posix()
if ext == ".pdf":
any_txt = False
for page_txt, page in load_text_from_pdf(f):
any_txt = True
for c in chunk(page_txt):
yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": page}}
if not any_txt:
yield {"id": str(uuid.uuid4()), "text": f"[NO-TEXT] {f.name}", "meta": {"source": rel, "page": None}}
elif ext in (".md", ".txt"):
txt = load_text_from_md_txt(f)
for c in chunk(txt):
yield {"id": str(uuid.uuid4()), "text": c, "meta": {"source": rel, "page": None}}
# ---- DB ----
def get_collection(reset=False):
_lazy_imports()
client = chromadb.PersistentClient(path=str(DB_DIR))
if reset:
try: client.delete_collection("materialmind")
except Exception: pass
return client.get_or_create_collection(name="materialmind")
def add_batch(col, ids, docs, metas):
embs = embed_texts(docs)
col.add(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
def build_index(batch_size=256) -> int:
ensure_dirs()
print(f"[PATH] DATA_DIR = {DATA_DIR}")
print(f"[PATH] DB_DIR = {DB_DIR}")
col = get_collection(reset=True)
ids, docs, metas, total = [], [], [], 0
print(f"[BUILD] Scanning {DATA_DIR} ...")
for doc in iter_documents():
if doc["text"].startswith("[NO-TEXT]"):
print(f"[INFO] Skipping unextractable: {doc['meta']['source']}")
continue
ids.append(doc["id"]); docs.append(doc["text"]); metas.append(doc["meta"])
if len(ids) >= batch_size:
add_batch(col, ids, docs, metas)
total += len(ids); ids, docs, metas = [], [], []
print(f"[BUILD] Added {total} chunks...")
if ids:
add_batch(col, ids, docs, metas); total += len(ids)
print(f"[BUILD] Done. Indexed {total} chunks.")
return total
def update_index():
ensure_dirs()
print(f"[PATH] DATA_DIR = {DATA_DIR}")
print(f"[PATH] DB_DIR = {DB_DIR}")
col = get_collection(reset=False)
manifest = load_manifest()
current = {f.relative_to(BASE_DIR).as_posix(): f for f in DATA_DIR.rglob("*") if f.is_file()}
# remove deleted
for src in list(manifest.keys()):
if src not in current:
col.delete(where={"source": src}); manifest.pop(src, None)
print(f"[DEL] {src}")
# add/refresh changed
for src, path in current.items():
try: sig = file_sig(path)
except Exception: continue
if manifest.get(src) == sig: continue
col.delete(where={"source": src})
added = 0
ext = path.suffix.lower()
if ext == ".pdf":
any_txt = False
for page_txt, page in load_text_from_pdf(path):
any_txt = True
for c in chunk(page_txt):
add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": page}])
added += 1
if not any_txt: print(f"[INFO] Skipping unextractable: {src}")
elif ext in (".md", ".txt"):
txt = load_text_from_md_txt(path)
for c in chunk(txt):
add_batch(col, [str(uuid.uuid4())], [c], [{"source": src, "page": None}])
added += 1
manifest[src] = sig
print(f"[UPD] {src} (+{added} chunks)")
save_manifest(manifest)
print("[UPDATE] Done.")
def search(query: str, k: int = DEFAULT_TOPK) -> List[Tuple[str, str]]:
col = get_collection(reset=False)
qvec = embed_texts([query])[0]
res = col.query(query_embeddings=[qvec], n_results=k, include=["documents", "metadatas"])
hits = []
for doc, meta in zip(res.get("documents", [[]])[0], res.get("metadatas", [[]])[0]):
src = meta.get("source", "unknown"); page = meta.get("page")
cite = f"{src}" + (f":p.{page}" if page else "")
hits.append((doc, cite))
return hits
# ---- CLI ----
def main():
ap = argparse.ArgumentParser(description="MaterialMind RAG")
ap.add_argument("--rebuild", action="store_true")
ap.add_argument("--update", action="store_true")
ap.add_argument("--ask", type=str)
ap.add_argument("--k", type=int, default=DEFAULT_TOPK)
args = ap.parse_args()
ensure_dirs()
if args.rebuild:
total = build_index()
print(f"[BUILD] Indexed {total} chunks from {DATA_DIR}")
if args.update:
update_index()
if args.ask:
hits = search(args.ask, k=args.k)
if not hits:
print("No results. Add PDFs to DATA_DIR and --rebuild.")
else:
for i, (text, cite) in enumerate(hits, 1):
print(f"[{i}] {cite}\n{textwrap.shorten(text.replace(chr(10),' '), 600, placeholder=' ...')}\n")
if not any([args.rebuild, args.update, args.ask]):
print(f"DATA_DIR: {DATA_DIR}\nDB_DIR: {DB_DIR}\nUsage: --rebuild | --update | --ask \"question\"")
if __name__ == "__main__":
main()