NotebookLMClone / src /backend /ingest.py
Hitakshi26's picture
Testing2
1bfb390
import os, pathlib
import requests
from bs4 import BeautifulSoup
from pypdf import PdfReader
from pptx import Presentation
from sentence_transformers import SentenceTransformer
from src.storage.paths import nb_root, ensure_tree
from src.storage.chroma_store import get_collection
from src.utils.text import safe_name
EMBED_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def _file_path_from_gradio_obj(file_obj):
if isinstance(file_obj, str):
return file_obj
path = getattr(file_obj, "name", None)
if isinstance(path, str):
return path
return None
def simple_chunk(text: str, max_chars=2200, overlap=250):
text = "\n".join([ln.strip() for ln in (text or "").splitlines() if ln.strip()]).strip()
if not text:
return []
if len(text) <= max_chars:
return [text]
out, start = [], 0
while start < len(text):
end = min(len(text), start + max_chars)
out.append(text[start:end])
if end == len(text): break
start = max(0, end - overlap)
return out
def extract_pdf(path: str):
reader = PdfReader(path)
items = []
for i, page in enumerate(reader.pages):
txt = (page.extract_text() or "").strip()
if txt:
items.append({"text": txt, "page": i+1})
return items
def extract_pptx(path: str):
prs = Presentation(path)
items = []
for i, slide in enumerate(prs.slides):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
texts.append(shape.text)
txt = "\n".join(t.strip() for t in texts if t.strip()).strip()
if txt:
items.append({"text": txt, "slide": i+1})
return items
def extract_txt(path: str):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
txt = f.read().strip()
return [{"text": txt, "page": None}] if txt else []
def extract_url(url: str):
r = requests.get(url, timeout=15, headers={"User-Agent": "Mozilla/5.0"})
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
for tag in soup(["script","style","noscript"]):
tag.decompose()
text = soup.get_text("\n")
text = "\n".join([ln.strip() for ln in text.splitlines() if ln.strip()])
return [{"text": text[:200000], "page": None}]
def upsert_extracted(username: str, notebook_id: str, source_title: str, source_id: str, extracted_items: list[dict]) -> int:
col = get_collection(username, notebook_id)
ids, docs, metas = [], [], []
chunk_counter = 0
for item_idx, item in enumerate(extracted_items):
for j, ch in enumerate(simple_chunk(item["text"])):
ids.append(f"{source_id}::item{item_idx}::chunk{j}::{chunk_counter}")
docs.append(ch)
meta = {
"source_title": source_title,
"source_id": source_id,
"page": item.get("page"),
"slide": item.get("slide"),
}
meta = {k: v for k, v in meta.items() if v is not None}
metas.append(meta)
chunk_counter += 1
if not docs:
return 0
embs = EMBED_MODEL.encode(docs, normalize_embeddings=True).tolist()
col.upsert(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
return len(docs)
def ingest_files(username: str, notebook_id: str, files) -> int:
ensure_tree(username, notebook_id)
raw_dir = os.path.join(nb_root(username, notebook_id), "files_raw")
ex_dir = os.path.join(nb_root(username, notebook_id), "files_extracted")
added = 0
for f in (files or []):
fp = _file_path_from_gradio_obj(f)
if not fp:
continue
if not os.path.exists(fp):
continue
dest = os.path.join(raw_dir, os.path.basename(fp))
pathlib.Path(dest).write_bytes(pathlib.Path(fp).read_bytes())
ext = os.path.splitext(dest)[1].lower()
if ext == ".pdf":
extracted = extract_pdf(dest)
elif ext == ".pptx":
extracted = extract_pptx(dest)
elif ext in [".txt", ".md"]:
extracted = extract_txt(dest)
else:
continue
ex_path = os.path.join(ex_dir, os.path.basename(dest) + ".txt")
with open(ex_path, "w", encoding="utf-8") as ftxt:
for item in extracted:
loc = ""
if item.get("page"):
loc = f"page={item.get('page')}"
elif item.get("slide"):
loc = f"slide={item.get('slide')}"
ftxt.write(f"\n--- {loc} ---\n{item['text']}\n")
added += upsert_extracted(
username,
notebook_id,
os.path.basename(dest),
f"file:{os.path.basename(dest)}",
extracted,
)
return added
def ingest_url(username: str, notebook_id: str, url: str) -> int:
ensure_tree(username, notebook_id)
extracted = extract_url(url)
ex_dir = os.path.join(nb_root(username, notebook_id), "files_extracted")
fname = safe_name(url.replace("https://","").replace("http://","").replace("/","_")) + ".txt"
with open(os.path.join(ex_dir, fname), "w", encoding="utf-8") as f:
f.write(extracted[0]["text"])
return upsert_extracted(username, notebook_id, url, f"url:{url}", extracted)