import streamlit as st import chromadb from sentence_transformers import SentenceTransformer import fitz # PyMuPDF import os import requests import hashlib import re from urllib.parse import urlparse, parse_qs from youtube_transcript_api import YouTubeTranscriptApi from bs4 import BeautifulSoup # ─── Page Config ────────────────────────────────────────────────────────────── st.set_page_config( page_title="RAG Assistant · Chat", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) # ─── CSS ────────────────────────────────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ─── Session State ──────────────────────────────────────────────────────────── defaults = { "indexed_sources": {}, # name → {type, chunks, meta} "chroma_collection": None, "chroma_client": None, "total_chunks": 0, "chat_history": [], # [{role, content, sources}] } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v # ─── Helpers ────────────────────────────────────────────────────────────────── @st.cache_resource(show_spinner=False) def load_embed_model(): return SentenceTransformer('all-MiniLM-L6-v2') def get_or_create_collection(): if st.session_state.chroma_client is None: st.session_state.chroma_client = chromadb.Client() st.session_state.chroma_collection = st.session_state.chroma_client.get_or_create_collection( name="rag_store", metadata={"hnsw:space": "cosine"} ) return st.session_state.chroma_collection def chunk_text(text: str, source_name: str, source_type: str, meta: dict, chunk_size: int = 400, overlap: int = 60) -> list[dict]: words = text.split() chunks = [] start = 0 while start < len(words): end = start + chunk_size chunk_str = " ".join(words[start:end]).strip() if len(chunk_str) > 60: chunks.append({"text": chunk_str, "source": source_name, "type": source_type, **meta}) start += chunk_size - overlap return chunks def index_chunks(chunks: list[dict], source_name: str, source_type: str, embed_model): collection = get_or_create_collection() texts = [c["text"] for c in chunks] embeddings = embed_model.encode(texts, batch_size=32, show_progress_bar=False).tolist() prefix = hashlib.md5(source_name.encode()).hexdigest()[:8] ids, docs, metas, embeds = [], [], [], [] for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): ids.append(f"{prefix}_chunk_{i}") docs.append(chunk["text"]) metas.append({"source": chunk["source"], "type": chunk["type"], "page": chunk.get("page", 1), "timestamp": chunk.get("timestamp", "")}) embeds.append(emb) collection.add(ids=ids, embeddings=embeds, documents=docs, metadatas=metas) st.session_state.total_chunks += len(chunks) st.session_state.indexed_sources[source_name] = { "type": source_type, "chunks": len(chunks), "meta": {k: v for k, v in chunks[0].items() if k not in ["text", "source", "type"]} } # ─── Source-specific extractors ─────────────────────────────────────────────── ## PDF def process_pdf(filename: str, pdf_bytes: bytes, embed_model): doc = fitz.open(stream=pdf_bytes, filetype="pdf") chunks = [] for page_num, page in enumerate(doc, start=1): text = page.get_text("text").strip() if text: page_chunks = chunk_text(text, filename, "pdf", {"page": page_num}) chunks.extend(page_chunks) doc.close() index_chunks(chunks, filename, "pdf", embed_model) return len(chunks) ## Web URL def process_url(url: str, embed_model): headers = {"User-Agent": "Mozilla/5.0 (compatible; RAGBot/1.0)"} r = requests.get(url, headers=headers, timeout=15) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Remove nav, footer, script, style tags for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): tag.decompose() text = soup.get_text(separator=" ", strip=True) text = re.sub(r'\s+', ' ', text).strip() if len(text) < 100: raise ValueError("Could not extract meaningful text from this URL.") parsed = urlparse(url) source_name = parsed.netloc + parsed.path[:40] chunks = chunk_text(text, source_name, "url", {"page": 1}) index_chunks(chunks, source_name, "url", embed_model) return len(chunks), source_name ## YouTube def get_youtube_id(url: str) -> str: patterns = [ r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', r'(?:embed/)([a-zA-Z0-9_-]{11})', ] for p in patterns: m = re.search(p, url) if m: return m.group(1) raise ValueError("Could not extract YouTube video ID from URL.") def process_youtube(url: str, embed_model): video_id = get_youtube_id(url) try: # New API style (youtube-transcript-api >= 0.6.0) from youtube_transcript_api import YouTubeTranscriptApi ytt = YouTubeTranscriptApi() fetched = ytt.fetch(video_id) transcript_list = [{"start": s.start, "text": s.text} for s in fetched] except Exception: # Fallback to old API style transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'en-US', 'en-GB']) chunks = [] buffer_text = "" buffer_start = None word_count = 0 for entry in transcript_list: if buffer_start is None: buffer_start = int(entry["start"]) buffer_text += " " + entry["text"] word_count += len(entry["text"].split()) if word_count >= 350: ts = f"{buffer_start//60}:{buffer_start%60:02d}" chunks.append({ "text": buffer_text.strip(), "source": f"youtube:{video_id}", "type": "youtube", "page": 1, "timestamp": ts }) buffer_text = "" buffer_start = None word_count = 0 if buffer_text.strip(): ts = f"{buffer_start//60}:{buffer_start%60:02d}" if buffer_start else "0:00" chunks.append({ "text": buffer_text.strip(), "source": f"youtube:{video_id}", "type": "youtube", "page": 1, "timestamp": ts }) if not chunks: raise ValueError("No transcript content found. The video may not have captions enabled.") index_chunks(chunks, f"youtube:{video_id}", "youtube", embed_model) return len(chunks), video_id # ─── RAG Query with Chat Memory ─────────────────────────────────────────────── def rag_query(question: str, embed_model, top_k: int, api_key: str) -> tuple[str, list]: collection = get_or_create_collection() q_emb = embed_model.encode(question).tolist() results = collection.query(query_embeddings=[q_emb], n_results=top_k) chunks = [] for i in range(len(results["documents"][0])): dist = results["distances"][0][i] meta = results["metadatas"][0][i] chunks.append({ "text": results["documents"][0][i], "source": meta["source"], "type": meta["type"], "page": meta.get("page", 1), "timestamp": meta.get("timestamp", ""), "relevance": round((1 - dist) * 100, 1), }) context = "\n\n".join([ f"[Source: {c['source']} | Type: {c['type']} | Page/Time: {c['page'] or c['timestamp']}]\n{c['text']}" for c in chunks ]) # Build conversation history for multi-turn memory history_text = "" if st.session_state.chat_history: recent = st.session_state.chat_history[-6:] # last 3 turns for msg in recent: role = "User" if msg["role"] == "user" else "Assistant" history_text += f"{role}: {msg['content']}\n" prompt = f"""You are a helpful assistant that answers questions based on indexed documents. Use ONLY the context below to answer. Be concise and conversational. Always cite your source (filename, URL, or YouTube timestamp) inline. If the answer isn't in the context, say "I couldn't find that in the indexed sources." Conversation so far: {history_text if history_text else "(This is the start of the conversation)"} Relevant context from documents: {context} User: {question} Assistant:""" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "model": "llama-3.3-70b-versatile", "messages": [{"role": "user", "content": prompt}], "max_tokens": 700, "temperature": 0.3, } r = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json=payload, timeout=30) r.raise_for_status() answer = r.json()["choices"][0]["message"]["content"] return answer, chunks # ─── Sidebar ────────────────────────────────────────────────────────────────── with st.sidebar: st.markdown("## 🤖 RAG Chat Assistant") st.markdown("
Index PDFs · Web pages · YouTube videos — then have a multi-turn conversation across all of them
Add at least one source above to start chatting.
Try a PDF, a Wikipedia URL, or a YouTube video.
Ask anything about your indexed sources below 👇