| import faiss |
| import numpy as np |
| import os |
| import pickle |
| from pypdf import PdfReader |
| from sentence_transformers import SentenceTransformer |
|
|
| |
| |
| |
| index = None |
| documents = [] |
| metadata = [] |
|
|
| |
| embedder = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
| CHUNK_SIZE = 800 |
| CHUNK_OVERLAP = 200 |
|
|
| DB_FILE_INDEX = "vector.index" |
| DB_FILE_META = "metadata.pkl" |
|
|
| |
| |
| |
| def chunk_text(text): |
| """Splits text into overlapping chunks.""" |
| chunks = [] |
| start = 0 |
| while start < len(text): |
| end = start + CHUNK_SIZE |
| chunks.append(text[start:end]) |
| start += CHUNK_SIZE - CHUNK_OVERLAP |
| return chunks |
|
|
| def save_db(): |
| global index, documents, metadata |
| if index: |
| faiss.write_index(index, DB_FILE_INDEX) |
| if documents: |
| with open(DB_FILE_META, "wb") as f: |
| pickle.dump({"documents": documents, "metadata": metadata}, f) |
| print("DEBUG: Knowledge base saved to disk.") |
|
|
| def load_db(): |
| global index, documents, metadata |
| if os.path.exists(DB_FILE_INDEX) and os.path.exists(DB_FILE_META): |
| try: |
| index = faiss.read_index(DB_FILE_INDEX) |
| with open(DB_FILE_META, "rb") as f: |
| data = pickle.load(f) |
| documents = data["documents"] |
| metadata = data["metadata"] |
| print(f"DEBUG: Loaded {len(documents)} documents from disk.") |
| except Exception as e: |
| print(f"DEBUG: Failed to load DB: {e}") |
| index = None |
| documents = [] |
| metadata = [] |
| else: |
| print("DEBUG: No existing DB found. Starting fresh.") |
|
|
| |
| load_db() |
|
|
| def clear_database(): |
| global index, documents, metadata |
| index = None |
| documents = [] |
| metadata = [] |
| |
| |
| if os.path.exists(DB_FILE_INDEX): |
| os.remove(DB_FILE_INDEX) |
| if os.path.exists(DB_FILE_META): |
| os.remove(DB_FILE_META) |
| |
| print("DEBUG: Database cleared.") |
|
|
| |
| |
| |
| def ingest_documents(files): |
| global index, documents, metadata |
|
|
| texts = [] |
| meta = [] |
|
|
| for file in files: |
| filename = file.filename |
| |
| |
| if filename.endswith(".pdf"): |
| reader = PdfReader(file.file) |
| for i, page in enumerate(reader.pages): |
| page_text = page.extract_text() |
| if page_text: |
| for chunk in chunk_text(page_text): |
| texts.append(chunk) |
| meta.append({"source": filename, "page": i + 1}) |
| |
| |
| elif filename.endswith(".txt"): |
| content = file.file.read().decode("utf-8") |
| for chunk in chunk_text(content): |
| texts.append(chunk) |
| meta.append({"source": filename, "page": "N/A"}) |
|
|
| |
| total_length = sum(len(t) for t in texts) |
| if total_length < 50: |
| raise ValueError( |
| "Extracted text is too short or empty. " |
| "If this is a PDF, it might be a scanned image without a text layer. " |
| "Please use a text-selectable PDF or a .txt file." |
| ) |
|
|
| if not texts: |
| raise ValueError("No readable text found in documents.") |
|
|
| |
| |
| |
| |
| new_embeddings = embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True) |
|
|
| if index is None: |
| |
| index = faiss.IndexFlatIP(new_embeddings.shape[1]) |
| |
| index.add(new_embeddings) |
|
|
| documents.extend(texts) |
| metadata.extend(meta) |
|
|
| save_db() |
| |
| return len(documents) |
|
|
| |
| |
| |
| def search_knowledge(query, top_k=5, min_similarity=0.3): |
| if index is None: |
| return [] |
|
|
| |
| query_vec = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True) |
| |
| |
| scores, indices = index.search(query_vec, top_k) |
|
|
| results = [] |
| print(f"DEBUG: Query: '{query}'") |
| for idx, score in zip(indices[0], scores[0]): |
| if idx == -1: continue |
| |
| print(f"DEBUG: Found chunk {idx} with score {score:.4f}") |
| |
| |
| if score > min_similarity: |
| results.append({ |
| "text": documents[idx], |
| "metadata": metadata[idx], |
| "score": float(score) |
| }) |
|
|
| return results |
|
|
| |
| |
| |
| def get_all_chunks(limit=50): |
| if not documents: |
| return [] |
|
|
| results = [] |
| |
| for text, meta in zip(documents[:limit], metadata[:limit]): |
| results.append({ |
| "text": text, |
| "metadata": meta |
| }) |
|
|
| return results |