import os
from typing import List, Dict, Tuple, Optional, Any
import streamlit as st
import logging
from datetime import datetime
# Disable telemetry for LangChain and Chroma by default
os.environ.setdefault("LANGCHAIN_TELEMETRY_ENABLED", "false")
os.environ.setdefault("LANGCHAIN_DISABLE_TELEMETRY", "true")
os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false")
from src.utils.rag_runtime import (
run_ingest_cli,
build_or_load_retriever_cached,
get_chain_cached,
answer_with_kg,
)
from src.utils.metrics import compute_quality_scores
from src.utils.formatting import format_source_label
from src.utils.env import ensure_openai_key
class AbaloneRAGApp:
"""Main application class for the Abalone RAG Chatbot."""
def __init__(self) -> None:
"""Initialize the Streamlit page and application state."""
st.set_page_config(page_title="Abalone RAG Chatbot", page_icon="🐚")
# Header row: title/subtitle on the left, rebuild action on the right
header_col, action_col = st.columns([5, 1])
with header_col:
st.title("Abalone RAG Chatbot")
st.write(
"Ask natural-language questions about abalone biology, ecology, "
"and research datasets. The app uses a local Chroma vectorstore "
"and OpenAI to retrieve and answer questions accurately."
)
with action_col:
# A compact, prominent rebuild control placed in the header
self._top_rebuild_clicked = st.button(
"Rebuild vectorstore",
key="top_rebuild",
use_container_width=True,
)
# Data and vectorstore locations
self.data_dir = "./data"
self.persist_dir = "./vectorstore"
# Initialize session state
st.session_state.setdefault("chat_history", [])
st.session_state.setdefault("rebuild_pending", False)
self.chat_history: List[Dict] = st.session_state["chat_history"]
# Sidebar configuration
(
self.model_name,
self.top_k,
self.retrieval_mode,
self.temperature,
self.answer_length,
self.style_instruction,
self.use_kg,
self.kg_hops,
) = self._build_sidebar()
# Ensure rebuild_clicked reflects the top-right control
self.rebuild_clicked = bool(getattr(self, "_top_rebuild_clicked", False))
# QA chain instance (loaded lazily)
# typing as Any avoids static warnings when calling the chain object
self.chain: Optional[Any] = None
# ------------------------------------------------------------------
# Sidebar configuration
# ------------------------------------------------------------------
def _build_sidebar(self) -> Tuple[str, int, str, float, str, str, bool, int]:
"""Render all sidebar controls and return model configuration.
Returns:
Tuple containing:
- model_name: Which LLM to use.
- top_k: Number of chunks to retrieve.
- retrieval_mode: Strategy (mmr, similarity, hybrid).
- temperature: LLM temperature.
- answer_length: Short/Medium/Long preference.
- style_instruction: Natural-language style directive.
- rebuild_clicked: Whether "Rebuild vectorstore" was pressed.
"""
st.sidebar.header("Model Settings")
model_name = st.sidebar.selectbox(
"Model",
options=["gpt-3.5-turbo", "gpt-4"],
index=0,
)
st.sidebar.markdown("---")
# Retrieval configuration
st.sidebar.header("Retrieval Configuration")
top_k = st.sidebar.slider(
"Number of retrieved chunks (k)",
min_value=2,
max_value=10,
value=4,
)
retrieval_mode_label = st.sidebar.selectbox(
"Retrieval mode",
["MMR (diverse)", "Similarity", "Hybrid (dense + MMR)"],
index=2,
)
retrieval_mode_map = {
"MMR (diverse)": "mmr",
"Similarity": "similarity",
"Hybrid (dense + MMR)": "hybrid",
}
retrieval_mode = retrieval_mode_map[retrieval_mode_label]
# Knowledge graph toggle (placed under Retrieval Configuration)
st.sidebar.markdown("---")
st.sidebar.header("Knowledge Graph")
use_kg = st.sidebar.checkbox("Use knowledge graph for retrieval", value=False)
kg_hops = st.sidebar.slider("KG hops", min_value=1, max_value=3, value=1)
st.sidebar.markdown("---")
# Answer style
st.sidebar.header("Answer Style")
temperature = st.sidebar.slider(
"Temperature",
min_value=0.0,
max_value=1.0,
value=0.2,
step=0.05,
)
answer_length = st.sidebar.selectbox(
"Answer length",
["Short", "Medium", "Long"],
index=1,
)
# (Vectorstore rebuild moved to top-right action button)
st.sidebar.markdown("---")
st.sidebar.markdown("To rebuild the vectorstore use the top-right \"Rebuild vectorstore\" button.", unsafe_allow_html=True)
# Build style instruction for the LLM
length_instruction_map = {
"Short": "Answer in 1–3 sentences.",
"Medium": "Answer in 1–2 paragraphs.",
"Long": "Provide a detailed, multi-paragraph explanation.",
}
length_instruction = length_instruction_map[answer_length]
style_instruction = (
length_instruction
+ f" Use a response style appropriate for a temperature of {temperature:.2f}, "
"where lower values are more factual and higher values are more exploratory."
)
return (
model_name,
top_k,
retrieval_mode,
temperature,
answer_length,
style_instruction,
use_kg,
kg_hops,
)
# ------------------------------------------------------------------
# Vectorstore rebuild workflow
# ------------------------------------------------------------------
def handle_rebuild(self) -> None:
"""Render rebuild confirmation dialog and rebuild if confirmed.
This manages the 2-step rebuild process:
1. User clicks "Rebuild vectorstore".
2. A confirmation dialog appears with "Yes, rebuild" and "Cancel".
If confirmed, the vectorstore is regenerated and caches are cleared.
"""
if self.rebuild_clicked:
st.session_state["rebuild_pending"] = True
if not st.session_state["rebuild_pending"]:
return
st.warning(
"Rebuild the vectorstore from the current contents of ./data? "
"This will overwrite existing embeddings."
)
col_left, col_center, col_right = st.columns([1, 2, 1])
with col_center:
confirm = st.button(
"Yes, rebuild",
key="confirm_rebuild",
use_container_width=True,
)
cancel = st.button(
"Cancel",
key="cancel_rebuild",
use_container_width=True,
)
# Centered green (confirm) and red (cancel) buttons
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# add a small UI log for rebuild actions
def _ui_log(msg: str):
try:
os.makedirs(self.persist_dir, exist_ok=True)
with open(os.path.join(self.persist_dir, "ui_rebuild.log"), "a", encoding="utf-8") as fh:
fh.write(f"{msg}\n")
except Exception:
pass
if confirm:
_ui_log(f"{datetime.utcnow().isoformat()} - Confirm rebuild clicked by user")
with st.spinner("Rebuilding vectorstore..."):
try:
out = run_ingest_cli(data_dir=self.data_dir, persist_dir=self.persist_dir)
_ui_log(f"{datetime.utcnow().isoformat()} - Rebuild succeeded")
except Exception as e:
import subprocess as _sp
_ui_log(f"{datetime.utcnow().isoformat()} - Rebuild failed: {e}")
if isinstance(e, _sp.CalledProcessError):
stderr = getattr(e, 'stderr', None)
stdout = getattr(e, 'output', None) or getattr(e, 'stdout', None)
st.error("Rebuild failed. See logs below.")
if stdout:
st.markdown("**ingest stdout:**")
st.code(stdout)
if stderr:
st.markdown("**ingest stderr:**")
st.code(stderr)
else:
st.error(f"Rebuild failed: {e}")
st.session_state["rebuild_pending"] = False
return
# On success, clear cached retriever/chain and reload
try:
build_or_load_retriever_cached.clear()
get_chain_cached.clear()
except Exception:
# if clearing cache fails, just log it in UI log
_ui_log(f"{datetime.utcnow().isoformat()} - Warning: failed to clear cached functions")
self.chain = get_chain_cached(
model_name=self.model_name,
top_k=self.top_k,
retrieval_mode=self.retrieval_mode,
data_dir=self.data_dir,
persist_dir=self.persist_dir,
)
st.session_state["rebuild_pending"] = False
st.success("Vectorstore rebuilt successfully.")
elif cancel:
st.session_state["rebuild_pending"] = False
st.info("Rebuild canceled.")
# ------------------------------------------------------------------
# Chain loading
# ------------------------------------------------------------------
def ensure_chain_ready(self) -> None:
"""Load or create the QA chain unless a rebuild is still pending."""
if st.session_state["rebuild_pending"]:
return
if self.chain is None:
with st.spinner("Initializing knowledge base and chat model..."):
self.chain = get_chain_cached(
model_name=self.model_name,
top_k=self.top_k,
retrieval_mode=self.retrieval_mode,
data_dir=self.data_dir,
persist_dir=self.persist_dir,
)
st.success("Knowledge base and model are ready.")
else:
st.success("Knowledge base and model are ready.")
# ------------------------------------------------------------------
# Chat UI
# ------------------------------------------------------------------
def render_chat_history(self) -> None:
"""Render previous user and assistant messages."""
for turn in self.chat_history:
with st.chat_message("user"):
st.markdown(turn["question"])
with st.chat_message("assistant"):
st.markdown(turn["answer"])
def handle_user_input(self) -> None:
"""Process new user queries, run RAG, compute metrics, and display results."""
if st.session_state["rebuild_pending"] or self.chain is None:
return
user_input = st.chat_input(
"Ask a question about abalone (biology, data, methodology, etc.)"
)
if not user_input:
return
# Render user message
with st.chat_message("user"):
st.markdown(user_input)
# Run inference
with st.spinner("Thinking..."):
prior_history: List[Tuple[str, str]] = [
(h.get("question"), h.get("answer", ""))
for h in self.chat_history
]
styled_question = self.style_instruction + "\n\nQuestion: " + user_input
if self.chain is None:
st.error("Model not initialized. Please wait for the knowledge base and model to be ready or rebuild the vectorstore.")
return
# Call the chain with a safe retry: if the underlying vectorstore is corrupted or missing
# (for example, Chroma raises an internal HNSW/disk error), attempt one automatic rebuild
# and retry. This avoids crashing the Streamlit app in deployed environments.
attempted_rebuild = False
last_exception = None
while True:
try:
if getattr(self, 'use_kg', False):
result = answer_with_kg(
self.chain,
styled_question,
prior_history,
persist_dir=self.persist_dir,
kg_hops=self.kg_hops,
)
else:
result = self.chain({"question": styled_question, "chat_history": prior_history})
break
except Exception as e:
# Keep the exception for logging and potential re-raise after a failed retry
last_exception = e
# If we've already attempted a rebuild, give up and show an error
if attempted_rebuild:
st.error("Retrieval error: failed to query the knowledge base. Try rebuilding the vectorstore manually.")
# Optionally show the exception text for debugging
st.exception(e)
# Stop processing this user input
return
# Attempt an automatic rebuild and retry once
attempted_rebuild = True
st.warning("Detected retrieval backend issue — attempting to rebuild the vectorstore and retry...")
try:
run_ingest_cli(data_dir=self.data_dir, persist_dir=self.persist_dir)
except Exception as rebuild_err:
st.error("Automatic rebuild failed; please rebuild manually from the sidebar or CLI.")
st.exception(rebuild_err)
return
# Clear cached retriever and chain and reload
try:
build_or_load_retriever_cached.clear()
get_chain_cached.clear()
self.chain = get_chain_cached(
model_name=self.model_name,
top_k=self.top_k,
retrieval_mode=self.retrieval_mode,
data_dir=self.data_dir,
persist_dir=self.persist_dir,
)
except Exception as reload_err:
st.error("Failed to reload the QA chain after rebuilding the vectorstore.")
st.exception(reload_err)
return
# loop will retry once
answer = (
result.get("answer")
or result.get("result")
or result.get("output_text")
or ""
)
source_docs = result.get("source_documents") or []
# Normalize retrieved docs for UI and metrics
formatted_sources: List[Dict] = []
for idx, sd in enumerate(source_docs, start=1):
if isinstance(sd, dict):
meta = sd.get("metadata", {}) or {}
text = (
sd.get("page_content")
or sd.get("content")
or sd.get("text", "")
or ""
)
else:
meta = getattr(sd, "metadata", {}) or {}
text = (
getattr(sd, "page_content", None)
or getattr(sd, "content", "")
or ""
)
formatted_sources.append(
{"index": idx, "metadata": meta, "content": str(text)}
)
# Compute simple retrieval quality metrics
coverage, grounding = compute_quality_scores(
user_input, answer, formatted_sources
)
coverage_pct = int(round(coverage * 100))
grounding_pct = int(round(grounding * 100))
# Render assistant message + debug block
with st.chat_message("assistant"):
st.markdown(answer)
with st.expander("Retrieval Metrics and Sources"):
st.markdown(f"- Retrieval mode: `{self.retrieval_mode}`")
st.markdown(f"- k: `{self.top_k}`")
st.markdown(
f"- Coverage score (question vs sources): **{coverage_pct}%**"
)
st.markdown(
f"- Grounding score (answer vs sources): **{grounding_pct}%**"
)
if formatted_sources:
st.markdown("**Retrieved chunks:**")
for src in formatted_sources:
label = format_source_label(src["metadata"], src["index"])
snippet = src["content"][:200].replace("\n", " ")
st.markdown(f"**[{src['index']}] {label}**")
st.code(snippet + "...")
# Persist turn in chat history
self.chat_history.append(
{
"question": user_input,
"answer": answer,
"sources": formatted_sources,
}
)
st.session_state["chat_history"] = self.chat_history
def main() -> None:
"""Main entry point for running the Abalone RAG Chatbot app."""
app = AbaloneRAGApp()
# Allow rebuild actions before enforcing OPENAI key so users can inspect logs
# and trigger rebuild operations even when the key isn't set. Chain init
# requires the key, so enforce it after handling rebuild requests.
app.handle_rebuild()
if not ensure_openai_key():
st.stop()
app.ensure_chain_ready()
app.render_chat_history()
app.handle_user_input()
if __name__ == "__main__":
main()