__import__('pysqlite3') import sys sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') import streamlit as st from langchain_community.document_loaders import PyPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_google_genai import ChatGoogleGenerativeAI from sklearn.metrics.pairwise import cosine_similarity import numpy as np import tempfile import chromadb import torch import time import os st.set_page_config( page_title="RAG Research Assistant", page_icon="⬡", layout="wide", initial_sidebar_state="expanded" ) st.markdown(""" """, unsafe_allow_html=True) # ── Sidebar ─────────────────────────────────────────────────── with st.sidebar: st.markdown('', unsafe_allow_html=True) st.markdown("---") st.markdown('', unsafe_allow_html=True) use_own_key = st.toggle("Use my own API key", value=False) if use_own_key: api_key = st.text_input( "Google AI API Key", type="password", placeholder="AIza..", help="Get free key at aistudio.google.com" ) else: try: from google.colab import userdata DEFAULT_API_KEY = userdata.get('GOOGLE_API_KEY') except: from dotenv import load_dotenv load_dotenv() DEFAULT_API_KEY = os.getenv("GOOGLE_API_KEY", "") api_key = DEFAULT_API_KEY if api_key: st.markdown( '

✓ Using default API key

', unsafe_allow_html=True ) else: st.markdown( '

⚠ No API key found. Add it in sidebar or .env

', unsafe_allow_html=True ) st.markdown("---") st.markdown('', unsafe_allow_html=True) use_custom_model = st.toggle("Use custom model", value=False) if use_custom_model: model_choice = st.text_input( "Model name", placeholder="gemini-1.5-pro, gemini-1.5-flash, gemini-3.1-flash-lite-preview...", help="Enter exact model string from Google AI Studio" ) st.markdown( '

Find model names at aistudio.google.com

', unsafe_allow_html=True ) else: model_choice = st.selectbox( "Gemini Model", ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-3.1-flash-lite-preview"], index=0 ) top_k = st.slider("Chunks to retrieve (k)", 3, 8, 5) chunk_size = st.slider("Chunk size (tokens)", 256, 1024, 512, step=128) st.markdown("---") st.markdown('', unsafe_allow_html=True) st.markdown("""

Multi-document RAG with
semantic retrieval, source
citations & quality evaluation.

Built by Aneeb Naqvi

""", unsafe_allow_html=True) # ── Header ──────────────────────────────────────────────────── st.markdown("""

Research Assistant

// semantic search · source citations · retrieval evaluation

