"""AdaptiveRAG β under-the-hood pipeline visualizer.
Run: streamlit run app.py
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
import time
from pathlib import Path
os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")
# suppress harmless noise from Streamlit's torch inspector + ChromaDB posthog client
logging.getLogger("streamlit.watcher.local_sources_watcher").setLevel(logging.ERROR)
logging.getLogger("chromadb.telemetry.product.posthog").setLevel(logging.ERROR)
import numpy as np
import pandas as pd
import streamlit as st
from agent.critic import critique, refine_query
from agent.planner import plan
from agent.router import route
from agent.tools import image_retrieve_and_reason
from config import AGENT_CONFIG, EMBEDDING_CONFIG, HOSTED, LLM_CONFIG, PATHS, RETRIEVAL_CONFIG
from ingestion.embedder import embed_query
from ingestion.indexer import fetch_embeddings
from llm.client_factory import get_llm
from retrieval.dense import Hit, dense_search
from retrieval.hybrid import reciprocal_rank_fusion
from retrieval.reranker import rerank
from retrieval.sparse import sparse_search
st.set_page_config(page_title="AdaptiveRAG β Underhood", page_icon="π¬", layout="wide")
# ββ LLM backend check β shown before anything else ββββββββββββββββββ
if not os.environ.get("GROQ_API_KEY"):
# Running without Groq β check if Ollama is reachable locally
try:
import requests as _req
_req.get("http://localhost:11434/api/tags", timeout=2).raise_for_status()
_ollama_ok = True
except Exception:
_ollama_ok = False
if not _ollama_ok:
st.error(
"**No LLM backend found.**\n\n"
"- **Running on Hugging Face?** Add your `GROQ_API_KEY` secret in "
"Space Settings β Variables and secrets. Get a free key at "
"[console.groq.com](https://console.groq.com).\n"
"- **Running locally?** Start Ollama: `ollama serve`"
)
st.stop()
# βββββββββββββββββββββββββββββ styling ββββββββββββββββββββββββββββββ
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# βββββββββββββββββββββββββββββ helpers ββββββββββββββββββββββββββββββ
@st.cache_resource
def _llm():
return get_llm()
def _load_manifest() -> dict:
p = PATHS["manifest_path"]
return json.loads(p.read_text()) if p.exists() else {}
def phase_header(num: int, title: str, subtitle: str = "") -> None:
st.markdown(
f"
STEP {num}"
f"{title}
{subtitle}"
f"
",
unsafe_allow_html=True,
)
def hits_to_df(hits: list[Hit], score_label: str = "score") -> pd.DataFrame:
rows = []
for h in hits:
title = h.metadata.get("title") or h.metadata.get("source_path", "?")
short = title.split(" (")[0]
if len(short) > 38:
short = short[:35] + "β¦"
label = f"{short} Β· p{h.metadata.get('page_start')} Β· {h.chunk_id.split('::')[-1]}"
rows.append({"chunk": label, score_label: float(h.score), "chunk_id": h.chunk_id})
return pd.DataFrame(rows)
def render_hits(hits: list[Hit], badge_class: str, label: str, max_chars: int = 220) -> None:
if not hits:
st.caption(f"_(no {label.lower()} hits)_")
return
for i, h in enumerate(hits, start=1):
meta = h.metadata
snippet = h.text[:max_chars].replace("\n", " ")
if len(h.text) > max_chars:
snippet += "β¦"
st.markdown(
f""
f"
"
f"{label} #{i}"
f"score {h.score:.3f} Β· "
f"{meta.get('title','?')} Β· p.{meta.get('page_start')}β{meta.get('page_end')} Β· "
f"{h.chunk_id}"
f"
{snippet}
",
unsafe_allow_html=True,
)
def pca_2d(matrix: np.ndarray) -> np.ndarray:
centered = matrix - matrix.mean(axis=0, keepdims=True)
_, _, vt = np.linalg.svd(centered, full_matrices=False)
return centered @ vt[:2].T
def vector_space_plot(query_vec: list[float], fused_hits: list[Hit],
dense_ids: set[str], sparse_ids: set[str],
kept_ids: set[str]) -> None:
if not fused_hits:
st.caption("_(nothing to plot)_")
return
embs = fetch_embeddings([h.chunk_id for h in fused_hits])
rows = []
vecs = [np.array(query_vec, dtype=np.float32)]
for h in fused_hits:
v = embs.get(h.chunk_id)
if v is None:
continue
vecs.append(np.array(v, dtype=np.float32))
in_d, in_s = h.chunk_id in dense_ids, h.chunk_id in sparse_ids
in_keep = h.chunk_id in kept_ids
if in_d and in_s:
color = "fused (both)"
elif in_d:
color = "dense only"
elif in_s:
color = "sparse only"
else:
color = "other"
title = (h.metadata.get("title") or "?").split(" (")[0][:40]
label = f"{title} Β· p{h.metadata.get('page_start')}"
rows.append({"label": label, "color": color, "size": 90 if in_keep else 50})
if len(vecs) < 3:
st.caption("_(need at least 2 hits for a 2D projection)_")
return
proj = pca_2d(np.vstack(vecs))
df = pd.DataFrame(
[{"x": proj[0, 0], "y": proj[0, 1], "label": "π your question",
"color": "QUERY", "size": 220}]
+ [{"x": proj[i + 1, 0], "y": proj[i + 1, 1], **rows[i]}
for i in range(len(rows))]
)
st.scatter_chart(
df, x="x", y="y", color="color", size="size",
height=380, use_container_width=True,
)
st.caption(
"PCA projection of the query embedding + fused hit embeddings. "
"Larger points survived cross-encoder reranking."
)
def render_embedding_card(query: str, qv: list[float], dt: float) -> None:
arr = np.array(qv, dtype=np.float32)
cols = st.columns([1, 1, 1, 3])
cols[0].metric("Model", EMBEDDING_CONFIG["model"].split("/")[-1])
cols[1].metric("Dimensions", len(qv))
cols[2].metric("L2 norm", f"{float(np.linalg.norm(arr)):.3f}")
cols[3].metric("Embed time", f"{dt*1000:.0f} ms")
st.caption(f"Question ({len(query)} chars, ~{len(query.split())} words):")
st.code(query, language="text")
st.caption("First 32 dimensions of the embedding vector:")
st.bar_chart(pd.DataFrame({"value": arr[:32]}), height=140, use_container_width=True)
preview = ", ".join(f"{x:+.3f}" for x in arr[:8]) + ", β¦"
st.markdown(f"vector[0:8] = [{preview}]",
unsafe_allow_html=True)
# βββββββββββββββββββββββββββββ pipeline view ββββββββββββββββββββββββββββββ
def _render_healing_trace(healing_trace: list[dict], health_score: float) -> None:
"""Render the healing trace panel below the answer."""
# Health score metric with colour
if health_score >= 80:
color, label = "#2ecc71", "Healthy"
elif health_score >= 60:
color, label = "#f39c12", "Fair"
else:
color, label = "#e74c3c", "Needs healing"
st.markdown(
f""
f""
f"βοΈ Health score: {health_score:.0f} / 100"
f""
f"{label}
",
unsafe_allow_html=True,
)
if not healing_trace:
st.success("β
Answer passed all checks on first attempt β no healing needed.")
return
for attempt in healing_trace:
num = attempt.get("attempt", "?")
healthy = attempt.get("healthy", False)
icon = "β
" if healthy else "π§"
issues = attempt.get("issues", [])
label_str = "Healthy" if healthy else f"Issues: {', '.join(issues)}"
with st.expander(f"{icon} Attempt {num} β {label_str}", expanded=not healthy):
if healthy:
st.success("All checks passed β answer accepted.")
continue
c1, c2, c3 = st.columns(3)
c1.metric("Hallucination", "β οΈ yes" if "hallucination" in issues else "β
no")
c2.metric("Low chunk quality", "β οΈ yes" if "low_chunk_quality" in issues else "β
no")
c3.metric("Knowledge gap", "β οΈ yes" if "knowledge_gap" in issues else "β
no")
for action in attempt.get("actions", []):
atype = action.get("type", "")
detail = action.get("detail", "")
icons = {
"hallucination_fix": "π",
"chunk_expansion": "π",
"web_search": "π",
"query_rewrite": "βοΈ",
"regenerate": "π",
}
prefix = icons.get(atype, "β’")
st.markdown(
f"{atype}"
f"{prefix} {detail}
",
unsafe_allow_html=True,
)
if atype == "hallucination_fix":
for s in action.get("flagged_sentences", []):
st.caption(f" β³ Flagged: \"{s[:120]}β¦\"")
def visual_pipeline(query: str, enable_healing: bool = True) -> None:
llm = _llm()
# ββ Step 1: embed the question ββββββββββββββββββββββββββββββββ
phase_header(1, "Question encoding",
"Convert text β 384-dim dense vector via sentence-transformers (MiniLM-L6).")
t0 = time.time()
qv = embed_query(query)
render_embedding_card(query, qv, time.time() - t0)
# ββ Step 2: Self-RAG router ββββββββββββββββββββββββββββββββ
phase_header(2, "Self-RAG router",
"Decide whether to RETRIEVE, ANSWER_DIRECTLY, or CLARIFY before touching the index.")
t0 = time.time()
decision = route(query, llm=llm)
dt = time.time() - t0
pill_map = {"RETRIEVE": "pill-blue", "ANSWER_DIRECTLY": "pill-green", "CLARIFY": "pill-amber"}
pill = pill_map.get(decision["action"], "pill-grey")
st.markdown(
f"{decision['action']}"
f"{decision.get('reason','')}"
f""
f"router latency: {dt*1000:.0f} ms",
unsafe_allow_html=True,
)
if decision["action"] == "ANSWER_DIRECTLY":
st.markdown("### Direct answer (no retrieval)")
ans = llm.generate(prompt=query,
system="You are a helpful research assistant. Be concise.",
temperature=0.2)
st.markdown(ans)
return
if decision["action"] == "CLARIFY":
st.markdown("### Clarifying question")
ans = llm.generate(
prompt=("The user asked: " + query +
"\n\nIt is too ambiguous to answer well. Ask one short clarifying question."),
system="You are a helpful research assistant.",
temperature=0.2,
)
st.markdown(ans)
return
# ββ Iterations of plan β retrieve β answer β critique ββββββββ
accumulated: list[Hit] = []
current_query = query
for it in range(AGENT_CONFIG["max_iterations"]):
st.markdown(f"---\n## π Iteration {it + 1}")
if current_query != query:
st.info(f"Refined query β **{current_query}**")
# ββ Step 3: plan βββββββββββββββββββββββββββββββββββββ
phase_header(3, "Planner", "LLM decomposes the question into focused sub-queries.")
prior = ""
if accumulated:
titles = sorted({h.metadata.get("title", "?") for h in accumulated})
prior = "Already gathered passages from: " + ", ".join(titles)
t0 = time.time()
steps = plan(current_query, prior_summary=prior, llm=llm)
dt = time.time() - t0
st.caption(f"Generated {len(steps)} sub-quer{'y' if len(steps)==1 else 'ies'} in {dt*1000:.0f} ms")
for i, s in enumerate(steps, start=1):
st.markdown(
f""
f"
sub-query {i}"
f"
{s['query']}"
f"
"
f"rationale: {s.get('rationale','β')}
",
unsafe_allow_html=True,
)
# ββ Step 4: retrieval per sub-query ββββββββββββββββββ
phase_header(
4,
"Hybrid retrieval per sub-query",
f"Dense (Chroma cosine, k={RETRIEVAL_CONFIG['dense_k']}) β₯ "
f"Sparse (BM25, k={RETRIEVAL_CONFIG['sparse_k']}) β "
f"Reciprocal Rank Fusion β Cross-encoder rerank "
f"(BGE, top {RETRIEVAL_CONFIG['rerank_top_n']}).",
)
for si, step in enumerate(steps, start=1):
with st.expander(f"Sub-query {si}: {step['query']}", expanded=(si == 1)):
t0 = time.time()
dense_hits = dense_search(step["query"])
t_dense = time.time() - t0
t0 = time.time()
sparse_hits = sparse_search(step["query"])
t_sparse = time.time() - t0
t0 = time.time()
fused = reciprocal_rank_fusion([dense_hits, sparse_hits],
top_k=max(RETRIEVAL_CONFIG["dense_k"],
RETRIEVAL_CONFIG["sparse_k"]))
t_fuse = time.time() - t0
t0 = time.time()
reranked = rerank(step["query"], fused)
t_rerank = time.time() - t0
m1, m2, m3, m4 = st.columns(4)
m1.metric("Dense hits", len(dense_hits), f"{t_dense*1000:.0f} ms")
m2.metric("Sparse hits", len(sparse_hits), f"{t_sparse*1000:.0f} ms")
m3.metric("After RRF", len(fused), f"{t_fuse*1000:.0f} ms")
m4.metric("After rerank", len(reranked), f"{t_rerank*1000:.0f} ms")
tabs = st.tabs([
"π΅ Dense (vectors)",
"π’ Sparse (BM25)",
"π£ RRF fusion",
"π‘ Cross-encoder rerank",
"πΊοΈ Vector space",
])
with tabs[0]:
st.caption("Top-K nearest neighbors by cosine similarity.")
if dense_hits:
st.bar_chart(hits_to_df(dense_hits, "cosine_sim"),
x="chunk", y="cosine_sim",
height=260, use_container_width=True)
render_hits(dense_hits[:5], "pill-blue", "DENSE")
with tabs[1]:
st.caption("Top-K BM25 keyword matches (normalized).")
if sparse_hits:
st.bar_chart(hits_to_df(sparse_hits, "bm25_norm"),
x="chunk", y="bm25_norm",
height=260, use_container_width=True)
render_hits(sparse_hits[:5], "pill-green", "BM25")
with tabs[2]:
st.caption(
"Reciprocal Rank Fusion: score(d) = Ξ£ 1/(k + rank). "
"Combines dense + sparse rankings into one merged list."
)
if fused:
st.bar_chart(hits_to_df(fused[:12], "rrf_score"),
x="chunk", y="rrf_score",
height=280, use_container_width=True)
render_hits(fused[:5], "pill-purple", "FUSED")
with tabs[3]:
st.caption(
"Cross-encoder scores (query, chunk) jointly β much more "
"accurate than bi-encoder cosine, but slower β only run on "
"the fused candidate set."
)
if reranked:
st.bar_chart(hits_to_df(reranked, "ce_score"),
x="chunk", y="ce_score",
height=240, use_container_width=True)
render_hits(reranked, "pill-amber", "RERANKED")
with tabs[4]:
dense_ids = {h.chunk_id for h in dense_hits}
sparse_ids = {h.chunk_id for h in sparse_hits}
kept_ids = {h.chunk_id for h in reranked}
vector_space_plot(qv, fused[:20], dense_ids, sparse_ids, kept_ids)
accumulated.extend(reranked)
# ββ Step 5: answer βββββββββββββββββββββββββββββββββββββ
# Dedupe + cap to 8 passages for the final prompt
seen: set[str] = set()
unique: list[Hit] = []
for h in accumulated:
if h.chunk_id in seen:
continue
seen.add(h.chunk_id)
unique.append(h)
if len(unique) >= 8:
break
context_lines, citations = [], []
for i, h in enumerate(unique, start=1):
meta = h.metadata
head = (f"[{i}] {meta.get('title','?')} "
f"(p.{meta.get('page_start')}-{meta.get('page_end')})")
context_lines.append(f"{head}\n{h.text}")
citations.append({
"n": i, "chunk_id": h.chunk_id,
"title": meta.get("title"),
"source_path": meta.get("source_path"),
"page_start": meta.get("page_start"),
"page_end": meta.get("page_end"),
"score": float(h.score),
})
context_block = "\n\n".join(context_lines)
phase_header(5, "Context assembly + answer generation",
f"Top {len(unique)} unique passages β {LLM_CONFIG['model']} via {LLM_CONFIG['provider']}.")
with st.expander("π¦ Context handed to the LLM", expanded=False):
for c in citations:
st.markdown(
f"**[{c['n']}]** {c['title']} Β· pages {c['page_start']}β{c['page_end']} Β· "
f"score `{c['score']:.3f}`"
)
st.code(context_block[:3000] + ("β¦" if len(context_block) > 3000 else ""),
language="text")
t0 = time.time()
ANSWER_SYSTEM = (
"You are a careful research assistant. Use ONLY the provided passages to "
"answer the question. Cite sources inline with [N] where N is the passage "
"number. If the passages are insufficient, say so explicitly."
)
ANSWER_PROMPT = (
f"Question: {query}\n\nPassages:\n{context_block}\n\n"
"Write a concise, well-grounded answer. Use inline citations like [1], [2] "
"that match the passage numbers above."
)
answer = llm.generate(prompt=ANSWER_PROMPT, system=ANSWER_SYSTEM, temperature=0.1)
st.caption(f"LLM generation: {time.time()-t0:.1f} s")
st.markdown("### Answer")
st.markdown(answer)
st.markdown("### Citations")
for c in citations:
st.markdown(
f"**[{c['n']}]** {c['title']} β pages {c['page_start']}β{c['page_end']} "
f"Β· score `{c['score']:.3f}` Β· `{Path(c['source_path']).name}`"
)
# ββ Step 6: critic βββββββββββββββββββββββββββββββββββββ
phase_header(6, "Self-critique",
"LLM scores its own answer for grounding + completeness.")
t0 = time.time()
crit = critique(query, answer, context_block, llm=llm)
c1, c2, c3 = st.columns(3)
c1.metric("Grounded", "β
yes" if crit["grounded"] else "β οΈ no")
c2.metric("Complete", "β
yes" if crit["complete"] else "β οΈ no")
c3.metric("Confidence", f"{crit['confidence']:.2f}",
delta=f"threshold {AGENT_CONFIG['confidence_threshold']:.2f}")
if crit.get("missing"):
st.warning(f"Missing: {crit['missing']}")
st.caption(f"Critique latency: {time.time()-t0:.1f} s")
if crit["confidence"] >= AGENT_CONFIG["confidence_threshold"] and crit["grounded"]:
st.success(f"β Confidence {crit['confidence']:.2f} β₯ threshold β answer accepted.")
if enable_healing:
phase_header(
7, "Self-Healing layer",
"Hallucination detection β chunk quality scoring β knowledge gap β regenerate if needed.",
)
with st.spinner("Running self-healing checksβ¦"):
from healing.healing_loop import self_heal
healed = self_heal(query, answer, unique, citations, llm=llm)
if healed.attempts_used > 0:
st.markdown("### Healed Answer")
st.markdown(healed.answer)
st.markdown("### Updated Citations")
for c in healed.citations:
st.markdown(
f"**[{c['n']}]** {c['title']} β "
f"pages {c.get('page_start')}β{c.get('page_end')} "
f"Β· score `{c['score']:.3f}`"
)
_render_healing_trace(healed.healing_trace, healed.health_score)
return
if it < AGENT_CONFIG["max_iterations"] - 1:
st.warning("Confidence below threshold β refining query and retrying.")
current_query = refine_query(query, crit.get("missing", ""), llm=llm)
else:
st.error("Max iterations reached. Returning best-effort answer.")
if enable_healing:
phase_header(
7, "Self-Healing layer",
"Hallucination detection β chunk quality scoring β knowledge gap β regenerate if needed.",
)
with st.spinner("Running self-healing checksβ¦"):
from healing.healing_loop import self_heal
healed = self_heal(query, answer, unique, citations, llm=llm)
if healed.attempts_used > 0:
st.markdown("### Healed Answer")
st.markdown(healed.answer)
_render_healing_trace(healed.healing_trace, healed.health_score)
# βββββββββββββββββββββββββββββ sidebar + tabs ββββββββββββββββββββββββββββββ
def _sidebar() -> None:
st.sidebar.title("AdaptiveRAG")
st.sidebar.caption("Agentic + Self-RAG + Modular RAG")
llm = _llm()
ok = llm.health()
backend = "Groq API" if HOSTED else "Ollama (local)"
st.sidebar.markdown(f"**LLM backend**: {'π’' if ok else 'π΄'} {backend}")
st.sidebar.markdown(f"**Model**: `{LLM_CONFIG['model']}`")
st.sidebar.markdown(f"**Embedder**: `{EMBEDDING_CONFIG['model'].split('/')[-1]}`")
st.sidebar.markdown(f"**Reranker**: `bge-reranker-base`")
manifest = _load_manifest()
if manifest:
st.sidebar.markdown(f"**Index**: {manifest.get('n_chunks','?')} chunks across "
f"{len(manifest.get('chunks_per_doc',{}))} docs")
with st.sidebar.expander("Documents"):
for doc, n in sorted(manifest.get("chunks_per_doc", {}).items()):
st.markdown(f"- `{doc}` β {n}")
else:
st.sidebar.warning("No index found. Run `python ingest.py --reset`.")
st.sidebar.divider()
st.sidebar.markdown("### Pipeline")
st.sidebar.code(
"question\n β embed (MiniLM)\n β Self-RAG router\n β planner β sub-queries\n"
" β dense β₯ sparse\n β RRF fusion\n β cross-encoder rerank\n β LLM answer\n"
" β self-critique β retry?\n β self-healing βοΈ\n β answer + citations",
language="text",
)
def pipeline_tab() -> None:
st.subheader("π¬ Underhood: watch every stage of the agentic RAG pipeline")
st.caption(
"Each step renders its inputs and outputs as it runs β embedding vector, "
"router decision, planner sub-queries, dense vs sparse hits side-by-side, "
"RRF fusion, cross-encoder rerank, vector-space projection, answer, self-critique."
)
samples = [
"How does Self-RAG decide when to retrieve, and what reflection tokens does it use?",
"Compare DDPM and DDIM sampling β what does DDIM gain by being non-Markovian?",
"What is multi-head self-attention and why does parallelism matter?",
"How does HyDE improve dense retrieval without relevance labels?",
"How does ReAct combine reasoning and acting, vs chain-of-thought?",
"hello, what can you do?",
]
if "vq" not in st.session_state:
st.session_state.vq = samples[0]
cols = st.columns(3)
for i, s in enumerate(samples):
if cols[i % 3].button(s, key=f"vs{i}", use_container_width=True):
st.session_state.vq = s
q = st.text_area("Question", value=st.session_state.vq, height=80, key="vq_input")
enable_healing = st.toggle(
"βοΈ Self-Healing",
value=True,
help="After the answer is generated, run hallucination detection, chunk quality "
"scoring, and knowledge-gap checks β regenerating if issues are found.",
)
if st.button("βΆ Run pipeline", type="primary"):
if q.strip():
visual_pipeline(q.strip(), enable_healing=enable_healing)
def image_tab() -> None:
st.subheader("πΌοΈ Multimodal RAG (Qwen3-VL)")
st.caption(
"Upload an image (e.g. a figure from a paper). Qwen3-VL captions it, the "
"caption + question drives hybrid retrieval, then the model reasons over "
"image + retrieved passages together."
)
uploaded = st.file_uploader("Image", type=["png", "jpg", "jpeg", "webp"])
q = st.text_input("Question about the image", "Explain what this figure shows.")
go = st.button("Reason", type="primary", key="img_go")
if uploaded:
st.image(uploaded, width=400)
if not (go and uploaded):
return
with tempfile.NamedTemporaryFile(suffix=Path(uploaded.name).suffix, delete=False) as f:
f.write(uploaded.getbuffer())
tmp_path = f.name
try:
with st.spinner("Captioning β retrieving β multimodal reasoning..."):
out = image_retrieve_and_reason(tmp_path, q, llm=_llm())
st.markdown("### Caption")
st.write(out["caption"])
st.markdown("### Answer")
st.markdown(out["answer"])
st.markdown("### Retrieved passages")
for i, h in enumerate(out["hits"], start=1):
st.markdown(
f"**[{i}]** {h.metadata.get('title')} "
f"(p.{h.metadata.get('page_start')}β{h.metadata.get('page_end')}) "
f"Β· score `{h.score:.3f}`"
)
st.caption(h.text[:300] + ("β¦" if len(h.text) > 300 else ""))
finally:
os.unlink(tmp_path)
def kb_tab() -> None:
"""Knowledge Base Versioning tab."""
st.subheader("π Knowledge Base Versioning")
st.caption(
"Every ingest run creates a versioned snapshot of the index (kb_v1, kb_v2 β¦). "
"Old snapshots are never deleted β rollback is a single metadata write. "
"You can query any version independently and replay historical answers."
)
# ββ lazy import so the tab loads even if versioning DB is empty ββ
try:
from versioning.version_router import VersionRouter
router = VersionRouter()
history = router.list_versions()
except Exception as exc:
st.error(f"Could not initialise versioning layer: {exc}")
return
# ββ current version banner βββββββββββββββββββββββββββββββββββββββ
current = router.current_version()
if current is None:
st.warning(
"No versioned snapshot found. Run `python ingest.py` to create v1."
)
return
info = router.version_info(current)
c1, c2, c3, c4 = st.columns(4)
c1.metric("Current version", f"v{current}")
c2.metric("Total snapshots", len(history))
if info:
c3.metric("Docs added (latest)", info.get("docs_added", 0))
c4.metric("Docs changed (latest)", info.get("docs_changed", 0))
st.divider()
# ββ version history table ββββββββββββββββββββββββββββββββββββββββ
st.markdown("### Version history")
if history:
df = pd.DataFrame(history)
df["version"] = df["version"].apply(lambda v: f"v{v}")
df["timestamp"] = df["timestamp"].str[:19].str.replace("T", " ")
df = df.rename(columns={
"version": "Version",
"timestamp": "Created",
"batch_name": "Batch",
"docs_added": "Added",
"docs_changed": "Changed",
"docs_unchanged": "Unchanged",
"reason": "Reason",
"collection_name": "Collection",
})
st.dataframe(df, use_container_width=True, hide_index=True)
else:
st.info("No versions yet.")
st.divider()
# ββ rollback βββββββββββββββββββββββββββββββββββββββββββββββββββββ
st.markdown("### Rollback")
st.caption(
"Rolling back points the 'latest' pointer at a previous snapshot. "
"The current snapshot is **not** deleted β you can roll forward again any time."
)
version_nums = sorted([v["version"] for v in history])
if len(version_nums) > 1:
target = st.selectbox(
"Roll back to",
options=[v for v in version_nums if v != current],
format_func=lambda v: f"v{v}",
key="kb_rollback_target",
)
if st.button("βͺ Rollback", type="secondary"):
try:
router.rollback(target)
st.success(f"Rolled back to v{target}. Reload the page to see updated metrics.")
st.rerun()
except Exception as exc:
st.error(str(exc))
else:
st.info("Need at least 2 versions to roll back.")
st.divider()
# ββ per-version diff view βββββββββββββββββββββββββββββββββββββββββ
st.markdown("### What changed per version")
for v in history:
vnum = v["version"]
added = v.get("docs_added", 0)
changed = v.get("docs_changed", 0)
unchanged = v.get("docs_unchanged", 0)
ts = (v.get("timestamp") or "")[:19].replace("T", " ")
badge = "π’" if vnum == current else "βͺ"
label = f"{badge} v{vnum} β {ts} Β· +{added} added, ~{changed} changed, {unchanged} unchanged"
with st.expander(label, expanded=(vnum == current)):
cols = st.columns(4)
cols[0].metric("Added", added)
cols[1].metric("Changed", changed)
cols[2].metric("Unchanged", unchanged)
cols[3].metric("Reason", v.get("reason") or "β")
st.caption(f"Collection: `{v.get('collection_name')}` Β· Batch: `{v.get('batch_name')}`")
# list docs active at this version
try:
from versioning.document_store import DocumentStore
store = DocumentStore()
docs = store.docs_at_version(vnum)
if docs:
st.markdown("**Documents active at this version:**")
for d in sorted(docs, key=lambda x: x["doc_id"]):
status_icon = "β
" if d["status"] == "active" else "ποΈ"
chk = (d.get("checksum") or "")[:12]
st.markdown(
f"{status_icon} `{d['doc_id']}` β "
f"{d.get('title','?')[:60]} "
f""
f"sha256:{chk}β¦",
unsafe_allow_html=True,
)
except Exception:
pass
st.divider()
# ββ cross-version query βββββββββββββββββββββββββββββββββββββββββββ
st.markdown("### Query a specific version")
st.caption(
"Run the same question against different snapshots to see how the "
"knowledge base evolution affects retrieval."
)
qv_text = st.text_input(
"Question",
value="How does Self-RAG decide when to retrieve?",
key="kb_query_text",
)
col_v, col_k = st.columns([2, 1])
qv_version = col_v.selectbox(
"Version to query",
options=["latest"] + [f"v{v}" for v in sorted(version_nums, reverse=True)],
key="kb_query_version",
)
qv_k = col_k.slider("Top-K", 3, 10, 5, key="kb_query_k")
if st.button("π Query version", key="kb_query_btn"):
version_arg: str | int = (
"latest"
if qv_version == "latest"
else int(qv_version.lstrip("v"))
)
if not router.collection_exists(version_arg):
st.error(
f"ChromaDB collection for {qv_version} not found. "
"The snapshot may exist in metadata but its collection was removed."
)
else:
with st.spinner(f"Querying {qv_version}β¦"):
try:
hits, resolved = router.query(
qv_text, version=version_arg, k=qv_k, log=True
)
except Exception as exc:
st.error(str(exc))
hits, resolved = [], None
if hits:
st.success(f"Retrieved {len(hits)} passages from v{resolved}.")
for i, h in enumerate(hits, start=1):
meta = h.metadata
title = meta.get("title", "?")
st.markdown(
f""
f"
"
f"#{i}"
f"score {h.score:.3f} Β· {title} Β· "
f"p.{meta.get('page_start')}β{meta.get('page_end')}"
f"
{h.text[:300]}{'β¦' if len(h.text)>300 else ''}"
f"
",
unsafe_allow_html=True,
)
else:
st.warning("No hits returned.")
st.divider()
# ββ query audit log βββββββββββββββββββββββββββββββββββββββββββββββ
st.markdown("### Query audit log")
st.caption("Every versioned query is recorded here for replay and debugging.")
log = router.get_query_log(limit=20)
if log:
df_log = pd.DataFrame(log)
df_log["timestamp"] = df_log["timestamp"].str[:19].str.replace("T", " ")
df_log["version_used"] = df_log["version_used"].apply(
lambda v: f"v{v}" if v is not None else "β"
)
df_log = df_log.rename(columns={
"timestamp": "Time",
"query": "Query",
"version_used": "Version",
"answer_hash": "Hash",
})
st.dataframe(df_log, use_container_width=True, hide_index=True)
else:
st.info("No queries logged yet.")
def main() -> None:
_sidebar()
st.title("AdaptiveRAG ππ¬")
st.caption(
"Agentic + Self-RAG + Modular RAG over your local paper library β "
f"powered by `{LLM_CONFIG['model']}` via **{LLM_CONFIG['provider']}**. "
"Every pipeline stage is exposed below."
)
pipe, img, kb = st.tabs([
"π¬ Underhood pipeline",
"πΌοΈ Image Q&A (multimodal)",
"π Knowledge Base",
])
with pipe:
pipeline_tab()
with img:
image_tab()
with kb:
kb_tab()
if __name__ == "__main__":
main()