__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
import streamlit as st
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import tempfile
import chromadb
import torch
import time
import os
st.set_page_config(
page_title="RAG Research Assistant",
page_icon="⬡",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
""", unsafe_allow_html=True)
# ── Sidebar ───────────────────────────────────────────────────
with st.sidebar:
st.markdown('
', unsafe_allow_html=True)
st.markdown("---")
st.markdown('', unsafe_allow_html=True)
use_own_key = st.toggle("Use my own API key", value=False)
if use_own_key:
api_key = st.text_input(
"Google AI API Key",
type="password",
placeholder="AIza..",
help="Get free key at aistudio.google.com"
)
else:
try:
from google.colab import userdata
DEFAULT_API_KEY = userdata.get('GOOGLE_API_KEY')
except:
from dotenv import load_dotenv
load_dotenv()
DEFAULT_API_KEY = os.getenv("GOOGLE_API_KEY", "")
api_key = DEFAULT_API_KEY
if api_key:
st.markdown(
'✓ Using default API key
',
unsafe_allow_html=True
)
else:
st.markdown(
'⚠ No API key found. Add it in sidebar or .env
',
unsafe_allow_html=True
)
st.markdown("---")
st.markdown('', unsafe_allow_html=True)
use_custom_model = st.toggle("Use custom model", value=False)
if use_custom_model:
model_choice = st.text_input(
"Model name",
placeholder="gemini-1.5-pro, gemini-1.5-flash, gemini-3.1-flash-lite-preview...",
help="Enter exact model string from Google AI Studio"
)
st.markdown(
'Find model names at aistudio.google.com
',
unsafe_allow_html=True
)
else:
model_choice = st.selectbox(
"Gemini Model",
["gemini-1.5-flash", "gemini-1.5-pro", "gemini-3.1-flash-lite-preview"],
index=0
)
top_k = st.slider("Chunks to retrieve (k)", 3, 8, 5)
chunk_size = st.slider("Chunk size (tokens)", 256, 1024, 512, step=128)
st.markdown("---")
st.markdown('', unsafe_allow_html=True)
st.markdown("""
Multi-document RAG with
semantic retrieval, source
citations & quality evaluation.
Built by Aneeb Naqvi
""", unsafe_allow_html=True)
# ── Header ────────────────────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ── Model loader ──────────────────────────────────────────────
@st.cache_resource
def load_embedding_model():
return HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2",
model_kwargs={"device": "cuda" if torch.cuda.is_available() else "cpu"}
)
def get_llm(api_key, model):
return ChatGoogleGenerativeAI(
model=model,
google_api_key=api_key,
temperature=0.3
)
# ── Processing ────────────────────────────────────────────────
def process_pdfs(uploaded_files, embedding_model, chunk_size, log_placeholder):
all_chunks = []
logs = []
def update_log(msg, level="info"):
tag = {"ok": "log-ok", "info": "log-info", "warn": "log-warn", "dim": "log-dim"}.get(level, "log-info")
logs.append(f'{msg}
')
log_placeholder.markdown(
f'{"".join(logs)}
',
unsafe_allow_html=True
)
update_log("// initializing document pipeline", "dim")
time.sleep(0.3)
for uploaded_file in uploaded_files:
update_log(f"→ loading [{uploaded_file.name}]", "info")
time.sleep(0.2)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f:
f.write(uploaded_file.read())
temp_path = f.name
loader = PyPDFLoader(temp_path)
documents = loader.load()
update_log(f" pages extracted: {len(documents)}", "dim")
splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=50)
chunks = splitter.split_documents(documents)
all_chunks.extend(chunks)
update_log(f" chunks created: {len(chunks)}", "dim")
update_log(f"→ embedding {len(all_chunks)} chunks into vector space", "info")
time.sleep(0.3)
client = chromadb.EphemeralClient()
vectorstore = Chroma.from_documents(
documents=all_chunks,
embedding=embedding_model,
client=client,
collection_name="rag_docs"
)
update_log(f"✓ vectorstore ready — {vectorstore._collection.count()} vectors indexed", "ok")
update_log("// system ready for queries", "dim")
return vectorstore
# ── RAG pipeline ──────────────────────────────────────────────
def answer_question(query, vectorstore, llm, k):
retrieved_docs = vectorstore.similarity_search(query, k=k)
context = ""
sources = []
contexts = []
for doc in retrieved_docs:
context += doc.page_content + "\n\n"
contexts.append(doc.page_content)
sources.append({
"source": doc.metadata.get('source', 'unknown'),
"page": doc.metadata.get('page', 0) + 1
})
prompt = f"""Answer the question based only on the context below.
Be specific and detailed. If not in context, say "I don't know".
Context:
{context}
Question: {query}
Answer:"""
response = llm.invoke(prompt)
if isinstance(response.content, list):
answer = " ".join([b['text'] for b in response.content if b.get('type') == 'text'])
else:
answer = response.content
return {"answer": answer, "sources": sources, "contexts": contexts}
# ── Evaluator ─────────────────────────────────────────────────
def evaluate_rag(query, result, embedding_model):
answer = result['answer']
contexts = result['contexts']
answer_words = set(answer.lower().split())
context_words = set(" ".join(contexts).lower().split())
grounding = len(answer_words & context_words) / len(answer_words) if answer_words else 0
query_vec = embedding_model.embed_query(query)
chunk_vecs = embedding_model.embed_documents(contexts)
sims = cosine_similarity([query_vec], chunk_vecs)[0]
retrieval_relevance = float(np.mean(sims))
completeness = len(answer_words & context_words) / len(context_words) if context_words else 0
return {
"grounding": round(grounding, 2),
"relevance": round(retrieval_relevance, 2),
"completeness": round(completeness, 2)
}
def score_color(val):
if val >= 0.7: return "score-high"
if val >= 0.4: return "score-mid"
return "score-low"
# ── Main UI ───────────────────────────────────────────────────
embedding_model = load_embedding_model()
col_upload, col_query = st.columns([1, 1], gap="large")
with col_upload:
st.markdown('', unsafe_allow_html=True)
uploaded_files = st.file_uploader(
"Drop PDF files here",
type="pdf",
accept_multiple_files=True,
label_visibility="collapsed"
)
log_placeholder = st.empty()
log_placeholder.markdown(
'',
unsafe_allow_html=True
)
if uploaded_files:
if st.button("⬡ Process Documents", use_container_width=True):
if not api_key:
st.error("Add your API key in the sidebar first.")
elif use_custom_model and not model_choice:
st.error("Enter a model name in the sidebar.")
else:
vectorstore = process_pdfs(
uploaded_files, embedding_model, chunk_size, log_placeholder
)
st.session_state.vectorstore = vectorstore
st.session_state.llm = get_llm(api_key, model_choice)
with col_query:
st.markdown('', unsafe_allow_html=True)
query = st.text_input(
"Query",
placeholder="What does this document say about...",
label_visibility="collapsed"
)
if query and "vectorstore" in st.session_state:
with st.spinner(""):
result = answer_question(query, st.session_state.vectorstore, st.session_state.llm, top_k)
scores = evaluate_rag(query, result, embedding_model)
st.markdown('', unsafe_allow_html=True)
st.markdown(f'{result["answer"]}
', unsafe_allow_html=True)
st.markdown('', unsafe_allow_html=True)
sources_html = ""
seen = set()
for s in result['sources']:
key = f"{s['source']}:p{s['page']}"
if key not in seen:
seen.add(key)
name = s['source'].split('/')[-1]
sources_html += f'📄 {name} · p{s["page"]}'
st.markdown(sources_html, unsafe_allow_html=True)
st.markdown('', unsafe_allow_html=True)
st.markdown(f"""
Grounding
{scores['grounding']}
Relevance
{scores['relevance']}
Completeness
{scores['completeness']}
""", unsafe_allow_html=True)
elif query and "vectorstore" not in st.session_state:
st.warning("Upload and process documents first.")