"""AdaptiveRAG β€” under-the-hood pipeline visualizer. Run: streamlit run app.py """ from __future__ import annotations import json import logging import os import tempfile import time from pathlib import Path os.environ.setdefault("ANONYMIZED_TELEMETRY", "False") # suppress harmless noise from Streamlit's torch inspector + ChromaDB posthog client logging.getLogger("streamlit.watcher.local_sources_watcher").setLevel(logging.ERROR) logging.getLogger("chromadb.telemetry.product.posthog").setLevel(logging.ERROR) import numpy as np import pandas as pd import streamlit as st from agent.critic import critique, refine_query from agent.planner import plan from agent.router import route from agent.tools import image_retrieve_and_reason from config import AGENT_CONFIG, EMBEDDING_CONFIG, HOSTED, LLM_CONFIG, PATHS, RETRIEVAL_CONFIG from ingestion.embedder import embed_query from ingestion.indexer import fetch_embeddings from llm.client_factory import get_llm from retrieval.dense import Hit, dense_search from retrieval.hybrid import reciprocal_rank_fusion from retrieval.reranker import rerank from retrieval.sparse import sparse_search st.set_page_config(page_title="AdaptiveRAG β€” Underhood", page_icon="πŸ”¬", layout="wide") # ── LLM backend check β€” shown before anything else ────────────────── if not os.environ.get("GROQ_API_KEY"): # Running without Groq β€” check if Ollama is reachable locally try: import requests as _req _req.get("http://localhost:11434/api/tags", timeout=2).raise_for_status() _ollama_ok = True except Exception: _ollama_ok = False if not _ollama_ok: st.error( "**No LLM backend found.**\n\n" "- **Running on Hugging Face?** Add your `GROQ_API_KEY` secret in " "Space Settings β†’ Variables and secrets. Get a free key at " "[console.groq.com](https://console.groq.com).\n" "- **Running locally?** Start Ollama: `ollama serve`" ) st.stop() # ───────────────────────────── styling ────────────────────────────── st.markdown( """ """, unsafe_allow_html=True, ) # ───────────────────────────── helpers ────────────────────────────── @st.cache_resource def _llm(): return get_llm() def _load_manifest() -> dict: p = PATHS["manifest_path"] return json.loads(p.read_text()) if p.exists() else {} def phase_header(num: int, title: str, subtitle: str = "") -> None: st.markdown( f"
STEP {num}" f"{title}
{subtitle}" f"
", unsafe_allow_html=True, ) def hits_to_df(hits: list[Hit], score_label: str = "score") -> pd.DataFrame: rows = [] for h in hits: title = h.metadata.get("title") or h.metadata.get("source_path", "?") short = title.split(" (")[0] if len(short) > 38: short = short[:35] + "…" label = f"{short} Β· p{h.metadata.get('page_start')} Β· {h.chunk_id.split('::')[-1]}" rows.append({"chunk": label, score_label: float(h.score), "chunk_id": h.chunk_id}) return pd.DataFrame(rows) def render_hits(hits: list[Hit], badge_class: str, label: str, max_chars: int = 220) -> None: if not hits: st.caption(f"_(no {label.lower()} hits)_") return for i, h in enumerate(hits, start=1): meta = h.metadata snippet = h.text[:max_chars].replace("\n", " ") if len(h.text) > max_chars: snippet += "…" st.markdown( f"
" f"
" f"{label} #{i}" f"score {h.score:.3f} Β· " f"{meta.get('title','?')} Β· p.{meta.get('page_start')}–{meta.get('page_end')} Β· " f"{h.chunk_id}" f"
{snippet}
", unsafe_allow_html=True, ) def pca_2d(matrix: np.ndarray) -> np.ndarray: centered = matrix - matrix.mean(axis=0, keepdims=True) _, _, vt = np.linalg.svd(centered, full_matrices=False) return centered @ vt[:2].T def vector_space_plot(query_vec: list[float], fused_hits: list[Hit], dense_ids: set[str], sparse_ids: set[str], kept_ids: set[str]) -> None: if not fused_hits: st.caption("_(nothing to plot)_") return embs = fetch_embeddings([h.chunk_id for h in fused_hits]) rows = [] vecs = [np.array(query_vec, dtype=np.float32)] for h in fused_hits: v = embs.get(h.chunk_id) if v is None: continue vecs.append(np.array(v, dtype=np.float32)) in_d, in_s = h.chunk_id in dense_ids, h.chunk_id in sparse_ids in_keep = h.chunk_id in kept_ids if in_d and in_s: color = "fused (both)" elif in_d: color = "dense only" elif in_s: color = "sparse only" else: color = "other" title = (h.metadata.get("title") or "?").split(" (")[0][:40] label = f"{title} Β· p{h.metadata.get('page_start')}" rows.append({"label": label, "color": color, "size": 90 if in_keep else 50}) if len(vecs) < 3: st.caption("_(need at least 2 hits for a 2D projection)_") return proj = pca_2d(np.vstack(vecs)) df = pd.DataFrame( [{"x": proj[0, 0], "y": proj[0, 1], "label": "πŸ”Ž your question", "color": "QUERY", "size": 220}] + [{"x": proj[i + 1, 0], "y": proj[i + 1, 1], **rows[i]} for i in range(len(rows))] ) st.scatter_chart( df, x="x", y="y", color="color", size="size", height=380, use_container_width=True, ) st.caption( "PCA projection of the query embedding + fused hit embeddings. " "Larger points survived cross-encoder reranking." ) def render_embedding_card(query: str, qv: list[float], dt: float) -> None: arr = np.array(qv, dtype=np.float32) cols = st.columns([1, 1, 1, 3]) cols[0].metric("Model", EMBEDDING_CONFIG["model"].split("/")[-1]) cols[1].metric("Dimensions", len(qv)) cols[2].metric("L2 norm", f"{float(np.linalg.norm(arr)):.3f}") cols[3].metric("Embed time", f"{dt*1000:.0f} ms") st.caption(f"Question ({len(query)} chars, ~{len(query.split())} words):") st.code(query, language="text") st.caption("First 32 dimensions of the embedding vector:") st.bar_chart(pd.DataFrame({"value": arr[:32]}), height=140, use_container_width=True) preview = ", ".join(f"{x:+.3f}" for x in arr[:8]) + ", …" st.markdown(f"vector[0:8] = [{preview}]", unsafe_allow_html=True) # ───────────────────────────── pipeline view ────────────────────────────── def _render_healing_trace(healing_trace: list[dict], health_score: float) -> None: """Render the healing trace panel below the answer.""" # Health score metric with colour if health_score >= 80: color, label = "#2ecc71", "Healthy" elif health_score >= 60: color, label = "#f39c12", "Fair" else: color, label = "#e74c3c", "Needs healing" st.markdown( f"
" f"" f"βš•οΈ Health score: {health_score:.0f} / 100" f"" f"{label}
", unsafe_allow_html=True, ) if not healing_trace: st.success("βœ… Answer passed all checks on first attempt β€” no healing needed.") return for attempt in healing_trace: num = attempt.get("attempt", "?") healthy = attempt.get("healthy", False) icon = "βœ…" if healthy else "πŸ”§" issues = attempt.get("issues", []) label_str = "Healthy" if healthy else f"Issues: {', '.join(issues)}" with st.expander(f"{icon} Attempt {num} β€” {label_str}", expanded=not healthy): if healthy: st.success("All checks passed β€” answer accepted.") continue c1, c2, c3 = st.columns(3) c1.metric("Hallucination", "⚠️ yes" if "hallucination" in issues else "βœ… no") c2.metric("Low chunk quality", "⚠️ yes" if "low_chunk_quality" in issues else "βœ… no") c3.metric("Knowledge gap", "⚠️ yes" if "knowledge_gap" in issues else "βœ… no") for action in attempt.get("actions", []): atype = action.get("type", "") detail = action.get("detail", "") icons = { "hallucination_fix": "πŸ”", "chunk_expansion": "πŸ“Ž", "web_search": "🌐", "query_rewrite": "✏️", "regenerate": "πŸ”„", } prefix = icons.get(atype, "β€’") st.markdown( f"
{atype}" f"{prefix} {detail}
", unsafe_allow_html=True, ) if atype == "hallucination_fix": for s in action.get("flagged_sentences", []): st.caption(f" ↳ Flagged: \"{s[:120]}…\"") def visual_pipeline(query: str, enable_healing: bool = True) -> None: llm = _llm() # ── Step 1: embed the question ──────────────────────────────── phase_header(1, "Question encoding", "Convert text β†’ 384-dim dense vector via sentence-transformers (MiniLM-L6).") t0 = time.time() qv = embed_query(query) render_embedding_card(query, qv, time.time() - t0) # ── Step 2: Self-RAG router ──────────────────────────────── phase_header(2, "Self-RAG router", "Decide whether to RETRIEVE, ANSWER_DIRECTLY, or CLARIFY before touching the index.") t0 = time.time() decision = route(query, llm=llm) dt = time.time() - t0 pill_map = {"RETRIEVE": "pill-blue", "ANSWER_DIRECTLY": "pill-green", "CLARIFY": "pill-amber"} pill = pill_map.get(decision["action"], "pill-grey") st.markdown( f"{decision['action']}" f"{decision.get('reason','')}" f"" f"router latency: {dt*1000:.0f} ms", unsafe_allow_html=True, ) if decision["action"] == "ANSWER_DIRECTLY": st.markdown("### Direct answer (no retrieval)") ans = llm.generate(prompt=query, system="You are a helpful research assistant. Be concise.", temperature=0.2) st.markdown(ans) return if decision["action"] == "CLARIFY": st.markdown("### Clarifying question") ans = llm.generate( prompt=("The user asked: " + query + "\n\nIt is too ambiguous to answer well. Ask one short clarifying question."), system="You are a helpful research assistant.", temperature=0.2, ) st.markdown(ans) return # ── Iterations of plan β†’ retrieve β†’ answer β†’ critique ──────── accumulated: list[Hit] = [] current_query = query for it in range(AGENT_CONFIG["max_iterations"]): st.markdown(f"---\n## πŸ” Iteration {it + 1}") if current_query != query: st.info(f"Refined query β†’ **{current_query}**") # ── Step 3: plan ───────────────────────────────────── phase_header(3, "Planner", "LLM decomposes the question into focused sub-queries.") prior = "" if accumulated: titles = sorted({h.metadata.get("title", "?") for h in accumulated}) prior = "Already gathered passages from: " + ", ".join(titles) t0 = time.time() steps = plan(current_query, prior_summary=prior, llm=llm) dt = time.time() - t0 st.caption(f"Generated {len(steps)} sub-quer{'y' if len(steps)==1 else 'ies'} in {dt*1000:.0f} ms") for i, s in enumerate(steps, start=1): st.markdown( f"
" f"sub-query {i}" f"{s['query']}" f"
" f"rationale: {s.get('rationale','β€”')}
", unsafe_allow_html=True, ) # ── Step 4: retrieval per sub-query ────────────────── phase_header( 4, "Hybrid retrieval per sub-query", f"Dense (Chroma cosine, k={RETRIEVAL_CONFIG['dense_k']}) βˆ₯ " f"Sparse (BM25, k={RETRIEVAL_CONFIG['sparse_k']}) β†’ " f"Reciprocal Rank Fusion β†’ Cross-encoder rerank " f"(BGE, top {RETRIEVAL_CONFIG['rerank_top_n']}).", ) for si, step in enumerate(steps, start=1): with st.expander(f"Sub-query {si}: {step['query']}", expanded=(si == 1)): t0 = time.time() dense_hits = dense_search(step["query"]) t_dense = time.time() - t0 t0 = time.time() sparse_hits = sparse_search(step["query"]) t_sparse = time.time() - t0 t0 = time.time() fused = reciprocal_rank_fusion([dense_hits, sparse_hits], top_k=max(RETRIEVAL_CONFIG["dense_k"], RETRIEVAL_CONFIG["sparse_k"])) t_fuse = time.time() - t0 t0 = time.time() reranked = rerank(step["query"], fused) t_rerank = time.time() - t0 m1, m2, m3, m4 = st.columns(4) m1.metric("Dense hits", len(dense_hits), f"{t_dense*1000:.0f} ms") m2.metric("Sparse hits", len(sparse_hits), f"{t_sparse*1000:.0f} ms") m3.metric("After RRF", len(fused), f"{t_fuse*1000:.0f} ms") m4.metric("After rerank", len(reranked), f"{t_rerank*1000:.0f} ms") tabs = st.tabs([ "πŸ”΅ Dense (vectors)", "🟒 Sparse (BM25)", "🟣 RRF fusion", "🟑 Cross-encoder rerank", "πŸ—ΊοΈ Vector space", ]) with tabs[0]: st.caption("Top-K nearest neighbors by cosine similarity.") if dense_hits: st.bar_chart(hits_to_df(dense_hits, "cosine_sim"), x="chunk", y="cosine_sim", height=260, use_container_width=True) render_hits(dense_hits[:5], "pill-blue", "DENSE") with tabs[1]: st.caption("Top-K BM25 keyword matches (normalized).") if sparse_hits: st.bar_chart(hits_to_df(sparse_hits, "bm25_norm"), x="chunk", y="bm25_norm", height=260, use_container_width=True) render_hits(sparse_hits[:5], "pill-green", "BM25") with tabs[2]: st.caption( "Reciprocal Rank Fusion: score(d) = Ξ£ 1/(k + rank). " "Combines dense + sparse rankings into one merged list." ) if fused: st.bar_chart(hits_to_df(fused[:12], "rrf_score"), x="chunk", y="rrf_score", height=280, use_container_width=True) render_hits(fused[:5], "pill-purple", "FUSED") with tabs[3]: st.caption( "Cross-encoder scores (query, chunk) jointly β€” much more " "accurate than bi-encoder cosine, but slower β†’ only run on " "the fused candidate set." ) if reranked: st.bar_chart(hits_to_df(reranked, "ce_score"), x="chunk", y="ce_score", height=240, use_container_width=True) render_hits(reranked, "pill-amber", "RERANKED") with tabs[4]: dense_ids = {h.chunk_id for h in dense_hits} sparse_ids = {h.chunk_id for h in sparse_hits} kept_ids = {h.chunk_id for h in reranked} vector_space_plot(qv, fused[:20], dense_ids, sparse_ids, kept_ids) accumulated.extend(reranked) # ── Step 5: answer ───────────────────────────────────── # Dedupe + cap to 8 passages for the final prompt seen: set[str] = set() unique: list[Hit] = [] for h in accumulated: if h.chunk_id in seen: continue seen.add(h.chunk_id) unique.append(h) if len(unique) >= 8: break context_lines, citations = [], [] for i, h in enumerate(unique, start=1): meta = h.metadata head = (f"[{i}] {meta.get('title','?')} " f"(p.{meta.get('page_start')}-{meta.get('page_end')})") context_lines.append(f"{head}\n{h.text}") citations.append({ "n": i, "chunk_id": h.chunk_id, "title": meta.get("title"), "source_path": meta.get("source_path"), "page_start": meta.get("page_start"), "page_end": meta.get("page_end"), "score": float(h.score), }) context_block = "\n\n".join(context_lines) phase_header(5, "Context assembly + answer generation", f"Top {len(unique)} unique passages β†’ {LLM_CONFIG['model']} via {LLM_CONFIG['provider']}.") with st.expander("πŸ“¦ Context handed to the LLM", expanded=False): for c in citations: st.markdown( f"**[{c['n']}]** {c['title']} Β· pages {c['page_start']}–{c['page_end']} Β· " f"score `{c['score']:.3f}`" ) st.code(context_block[:3000] + ("…" if len(context_block) > 3000 else ""), language="text") t0 = time.time() ANSWER_SYSTEM = ( "You are a careful research assistant. Use ONLY the provided passages to " "answer the question. Cite sources inline with [N] where N is the passage " "number. If the passages are insufficient, say so explicitly." ) ANSWER_PROMPT = ( f"Question: {query}\n\nPassages:\n{context_block}\n\n" "Write a concise, well-grounded answer. Use inline citations like [1], [2] " "that match the passage numbers above." ) answer = llm.generate(prompt=ANSWER_PROMPT, system=ANSWER_SYSTEM, temperature=0.1) st.caption(f"LLM generation: {time.time()-t0:.1f} s") st.markdown("### Answer") st.markdown(answer) st.markdown("### Citations") for c in citations: st.markdown( f"**[{c['n']}]** {c['title']} β€” pages {c['page_start']}–{c['page_end']} " f"Β· score `{c['score']:.3f}` Β· `{Path(c['source_path']).name}`" ) # ── Step 6: critic ───────────────────────────────────── phase_header(6, "Self-critique", "LLM scores its own answer for grounding + completeness.") t0 = time.time() crit = critique(query, answer, context_block, llm=llm) c1, c2, c3 = st.columns(3) c1.metric("Grounded", "βœ… yes" if crit["grounded"] else "⚠️ no") c2.metric("Complete", "βœ… yes" if crit["complete"] else "⚠️ no") c3.metric("Confidence", f"{crit['confidence']:.2f}", delta=f"threshold {AGENT_CONFIG['confidence_threshold']:.2f}") if crit.get("missing"): st.warning(f"Missing: {crit['missing']}") st.caption(f"Critique latency: {time.time()-t0:.1f} s") if crit["confidence"] >= AGENT_CONFIG["confidence_threshold"] and crit["grounded"]: st.success(f"βœ“ Confidence {crit['confidence']:.2f} β‰₯ threshold β€” answer accepted.") if enable_healing: phase_header( 7, "Self-Healing layer", "Hallucination detection β†’ chunk quality scoring β†’ knowledge gap β†’ regenerate if needed.", ) with st.spinner("Running self-healing checks…"): from healing.healing_loop import self_heal healed = self_heal(query, answer, unique, citations, llm=llm) if healed.attempts_used > 0: st.markdown("### Healed Answer") st.markdown(healed.answer) st.markdown("### Updated Citations") for c in healed.citations: st.markdown( f"**[{c['n']}]** {c['title']} β€” " f"pages {c.get('page_start')}–{c.get('page_end')} " f"Β· score `{c['score']:.3f}`" ) _render_healing_trace(healed.healing_trace, healed.health_score) return if it < AGENT_CONFIG["max_iterations"] - 1: st.warning("Confidence below threshold β€” refining query and retrying.") current_query = refine_query(query, crit.get("missing", ""), llm=llm) else: st.error("Max iterations reached. Returning best-effort answer.") if enable_healing: phase_header( 7, "Self-Healing layer", "Hallucination detection β†’ chunk quality scoring β†’ knowledge gap β†’ regenerate if needed.", ) with st.spinner("Running self-healing checks…"): from healing.healing_loop import self_heal healed = self_heal(query, answer, unique, citations, llm=llm) if healed.attempts_used > 0: st.markdown("### Healed Answer") st.markdown(healed.answer) _render_healing_trace(healed.healing_trace, healed.health_score) # ───────────────────────────── sidebar + tabs ────────────────────────────── def _sidebar() -> None: st.sidebar.title("AdaptiveRAG") st.sidebar.caption("Agentic + Self-RAG + Modular RAG") llm = _llm() ok = llm.health() backend = "Groq API" if HOSTED else "Ollama (local)" st.sidebar.markdown(f"**LLM backend**: {'🟒' if ok else 'πŸ”΄'} {backend}") st.sidebar.markdown(f"**Model**: `{LLM_CONFIG['model']}`") st.sidebar.markdown(f"**Embedder**: `{EMBEDDING_CONFIG['model'].split('/')[-1]}`") st.sidebar.markdown(f"**Reranker**: `bge-reranker-base`") manifest = _load_manifest() if manifest: st.sidebar.markdown(f"**Index**: {manifest.get('n_chunks','?')} chunks across " f"{len(manifest.get('chunks_per_doc',{}))} docs") with st.sidebar.expander("Documents"): for doc, n in sorted(manifest.get("chunks_per_doc", {}).items()): st.markdown(f"- `{doc}` β€” {n}") else: st.sidebar.warning("No index found. Run `python ingest.py --reset`.") st.sidebar.divider() st.sidebar.markdown("### Pipeline") st.sidebar.code( "question\n ↓ embed (MiniLM)\n ↓ Self-RAG router\n ↓ planner β†’ sub-queries\n" " ↓ dense βˆ₯ sparse\n ↓ RRF fusion\n ↓ cross-encoder rerank\n ↓ LLM answer\n" " ↓ self-critique β†’ retry?\n ↓ self-healing βš•οΈ\n β†’ answer + citations", language="text", ) def pipeline_tab() -> None: st.subheader("πŸ”¬ Underhood: watch every stage of the agentic RAG pipeline") st.caption( "Each step renders its inputs and outputs as it runs β€” embedding vector, " "router decision, planner sub-queries, dense vs sparse hits side-by-side, " "RRF fusion, cross-encoder rerank, vector-space projection, answer, self-critique." ) samples = [ "How does Self-RAG decide when to retrieve, and what reflection tokens does it use?", "Compare DDPM and DDIM sampling β€” what does DDIM gain by being non-Markovian?", "What is multi-head self-attention and why does parallelism matter?", "How does HyDE improve dense retrieval without relevance labels?", "How does ReAct combine reasoning and acting, vs chain-of-thought?", "hello, what can you do?", ] if "vq" not in st.session_state: st.session_state.vq = samples[0] cols = st.columns(3) for i, s in enumerate(samples): if cols[i % 3].button(s, key=f"vs{i}", use_container_width=True): st.session_state.vq = s q = st.text_area("Question", value=st.session_state.vq, height=80, key="vq_input") enable_healing = st.toggle( "βš•οΈ Self-Healing", value=True, help="After the answer is generated, run hallucination detection, chunk quality " "scoring, and knowledge-gap checks β€” regenerating if issues are found.", ) if st.button("β–Ά Run pipeline", type="primary"): if q.strip(): visual_pipeline(q.strip(), enable_healing=enable_healing) def image_tab() -> None: st.subheader("πŸ–ΌοΈ Multimodal RAG (Qwen3-VL)") st.caption( "Upload an image (e.g. a figure from a paper). Qwen3-VL captions it, the " "caption + question drives hybrid retrieval, then the model reasons over " "image + retrieved passages together." ) uploaded = st.file_uploader("Image", type=["png", "jpg", "jpeg", "webp"]) q = st.text_input("Question about the image", "Explain what this figure shows.") go = st.button("Reason", type="primary", key="img_go") if uploaded: st.image(uploaded, width=400) if not (go and uploaded): return with tempfile.NamedTemporaryFile(suffix=Path(uploaded.name).suffix, delete=False) as f: f.write(uploaded.getbuffer()) tmp_path = f.name try: with st.spinner("Captioning β†’ retrieving β†’ multimodal reasoning..."): out = image_retrieve_and_reason(tmp_path, q, llm=_llm()) st.markdown("### Caption") st.write(out["caption"]) st.markdown("### Answer") st.markdown(out["answer"]) st.markdown("### Retrieved passages") for i, h in enumerate(out["hits"], start=1): st.markdown( f"**[{i}]** {h.metadata.get('title')} " f"(p.{h.metadata.get('page_start')}–{h.metadata.get('page_end')}) " f"Β· score `{h.score:.3f}`" ) st.caption(h.text[:300] + ("…" if len(h.text) > 300 else "")) finally: os.unlink(tmp_path) def kb_tab() -> None: """Knowledge Base Versioning tab.""" st.subheader("πŸ“š Knowledge Base Versioning") st.caption( "Every ingest run creates a versioned snapshot of the index (kb_v1, kb_v2 …). " "Old snapshots are never deleted β€” rollback is a single metadata write. " "You can query any version independently and replay historical answers." ) # ── lazy import so the tab loads even if versioning DB is empty ── try: from versioning.version_router import VersionRouter router = VersionRouter() history = router.list_versions() except Exception as exc: st.error(f"Could not initialise versioning layer: {exc}") return # ── current version banner ─────────────────────────────────────── current = router.current_version() if current is None: st.warning( "No versioned snapshot found. Run `python ingest.py` to create v1." ) return info = router.version_info(current) c1, c2, c3, c4 = st.columns(4) c1.metric("Current version", f"v{current}") c2.metric("Total snapshots", len(history)) if info: c3.metric("Docs added (latest)", info.get("docs_added", 0)) c4.metric("Docs changed (latest)", info.get("docs_changed", 0)) st.divider() # ── version history table ──────────────────────────────────────── st.markdown("### Version history") if history: df = pd.DataFrame(history) df["version"] = df["version"].apply(lambda v: f"v{v}") df["timestamp"] = df["timestamp"].str[:19].str.replace("T", " ") df = df.rename(columns={ "version": "Version", "timestamp": "Created", "batch_name": "Batch", "docs_added": "Added", "docs_changed": "Changed", "docs_unchanged": "Unchanged", "reason": "Reason", "collection_name": "Collection", }) st.dataframe(df, use_container_width=True, hide_index=True) else: st.info("No versions yet.") st.divider() # ── rollback ───────────────────────────────────────────────────── st.markdown("### Rollback") st.caption( "Rolling back points the 'latest' pointer at a previous snapshot. " "The current snapshot is **not** deleted β€” you can roll forward again any time." ) version_nums = sorted([v["version"] for v in history]) if len(version_nums) > 1: target = st.selectbox( "Roll back to", options=[v for v in version_nums if v != current], format_func=lambda v: f"v{v}", key="kb_rollback_target", ) if st.button("βͺ Rollback", type="secondary"): try: router.rollback(target) st.success(f"Rolled back to v{target}. Reload the page to see updated metrics.") st.rerun() except Exception as exc: st.error(str(exc)) else: st.info("Need at least 2 versions to roll back.") st.divider() # ── per-version diff view ───────────────────────────────────────── st.markdown("### What changed per version") for v in history: vnum = v["version"] added = v.get("docs_added", 0) changed = v.get("docs_changed", 0) unchanged = v.get("docs_unchanged", 0) ts = (v.get("timestamp") or "")[:19].replace("T", " ") badge = "🟒" if vnum == current else "βšͺ" label = f"{badge} v{vnum} β€” {ts} Β· +{added} added, ~{changed} changed, {unchanged} unchanged" with st.expander(label, expanded=(vnum == current)): cols = st.columns(4) cols[0].metric("Added", added) cols[1].metric("Changed", changed) cols[2].metric("Unchanged", unchanged) cols[3].metric("Reason", v.get("reason") or "β€”") st.caption(f"Collection: `{v.get('collection_name')}` Β· Batch: `{v.get('batch_name')}`") # list docs active at this version try: from versioning.document_store import DocumentStore store = DocumentStore() docs = store.docs_at_version(vnum) if docs: st.markdown("**Documents active at this version:**") for d in sorted(docs, key=lambda x: x["doc_id"]): status_icon = "βœ…" if d["status"] == "active" else "πŸ—ƒοΈ" chk = (d.get("checksum") or "")[:12] st.markdown( f"{status_icon} `{d['doc_id']}` β€” " f"{d.get('title','?')[:60]} " f"" f"sha256:{chk}…", unsafe_allow_html=True, ) except Exception: pass st.divider() # ── cross-version query ─────────────────────────────────────────── st.markdown("### Query a specific version") st.caption( "Run the same question against different snapshots to see how the " "knowledge base evolution affects retrieval." ) qv_text = st.text_input( "Question", value="How does Self-RAG decide when to retrieve?", key="kb_query_text", ) col_v, col_k = st.columns([2, 1]) qv_version = col_v.selectbox( "Version to query", options=["latest"] + [f"v{v}" for v in sorted(version_nums, reverse=True)], key="kb_query_version", ) qv_k = col_k.slider("Top-K", 3, 10, 5, key="kb_query_k") if st.button("πŸ” Query version", key="kb_query_btn"): version_arg: str | int = ( "latest" if qv_version == "latest" else int(qv_version.lstrip("v")) ) if not router.collection_exists(version_arg): st.error( f"ChromaDB collection for {qv_version} not found. " "The snapshot may exist in metadata but its collection was removed." ) else: with st.spinner(f"Querying {qv_version}…"): try: hits, resolved = router.query( qv_text, version=version_arg, k=qv_k, log=True ) except Exception as exc: st.error(str(exc)) hits, resolved = [], None if hits: st.success(f"Retrieved {len(hits)} passages from v{resolved}.") for i, h in enumerate(hits, start=1): meta = h.metadata title = meta.get("title", "?") st.markdown( f"
" f"
" f"#{i}" f"score {h.score:.3f} Β· {title} Β· " f"p.{meta.get('page_start')}–{meta.get('page_end')}" f"
{h.text[:300]}{'…' if len(h.text)>300 else ''}" f"
", unsafe_allow_html=True, ) else: st.warning("No hits returned.") st.divider() # ── query audit log ─────────────────────────────────────────────── st.markdown("### Query audit log") st.caption("Every versioned query is recorded here for replay and debugging.") log = router.get_query_log(limit=20) if log: df_log = pd.DataFrame(log) df_log["timestamp"] = df_log["timestamp"].str[:19].str.replace("T", " ") df_log["version_used"] = df_log["version_used"].apply( lambda v: f"v{v}" if v is not None else "β€”" ) df_log = df_log.rename(columns={ "timestamp": "Time", "query": "Query", "version_used": "Version", "answer_hash": "Hash", }) st.dataframe(df_log, use_container_width=True, hide_index=True) else: st.info("No queries logged yet.") def main() -> None: _sidebar() st.title("AdaptiveRAG πŸ“šπŸ”¬") st.caption( "Agentic + Self-RAG + Modular RAG over your local paper library β€” " f"powered by `{LLM_CONFIG['model']}` via **{LLM_CONFIG['provider']}**. " "Every pipeline stage is exposed below." ) pipe, img, kb = st.tabs([ "πŸ”¬ Underhood pipeline", "πŸ–ΌοΈ Image Q&A (multimodal)", "πŸ“š Knowledge Base", ]) with pipe: pipeline_tab() with img: image_tab() with kb: kb_tab() if __name__ == "__main__": main()