AnyRAG-WebSearch / src /chatbot_frontend.py
Rashid Ali
fix app path
666b79c
import streamlit as st
from chatbot_backend import chatbot, retrieve_all_threads
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from ingestion import process_document
from vector_store import build_vector_store
import uuid
import os
# =========================== Utilities ===========================
def generate_thread_id():
return uuid.uuid4()
def reset_chat():
thread_id = generate_thread_id()
st.session_state["thread_id"] = thread_id
add_thread(thread_id)
st.session_state["message_history"] = []
st.session_state["doc_content"] = None
st.session_state["vector_store_path"] = None
def reset_chat():
thread_id = generate_thread_id()
st.session_state["thread_id"] = thread_id
add_thread(thread_id)
st.session_state["message_history"] = []
st.session_state["doc_content"] = None
st.session_state["thread_titles"][thread_id] = "New Chat"
def add_thread(thread_id):
if thread_id not in st.session_state["chat_threads"]:
st.session_state["chat_threads"].append(thread_id)
def load_conversation(thread_id):
state = chatbot.get_state(config={"configurable": {"thread_id": thread_id}})
return state.values.get("messages", [])
def save_uploaded_file(uploaded_file):
upload_dir = "uploaded_docs"
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, uploaded_file.name)
with open(file_path, "wb") as f:
f.write(uploaded_file.read())
return file_path
def handle_uploaded_docs(uploaded_files, model_choice):
"""Processes uploaded documents, builds vector store, and returns doc content + store path."""
if not uploaded_files:
return None, None
save_dir = "vector_stores"
os.makedirs(save_dir, exist_ok=True)
db_save_path = os.path.join(save_dir, "user_docs_index")
all_chunks = []
all_text = ""
for uploaded_file in uploaded_files:
file_path = save_uploaded_file(uploaded_file)
st.sidebar.write(f"πŸ“„ Processing {uploaded_file.name} ...")
chunks = process_document(file_path)
all_chunks.extend(chunks)
all_text += "\n".join([chunk.page_content for chunk in chunks])
st.sidebar.write("πŸ” Building FAISS vector store...")
build_vector_store(all_chunks, db_save_path, model_choice)
st.sidebar.success("βœ… Documents processed and stored successfully!")
return all_text, db_save_path
# ======================= Session Initialization ===================
if "message_history" not in st.session_state:
st.session_state["message_history"] = []
if "thread_id" not in st.session_state:
st.session_state["thread_id"] = generate_thread_id()
if "chat_threads" not in st.session_state:
st.session_state["chat_threads"] = retrieve_all_threads()
if "selected_model" not in st.session_state:
st.session_state["selected_model"] = "OpenAI (Paid)"
if "doc_source" not in st.session_state:
st.session_state["doc_source"] = "Upload Your Documents"
if "vector_store_path" not in st.session_state:
st.session_state["vector_store_path"] = None
if "doc_content" not in st.session_state:
st.session_state["doc_content"] = None
if "thread_titles" not in st.session_state:
st.session_state["thread_titles"] = {}
if "system_prompt" not in st.session_state:
st.session_state["system_prompt"] = ""
if "search_enabled" not in st.session_state:
st.session_state["search_enabled"] = False
add_thread(st.session_state["thread_id"])
# ============================ Sidebar ============================
st.sidebar.title("LangGraph Chatbot")
# -------- Model Selection --------
st.sidebar.subheader("Select Model")
model_choice = st.sidebar.radio(
"Choose a model:",
["OpenAI (Paid)", "LLaMA (Open Source)"],
index=0 if st.session_state["selected_model"] == "OpenAI (Paid)" else 1,
)
st.session_state["selected_model"] = model_choice
# -------- Document Source Selection --------
st.sidebar.subheader("Choose your document source:")
doc_source_choice = st.sidebar.radio(
"",
["Upload Your Documents", "Use Sample Documents"],
index=0 if st.session_state["doc_source"] == "Upload Your Documents" else 1,
)
st.session_state["doc_source"] = doc_source_choice
# -------- Handle Document Upload --------
uploaded_files = None
if st.session_state["doc_source"] == "Upload Your Documents":
uploaded_files = st.sidebar.file_uploader(
"Upload PDF, TXT, or DOCX files",
accept_multiple_files=True,
type=["pdf", "txt", "docx", "mp3", "wav", "m4a", "jpg", "jpeg", "png", "csv", "json"],
)
if uploaded_files:
if st.sidebar.button("Process Documents"):
doc_text, vector_path = handle_uploaded_docs(
uploaded_files, st.session_state["selected_model"]
)
st.session_state["vector_store_path"] = vector_path
st.session_state["doc_content"] = doc_text
# ```
else:
# -----------------------
# Load sample documents
# -----------------------
SAMPLE_DIR = "sample_docs"
st.sidebar.info("πŸ“š Using sample documents from `sample_docs/` folder.")
# If user wants to rebuild index from sample files, expose a button
if st.sidebar.button("Load & Process Sample Documents"):
sample_paths = []
if not os.path.isdir(SAMPLE_DIR):
st.sidebar.error(f"No `{SAMPLE_DIR}` folder found. Create it and add sample .pdf/.txt/.docx files.")
st.session_state["vector_store_path"] = None
st.session_state["doc_content"] = None
else:
# collect sample files
for fname in os.listdir(SAMPLE_DIR):
if fname.lower().endswith((".pdf", ".txt", ".docx")):
sample_paths.append(os.path.join(SAMPLE_DIR, fname))
if not sample_paths:
st.sidebar.error(f"No supported files in `{SAMPLE_DIR}`. Add .pdf/.txt/.docx files.")
st.session_state["vector_store_path"] = None
st.session_state["doc_content"] = None
else:
st.sidebar.write(f"Processing {len(sample_paths)} sample files...")
all_chunks = []
all_text = ""
for path in sample_paths:
chunks = process_document(path)
all_chunks.extend(chunks)
# combine text from chunks for doc_content fallback
all_text += "\n".join([c.page_content for c in chunks]) + "\n\n"
# Build (or overwrite) a sample index if desired
save_dir = "vector_stores"
os.makedirs(save_dir, exist_ok=True)
db_save_path = os.path.join(save_dir, "sample_index")
st.sidebar.write("πŸ” Building/Updating FAISS index for sample docs...")
try:
build_vector_store(all_chunks, db_save_path, st.session_state["selected_model"])
st.sidebar.success("βœ… Sample FAISS index built.")
st.session_state["vector_store_path"] = db_save_path
except Exception as e:
st.sidebar.warning(f"Could not build FAISS index: {e}")
st.session_state["vector_store_path"] = None
st.session_state["doc_content"] = all_text
st.sidebar.success("βœ… Sample documents loaded and available as `doc_content`.")
# If a pre-built sample index is already present, load it automatically (optional)
elif os.path.isdir("vector_stores") and os.path.isdir(os.path.join("vector_stores", "sample_index")):
st.sidebar.write("Using existing FAISS sample index at `vector_stores/sample_index`.")
st.session_state["vector_store_path"] = os.path.join("vector_stores", "sample_index")
# If you want doc_content as a fallback too, try to build it from raw sample files if available
# Prefer to load raw sample files to create doc_content; otherwise leave None
if os.path.isdir(SAMPLE_DIR):
# quick concat of processed text (non-blocking small)
all_text = ""
for fname in os.listdir(SAMPLE_DIR):
if fname.lower().endswith((".pdf", ".txt", ".docx")):
try:
chunks = process_document(os.path.join(SAMPLE_DIR, fname))
all_text += "\n".join([c.page_content for c in chunks]) + "\n\n"
except Exception:
# ignore processing errors for a single file
continue
st.session_state["doc_content"] = all_text or "Sample index loaded, but no raw sample text available."
else:
st.session_state["doc_content"] = "Sample FAISS index loaded."
else:
# Nothing present yet β€” inform the user what to do
st.sidebar.info("No sample index found. Click 'Load & Process Sample Documents' after adding files to `sample_docs/`.")
st.session_state["vector_store_path"] = None
st.session_state["doc_content"] = None
# -------- Conversation Controls --------
if st.sidebar.button("New Chat",key="new_chat_button"):
reset_chat()
st.sidebar.header("My Conversations")
for thread_id in st.session_state["chat_threads"][::-1]:
title = st.session_state["thread_titles"].get(thread_id, "Untitled Chat")
if st.sidebar.button(title,key=f"chat_{thread_id}"):
st.session_state["thread_id"] = thread_id
messages = load_conversation(thread_id)
temp_messages = []
for msg in messages:
role = "user" if isinstance(msg, HumanMessage) else "assistant"
temp_messages.append({"role": role, "content": msg.content})
st.session_state["message_history"] = temp_messages
# -------- Web Search Option --------
st.sidebar.header("🌐 Web Search Option")
st.session_state["search_enabled"] = st.sidebar.toggle(
"Enable Web Search (DuckDuckGo)",
value=st.session_state["search_enabled"],
key="web_search_toggle"
)
if st.session_state["search_enabled"]:
st.sidebar.success("🌍 Web Search: Enabled")
else:
st.sidebar.info("🚫 Web Search: Disabled")
# -------- System Prompt --------
st.sidebar.header("🧠 System Behavior")
st.session_state["system_prompt"] = st.sidebar.text_area(
"Set System Prompt (optional)",
placeholder="e.g. You are an expert tax advisor specializing in Indian GST laws.",
value=st.session_state["system_prompt"],
key="system_prompt_input" # βœ… also give this a unique key
)
# ============================ Main UI ============================
st.title("🧠 AnyRAG β€” Intelligent Document & Web-Aware Chatbot")
st.caption(
f"🧠 Model: **{st.session_state['selected_model']}** | "
f"πŸ“‚ Source: **{st.session_state['doc_source']}**"
)
# Render history
for message in st.session_state["message_history"]:
with st.chat_message(message["role"]):
st.text(message["content"])
user_input = st.chat_input("Type here")
if user_input:
st.session_state["message_history"].append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.text(user_input)
# βœ… Generate a short title if this is the first message
if st.session_state["thread_id"] not in st.session_state["thread_titles"] or \
st.session_state["thread_titles"][st.session_state["thread_id"]] == "New Chat":
title = " ".join(user_input.strip().split()[:2]) or "Chat"
st.session_state["thread_titles"][st.session_state["thread_id"]] = title
CONFIG = {
"configurable": {
"thread_id": st.session_state["thread_id"],
"model_choice": st.session_state["selected_model"],
"doc_source": st.session_state["doc_source"],
"vector_store_path": st.session_state["vector_store_path"],
"doc_content": st.session_state["doc_content"], # βœ… Added this key
},
"metadata": {"thread_id": st.session_state["thread_id"]},
"run_name": "chat_turn",
}
# Assistant streaming response
with st.chat_message("assistant"):
status_holder = {"box": None}
def ai_only_stream():
initial_state = {
"messages": [HumanMessage(content=user_input)],
"doc_content": st.session_state.get("doc_content"),
"search_enabled": st.session_state.get("search_enabled", False),
"model_choice": st.session_state["selected_model"],
"doc_source": st.session_state["doc_source"],
"system_prompt": st.session_state.get("system_prompt", ""), # βœ… Added
}
for message_chunk, metadata in chatbot.stream(
initial_state,
config=CONFIG,
stream_mode="messages",
):
if isinstance(message_chunk, ToolMessage):
tool_name = getattr(message_chunk, "name", "tool")
if status_holder["box"] is None:
status_holder["box"] = st.status(
f"πŸ”§ Using `{tool_name}` …", expanded=True
)
else:
status_holder["box"].update(
label=f"πŸ”§ Using `{tool_name}` …",
state="running",
expanded=True,
)
if isinstance(message_chunk, AIMessage):
yield message_chunk.content
ai_message = st.write_stream(ai_only_stream())
if status_holder["box"] is not None:
status_holder["box"].update(
label="βœ… Tool finished", state="complete", expanded=False
)
st.session_state["message_history"].append(
{"role": "assistant", "content": ai_message}
)