File size: 8,649 Bytes
87344df
 
 
45620de
 
 
2e63e98
87344df
 
 
 
 
2e63e98
87344df
 
 
 
 
45620de
 
 
87344df
 
 
 
 
 
 
 
 
45620de
 
 
 
 
 
 
 
 
 
 
87344df
 
 
 
 
 
 
 
 
 
 
 
 
 
45620de
87344df
 
 
 
 
 
 
 
 
 
 
 
 
45620de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87344df
45620de
87344df
 
45620de
87344df
 
 
 
 
2e63e98
87344df
 
 
 
 
 
 
 
 
 
 
45620de
87344df
 
2e63e98
87344df
45620de
 
87344df
 
45620de
87344df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45620de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87344df
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""RAG layer: load corpus, chunk, embed, and retrieve."""

import os
import shutil
import tempfile
import zipfile

import chromadb
from sentence_transformers import SentenceTransformer

CORPUS_DIR = os.environ.get("CORPUS_DIR", "corpus")
CHROMA_DIR = os.environ.get("CHROMA_DIR", "chroma_data")
CHUNK_SIZE = 500   # approximate token count (words used as proxy)
CHUNK_OVERLAP = 50
TOP_K = 3

_model: SentenceTransformer | None = None
_collection: chromadb.Collection | None = None
_client: chromadb.ClientAPI | None = None

SUPPORTED_EXTENSIONS = {".txt", ".pdf", ".pptx", ".ppt"}


def _get_model() -> SentenceTransformer:
    global _model
    if _model is None:
        _model = SentenceTransformer("all-MiniLM-L6-v2")
    return _model


def _get_client() -> chromadb.ClientAPI:
    global _client
    if _client is None:
        _client = chromadb.Client(chromadb.config.Settings(
            persist_directory=CHROMA_DIR,
            anonymized_telemetry=False,
            is_persistent=True,
        ))
    return _client


def _approximate_token_split(text: str, size: int, overlap: int) -> list[str]:
    """Split text into chunks of approximately `size` words with `overlap`."""
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = start + size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start = end - overlap
    return chunks


def _read_txt(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()


def _read_pdf(path: str) -> str:
    try:
        from PyPDF2 import PdfReader
        reader = PdfReader(path)
        pages = [page.extract_text() or "" for page in reader.pages]
        return "\n".join(pages)
    except Exception:
        return ""


def _read_pptx(path: str) -> str:
    try:
        from pptx import Presentation
        prs = Presentation(path)
        texts = []
        for slide in prs.slides:
            for shape in slide.shapes:
                if shape.has_text_frame:
                    for para in shape.text_frame.paragraphs:
                        text = para.text.strip()
                        if text:
                            texts.append(text)
        return "\n".join(texts)
    except Exception:
        return ""


def _read_file(path: str) -> str:
    """Read a file based on its extension."""
    lower = path.lower()
    if lower.endswith(".txt"):
        return _read_txt(path)
    elif lower.endswith(".pdf"):
        return _read_pdf(path)
    elif lower.endswith((".pptx", ".ppt")):
        return _read_pptx(path)
    return ""


def _extract_zip(zip_bytes: bytes) -> list[tuple[str, bytes]]:
    """Extract supported files from a ZIP archive. Returns list of (filename, content)."""
    results = []
    with tempfile.TemporaryDirectory() as tmpdir:
        zip_path = os.path.join(tmpdir, "archive.zip")
        with open(zip_path, "wb") as f:
            f.write(zip_bytes)

        with zipfile.ZipFile(zip_path, "r") as zf:
            zf.extractall(tmpdir)

        for root, dirs, files in os.walk(tmpdir):
            # Skip __MACOSX and hidden directories
            dirs[:] = [d for d in dirs if not d.startswith((".", "__"))]
            for fname in files:
                if fname.startswith("."):
                    continue
                ext = os.path.splitext(fname)[1].lower()
                if ext in SUPPORTED_EXTENSIONS:
                    fpath = os.path.join(root, fname)
                    with open(fpath, "rb") as f:
                        results.append((fname, f.read()))
    return results


def load_corpus() -> None:
    """Load all supported files from corpus, chunk, embed, store in ChromaDB."""
    global _collection

    client = _get_client()

    try:
        client.delete_collection("corpus")
    except Exception:
        pass

    _collection = client.create_collection(
        name="corpus",
        metadata={"hnsw:space": "cosine"},
    )

    model = _get_model()
    all_chunks: list[str] = []
    all_ids: list[str] = []
    all_meta: list[dict] = []

    if not os.path.isdir(CORPUS_DIR):
        os.makedirs(CORPUS_DIR, exist_ok=True)
        return

    for filename in sorted(os.listdir(CORPUS_DIR)):
        filepath = os.path.join(CORPUS_DIR, filename)
        ext = os.path.splitext(filename)[1].lower()
        if ext not in SUPPORTED_EXTENSIONS:
            continue

        text = _read_file(filepath)
        if not text.strip():
            continue

        chunks = _approximate_token_split(text, CHUNK_SIZE, CHUNK_OVERLAP)
        for i, chunk in enumerate(chunks):
            chunk_id = f"{filename}_{i}"
            all_chunks.append(chunk)
            all_ids.append(chunk_id)
            all_meta.append({"source": filename, "chunk_index": i})

    if all_chunks:
        embeddings = model.encode(all_chunks).tolist()
        _collection.add(
            ids=all_ids,
            embeddings=embeddings,
            documents=all_chunks,
            metadatas=all_meta,
        )


def _add_single_file(filename: str, file_bytes: bytes) -> dict:
    """Process a single file: save to corpus and embed."""
    global _collection

    os.makedirs(CORPUS_DIR, exist_ok=True)
    filepath = os.path.join(CORPUS_DIR, filename)

    with open(filepath, "wb") as f:
        f.write(file_bytes)

    text = _read_file(filepath)
    if not text.strip():
        os.remove(filepath)
        return {"filename": filename, "status": "error", "message": "Texte non extractible"}

    chunks = _approximate_token_split(text, CHUNK_SIZE, CHUNK_OVERLAP)
    model = _get_model()

    if _collection is None:
        load_corpus()
        return {"filename": filename, "status": "ok", "chunks": len(chunks)}

    # Remove old chunks from same file if re-uploading
    try:
        existing = _collection.get(where={"source": filename})
        if existing["ids"]:
            _collection.delete(ids=existing["ids"])
    except Exception:
        pass

    chunk_ids = [f"{filename}_{i}" for i in range(len(chunks))]
    metas = [{"source": filename, "chunk_index": i} for i in range(len(chunks))]
    embeddings = model.encode(chunks).tolist()

    _collection.add(
        ids=chunk_ids,
        embeddings=embeddings,
        documents=chunks,
        metadatas=metas,
    )

    return {"filename": filename, "status": "ok", "chunks": len(chunks)}


def add_documents(files: list[tuple[str, bytes]]) -> list[dict]:
    """Add one or more uploaded files. Handles ZIP extraction automatically."""
    results = []
    for filename, file_bytes in files:
        if filename.lower().endswith(".zip"):
            extracted = _extract_zip(file_bytes)
            if not extracted:
                results.append({"filename": filename, "status": "error",
                                "message": "Aucun fichier supporte trouve dans le ZIP"})
                continue
            for inner_name, inner_bytes in extracted:
                results.append(_add_single_file(inner_name, inner_bytes))
        else:
            results.append(_add_single_file(filename, file_bytes))
    return results


def list_documents() -> list[dict]:
    """List all documents in the corpus directory."""
    docs = []
    if not os.path.isdir(CORPUS_DIR):
        return docs
    for filename in sorted(os.listdir(CORPUS_DIR)):
        ext = os.path.splitext(filename)[1].lower()
        if ext in SUPPORTED_EXTENSIONS:
            filepath = os.path.join(CORPUS_DIR, filename)
            size = os.path.getsize(filepath)
            docs.append({"filename": filename, "size": size})
    return docs


def delete_document(filename: str) -> bool:
    """Delete a document from corpus and its embeddings."""
    global _collection
    filepath = os.path.join(CORPUS_DIR, filename)
    if not os.path.isfile(filepath):
        return False

    os.remove(filepath)

    if _collection is not None:
        try:
            existing = _collection.get(where={"source": filename})
            if existing["ids"]:
                _collection.delete(ids=existing["ids"])
        except Exception:
            pass

    return True


def retrieve(query: str, top_k: int = TOP_K) -> list[str]:
    """Retrieve the top_k most relevant chunks for a query."""
    if _collection is None or _collection.count() == 0:
        return []

    model = _get_model()
    query_embedding = model.encode([query]).tolist()
    results = _collection.query(
        query_embeddings=query_embedding,
        n_results=min(top_k, _collection.count()),
    )
    return results["documents"][0] if results["documents"] else []