Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from chatbot_backend import chatbot, retrieve_all_threads | |
| from langchain_core.messages import HumanMessage, AIMessage, ToolMessage | |
| from ingestion import process_document | |
| from vector_store import build_vector_store | |
| import uuid | |
| import os | |
| # =========================== Utilities =========================== | |
| def generate_thread_id(): | |
| return uuid.uuid4() | |
| def reset_chat(): | |
| thread_id = generate_thread_id() | |
| st.session_state["thread_id"] = thread_id | |
| add_thread(thread_id) | |
| st.session_state["message_history"] = [] | |
| st.session_state["doc_content"] = None | |
| st.session_state["vector_store_path"] = None | |
| def reset_chat(): | |
| thread_id = generate_thread_id() | |
| st.session_state["thread_id"] = thread_id | |
| add_thread(thread_id) | |
| st.session_state["message_history"] = [] | |
| st.session_state["doc_content"] = None | |
| st.session_state["thread_titles"][thread_id] = "New Chat" | |
| def add_thread(thread_id): | |
| if thread_id not in st.session_state["chat_threads"]: | |
| st.session_state["chat_threads"].append(thread_id) | |
| def load_conversation(thread_id): | |
| state = chatbot.get_state(config={"configurable": {"thread_id": thread_id}}) | |
| return state.values.get("messages", []) | |
| def save_uploaded_file(uploaded_file): | |
| upload_dir = "uploaded_docs" | |
| os.makedirs(upload_dir, exist_ok=True) | |
| file_path = os.path.join(upload_dir, uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| return file_path | |
| def handle_uploaded_docs(uploaded_files, model_choice): | |
| """Processes uploaded documents, builds vector store, and returns doc content + store path.""" | |
| if not uploaded_files: | |
| return None, None | |
| save_dir = "vector_stores" | |
| os.makedirs(save_dir, exist_ok=True) | |
| db_save_path = os.path.join(save_dir, "user_docs_index") | |
| all_chunks = [] | |
| all_text = "" | |
| for uploaded_file in uploaded_files: | |
| file_path = save_uploaded_file(uploaded_file) | |
| st.sidebar.write(f"π Processing {uploaded_file.name} ...") | |
| chunks = process_document(file_path) | |
| all_chunks.extend(chunks) | |
| all_text += "\n".join([chunk.page_content for chunk in chunks]) | |
| st.sidebar.write("π Building FAISS vector store...") | |
| build_vector_store(all_chunks, db_save_path, model_choice) | |
| st.sidebar.success("β Documents processed and stored successfully!") | |
| return all_text, db_save_path | |
| # ======================= Session Initialization =================== | |
| if "message_history" not in st.session_state: | |
| st.session_state["message_history"] = [] | |
| if "thread_id" not in st.session_state: | |
| st.session_state["thread_id"] = generate_thread_id() | |
| if "chat_threads" not in st.session_state: | |
| st.session_state["chat_threads"] = retrieve_all_threads() | |
| if "selected_model" not in st.session_state: | |
| st.session_state["selected_model"] = "OpenAI (Paid)" | |
| if "doc_source" not in st.session_state: | |
| st.session_state["doc_source"] = "Upload Your Documents" | |
| if "vector_store_path" not in st.session_state: | |
| st.session_state["vector_store_path"] = None | |
| if "doc_content" not in st.session_state: | |
| st.session_state["doc_content"] = None | |
| if "thread_titles" not in st.session_state: | |
| st.session_state["thread_titles"] = {} | |
| if "system_prompt" not in st.session_state: | |
| st.session_state["system_prompt"] = "" | |
| if "search_enabled" not in st.session_state: | |
| st.session_state["search_enabled"] = False | |
| add_thread(st.session_state["thread_id"]) | |
| # ============================ Sidebar ============================ | |
| st.sidebar.title("LangGraph Chatbot") | |
| # -------- Model Selection -------- | |
| st.sidebar.subheader("Select Model") | |
| model_choice = st.sidebar.radio( | |
| "Choose a model:", | |
| ["OpenAI (Paid)", "LLaMA (Open Source)"], | |
| index=0 if st.session_state["selected_model"] == "OpenAI (Paid)" else 1, | |
| ) | |
| st.session_state["selected_model"] = model_choice | |
| # -------- Document Source Selection -------- | |
| st.sidebar.subheader("Choose your document source:") | |
| doc_source_choice = st.sidebar.radio( | |
| "", | |
| ["Upload Your Documents", "Use Sample Documents"], | |
| index=0 if st.session_state["doc_source"] == "Upload Your Documents" else 1, | |
| ) | |
| st.session_state["doc_source"] = doc_source_choice | |
| # -------- Handle Document Upload -------- | |
| uploaded_files = None | |
| if st.session_state["doc_source"] == "Upload Your Documents": | |
| uploaded_files = st.sidebar.file_uploader( | |
| "Upload PDF, TXT, or DOCX files", | |
| accept_multiple_files=True, | |
| type=["pdf", "txt", "docx", "mp3", "wav", "m4a", "jpg", "jpeg", "png", "csv", "json"], | |
| ) | |
| if uploaded_files: | |
| if st.sidebar.button("Process Documents"): | |
| doc_text, vector_path = handle_uploaded_docs( | |
| uploaded_files, st.session_state["selected_model"] | |
| ) | |
| st.session_state["vector_store_path"] = vector_path | |
| st.session_state["doc_content"] = doc_text | |
| # ``` | |
| else: | |
| # ----------------------- | |
| # Load sample documents | |
| # ----------------------- | |
| SAMPLE_DIR = "sample_docs" | |
| st.sidebar.info("π Using sample documents from `sample_docs/` folder.") | |
| # If user wants to rebuild index from sample files, expose a button | |
| if st.sidebar.button("Load & Process Sample Documents"): | |
| sample_paths = [] | |
| if not os.path.isdir(SAMPLE_DIR): | |
| st.sidebar.error(f"No `{SAMPLE_DIR}` folder found. Create it and add sample .pdf/.txt/.docx files.") | |
| st.session_state["vector_store_path"] = None | |
| st.session_state["doc_content"] = None | |
| else: | |
| # collect sample files | |
| for fname in os.listdir(SAMPLE_DIR): | |
| if fname.lower().endswith((".pdf", ".txt", ".docx")): | |
| sample_paths.append(os.path.join(SAMPLE_DIR, fname)) | |
| if not sample_paths: | |
| st.sidebar.error(f"No supported files in `{SAMPLE_DIR}`. Add .pdf/.txt/.docx files.") | |
| st.session_state["vector_store_path"] = None | |
| st.session_state["doc_content"] = None | |
| else: | |
| st.sidebar.write(f"Processing {len(sample_paths)} sample files...") | |
| all_chunks = [] | |
| all_text = "" | |
| for path in sample_paths: | |
| chunks = process_document(path) | |
| all_chunks.extend(chunks) | |
| # combine text from chunks for doc_content fallback | |
| all_text += "\n".join([c.page_content for c in chunks]) + "\n\n" | |
| # Build (or overwrite) a sample index if desired | |
| save_dir = "vector_stores" | |
| os.makedirs(save_dir, exist_ok=True) | |
| db_save_path = os.path.join(save_dir, "sample_index") | |
| st.sidebar.write("π Building/Updating FAISS index for sample docs...") | |
| try: | |
| build_vector_store(all_chunks, db_save_path, st.session_state["selected_model"]) | |
| st.sidebar.success("β Sample FAISS index built.") | |
| st.session_state["vector_store_path"] = db_save_path | |
| except Exception as e: | |
| st.sidebar.warning(f"Could not build FAISS index: {e}") | |
| st.session_state["vector_store_path"] = None | |
| st.session_state["doc_content"] = all_text | |
| st.sidebar.success("β Sample documents loaded and available as `doc_content`.") | |
| # If a pre-built sample index is already present, load it automatically (optional) | |
| elif os.path.isdir("vector_stores") and os.path.isdir(os.path.join("vector_stores", "sample_index")): | |
| st.sidebar.write("Using existing FAISS sample index at `vector_stores/sample_index`.") | |
| st.session_state["vector_store_path"] = os.path.join("vector_stores", "sample_index") | |
| # If you want doc_content as a fallback too, try to build it from raw sample files if available | |
| # Prefer to load raw sample files to create doc_content; otherwise leave None | |
| if os.path.isdir(SAMPLE_DIR): | |
| # quick concat of processed text (non-blocking small) | |
| all_text = "" | |
| for fname in os.listdir(SAMPLE_DIR): | |
| if fname.lower().endswith((".pdf", ".txt", ".docx")): | |
| try: | |
| chunks = process_document(os.path.join(SAMPLE_DIR, fname)) | |
| all_text += "\n".join([c.page_content for c in chunks]) + "\n\n" | |
| except Exception: | |
| # ignore processing errors for a single file | |
| continue | |
| st.session_state["doc_content"] = all_text or "Sample index loaded, but no raw sample text available." | |
| else: | |
| st.session_state["doc_content"] = "Sample FAISS index loaded." | |
| else: | |
| # Nothing present yet β inform the user what to do | |
| st.sidebar.info("No sample index found. Click 'Load & Process Sample Documents' after adding files to `sample_docs/`.") | |
| st.session_state["vector_store_path"] = None | |
| st.session_state["doc_content"] = None | |
| # -------- Conversation Controls -------- | |
| if st.sidebar.button("New Chat",key="new_chat_button"): | |
| reset_chat() | |
| st.sidebar.header("My Conversations") | |
| for thread_id in st.session_state["chat_threads"][::-1]: | |
| title = st.session_state["thread_titles"].get(thread_id, "Untitled Chat") | |
| if st.sidebar.button(title,key=f"chat_{thread_id}"): | |
| st.session_state["thread_id"] = thread_id | |
| messages = load_conversation(thread_id) | |
| temp_messages = [] | |
| for msg in messages: | |
| role = "user" if isinstance(msg, HumanMessage) else "assistant" | |
| temp_messages.append({"role": role, "content": msg.content}) | |
| st.session_state["message_history"] = temp_messages | |
| # -------- Web Search Option -------- | |
| st.sidebar.header("π Web Search Option") | |
| st.session_state["search_enabled"] = st.sidebar.toggle( | |
| "Enable Web Search (DuckDuckGo)", | |
| value=st.session_state["search_enabled"], | |
| key="web_search_toggle" | |
| ) | |
| if st.session_state["search_enabled"]: | |
| st.sidebar.success("π Web Search: Enabled") | |
| else: | |
| st.sidebar.info("π« Web Search: Disabled") | |
| # -------- System Prompt -------- | |
| st.sidebar.header("π§ System Behavior") | |
| st.session_state["system_prompt"] = st.sidebar.text_area( | |
| "Set System Prompt (optional)", | |
| placeholder="e.g. You are an expert tax advisor specializing in Indian GST laws.", | |
| value=st.session_state["system_prompt"], | |
| key="system_prompt_input" # β also give this a unique key | |
| ) | |
| # ============================ Main UI ============================ | |
| st.title("π§ AnyRAG β Intelligent Document & Web-Aware Chatbot") | |
| st.caption( | |
| f"π§ Model: **{st.session_state['selected_model']}** | " | |
| f"π Source: **{st.session_state['doc_source']}**" | |
| ) | |
| # Render history | |
| for message in st.session_state["message_history"]: | |
| with st.chat_message(message["role"]): | |
| st.text(message["content"]) | |
| user_input = st.chat_input("Type here") | |
| if user_input: | |
| st.session_state["message_history"].append({"role": "user", "content": user_input}) | |
| with st.chat_message("user"): | |
| st.text(user_input) | |
| # β Generate a short title if this is the first message | |
| if st.session_state["thread_id"] not in st.session_state["thread_titles"] or \ | |
| st.session_state["thread_titles"][st.session_state["thread_id"]] == "New Chat": | |
| title = " ".join(user_input.strip().split()[:2]) or "Chat" | |
| st.session_state["thread_titles"][st.session_state["thread_id"]] = title | |
| CONFIG = { | |
| "configurable": { | |
| "thread_id": st.session_state["thread_id"], | |
| "model_choice": st.session_state["selected_model"], | |
| "doc_source": st.session_state["doc_source"], | |
| "vector_store_path": st.session_state["vector_store_path"], | |
| "doc_content": st.session_state["doc_content"], # β Added this key | |
| }, | |
| "metadata": {"thread_id": st.session_state["thread_id"]}, | |
| "run_name": "chat_turn", | |
| } | |
| # Assistant streaming response | |
| with st.chat_message("assistant"): | |
| status_holder = {"box": None} | |
| def ai_only_stream(): | |
| initial_state = { | |
| "messages": [HumanMessage(content=user_input)], | |
| "doc_content": st.session_state.get("doc_content"), | |
| "search_enabled": st.session_state.get("search_enabled", False), | |
| "model_choice": st.session_state["selected_model"], | |
| "doc_source": st.session_state["doc_source"], | |
| "system_prompt": st.session_state.get("system_prompt", ""), # β Added | |
| } | |
| for message_chunk, metadata in chatbot.stream( | |
| initial_state, | |
| config=CONFIG, | |
| stream_mode="messages", | |
| ): | |
| if isinstance(message_chunk, ToolMessage): | |
| tool_name = getattr(message_chunk, "name", "tool") | |
| if status_holder["box"] is None: | |
| status_holder["box"] = st.status( | |
| f"π§ Using `{tool_name}` β¦", expanded=True | |
| ) | |
| else: | |
| status_holder["box"].update( | |
| label=f"π§ Using `{tool_name}` β¦", | |
| state="running", | |
| expanded=True, | |
| ) | |
| if isinstance(message_chunk, AIMessage): | |
| yield message_chunk.content | |
| ai_message = st.write_stream(ai_only_stream()) | |
| if status_holder["box"] is not None: | |
| status_holder["box"].update( | |
| label="β Tool finished", state="complete", expanded=False | |
| ) | |
| st.session_state["message_history"].append( | |
| {"role": "assistant", "content": ai_message} | |
| ) | |