Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Dynamic RAG Database Updater | |
| Processes PDFs in memory and updates the vector database. | |
| No PDFs, OCR text, or intermediate files are stored. | |
| """ | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import pickle | |
| from datetime import datetime | |
| # PDF processing | |
| from pdf2image import convert_from_path | |
| # OCR | |
| import pytesseract | |
| # Embeddings | |
| from sentence_transformers import SentenceTransformer | |
| # FAISS | |
| import faiss | |
| class DynamicRAGUpdater: | |
| """ | |
| Handles dynamic updates to RAG database: | |
| 1. PDF upload (temporary path) | |
| 2. OCR extraction (in memory) | |
| 3. Chunking | |
| 4. Embedding generation | |
| 5. FAISS + metadata update ONLY | |
| """ | |
| def __init__( | |
| self, | |
| vector_db_path: str, | |
| embedding_model: str = "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext", | |
| ): | |
| self.vector_db_path = Path(vector_db_path) | |
| print("Using Tesseract OCR (in-memory only)") | |
| self.embedding_model = SentenceTransformer( | |
| embedding_model, | |
| device="cpu", | |
| ) | |
| self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() | |
| self.load_database() | |
| # -------------------------------------------------- | |
| # Load / Save database | |
| # -------------------------------------------------- | |
| def load_database(self): | |
| index_file = self.vector_db_path / "faiss.index" | |
| metadata_file = self.vector_db_path / "metadata.pkl" | |
| self.faiss_index = faiss.read_index(str(index_file)) | |
| with open(metadata_file, "rb") as f: | |
| data = pickle.load(f) | |
| self.chunks = data["chunks"] | |
| self.chunk_id_to_idx = data["chunk_id_to_idx"] | |
| def save_database(self): | |
| faiss.write_index(self.faiss_index, str(self.vector_db_path / "faiss.index")) | |
| with open(self.vector_db_path / "metadata.pkl", "wb") as f: | |
| pickle.dump( | |
| { | |
| "chunks": self.chunks, | |
| "chunk_id_to_idx": self.chunk_id_to_idx, | |
| "embedding_dim": self.embedding_dim, | |
| "model": "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext", | |
| }, | |
| f, | |
| ) | |
| # -------------------------------------------------- | |
| # OCR (in memory) | |
| # -------------------------------------------------- | |
| def extract_text_from_pdf(self, pdf_path: str) -> str: | |
| try: | |
| images = convert_from_path(pdf_path, dpi=300) | |
| except Exception as e: | |
| raise RuntimeError( | |
| "PDF conversion failed. Ensure Poppler is installed." | |
| ) from e | |
| pages = [] | |
| for page_num, image in enumerate(images, 1): | |
| text = pytesseract.image_to_string( | |
| image, | |
| lang="eng", | |
| config="--oem 3 --psm 6", | |
| ) | |
| pages.append( | |
| f"\n{'=' * 40}\nPAGE {page_num}\n{'=' * 40}\n{text}" | |
| ) | |
| return "\n".join(pages) | |
| # -------------------------------------------------- | |
| # Chunking | |
| # -------------------------------------------------- | |
| def chunk_text(self, text: str, chunk_size: int = 512) -> List[str]: | |
| sentences = text.split(". ") | |
| chunks, current, length = [], [], 0 | |
| for s in sentences: | |
| s = s.strip() | |
| if not s: | |
| continue | |
| s += ". " | |
| if length + len(s) > chunk_size and current: | |
| chunks.append("".join(current)) | |
| current = [s] | |
| length = len(s) | |
| else: | |
| current.append(s) | |
| length += len(s) | |
| if current: | |
| chunks.append("".join(current)) | |
| return chunks | |
| # -------------------------------------------------- | |
| # Embeddings | |
| # -------------------------------------------------- | |
| def generate_embeddings(self, chunks: List[str]) -> np.ndarray: | |
| return self.embedding_model.encode( | |
| chunks, | |
| batch_size=32, | |
| convert_to_numpy=True, | |
| show_progress_bar=True, | |
| ) | |
| # -------------------------------------------------- | |
| # FAISS update | |
| # -------------------------------------------------- | |
| def add_to_database( | |
| self, | |
| embeddings: np.ndarray, | |
| chunks: List[str], | |
| filename: str, | |
| ) -> int: | |
| start_idx = self.faiss_index.ntotal | |
| self.faiss_index.add(embeddings.astype("float32")) | |
| for i, text in enumerate(chunks): | |
| meta = { | |
| "chunk_id": start_idx + i, | |
| "text": text, | |
| "filename": filename, | |
| "upload_date": datetime.now().isoformat(), | |
| "source": "user_upload", | |
| } | |
| self.chunks.append(meta) | |
| self.chunk_id_to_idx[f"{filename}_{i}"] = start_idx + i | |
| return len(embeddings) | |
| # -------------------------------------------------- | |
| # Full pipeline (NO FILE STORAGE) | |
| # -------------------------------------------------- | |
| def process_and_add_pdf(self, pdf_path: str) -> Dict: | |
| start = datetime.now() | |
| filename = Path(pdf_path).stem | |
| # All steps are in memory | |
| text = self.extract_text_from_pdf(pdf_path) | |
| chunks = self.chunk_text(text) | |
| embeddings = self.generate_embeddings(chunks) | |
| vectors_added = self.add_to_database(embeddings, chunks, filename) | |
| self.save_database() | |
| return { | |
| "filename": filename, | |
| "num_chunks": len(chunks), | |
| "vectors_added": vectors_added, | |
| "total_vectors": self.faiss_index.ntotal, | |
| "processing_time_seconds": (datetime.now() - start).total_seconds(), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |