Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| import fitz # PyMuPDF | |
| import os | |
| import requests | |
| import hashlib | |
| import re | |
| from urllib.parse import urlparse, parse_qs | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from bs4 import BeautifulSoup | |
| # βββ Page Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.set_page_config( | |
| page_title="RAG Assistant Β· Chat", | |
| page_icon="π€", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # βββ CSS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown(""" | |
| <style> | |
| @import url('https://fonts.googleapis.com/css2?family=IBM+Plex+Sans:wght@300;400;500;600&family=IBM+Plex+Mono:wght@400;500&display=swap'); | |
| html, body, [class*="css"] { font-family: 'IBM Plex Sans', sans-serif; } | |
| .main { background-color: #0b0f1a; } | |
| .hero { | |
| background: linear-gradient(160deg, #0d1424 0%, #0b0f1a 100%); | |
| border: 1px solid #1e2a3e; | |
| border-top: 3px solid #22d3ee; | |
| border-radius: 12px; | |
| padding: 24px 28px; | |
| margin-bottom: 20px; | |
| } | |
| .hero h1 { font-size: 1.7rem; font-weight: 600; color: #e2e8f0; margin: 0 0 4px 0; } | |
| .hero p { color: #64748b; font-size: 0.88rem; margin: 0; } | |
| /* Source type tabs */ | |
| .source-tabs { display: flex; gap: 8px; margin-bottom: 16px; } | |
| .source-tab { | |
| flex: 1; padding: 10px; text-align: center; | |
| background: #0d1424; border: 1px solid #1e2a3e; | |
| border-radius: 8px; font-size: 0.82rem; color: #64748b; cursor: pointer; | |
| } | |
| .source-tab.active { border-color: #22d3ee; color: #22d3ee; background: rgba(34,211,238,0.07); } | |
| /* Indexed source cards */ | |
| .source-card { | |
| background: #0d1424; border: 1px solid #1e2a3e; | |
| border-radius: 8px; padding: 10px 14px; margin: 6px 0; | |
| display: flex; align-items: center; justify-content: space-between; | |
| } | |
| .source-name { font-size: 0.82rem; color: #e2e8f0; font-weight: 500; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; max-width: 160px; } | |
| .source-meta { font-family: 'IBM Plex Mono', monospace; font-size: 0.68rem; color: #475569; } | |
| .source-type-badge { | |
| font-size: 0.68rem; padding: 2px 8px; border-radius: 20px; | |
| font-family: 'IBM Plex Mono', monospace; white-space: nowrap; | |
| } | |
| .badge-pdf { background: rgba(99,102,241,0.12); color: #a5b4fc; border: 1px solid rgba(99,102,241,0.25); } | |
| .badge-url { background: rgba(34,197,94,0.1); color: #4ade80; border: 1px solid rgba(34,197,94,0.25); } | |
| .badge-yt { background: rgba(239,68,68,0.1); color: #f87171; border: 1px solid rgba(239,68,68,0.25); } | |
| /* Chat messages */ | |
| .chat-user { | |
| display: flex; justify-content: flex-end; margin: 10px 0; | |
| } | |
| .chat-user-bubble { | |
| background: rgba(34,211,238,0.1); border: 1px solid rgba(34,211,238,0.2); | |
| border-radius: 16px 16px 4px 16px; | |
| padding: 12px 18px; max-width: 70%; | |
| color: #e2e8f0; font-size: 0.92rem; line-height: 1.6; | |
| } | |
| .chat-assistant { | |
| display: flex; justify-content: flex-start; margin: 10px 0; gap: 10px; | |
| } | |
| .chat-avatar { | |
| width: 32px; height: 32px; border-radius: 50%; | |
| background: linear-gradient(135deg, #22d3ee, #6366f1); | |
| display: flex; align-items: center; justify-content: center; | |
| font-size: 0.9rem; flex-shrink: 0; margin-top: 2px; | |
| } | |
| .chat-assistant-bubble { | |
| background: #0d1424; border: 1px solid #1e2a3e; | |
| border-radius: 4px 16px 16px 16px; | |
| padding: 14px 18px; max-width: 75%; | |
| color: #e2e8f0; font-size: 0.92rem; line-height: 1.7; | |
| } | |
| .chat-sources { | |
| margin-top: 10px; padding-top: 10px; | |
| border-top: 1px solid #1e2a3e; | |
| } | |
| .chat-source-chip { | |
| display: inline-block; font-size: 0.72rem; | |
| font-family: 'IBM Plex Mono', monospace; | |
| background: #0b0f1a; border: 1px solid #1e2a3e; | |
| border-radius: 20px; padding: 2px 10px; margin: 3px 3px 0 0; | |
| color: #475569; | |
| } | |
| /* Chunk expander styling */ | |
| .chunk-card { | |
| background: #0b0f1a; border: 1px solid #1e2a3e; | |
| border-radius: 8px; padding: 12px 16px; margin: 6px 0; | |
| } | |
| .chunk-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px; } | |
| .chunk-src { font-size: 0.75rem; font-weight: 600; color: #22d3ee; text-transform: uppercase; letter-spacing: 0.04em; } | |
| .chunk-score { font-family: 'IBM Plex Mono', monospace; font-size: 0.72rem; color: #475569; } | |
| .chunk-text { font-size: 0.84rem; color: #94a3b8; line-height: 1.6; } | |
| .stat-row { display: flex; gap: 8px; margin: 12px 0; } | |
| .stat-box { flex: 1; background: #0d1424; border: 1px solid #1e2a3e; border-radius: 8px; padding: 10px; text-align: center; } | |
| .stat-val { font-size: 1.2rem; font-weight: 600; color: #22d3ee; } | |
| .stat-lbl { font-size: 0.68rem; color: #475569; margin-top: 2px; } | |
| .section-label { | |
| font-size: 0.68rem; text-transform: uppercase; letter-spacing: 0.1em; | |
| color: #374151; font-weight: 600; margin: 16px 0 8px 0; | |
| } | |
| section[data-testid="stSidebar"] { background-color: #080c14; border-right: 1px solid #131c2e; } | |
| .empty-chat { | |
| text-align: center; padding: 48px 24px; | |
| color: #374151; border: 2px dashed #1e2a3e; border-radius: 12px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # βββ Session State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| defaults = { | |
| "indexed_sources": {}, # name β {type, chunks, meta} | |
| "chroma_collection": None, | |
| "chroma_client": None, | |
| "total_chunks": 0, | |
| "chat_history": [], # [{role, content, sources}] | |
| } | |
| for k, v in defaults.items(): | |
| if k not in st.session_state: | |
| st.session_state[k] = v | |
| # βββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_embed_model(): | |
| return SentenceTransformer('all-MiniLM-L6-v2') | |
| def get_or_create_collection(): | |
| if st.session_state.chroma_client is None: | |
| st.session_state.chroma_client = chromadb.Client() | |
| st.session_state.chroma_collection = st.session_state.chroma_client.get_or_create_collection( | |
| name="rag_store", metadata={"hnsw:space": "cosine"} | |
| ) | |
| return st.session_state.chroma_collection | |
| def chunk_text(text: str, source_name: str, source_type: str, meta: dict, | |
| chunk_size: int = 400, overlap: int = 60) -> list[dict]: | |
| words = text.split() | |
| chunks = [] | |
| start = 0 | |
| while start < len(words): | |
| end = start + chunk_size | |
| chunk_str = " ".join(words[start:end]).strip() | |
| if len(chunk_str) > 60: | |
| chunks.append({"text": chunk_str, "source": source_name, "type": source_type, **meta}) | |
| start += chunk_size - overlap | |
| return chunks | |
| def index_chunks(chunks: list[dict], source_name: str, source_type: str, embed_model): | |
| collection = get_or_create_collection() | |
| texts = [c["text"] for c in chunks] | |
| embeddings = embed_model.encode(texts, batch_size=32, show_progress_bar=False).tolist() | |
| prefix = hashlib.md5(source_name.encode()).hexdigest()[:8] | |
| ids, docs, metas, embeds = [], [], [], [] | |
| for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): | |
| ids.append(f"{prefix}_chunk_{i}") | |
| docs.append(chunk["text"]) | |
| metas.append({"source": chunk["source"], "type": chunk["type"], | |
| "page": chunk.get("page", 1), "timestamp": chunk.get("timestamp", "")}) | |
| embeds.append(emb) | |
| collection.add(ids=ids, embeddings=embeds, documents=docs, metadatas=metas) | |
| st.session_state.total_chunks += len(chunks) | |
| st.session_state.indexed_sources[source_name] = { | |
| "type": source_type, "chunks": len(chunks), | |
| "meta": {k: v for k, v in chunks[0].items() if k not in ["text", "source", "type"]} | |
| } | |
| # βββ Source-specific extractors βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_pdf(filename: str, pdf_bytes: bytes, embed_model): | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| chunks = [] | |
| for page_num, page in enumerate(doc, start=1): | |
| text = page.get_text("text").strip() | |
| if text: | |
| page_chunks = chunk_text(text, filename, "pdf", {"page": page_num}) | |
| chunks.extend(page_chunks) | |
| doc.close() | |
| index_chunks(chunks, filename, "pdf", embed_model) | |
| return len(chunks) | |
| ## Web URL | |
| def process_url(url: str, embed_model): | |
| headers = {"User-Agent": "Mozilla/5.0 (compatible; RAGBot/1.0)"} | |
| r = requests.get(url, headers=headers, timeout=15) | |
| r.raise_for_status() | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| # Remove nav, footer, script, style tags | |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| tag.decompose() | |
| text = soup.get_text(separator=" ", strip=True) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| if len(text) < 100: | |
| raise ValueError("Could not extract meaningful text from this URL.") | |
| parsed = urlparse(url) | |
| source_name = parsed.netloc + parsed.path[:40] | |
| chunks = chunk_text(text, source_name, "url", {"page": 1}) | |
| index_chunks(chunks, source_name, "url", embed_model) | |
| return len(chunks), source_name | |
| ## YouTube | |
| def get_youtube_id(url: str) -> str: | |
| patterns = [ | |
| r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', | |
| r'(?:embed/)([a-zA-Z0-9_-]{11})', | |
| ] | |
| for p in patterns: | |
| m = re.search(p, url) | |
| if m: | |
| return m.group(1) | |
| raise ValueError("Could not extract YouTube video ID from URL.") | |
| def process_youtube(url: str, embed_model): | |
| video_id = get_youtube_id(url) | |
| try: | |
| # New API style (youtube-transcript-api >= 0.6.0) | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| ytt = YouTubeTranscriptApi() | |
| fetched = ytt.fetch(video_id) | |
| transcript_list = [{"start": s.start, "text": s.text} for s in fetched] | |
| except Exception: | |
| # Fallback to old API style | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'en-US', 'en-GB']) | |
| chunks = [] | |
| buffer_text = "" | |
| buffer_start = None | |
| word_count = 0 | |
| for entry in transcript_list: | |
| if buffer_start is None: | |
| buffer_start = int(entry["start"]) | |
| buffer_text += " " + entry["text"] | |
| word_count += len(entry["text"].split()) | |
| if word_count >= 350: | |
| ts = f"{buffer_start//60}:{buffer_start%60:02d}" | |
| chunks.append({ | |
| "text": buffer_text.strip(), | |
| "source": f"youtube:{video_id}", | |
| "type": "youtube", | |
| "page": 1, | |
| "timestamp": ts | |
| }) | |
| buffer_text = "" | |
| buffer_start = None | |
| word_count = 0 | |
| if buffer_text.strip(): | |
| ts = f"{buffer_start//60}:{buffer_start%60:02d}" if buffer_start else "0:00" | |
| chunks.append({ | |
| "text": buffer_text.strip(), | |
| "source": f"youtube:{video_id}", | |
| "type": "youtube", | |
| "page": 1, | |
| "timestamp": ts | |
| }) | |
| if not chunks: | |
| raise ValueError("No transcript content found. The video may not have captions enabled.") | |
| index_chunks(chunks, f"youtube:{video_id}", "youtube", embed_model) | |
| return len(chunks), video_id | |
| # βββ RAG Query with Chat Memory βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def rag_query(question: str, embed_model, top_k: int, api_key: str) -> tuple[str, list]: | |
| collection = get_or_create_collection() | |
| q_emb = embed_model.encode(question).tolist() | |
| results = collection.query(query_embeddings=[q_emb], n_results=top_k) | |
| chunks = [] | |
| for i in range(len(results["documents"][0])): | |
| dist = results["distances"][0][i] | |
| meta = results["metadatas"][0][i] | |
| chunks.append({ | |
| "text": results["documents"][0][i], | |
| "source": meta["source"], | |
| "type": meta["type"], | |
| "page": meta.get("page", 1), | |
| "timestamp": meta.get("timestamp", ""), | |
| "relevance": round((1 - dist) * 100, 1), | |
| }) | |
| context = "\n\n".join([ | |
| f"[Source: {c['source']} | Type: {c['type']} | Page/Time: {c['page'] or c['timestamp']}]\n{c['text']}" | |
| for c in chunks | |
| ]) | |
| # Build conversation history for multi-turn memory | |
| history_text = "" | |
| if st.session_state.chat_history: | |
| recent = st.session_state.chat_history[-6:] # last 3 turns | |
| for msg in recent: | |
| role = "User" if msg["role"] == "user" else "Assistant" | |
| history_text += f"{role}: {msg['content']}\n" | |
| prompt = f"""You are a helpful assistant that answers questions based on indexed documents. Use ONLY the context below to answer. Be concise and conversational. Always cite your source (filename, URL, or YouTube timestamp) inline. If the answer isn't in the context, say "I couldn't find that in the indexed sources." | |
| Conversation so far: | |
| {history_text if history_text else "(This is the start of the conversation)"} | |
| Relevant context from documents: | |
| {context} | |
| User: {question} | |
| Assistant:""" | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| payload = { | |
| "model": "llama-3.3-70b-versatile", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "max_tokens": 700, | |
| "temperature": 0.3, | |
| } | |
| r = requests.post("https://api.groq.com/openai/v1/chat/completions", | |
| headers=headers, json=payload, timeout=30) | |
| r.raise_for_status() | |
| answer = r.json()["choices"][0]["message"]["content"] | |
| return answer, chunks | |
| # βββ Sidebar ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with st.sidebar: | |
| st.markdown("## π€ RAG Chat Assistant") | |
| st.markdown("<div style='color:#374151;font-size:0.78rem'>PDF Β· Web Β· YouTube β Chat</div>", unsafe_allow_html=True) | |
| st.markdown("---") | |
| env_key = os.environ.get("GROQ_API_KEY", "") | |
| api_key = env_key if env_key else st.text_input( | |
| "π Groq API Key", type="password", placeholder="gsk_...", | |
| help="Free at console.groq.com" | |
| ) | |
| if not env_key and not api_key: | |
| st.caption("Get free key β [console.groq.com](https://console.groq.com)") | |
| st.markdown("---") | |
| st.markdown("<div class='section-label'>Indexed Sources</div>", unsafe_allow_html=True) | |
| if st.session_state.indexed_sources: | |
| for name, info in st.session_state.indexed_sources.items(): | |
| badge_class = f"badge-{info['type']}" | |
| icon = "π" if info['type'] == 'pdf' else "π" if info['type'] == 'url' else "βΆοΈ" | |
| label = info['type'].upper() | |
| st.markdown(f""" | |
| <div class='source-card'> | |
| <div> | |
| <div class='source-name'>{icon} {name}</div> | |
| <div class='source-meta'>{info['chunks']} chunks</div> | |
| </div> | |
| <div class='source-type-badge {badge_class}'>{label}</div> | |
| </div>""", unsafe_allow_html=True) | |
| st.markdown("") | |
| col1, col2 = st.columns(2) | |
| if col1.button("ποΈ Clear index", use_container_width=True): | |
| for k in ["indexed_sources", "chroma_collection", "chroma_client", "total_chunks"]: | |
| del st.session_state[k] | |
| st.rerun() | |
| if col2.button("π¬ Clear chat", use_container_width=True): | |
| st.session_state.chat_history = [] | |
| st.rerun() | |
| else: | |
| st.markdown("<div style='color:#374151;font-size:0.82rem'>Nothing indexed yet.</div>", unsafe_allow_html=True) | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style='font-size:0.75rem;color:#374151;line-height:2'> | |
| <b style='color:#4b5563'>Stack</b><br> | |
| π PDF: PyMuPDF<br> | |
| π Web: BeautifulSoup4<br> | |
| βΆοΈ YouTube: youtube-transcript-api<br> | |
| π’ Embeddings: all-MiniLM-L6-v2<br> | |
| ποΈ Vector DB: ChromaDB<br> | |
| π§ LLM: Groq Β· Llama 3.3 70B | |
| </div>""", unsafe_allow_html=True) | |
| # βββ Main UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown(""" | |
| <div class='hero'> | |
| <h1>π€ RAG Chat Assistant</h1> | |
| <p>Index PDFs Β· Web pages Β· YouTube videos β then have a multi-turn conversation across all of them</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with st.spinner("βοΈ Loading embedding model..."): | |
| embed_model = load_embed_model() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # INGEST PANEL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with st.expander("β Add a new source (PDF / Web URL / YouTube)", expanded=len(st.session_state.indexed_sources) == 0): | |
| tab_pdf, tab_url, tab_yt = st.tabs(["π PDF Upload", "π Web URL", "βΆοΈ YouTube"]) | |
| # ββ PDF Tab ββ | |
| with tab_pdf: | |
| uploaded = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True, label_visibility="collapsed") | |
| if uploaded: | |
| new = [f for f in uploaded if f.name not in st.session_state.indexed_sources] | |
| if new: | |
| if st.button(f"β‘ Index {len(new)} PDF(s)", type="primary", key="idx_pdf"): | |
| for f in new: | |
| with st.spinner(f"Indexing {f.name}..."): | |
| n = process_pdf(f.name, f.read(), embed_model) | |
| st.success(f"β {f.name} β {n} chunks") | |
| st.rerun() | |
| else: | |
| st.info("Already indexed.") | |
| # ββ URL Tab ββ | |
| with tab_url: | |
| url_input = st.text_input("Paste a public webpage URL", placeholder="https://en.wikipedia.org/wiki/...", label_visibility="collapsed") | |
| if st.button("β‘ Fetch & Index URL", type="primary", key="idx_url"): | |
| if url_input: | |
| with st.spinner(f"Fetching and indexing {url_input}..."): | |
| try: | |
| n, source_name = process_url(url_input, embed_model) | |
| st.success(f"β {source_name} β {n} chunks indexed") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"β {str(e)}") | |
| else: | |
| st.warning("Please enter a URL.") | |
| # ββ YouTube Tab ββ | |
| with tab_yt: | |
| yt_input = st.text_input("Paste a YouTube video URL", placeholder="https://www.youtube.com/watch?v=...", label_visibility="collapsed") | |
| st.caption("Works with any video that has English captions/subtitles enabled.") | |
| if st.button("β‘ Fetch Transcript & Index", type="primary", key="idx_yt"): | |
| if yt_input: | |
| with st.spinner("Fetching YouTube transcript..."): | |
| try: | |
| n, vid_id = process_youtube(yt_input, embed_model) | |
| st.success(f"β youtube:{vid_id} β {n} chunks indexed") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"β {str(e)}") | |
| else: | |
| st.warning("Please enter a YouTube URL.") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STATS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if st.session_state.indexed_sources: | |
| pdf_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "pdf") | |
| url_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "url") | |
| yt_count = sum(1 for s in st.session_state.indexed_sources.values() if s["type"] == "youtube") | |
| st.markdown(f""" | |
| <div class='stat-row'> | |
| <div class='stat-box'><div class='stat-val'>{pdf_count}</div><div class='stat-lbl'>PDFs</div></div> | |
| <div class='stat-box'><div class='stat-val'>{url_count}</div><div class='stat-lbl'>Web Pages</div></div> | |
| <div class='stat-box'><div class='stat-val'>{yt_count}</div><div class='stat-lbl'>YouTube Videos</div></div> | |
| <div class='stat-box'><div class='stat-val'>{st.session_state.total_chunks}</div><div class='stat-lbl'>Total Chunks</div></div> | |
| <div class='stat-box'><div class='stat-val'>{len(st.session_state.chat_history)}</div><div class='stat-lbl'>Messages</div></div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CHAT UI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if not st.session_state.indexed_sources: | |
| st.markdown(""" | |
| <div class='empty-chat'> | |
| <div style='font-size:2.5rem;margin-bottom:12px'>π</div> | |
| <p style='color:#4b5563'>Add at least one source above to start chatting.<br> | |
| Try a PDF, a Wikipedia URL, or a YouTube video.</p> | |
| </div>""", unsafe_allow_html=True) | |
| st.stop() | |
| if not api_key: | |
| st.warning("π Add your Groq API key in the sidebar to start chatting.") | |
| st.stop() | |
| st.markdown("---") | |
| st.markdown("<div class='section-label'>Conversation</div>", unsafe_allow_html=True) | |
| # Render chat history | |
| if not st.session_state.chat_history: | |
| st.markdown(""" | |
| <div class='empty-chat' style='padding:28px'> | |
| <p style='color:#4b5563;margin:0'>Ask anything about your indexed sources below π</p> | |
| </div>""", unsafe_allow_html=True) | |
| for msg in st.session_state.chat_history: | |
| if msg["role"] == "user": | |
| st.markdown(f""" | |
| <div class='chat-user'> | |
| <div class='chat-user-bubble'>{msg['content']}</div> | |
| </div>""", unsafe_allow_html=True) | |
| else: | |
| source_chips = "" | |
| if msg.get("sources"): | |
| for s in msg["sources"][:4]: | |
| label = f"{s['source']} Β· {s['relevance']}%" | |
| if s.get("timestamp"): | |
| label += f" @ {s['timestamp']}" | |
| source_chips += f"<span class='chat-source-chip'>{label}</span>" | |
| st.markdown(f""" | |
| <div class='chat-assistant'> | |
| <div class='chat-avatar'>π€</div> | |
| <div class='chat-assistant-bubble'> | |
| {msg['content']} | |
| {f"<div class='chat-sources'>{source_chips}</div>" if source_chips else ""} | |
| </div> | |
| </div>""", unsafe_allow_html=True) | |
| if msg.get("sources"): | |
| with st.expander("π View retrieved chunks", expanded=False): | |
| for chunk in msg["sources"]: | |
| icon = "π" if chunk["type"] == "pdf" else "π" if chunk["type"] == "url" else "βΆοΈ" | |
| detail = f"Page {chunk['page']}" if chunk["type"] != "youtube" else f"@ {chunk['timestamp']}" | |
| st.markdown(f""" | |
| <div class='chunk-card'> | |
| <div class='chunk-header'> | |
| <div class='chunk-src'>{icon} {chunk['source']}</div> | |
| <div class='chunk-score'>{detail} Β· {chunk['relevance']}% match</div> | |
| </div> | |
| <div class='chunk-text'>{chunk['text'][:400]}{'...' if len(chunk['text']) > 400 else ''}</div> | |
| </div>""", unsafe_allow_html=True) | |
| # Chat input | |
| st.markdown("") | |
| col_input, col_k, col_btn = st.columns([6, 1, 1]) | |
| with col_input: | |
| user_input = st.text_input("Question", placeholder="Ask something about your indexed sources...", label_visibility="collapsed", key="chat_input") | |
| with col_k: | |
| top_k = st.selectbox("K", [2, 3, 4, 5], index=1, label_visibility="collapsed") | |
| with col_btn: | |
| send = st.button("Send β€", type="primary", use_container_width=True) | |
| if send and user_input: | |
| # Add user message | |
| st.session_state.chat_history.append({"role": "user", "content": user_input}) | |
| with st.spinner("Thinking..."): | |
| try: | |
| answer, chunks = rag_query(user_input, embed_model, top_k, api_key) | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": answer, | |
| "sources": chunks | |
| }) | |
| except requests.HTTPError as e: | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": f"β API error: {str(e)}", | |
| "sources": [] | |
| }) | |
| st.rerun() | |