{ "cells": [ { "cell_type": "markdown", "id": "cf8f37b5", "metadata": {}, "source": [ "## 1๏ธโฃ Install Required Packages" ] }, { "cell_type": "code", "execution_count": null, "id": "35266b5d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "โ All packages installed!\n" ] } ], "source": [ "import sys\n", "import subprocess\n", "\n", "# Install packages (works in VS Code Jupyter)\n", "packages = [\n", " 'langchain-community',\n", " 'sentence-transformers',\n", " 'transformers',\n", " 'faiss-cpu',\n", " 'pypdf',\n", " 'google-generativeai',\n", " 'langchain-huggingface',\n", " 'langchain-text-splitters',\n", " 'fastapi',\n", " 'uvicorn',\n", " 'nest-asyncio',\n", " 'gradio',\n", " 'deep-translator'\n", "]\n", "\n", "print(\"๐ฆ Installing required packages...\")\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q'] + packages)\n", "print(\"โ All packages installed!\")" ] }, { "cell_type": "markdown", "id": "b09a84be", "metadata": {}, "source": [ "## 2๏ธโฃ Setup Local Directories (Windows)" ] }, { "cell_type": "code", "execution_count": 6, "id": "760088c8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "โ Local directories created!\n", "๐ RAG Data Location: /content/rag_data\n", "๐ PDFs will be stored at: /content/rag_data/pdfs\n", "๐๏ธ FAISS index at: /content/rag_data/faiss_index\n" ] } ], "source": [ "import os\n", "\n", "# Use local directories\n", "RAG_DIR = os.path.join(os.getcwd(), 'rag_data')\n", "FAISS_PATH = os.path.join(RAG_DIR, 'faiss_index')\n", "PDFS_PATH = os.path.join(RAG_DIR, 'pdfs')\n", "\n", "os.makedirs(FAISS_PATH, exist_ok=True)\n", "os.makedirs(PDFS_PATH, exist_ok=True)\n", "\n", "print(f\"โ Local directories created!\")\n", "print(f\"๐ RAG Data Location: {RAG_DIR}\")\n", "print(f\"๐ PDFs will be stored at: {PDFS_PATH}\")\n", "print(f\"๐๏ธ FAISS index at: {FAISS_PATH}\")" ] }, { "cell_type": "markdown", "id": "888d519c", "metadata": {}, "source": [ "## 3๏ธโฃ Configure Gemini API Key" ] }, { "cell_type": "code", "execution_count": 7, "id": "8902f9ef", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "โ ๏ธ WARNING: Please set your Gemini API key above!\n" ] } ], "source": [ "import google.generativeai as genai\n", "\n", "# ๐ REPLACE WITH YOUR GEMINI API KEY\n", "# Get it from: https://makersuite.google.com/app/apikey\n", "GOOGLE_API_KEY = \"YOUR_GEMINI_API_KEY_HERE\"\n", "\n", "if GOOGLE_API_KEY == \"YOUR_GEMINI_API_KEY_HERE\":\n", " print(\"โ ๏ธ WARNING: Please set your Gemini API key above!\")\n", "else:\n", " genai.configure(api_key=GOOGLE_API_KEY)\n", " print(\"โ Gemini API configured!\")" ] }, { "cell_type": "markdown", "id": "5b250359", "metadata": {}, "source": [ "## 4๏ธโฃ RAG System Functions" ] }, { "cell_type": "code", "execution_count": 8, "id": "d292e154", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:torchao.kernel.intmm:Warning: Detected no triton, on systems without Triton certain kernels will not work\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "๐ Checking for existing RAG data...\n", "โน๏ธ No existing vector store found\n", "\n", "โ RAG System Ready!\n" ] } ], "source": [ "import unicodedata\n", "import re\n", "import shutil\n", "from typing import List, Dict, Optional\n", "from pathlib import Path\n", "from langchain_community.document_loaders.pdf import PyPDFLoader\n", "from langchain_text_splitters import RecursiveCharacterTextSplitter\n", "from langchain_huggingface import HuggingFaceEmbeddings\n", "from langchain_community.vectorstores import FAISS\n", "from deep_translator import GoogleTranslator\n", "\n", "# Global variables\n", "vectordb = None\n", "retriever = None\n", "embeddings = None\n", "rag_initialized = False\n", "uploaded_documents = []\n", "\n", "\n", "def initialize_embeddings():\n", " \"\"\"Initialize multilingual embedding model (supports English & Sinhala)\"\"\"\n", " global embeddings\n", " \n", " if embeddings is not None:\n", " return embeddings\n", " \n", " print(\"๐ฅ Loading multilingual embedding model...\")\n", " embeddings = HuggingFaceEmbeddings(\n", " model_name=\"sentence-transformers/paraphrase-multilingual-mpnet-base-v2\"\n", " )\n", " print(\"โ Embedding model loaded!\")\n", " return embeddings\n", "\n", "\n", "def clean_text(text: str) -> str:\n", " \"\"\"Clean and normalize text for embedding\"\"\"\n", " if not isinstance(text, str) or not text.strip():\n", " return \"\"\n", " \n", " normalized_text = unicodedata.normalize('NFKC', text)\n", " cleaned_chars = [\n", " char for char in normalized_text\n", " if unicodedata.category(char) not in ['So', 'Cn', 'Cc', 'Cf', 'Cs']\n", " ]\n", " cleaned_text = \"\".join(cleaned_chars)\n", " cleaned_text = re.sub(r'\\s+', ' ', cleaned_text).strip()\n", " return cleaned_text\n", "\n", "\n", "def load_and_process_pdf(pdf_path: str) -> List:\n", " \"\"\"Load PDF and split into chunks\"\"\"\n", " print(f\"๐ Loading PDF: {Path(pdf_path).name}\")\n", " \n", " loader = PyPDFLoader(pdf_path)\n", " docs = loader.load()\n", " \n", " splitter = RecursiveCharacterTextSplitter(\n", " chunk_size=300,\n", " chunk_overlap=80\n", " )\n", " chunks = splitter.split_documents(docs)\n", " \n", " print(f\" โ {len(docs)} pages โ {len(chunks)} chunks\")\n", " return chunks\n", "\n", "\n", "def create_vector_store(chunks: List) -> bool:\n", " \"\"\"Create or update FAISS vector store\"\"\"\n", " global vectordb, retriever, rag_initialized\n", " \n", " initialize_embeddings()\n", " \n", " texts = [doc.page_content for doc in chunks]\n", " metadatas = [doc.metadata for doc in chunks]\n", " \n", " processed_texts = []\n", " processed_metadatas = []\n", " \n", " for i, text in enumerate(texts):\n", " cleaned_text = clean_text(text)\n", " if cleaned_text:\n", " processed_texts.append(cleaned_text)\n", " processed_metadatas.append(metadatas[i])\n", " \n", " if not processed_texts:\n", " print(\"โ ๏ธ No valid texts after cleaning\")\n", " return False\n", " \n", " print(f\"๐ Creating embeddings for {len(processed_texts)} chunks...\")\n", " \n", " if vectordb is None:\n", " vectordb = FAISS.from_texts(processed_texts, embeddings, metadatas=processed_metadatas)\n", " else:\n", " new_vectordb = FAISS.from_texts(processed_texts, embeddings, metadatas=processed_metadatas)\n", " vectordb.merge_from(new_vectordb)\n", " \n", " retriever = vectordb.as_retriever(search_kwargs={\"k\": 4})\n", " rag_initialized = True\n", " \n", " save_vector_store()\n", " return True\n", "\n", "\n", "def save_vector_store():\n", " \"\"\"Save FAISS index to local storage\"\"\"\n", " if vectordb is None:\n", " return\n", " \n", " vectordb.save_local(FAISS_PATH)\n", " print(f\"๐พ Vector store saved locally\")\n", "\n", "\n", "def load_vector_store() -> bool:\n", " \"\"\"Load FAISS index from local storage\"\"\"\n", " global vectordb, retriever, rag_initialized, uploaded_documents\n", " \n", " index_file = os.path.join(FAISS_PATH, 'index.faiss')\n", " if not os.path.exists(index_file):\n", " print(\"โน๏ธ No existing vector store found\")\n", " return False\n", " \n", " try:\n", " initialize_embeddings()\n", " vectordb = FAISS.load_local(\n", " FAISS_PATH, \n", " embeddings,\n", " allow_dangerous_deserialization=True\n", " )\n", " retriever = vectordb.as_retriever(search_kwargs={\"k\": 4})\n", " rag_initialized = True\n", " \n", " # Load document list\n", " uploaded_documents = [f for f in os.listdir(PDFS_PATH) if f.endswith('.pdf')]\n", " \n", " print(f\"โ Loaded existing vector store\")\n", " print(f\"๐ {len(uploaded_documents)} documents found\")\n", " return True\n", " except Exception as e:\n", " print(f\"โ ๏ธ Failed to load vector store: {e}\")\n", " return False\n", "\n", "\n", "def translate_to_english(text: str) -> str:\n", " \"\"\"Translate any language to English\"\"\"\n", " try:\n", " translator = GoogleTranslator(source='auto', target='en')\n", " return translator.translate(text)\n", " except:\n", " return text # Return original if translation fails\n", "\n", "\n", "def rag_answer(question: str, relevance_threshold: float = 2.0, translate: bool = True) -> Dict:\n", " \"\"\"Answer question using RAG - check database first, fallback to Gemini\"\"\"\n", " global retriever, vectordb\n", " \n", " # Translate to English if needed\n", " original_question = question\n", " if translate:\n", " question = translate_to_english(question)\n", " \n", " result = {\n", " \"question\": original_question,\n", " \"question_english\": question,\n", " \"answer\": \"\",\n", " \"source\": \"none\",\n", " \"context_found\": False,\n", " \"relevance_score\": 0.0\n", " }\n", " \n", " if not rag_initialized or retriever is None:\n", " print(\"โ ๏ธ RAG not initialized, using Gemini\")\n", " result[\"source\"] = \"gemini\"\n", " result[\"answer\"] = ask_gemini_directly(question)\n", " return result\n", " \n", " # Search vector database\n", " docs_with_scores = vectordb.similarity_search_with_score(question, k=4)\n", " \n", " if not docs_with_scores:\n", " print(\"โ ๏ธ No documents found, using Gemini\")\n", " result[\"source\"] = \"gemini\"\n", " result[\"answer\"] = ask_gemini_directly(question)\n", " return result\n", " \n", " best_score = docs_with_scores[0][1]\n", " result[\"relevance_score\"] = float(best_score)\n", " \n", " # Check relevance threshold\n", " if best_score > relevance_threshold:\n", " print(f\"โ ๏ธ Low relevance (score: {best_score:.3f}), using Gemini\")\n", " result[\"source\"] = \"gemini\"\n", " result[\"answer\"] = ask_gemini_directly(question)\n", " return result\n", " \n", " # Good relevance - use RAG\n", " print(f\"โ Good relevance (score: {best_score:.3f}), answering from documents\")\n", " docs = [doc for doc, score in docs_with_scores]\n", " context = \"\\n\\n\".join([d.page_content for d in docs])\n", " result[\"context_found\"] = True\n", " \n", " prompt = f\"\"\"Answer the question based on the following context from PDF documents. If the context doesn't contain enough information, say \"I don't have enough information in the documents.\"\n", "\n", "Context:\n", "{context}\n", "\n", "Question: {question}\n", "\n", "Answer:\"\"\"\n", " \n", " try:\n", " model = genai.GenerativeModel(\"models/gemini-1.5-flash\")\n", " response = model.generate_content(prompt)\n", " result[\"answer\"] = response.text\n", " result[\"source\"] = \"rag\"\n", " except Exception as e:\n", " print(f\"โ RAG generation error: {e}\")\n", " result[\"answer\"] = f\"Error: {str(e)}\"\n", " result[\"source\"] = \"error\"\n", " \n", " return result\n", "\n", "\n", "def ask_gemini_directly(question: str) -> str:\n", " \"\"\"Fallback: Ask Gemini directly without RAG\"\"\"\n", " try:\n", " model = genai.GenerativeModel(\"models/gemini-1.5-flash\")\n", " response = model.generate_content(f\"Answer this question: {question}\")\n", " return response.text\n", " except Exception as e:\n", " return f\"Error: {str(e)}\"\n", "\n", "\n", "def process_uploaded_pdf(file_path: str, original_filename: str) -> str:\n", " \"\"\"Process uploaded PDF from admin panel\"\"\"\n", " try:\n", " # Copy to local storage\n", " dest_path = os.path.join(PDFS_PATH, original_filename)\n", " shutil.copy(file_path, dest_path)\n", " \n", " # Process PDF\n", " chunks = load_and_process_pdf(dest_path)\n", " \n", " if not chunks:\n", " return f\"โ Failed to extract text from {original_filename}\"\n", " \n", " # Create/update vector store\n", " success = create_vector_store(chunks)\n", " \n", " if success:\n", " if original_filename not in uploaded_documents:\n", " uploaded_documents.append(original_filename)\n", " return f\"โ Successfully processed '{original_filename}'\\n ๐ {len(chunks)} chunks created\\n ๐ Total documents: {len(uploaded_documents)}\"\n", " else:\n", " return f\"โ Failed to process {original_filename}\"\n", " \n", " except Exception as e:\n", " return f\"โ Error: {str(e)}\"\n", "\n", "\n", "def get_status() -> Dict:\n", " \"\"\"Get RAG system status\"\"\"\n", " return {\n", " \"initialized\": rag_initialized,\n", " \"documents_count\": len(uploaded_documents),\n", " \"documents\": uploaded_documents,\n", " \"has_vector_store\": vectordb is not None,\n", " \"storage_path\": PDFS_PATH\n", " }\n", "\n", "\n", "# Try to load existing data\n", "print(\"๐ Checking for existing RAG data...\")\n", "load_vector_store()\n", "\n", "print(\"\\nโ RAG System Ready!\")" ] }, { "cell_type": "markdown", "id": "bee976ec", "metadata": {}, "source": [ "## 5๏ธโฃ Admin Panel - Upload PDFs Here! ๐ค" ] }, { "cell_type": "code", "execution_count": 9, "id": "7fad545f", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/tmp/ipython-input-3459415953.py:45: DeprecationWarning: The 'theme' parameter in the Blocks constructor will be removed in Gradio 6.0. You will need to pass 'theme' to Blocks.launch() instead.\n", " with gr.Blocks(title=\"RAG Admin Panel\", theme=gr.themes.Soft()) as admin_panel:\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "๐๏ธ Launching Admin Panel...\n", "\n", "Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().\n", "Note: opening Chrome Inspector may crash demo inside Colab notebooks.\n", "* To create a public link, set `share=True` in `launch()`.\n" ] }, { "data": { "application/javascript": "(async (port, path, width, height, cache, element) => {\n if (!google.colab.kernel.accessAllowed && !cache) {\n return;\n }\n element.appendChild(document.createTextNode(''));\n const url = await google.colab.kernel.proxyPort(port, {cache});\n\n const external_link = document.createElement('div');\n external_link.innerHTML = `\n