#!/usr/bin/env python3 """ Dynamic RAG Database Updater Processes new PDFs and updates the vector database in real-time """ import os import json import numpy as np from pathlib import Path from typing import List, Dict import pickle from datetime import datetime # PDF processing import fitz # OCR (CPU optimized) from paddleocr import PaddleOCR # Embeddings from sentence_transformers import SentenceTransformer # FAISS (CPU) import faiss class DynamicRAGUpdater: """ Handles dynamic updates to RAG database: 1. Upload PDF 2. OCR extraction (PaddleOCR CPU) 3. Generate embeddings (BiomedBERT) 4. Update FAISS index 5. Update metadata """ def __init__( self, vector_db_path: str, embedding_model: str = "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext", upload_dir: str = "uploaded_reports" ): self.vector_db_path = Path(vector_db_path) self.upload_dir = Path(upload_dir) self.upload_dir.mkdir(exist_ok=True) self.ocr_dir = self.upload_dir / "ocr_text" self.embeddings_dir = self.upload_dir / "embeddings" self.ocr_dir.mkdir(exist_ok=True) self.embeddings_dir.mkdir(exist_ok=True) # PaddleOCR (explicit CPU mode) self.ocr = PaddleOCR( use_angle_cls=True, lang="en", cpu_threads=4, enable_mkldnn=True ) # BiomedBERT only self.embedding_model = SentenceTransformer( embedding_model, device="cpu" ) self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() self.load_database() def load_database(self): index_file = self.vector_db_path / "faiss.index" metadata_file = self.vector_db_path / "metadata.pkl" self.faiss_index = faiss.read_index(str(index_file)) with open(metadata_file, "rb") as f: data = pickle.load(f) self.chunks = data["chunks"] self.chunk_id_to_idx = data.get("chunk_id_to_idx", {}) def save_database(self): faiss.write_index(self.faiss_index, str(self.vector_db_path / "faiss.index")) with open(self.vector_db_path / "metadata.pkl", "wb") as f: pickle.dump( { "chunks": self.chunks, "chunk_id_to_idx": self.chunk_id_to_idx, "embedding_dim": self.embedding_dim, "model": "microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext" }, f ) def extract_text_from_pdf(self, pdf_path: str) -> str: doc = fitz.open(pdf_path) full_text = [] for page_num in range(len(doc)): page = doc.load_page(page_num) pix = page.get_pixmap(dpi=300, alpha=False) image_np = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n) ocr_result = self.ocr.ocr(image_np) page_text = [] if ocr_result and ocr_result[0]: for line in ocr_result[0]: page_text.append(line[1][0]) full_text.append( f"\n{'='*50}\nPAGE {page_num + 1}\n{'='*50}\n" + "\n".join(page_text) ) return "\n".join(full_text) def chunk_text(self, text: str, chunk_size: int = 512) -> List[str]: sentences = text.split(". ") chunks = [] current = [] length = 0 for s in sentences: s = s.strip() if not s: continue s = s + ". " if length + len(s) > chunk_size and current: chunks.append("".join(current)) current = [s] length = len(s) else: current.append(s) length += len(s) if current: chunks.append("".join(current)) return chunks def generate_embeddings(self, chunks: List[str]) -> np.ndarray: return self.embedding_model.encode( chunks, batch_size=32, convert_to_numpy=True, show_progress_bar=True ) def add_to_database( self, embeddings: np.ndarray, chunks: List[str], filename: str ) -> int: start_idx = self.faiss_index.ntotal self.faiss_index.add(embeddings.astype("float32")) for i, text in enumerate(chunks): meta = { "chunk_id": start_idx + i, "text": text, "filename": filename, "upload_date": datetime.now().isoformat(), "source": "user_upload" } self.chunks.append(meta) self.chunk_id_to_idx[f"{filename}_{i}"] = start_idx + i return len(embeddings) def process_and_add_pdf(self, pdf_path: str) -> Dict: start = datetime.now() filename = Path(pdf_path).stem text = self.extract_text_from_pdf(pdf_path) (self.ocr_dir / f"{filename}.txt").write_text(text, encoding="utf-8") chunks = self.chunk_text(text) embeddings = self.generate_embeddings(chunks) np.save(self.embeddings_dir / f"{filename}_embeddings.npy", embeddings) vectors_added = self.add_to_database(embeddings, chunks, filename) self.save_database() return { "filename": filename, "text_length": len(text), "num_chunks": len(chunks), "vectors_added": vectors_added, "total_vectors": self.faiss_index.ntotal, "processing_time_seconds": (datetime.now() - start).total_seconds(), "timestamp": datetime.now().isoformat() } def main(): vector_db_path = "/usr/users/3d_dimension_est/selva_sur/RAG/output/biomedbert_vector_db" updater = DynamicRAGUpdater( vector_db_path=vector_db_path, embedding_model="microsoft/BiomedNLP-BiomedBERT-base-uncased-abstract-fulltext", upload_dir="uploaded_reports" ) test_pdf = "path/to/new_report.pdf" if Path(test_pdf).exists(): stats = updater.process_and_add_pdf(test_pdf) print(json.dumps(stats, indent=2)) else: print("Test PDF not found. Update the path in main().") if __name__ == "__main__": main()