import streamlit as st import chromadb from sentence_transformers import SentenceTransformer import fitz # PyMuPDF import os import requests import hashlib import re from urllib.parse import urlparse, parse_qs from youtube_transcript_api import YouTubeTranscriptApi from bs4 import BeautifulSoup # ─── Page Config ────────────────────────────────────────────────────────────── st.set_page_config( page_title="RAG Assistant · Chat", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) # ─── CSS ────────────────────────────────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ─── Session State ──────────────────────────────────────────────────────────── defaults = { "indexed_sources": {}, # name → {type, chunks, meta} "chroma_collection": None, "chroma_client": None, "total_chunks": 0, "chat_history": [], # [{role, content, sources}] } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v # ─── Helpers ────────────────────────────────────────────────────────────────── @st.cache_resource(show_spinner=False) def load_embed_model(): return SentenceTransformer('all-MiniLM-L6-v2') def get_or_create_collection(): if st.session_state.chroma_client is None: st.session_state.chroma_client = chromadb.Client() st.session_state.chroma_collection = st.session_state.chroma_client.get_or_create_collection( name="rag_store", metadata={"hnsw:space": "cosine"} ) return st.session_state.chroma_collection def chunk_text(text: str, source_name: str, source_type: str, meta: dict, chunk_size: int = 400, overlap: int = 60) -> list[dict]: words = text.split() chunks = [] start = 0 while start < len(words): end = start + chunk_size chunk_str = " ".join(words[start:end]).strip() if len(chunk_str) > 60: chunks.append({"text": chunk_str, "source": source_name, "type": source_type, **meta}) start += chunk_size - overlap return chunks def index_chunks(chunks: list[dict], source_name: str, source_type: str, embed_model): collection = get_or_create_collection() texts = [c["text"] for c in chunks] embeddings = embed_model.encode(texts, batch_size=32, show_progress_bar=False).tolist() prefix = hashlib.md5(source_name.encode()).hexdigest()[:8] ids, docs, metas, embeds = [], [], [], [] for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): ids.append(f"{prefix}_chunk_{i}") docs.append(chunk["text"]) metas.append({"source": chunk["source"], "type": chunk["type"], "page": chunk.get("page", 1), "timestamp": chunk.get("timestamp", "")}) embeds.append(emb) collection.add(ids=ids, embeddings=embeds, documents=docs, metadatas=metas) st.session_state.total_chunks += len(chunks) st.session_state.indexed_sources[source_name] = { "type": source_type, "chunks": len(chunks), "meta": {k: v for k, v in chunks[0].items() if k not in ["text", "source", "type"]} } # ─── Source-specific extractors ─────────────────────────────────────────────── ## PDF def process_pdf(filename: str, pdf_bytes: bytes, embed_model): doc = fitz.open(stream=pdf_bytes, filetype="pdf") chunks = [] for page_num, page in enumerate(doc, start=1): text = page.get_text("text").strip() if text: page_chunks = chunk_text(text, filename, "pdf", {"page": page_num}) chunks.extend(page_chunks) doc.close() index_chunks(chunks, filename, "pdf", embed_model) return len(chunks) ## Web URL def process_url(url: str, embed_model): headers = {"User-Agent": "Mozilla/5.0 (compatible; RAGBot/1.0)"} r = requests.get(url, headers=headers, timeout=15) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Remove nav, footer, script, style tags for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): tag.decompose() text = soup.get_text(separator=" ", strip=True) text = re.sub(r'\s+', ' ', text).strip() if len(text) < 100: raise ValueError("Could not extract meaningful text from this URL.") parsed = urlparse(url) source_name = parsed.netloc + parsed.path[:40] chunks = chunk_text(text, source_name, "url", {"page": 1}) index_chunks(chunks, source_name, "url", embed_model) return len(chunks), source_name ## YouTube def get_youtube_id(url: str) -> str: patterns = [ r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', r'(?:embed/)([a-zA-Z0-9_-]{11})', ] for p in patterns: m = re.search(p, url) if m: return m.group(1) raise ValueError("Could not extract YouTube video ID from URL.") def process_youtube(url: str, embed_model): video_id = get_youtube_id(url) try: # New API style (youtube-transcript-api >= 0.6.0) from youtube_transcript_api import YouTubeTranscriptApi ytt = YouTubeTranscriptApi() fetched = ytt.fetch(video_id) transcript_list = [{"start": s.start, "text": s.text} for s in fetched] except Exception: # Fallback to old API style transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'en-US', 'en-GB']) chunks = [] buffer_text = "" buffer_start = None word_count = 0 for entry in transcript_list: if buffer_start is None: buffer_start = int(entry["start"]) buffer_text += " " + entry["text"] word_count += len(entry["text"].split()) if word_count >= 350: ts = f"{buffer_start//60}:{buffer_start%60:02d}" chunks.append({ "text": buffer_text.strip(), "source": f"youtube:{video_id}", "type": "youtube", "page": 1, "timestamp": ts }) buffer_text = "" buffer_start = None word_count = 0 if buffer_text.strip(): ts = f"{buffer_start//60}:{buffer_start%60:02d}" if buffer_start else "0:00" chunks.append({ "text": buffer_text.strip(), "source": f"youtube:{video_id}", "type": "youtube", "page": 1, "timestamp": ts }) if not chunks: raise ValueError("No transcript content found. The video may not have captions enabled.") index_chunks(chunks, f"youtube:{video_id}", "youtube", embed_model) return len(chunks), video_id # ─── RAG Query with Chat Memory ─────────────────────────────────────────────── def rag_query(question: str, embed_model, top_k: int, api_key: str) -> tuple[str, list]: collection = get_or_create_collection() q_emb = embed_model.encode(question).tolist() results = collection.query(query_embeddings=[q_emb], n_results=top_k) chunks = [] for i in range(len(results["documents"][0])): dist = results["distances"][0][i] meta = results["metadatas"][0][i] chunks.append({ "text": results["documents"][0][i], "source": meta["source"], "type": meta["type"], "page": meta.get("page", 1), "timestamp": meta.get("timestamp", ""), "relevance": round((1 - dist) * 100, 1), }) context = "\n\n".join([ f"[Source: {c['source']} | Type: {c['type']} | Page/Time: {c['page'] or c['timestamp']}]\n{c['text']}" for c in chunks ]) # Build conversation history for multi-turn memory history_text = "" if st.session_state.chat_history: recent = st.session_state.chat_history[-6:] # last 3 turns for msg in recent: role = "User" if msg["role"] == "user" else "Assistant" history_text += f"{role}: {msg['content']}\n" prompt = f"""You are a helpful assistant that answers questions based on indexed documents. Use ONLY the context below to answer. Be concise and conversational. Always cite your source (filename, URL, or YouTube timestamp) inline. If the answer isn't in the context, say "I couldn't find that in the indexed sources." Conversation so far: {history_text if history_text else "(This is the start of the conversation)"} Relevant context from documents: {context} User: {question} Assistant:""" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "model": "llama-3.3-70b-versatile", "messages": [{"role": "user", "content": prompt}], "max_tokens": 700, "temperature": 0.3, } r = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json=payload, timeout=30) r.raise_for_status() answer = r.json()["choices"][0]["message"]["content"] return answer, chunks # ─── Sidebar ────────────────────────────────────────────────────────────────── with st.sidebar: st.markdown("## 🤖 RAG Chat Assistant") st.markdown("
PDF · Web · YouTube → Chat
", unsafe_allow_html=True) st.markdown("---") env_key = os.environ.get("GROQ_API_KEY", "") api_key = env_key if env_key else st.text_input( "🔑 Groq API Key", type="password", placeholder="gsk_...", help="Free at console.groq.com" ) if not env_key and not api_key: st.caption("Get free key → [console.groq.com](https://console.groq.com)") st.markdown("---") st.markdown("
Indexed Sources
", unsafe_allow_html=True) if st.session_state.indexed_sources: for name, info in st.session_state.indexed_sources.items(): badge_class = f"badge-{info['type']}" icon = "📄" if info['type'] == 'pdf' else "🌐" if info['type'] == 'url' else "▶️" label = info['type'].upper() st.markdown(f"""
{icon} {name}
{info['chunks']} chunks
{label}
""", unsafe_allow_html=True) st.markdown("") col1, col2 = st.columns(2) if col1.button("🗑️ Clear index", use_container_width=True): for k in ["indexed_sources", "chroma_collection", "chroma_client", "total_chunks"]: del st.session_state[k] st.rerun() if col2.button("💬 Clear chat", use_container_width=True): st.session_state.chat_history = [] st.rerun() else: st.markdown("
Nothing indexed yet.
", unsafe_allow_html=True) st.markdown("---") st.markdown("""
Stack
📄 PDF: PyMuPDF
🌐 Web: BeautifulSoup4
▶️ YouTube: youtube-transcript-api
🔢 Embeddings: all-MiniLM-L6-v2
🗄️ Vector DB: ChromaDB
🧠 LLM: Groq · Llama 3.3 70B
""", unsafe_allow_html=True) # ─── Main UI ────────────────────────────────────────────────────────────────── st.markdown("""

🤖 RAG Chat Assistant

Index PDFs · Web pages · YouTube videos — then have a multi-turn conversation across all of them

""", unsafe_allow_html=True) with st.spinner("⚙️ Loading embedding model..."): embed_model = load_embed_model() # ════════════════════════════════════════════════════════ # INGEST PANEL # ════════════════════════════════════════════════════════ with st.expander("➕ Add a new source (PDF / Web URL / YouTube)", expanded=len(st.session_state.indexed_sources) == 0): tab_pdf, tab_url, tab_yt = st.tabs(["📄 PDF Upload", "🌐 Web URL", "▶️ YouTube"]) # ── PDF Tab ── with tab_pdf: uploaded = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True, label_visibility="collapsed") if uploaded: new = [f for f in uploaded if f.name not in st.session_state.indexed_sources] if new: if st.button(f"⚡ Index {len(new)} PDF(s)", type="primary", key="idx_pdf"): for f in new: with st.spinner(f"Indexing {f.name}..."): n = process_pdf(f.name, f.read(), embed_model) st.success(f"✅ {f.name} → {n} chunks") st.rerun() else: st.info("Already indexed.") # ── URL Tab ── with tab_url: url_input = st.text_input("Paste a public webpage URL", placeholder="https://en.wikipedia.org/wiki/...", label_visibility="collapsed") if st.button("⚡ Fetch & Index URL", type="primary", key="idx_url"): if url_input: with st.spinner(f"Fetching and indexing {url_input}..."): try: n, source_name = process_url(url_input, embed_model) st.success(f"✅ {source_name} → {n} chunks indexed") st.rerun() except Exception as e: st.error(f"❌ {str(e)}") else: st.warning("Please enter a URL.") # ── YouTube Tab ── with tab_yt: yt_input = st.text_input("Paste a YouTube video URL", placeholder="https://www.youtube.com/watch?v=...", label_visibility="collapsed") st.caption("Works with any video that has English captions/subtitles enabled.") if st.button("⚡ Fetch Transcript & Index", type="primary", key="idx_yt"): if yt_input: with st.spinner("Fetching YouTube transcript..."): try: n, vid_id = process_youtube(yt_input, embed_model) st.success(f"✅ youtube:{vid_id} → {n} chunks indexed") st.rerun() except Exception as e: st.error(f"❌ {str(e)}") else: st.warning("Please enter a YouTube URL.") # ════════════════════════════════════════════════════════ # STATS # ════════════════════════════════════════════════════════ if st.session_state.indexed_sources: pdf_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "pdf") url_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "url") yt_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "youtube") st.markdown(f"""
{pdf_count}
PDFs
{url_count}
Web Pages
{yt_count}
YouTube Videos
{st.session_state.total_chunks}
Total Chunks
{len(st.session_state.chat_history)}
Messages
""", unsafe_allow_html=True) # ════════════════════════════════════════════════════════ # CHAT UI # ════════════════════════════════════════════════════════ if not st.session_state.indexed_sources: st.markdown("""
📂

Add at least one source above to start chatting.
Try a PDF, a Wikipedia URL, or a YouTube video.

""", unsafe_allow_html=True) st.stop() if not api_key: st.warning("👈 Add your Groq API key in the sidebar to start chatting.") st.stop() st.markdown("---") st.markdown("
Conversation
", unsafe_allow_html=True) # Render chat history if not st.session_state.chat_history: st.markdown("""

Ask anything about your indexed sources below 👇

""", unsafe_allow_html=True) for msg in st.session_state.chat_history: if msg["role"] == "user": st.markdown(f"""
{msg['content']}
""", unsafe_allow_html=True) else: source_chips = "" if msg.get("sources"): for s in msg["sources"][:4]: label = f"{s['source']} · {s['relevance']}%" if s.get("timestamp"): label += f" @ {s['timestamp']}" source_chips += f"{label}" st.markdown(f"""
🤖
{msg['content']} {f"
{source_chips}
" if source_chips else ""}
""", unsafe_allow_html=True) if msg.get("sources"): with st.expander("🔍 View retrieved chunks", expanded=False): for chunk in msg["sources"]: icon = "📄" if chunk["type"] == "pdf" else "🌐" if chunk["type"] == "url" else "▶️" detail = f"Page {chunk['page']}" if chunk["type"] != "youtube" else f"@ {chunk['timestamp']}" st.markdown(f"""
{icon} {chunk['source']}
{detail} · {chunk['relevance']}% match
{chunk['text'][:400]}{'...' if len(chunk['text']) > 400 else ''}
""", unsafe_allow_html=True) # Chat input st.markdown("") col_input, col_k, col_btn = st.columns([6, 1, 1]) with col_input: user_input = st.text_input("Question", placeholder="Ask something about your indexed sources...", label_visibility="collapsed", key="chat_input") with col_k: top_k = st.selectbox("K", [2, 3, 4, 5], index=1, label_visibility="collapsed") with col_btn: send = st.button("Send ➤", type="primary", use_container_width=True) if send and user_input: # Add user message st.session_state.chat_history.append({"role": "user", "content": user_input}) with st.spinner("Thinking..."): try: answer, chunks = rag_query(user_input, embed_model, top_k, api_key) st.session_state.chat_history.append({ "role": "assistant", "content": answer, "sources": chunks }) except requests.HTTPError as e: st.session_state.chat_history.append({ "role": "assistant", "content": f"❌ API error: {str(e)}", "sources": [] }) st.rerun()