""", unsafe_allow_html=True) # ── Model loader ────────────────────────────────────────────── @st.cache_resource def load_embedding_model(): return HuggingFaceEmbeddings( model_name="all-MiniLM-L6-v2", model_kwargs={"device": "cuda" if torch.cuda.is_available() else "cpu"} ) def get_llm(api_key, model): return ChatGoogleGenerativeAI( model=model, google_api_key=api_key, temperature=0.3 ) # ── Processing ──────────────────────────────────────────────── def process_pdfs(uploaded_files, embedding_model, chunk_size, log_placeholder): all_chunks = [] logs = [] def update_log(msg, level="info"): tag = {"ok": "log-ok", "info": "log-info", "warn": "log-warn", "dim": "log-dim"}.get(level, "log-info") logs.append(f'
{msg}
') log_placeholder.markdown( f'
{"".join(logs)}
', unsafe_allow_html=True ) update_log("// initializing document pipeline", "dim") time.sleep(0.3) for uploaded_file in uploaded_files: update_log(f"→ loading [{uploaded_file.name}]", "info") time.sleep(0.2) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f: f.write(uploaded_file.read()) temp_path = f.name loader = PyPDFLoader(temp_path) documents = loader.load() update_log(f" pages extracted: {len(documents)}", "dim") splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=50) chunks = splitter.split_documents(documents) all_chunks.extend(chunks) update_log(f" chunks created: {len(chunks)}", "dim") update_log(f"→ embedding {len(all_chunks)} chunks into vector space", "info") time.sleep(0.3) client = chromadb.EphemeralClient() vectorstore = Chroma.from_documents( documents=all_chunks, embedding=embedding_model, client=client, collection_name="rag_docs" ) update_log(f"✓ vectorstore ready — {vectorstore._collection.count()} vectors indexed", "ok") update_log("// system ready for queries", "dim") return vectorstore # ── RAG pipeline ────────────────────────────────────────────── def answer_question(query, vectorstore, llm, k): retrieved_docs = vectorstore.similarity_search(query, k=k) context = "" sources = [] contexts = [] for doc in retrieved_docs: context += doc.page_content + "\n\n" contexts.append(doc.page_content) sources.append({ "source": doc.metadata.get('source', 'unknown'), "page": doc.metadata.get('page', 0) + 1 }) prompt = f"""Answer the question based only on the context below. Be specific and detailed. If not in context, say "I don't know". Context: {context} Question: {query} Answer:""" response = llm.invoke(prompt) if isinstance(response.content, list): answer = " ".join([b['text'] for b in response.content if b.get('type') == 'text']) else: answer = response.content return {"answer": answer, "sources": sources, "contexts": contexts} # ── Evaluator ───────────────────────────────────────────────── def evaluate_rag(query, result, embedding_model): answer = result['answer'] contexts = result['contexts'] answer_words = set(answer.lower().split()) context_words = set(" ".join(contexts).lower().split()) grounding = len(answer_words & context_words) / len(answer_words) if answer_words else 0 query_vec = embedding_model.embed_query(query) chunk_vecs = embedding_model.embed_documents(contexts) sims = cosine_similarity([query_vec], chunk_vecs)[0] retrieval_relevance = float(np.mean(sims)) completeness = len(answer_words & context_words) / len(context_words) if context_words else 0 return { "grounding": round(grounding, 2), "relevance": round(retrieval_relevance, 2), "completeness": round(completeness, 2) } def score_color(val): if val >= 0.7: return "score-high" if val >= 0.4: return "score-mid" return "score-low" # ── Main UI ─────────────────────────────────────────────────── embedding_model = load_embedding_model() col_upload, col_query = st.columns([1, 1], gap="large") with col_upload: st.markdown('', unsafe_allow_html=True) uploaded_files = st.file_uploader( "Drop PDF files here", type="pdf", accept_multiple_files=True, label_visibility="collapsed" ) log_placeholder = st.empty() log_placeholder.markdown( '
// awaiting documents...
', unsafe_allow_html=True ) if uploaded_files: if st.button("⬡ Process Documents", use_container_width=True): if not api_key: st.error("Add your API key in the sidebar first.") elif use_custom_model and not model_choice: st.error("Enter a model name in the sidebar.") else: vectorstore = process_pdfs( uploaded_files, embedding_model, chunk_size, log_placeholder ) st.session_state.vectorstore = vectorstore st.session_state.llm = get_llm(api_key, model_choice) with col_query: st.markdown('', unsafe_allow_html=True) query = st.text_input( "Query", placeholder="What does this document say about...", label_visibility="collapsed" ) if query and "vectorstore" in st.session_state: with st.spinner(""): result = answer_question(query, st.session_state.vectorstore, st.session_state.llm, top_k) scores = evaluate_rag(query, result, embedding_model) st.markdown('', unsafe_allow_html=True) st.markdown(f'
{result["answer"]}
', unsafe_allow_html=True) st.markdown('', unsafe_allow_html=True) sources_html = "" seen = set() for s in result['sources']: key = f"{s['source']}:p{s['page']}" if key not in seen: seen.add(key) name = s['source'].split('/')[-1] sources_html += f'📄 {name} · p{s["page"]}' st.markdown(sources_html, unsafe_allow_html=True) st.markdown('', unsafe_allow_html=True) st.markdown(f"""
Grounding
{scores['grounding']}
Relevance
{scores['relevance']}
Completeness
{scores['completeness']}
""", unsafe_allow_html=True) elif query and "vectorstore" not in st.session_state: st.warning("Upload and process documents first.")