File size: 5,367 Bytes
1bfb390
b4c7867
 
 
 
 
 
 
 
 
 
 
 
8593064
1bfb390
 
 
 
 
 
 
8593064
b4c7867
 
 
 
 
 
 
 
 
 
1bfb390
b4c7867
 
 
 
 
 
 
 
 
1bfb390
b4c7867
 
 
 
 
 
 
 
 
 
 
 
1bfb390
b4c7867
 
 
 
 
1bfb390
b4c7867
 
 
 
 
1bfb390
b4c7867
 
 
1bfb390
8593064
1bfb390
 
b4c7867
1bfb390
8593064
1bfb390
 
b4c7867
ca39256
1bfb390
 
 
 
ca39256
1bfb390
ca39256
1bfb390
b4c7867
 
 
 
 
 
8593064
b4c7867
 
 
 
 
8593064
1bfb390
8593064
 
 
1bfb390
 
8593064
1bfb390
 
b4c7867
 
 
 
 
 
 
 
 
 
 
1bfb390
8593064
b4c7867
8593064
1bfb390
c60446c
1bfb390
c60446c
1bfb390
b4c7867
c60446c
1bfb390
 
 
 
 
c60446c
b4c7867
 
ca39256
b4c7867
 
 
 
1bfb390
b4c7867
1bfb390
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os, pathlib
import requests
from bs4 import BeautifulSoup
from pypdf import PdfReader
from pptx import Presentation
from sentence_transformers import SentenceTransformer

from src.storage.paths import nb_root, ensure_tree
from src.storage.chroma_store import get_collection
from src.utils.text import safe_name

EMBED_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")


def _file_path_from_gradio_obj(file_obj):
    if isinstance(file_obj, str):
        return file_obj
    path = getattr(file_obj, "name", None)
    if isinstance(path, str):
        return path
    return None

def simple_chunk(text: str, max_chars=2200, overlap=250):
    text = "\n".join([ln.strip() for ln in (text or "").splitlines() if ln.strip()]).strip()
    if not text:
        return []
    if len(text) <= max_chars:
        return [text]
    out, start = [], 0
    while start < len(text):
        end = min(len(text), start + max_chars)
        out.append(text[start:end])
        if end == len(text): break
        start = max(0, end - overlap)
    return out

def extract_pdf(path: str):
    reader = PdfReader(path)
    items = []
    for i, page in enumerate(reader.pages):
        txt = (page.extract_text() or "").strip()
        if txt:
            items.append({"text": txt, "page": i+1})
    return items

def extract_pptx(path: str):
    prs = Presentation(path)
    items = []
    for i, slide in enumerate(prs.slides):
        texts = []
        for shape in slide.shapes:
            if hasattr(shape, "text") and shape.text:
                texts.append(shape.text)
        txt = "\n".join(t.strip() for t in texts if t.strip()).strip()
        if txt:
            items.append({"text": txt, "slide": i+1})
    return items

def extract_txt(path: str):
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        txt = f.read().strip()
    return [{"text": txt, "page": None}] if txt else []

def extract_url(url: str):
    r = requests.get(url, timeout=15, headers={"User-Agent": "Mozilla/5.0"})
    r.raise_for_status()
    soup = BeautifulSoup(r.text, "html.parser")
    for tag in soup(["script","style","noscript"]):
        tag.decompose()
    text = soup.get_text("\n")
    text = "\n".join([ln.strip() for ln in text.splitlines() if ln.strip()])
    return [{"text": text[:200000], "page": None}]

def upsert_extracted(username: str, notebook_id: str, source_title: str, source_id: str, extracted_items: list[dict]) -> int:
    col = get_collection(username, notebook_id)
    ids, docs, metas = [], [], []
    chunk_counter = 0
    for item_idx, item in enumerate(extracted_items):
        for j, ch in enumerate(simple_chunk(item["text"])):
            ids.append(f"{source_id}::item{item_idx}::chunk{j}::{chunk_counter}")
            docs.append(ch)
            meta = {
                "source_title": source_title,
                "source_id": source_id,
                "page": item.get("page"),
                "slide": item.get("slide"),
            }
            meta = {k: v for k, v in meta.items() if v is not None}
            metas.append(meta)
            chunk_counter += 1
    if not docs:
        return 0
    embs = EMBED_MODEL.encode(docs, normalize_embeddings=True).tolist()
    col.upsert(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
    return len(docs)

def ingest_files(username: str, notebook_id: str, files) -> int:
    ensure_tree(username, notebook_id)
    raw_dir = os.path.join(nb_root(username, notebook_id), "files_raw")
    ex_dir = os.path.join(nb_root(username, notebook_id), "files_extracted")
    added = 0

    for f in (files or []):
        fp = _file_path_from_gradio_obj(f)
        if not fp:
            continue

        if not os.path.exists(fp):
            continue

        dest = os.path.join(raw_dir, os.path.basename(fp))
        pathlib.Path(dest).write_bytes(pathlib.Path(fp).read_bytes())

        ext = os.path.splitext(dest)[1].lower()
        if ext == ".pdf":
            extracted = extract_pdf(dest)
        elif ext == ".pptx":
            extracted = extract_pptx(dest)
        elif ext in [".txt", ".md"]:
            extracted = extract_txt(dest)
        else:
            continue

        ex_path = os.path.join(ex_dir, os.path.basename(dest) + ".txt")
        with open(ex_path, "w", encoding="utf-8") as ftxt:
            for item in extracted:
                loc = ""
                if item.get("page"):
                    loc = f"page={item.get('page')}"
                elif item.get("slide"):
                    loc = f"slide={item.get('slide')}"
                ftxt.write(f"\n--- {loc} ---\n{item['text']}\n")

        added += upsert_extracted(
            username,
            notebook_id,
            os.path.basename(dest),
            f"file:{os.path.basename(dest)}",
            extracted,
        )

    return added

def ingest_url(username: str, notebook_id: str, url: str) -> int:
    ensure_tree(username, notebook_id)
    extracted = extract_url(url)
    ex_dir = os.path.join(nb_root(username, notebook_id), "files_extracted")
    fname = safe_name(url.replace("https://","").replace("http://","").replace("/","_")) + ".txt"
    with open(os.path.join(ex_dir, fname), "w", encoding="utf-8") as f:
        f.write(extracted[0]["text"])
    return upsert_extracted(username, notebook_id, url, f"url:{url}", extracted)