| import streamlit as st |
| import os |
| import sys |
| import tempfile |
| from datetime import datetime |
| import pandas as pd |
| from typing import List, Dict, Any |
| import time |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) |
|
|
| |
| try: |
| from app.core.agent import AssistantAgent |
| from app.core.ingestion import DocumentProcessor |
| from app.core.telegram_bot import TelegramBot |
| from app.core.chat_history import ChatHistoryManager |
| from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file |
| from app.config import ( |
| LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, |
| TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, |
| HF_DATASET_NAME |
| ) |
| except ImportError: |
| |
| sys.path.append(os.path.abspath('.')) |
| from app.core.agent import AssistantAgent |
| from app.core.ingestion import DocumentProcessor |
| from app.core.telegram_bot import TelegramBot |
| from app.core.chat_history import ChatHistoryManager |
| from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file |
| from app.config import ( |
| LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, |
| TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, |
| HF_DATASET_NAME |
| ) |
|
|
| |
| st.set_page_config( |
| page_title="Personal AI Second Brain", |
| page_icon="🧠", |
| layout="wide" |
| ) |
|
|
| |
| @st.cache_resource |
| def get_agent(): |
| logger.info("Initializing AssistantAgent (should only happen once)") |
| try: |
| return AssistantAgent() |
| except Exception as e: |
| logger.error(f"Error initializing agent: {e}") |
| st.error(f"Could not initialize AI assistant: {str(e)}") |
| |
| class DummyAgent: |
| def query(self, question): |
| return { |
| "answer": "I'm having trouble starting up. Please try refreshing the page.", |
| "sources": [] |
| } |
| def add_conversation_to_memory(self, *args, **kwargs): |
| pass |
| return DummyAgent() |
|
|
| |
| @st.cache_resource |
| def get_document_processor(_agent): |
| """Initialize document processor with unhashable agent parameter. |
| The leading underscore in _agent tells Streamlit not to hash this parameter. |
| """ |
| logger.info("Initializing DocumentProcessor (should only happen once)") |
| try: |
| return DocumentProcessor(_agent.memory_manager) |
| except Exception as e: |
| logger.error(f"Error initializing document processor: {e}") |
| st.error(f"Could not initialize document processor: {str(e)}") |
| |
| class DummyProcessor: |
| def ingest_file(self, *args, **kwargs): |
| return ["dummy-id"] |
| def ingest_text(self, *args, **kwargs): |
| return ["dummy-id"] |
| return DummyProcessor() |
|
|
| |
| @st.cache_resource |
| def get_chat_history_manager(): |
| logger.info("Initializing ChatHistoryManager") |
| try: |
| return ChatHistoryManager(dataset_name=HF_DATASET_NAME) |
| except Exception as e: |
| logger.error(f"Error initializing chat history manager: {e}") |
| st.error(f"Could not initialize chat history: {str(e)}") |
| |
| class DummyHistoryManager: |
| def load_history(self, *args, **kwargs): |
| return [] |
| def save_conversation(self, *args, **kwargs): |
| return True |
| def sync_to_hub(self, *args, **kwargs): |
| return False |
| return DummyHistoryManager() |
|
|
| |
| @st.cache_resource |
| def get_telegram_bot(_agent): |
| """Initialize Telegram bot with unhashable agent parameter.""" |
| if not TELEGRAM_ENABLED or not TELEGRAM_BOT_TOKEN: |
| logger.info("Telegram bot disabled or token missing") |
| return None |
| |
| logger.info("Initializing Telegram bot") |
| try: |
| bot = TelegramBot( |
| agent=_agent, |
| token=TELEGRAM_BOT_TOKEN, |
| allowed_user_ids=TELEGRAM_ALLOWED_USERS |
| ) |
| return bot |
| except Exception as e: |
| logger.error(f"Error initializing Telegram bot: {e}") |
| return None |
|
|
| |
| if "messages" not in st.session_state: |
| st.session_state.messages = [] |
| if "telegram_status" not in st.session_state: |
| st.session_state.telegram_status = "Not started" |
| if "history_filter" not in st.session_state: |
| st.session_state.history_filter = "" |
| if "current_tab" not in st.session_state: |
| st.session_state.current_tab = "Chat" |
|
|
| |
| agent = get_agent() |
| document_processor = get_document_processor(agent) |
| chat_history_manager = get_chat_history_manager() |
| telegram_bot = get_telegram_bot(agent) |
|
|
| |
| if not st.session_state.messages: |
| try: |
| recent_history = chat_history_manager.load_history() |
| |
| for conv in recent_history[-10:]: |
| if "user_query" in conv and "assistant_response" in conv: |
| st.session_state.messages.append({"role": "user", "content": conv["user_query"]}) |
| st.session_state.messages.append({"role": "assistant", "content": conv["assistant_response"]}) |
| except Exception as e: |
| logger.error(f"Error loading initial history: {e}") |
|
|
| |
| st.title("🧠 Personal AI Second Brain") |
|
|
| |
| tabs = st.tabs(["Chat", "Documents", "History", "Settings"]) |
|
|
| |
| with tabs[0]: |
| if st.session_state.current_tab != "Chat": |
| st.session_state.current_tab = "Chat" |
| |
| |
| for message in st.session_state.messages: |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
| |
| |
| if prompt := st.chat_input("Ask me anything..."): |
| |
| st.session_state.messages.append({"role": "user", "content": prompt}) |
| |
| |
| with st.chat_message("user"): |
| st.markdown(prompt) |
| |
| |
| with st.chat_message("assistant"): |
| message_placeholder = st.empty() |
| message_placeholder.markdown("Thinking...") |
| |
| try: |
| response = agent.query(prompt) |
| answer = response["answer"] |
| sources = response["sources"] |
| |
| |
| message_placeholder.markdown(answer) |
| |
| |
| st.session_state.messages.append({"role": "assistant", "content": answer}) |
| |
| |
| chat_history_manager.save_conversation({ |
| "user_query": prompt, |
| "assistant_response": answer, |
| "sources": [s["source"] for s in sources] if sources else [], |
| "timestamp": datetime.now().isoformat() |
| }) |
| |
| |
| if sources: |
| with st.expander("Sources"): |
| st.markdown(format_sources(sources)) |
| |
| |
| agent.add_conversation_to_memory(prompt, answer) |
| |
| except Exception as e: |
| logger.error(f"Error generating response: {e}") |
| error_message = f"I'm sorry, I encountered an error: {str(e)}" |
| message_placeholder.markdown(error_message) |
| st.session_state.messages.append({"role": "assistant", "content": error_message}) |
|
|
| |
| with tabs[1]: |
| if st.session_state.current_tab != "Documents": |
| st.session_state.current_tab = "Documents" |
| |
| st.header("Upload & Manage Documents") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.subheader("Upload a File") |
| |
| |
| with st.expander("Supported File Types"): |
| st.markdown(""" |
| - **PDF** (.pdf) - Best for formatted documents |
| - **Text** (.txt) - Simple text files |
| - **CSV** (.csv) - Structured data |
| - **Word** (.doc, .docx) - Microsoft Word documents |
| - **Markdown** (.md) - Formatted text |
| - **HTML** (.html, .htm) - Web pages |
| |
| Other file types may work but are not fully supported. |
| """) |
| |
| uploaded_file = st.file_uploader("Choose a file", type=["pdf", "txt", "csv", "doc", "docx", "md", "html", "htm", "xml", "json"]) |
| |
| if uploaded_file is not None: |
| |
| file_details = { |
| "Filename": uploaded_file.name, |
| "File size": f"{uploaded_file.size / 1024:.1f} KB", |
| "File type": uploaded_file.type |
| } |
| |
| st.json(file_details) |
| |
| |
| if st.button("Process Document"): |
| with st.spinner("Processing document..."): |
| status_placeholder = st.empty() |
| status_placeholder.info("Starting document processing...") |
| |
| try: |
| |
| status_placeholder.info("Creating temporary file...") |
| temp_dir = tempfile.gettempdir() |
| temp_path = os.path.join(temp_dir, uploaded_file.name) |
| |
| logger.info(f"Saving uploaded file to temporary path: {temp_path}") |
| |
| |
| with open(temp_path, "wb") as temp_file: |
| temp_file.write(uploaded_file.getvalue()) |
| |
| |
| status_placeholder.info("Preparing document storage location...") |
| doc_path = get_document_path(uploaded_file.name) |
| |
| |
| logger.info(f"Copying file to documents directory: {doc_path}") |
| copy_success = copy_uploaded_file(temp_path, doc_path) |
| |
| if not copy_success: |
| logger.warning("Using temporary file path instead of documents directory") |
| doc_path = temp_path |
| status_placeholder.warning("Using temporary storage (document won't be permanently saved)") |
| |
| |
| status_placeholder.info("Analyzing and indexing document content...") |
| progress_bar = st.progress(0) |
| max_retries = 3 |
| |
| for attempt in range(max_retries): |
| try: |
| progress_bar.progress((attempt * 30) / 100) |
| ids = document_processor.ingest_file(temp_path, {"original_name": uploaded_file.name}) |
| progress_bar.progress(100) |
| break |
| except Exception as e: |
| error_str = str(e).lower() |
| if ("403" in error_str or "forbidden" in error_str or "permission" in error_str) and attempt < max_retries - 1: |
| status_placeholder.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") |
| logger.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") |
| time.sleep(1.5) |
| elif attempt < max_retries - 1: |
| |
| status_placeholder.warning(f"Error ({attempt+1}/{max_retries}), retrying...") |
| logger.warning(f"Error during ingestion ({attempt+1}/{max_retries}): {e}") |
| time.sleep(1.5) |
| else: |
| raise |
| |
| |
| if temp_path != doc_path and os.path.exists(temp_path): |
| try: |
| os.unlink(temp_path) |
| logger.info(f"Temporary file removed: {temp_path}") |
| except Exception as e: |
| logger.warning(f"Could not remove temporary file: {e}") |
| |
| |
| if ids and not all(str(id).startswith("error-") for id in ids): |
| status_placeholder.success(f"✅ Document processed successfully!") |
| st.balloons() |
| else: |
| status_placeholder.warning("⚠️ Document processed with warnings. Some content may not be fully indexed.") |
| |
| except Exception as e: |
| progress_bar = st.progress(100) if 'progress_bar' in locals() else st.progress(0) |
| logger.error(f"Error processing document: {str(e)}") |
| status_placeholder.error(f"❌ Error processing document: {str(e)}") |
| |
| if "403" in str(e) or "forbidden" in str(e).lower(): |
| st.warning("This appears to be a permissions issue. Try using a different file format or using the text input option instead.") |
| elif "unsupported" in str(e).lower() or "not supported" in str(e).lower() or "no specific loader" in str(e).lower(): |
| st.warning("This file format may not be supported. Try converting to PDF or TXT first.") |
| |
| with col2: |
| st.subheader("Add Text Directly") |
| |
| |
| text_content = st.text_area("Enter text to add to your knowledge base:", height=200) |
| text_title = st.text_input("Give this text a title:") |
| |
| if st.button("Process Text") and text_content and text_title: |
| with st.spinner("Processing text..."): |
| status_placeholder = st.empty() |
| status_placeholder.info("Processing your text...") |
| |
| try: |
| |
| metadata = {"title": text_title, "source": "direct_input"} |
| ids = document_processor.ingest_text(text_content, metadata) |
| |
| if ids: |
| status_placeholder.success("✅ Text processed successfully!") |
| else: |
| status_placeholder.warning("⚠️ Text processed with warnings.") |
| except Exception as e: |
| logger.error(f"Error processing text: {str(e)}") |
| status_placeholder.error(f"❌ Error processing text: {str(e)}") |
|
|
| |
| with tabs[2]: |
| if st.session_state.current_tab != "History": |
| st.session_state.current_tab = "History" |
| |
| st.header("Chat History") |
| |
| |
| col1, col2, col3 = st.columns([2, 1, 1]) |
| |
| with col1: |
| search_query = st.text_input("Search conversations:", st.session_state.history_filter) |
| if search_query != st.session_state.history_filter: |
| st.session_state.history_filter = search_query |
| |
| with col2: |
| st.text("Date Range (optional)") |
| start_date = st.date_input("Start date", None) |
| |
| with col3: |
| st.text("\u00A0") |
| end_date = st.date_input("End date", None) |
| |
| |
| try: |
| history = chat_history_manager.load_history() |
| |
| |
| if search_query: |
| history = chat_history_manager.search_conversations(search_query) |
| |
| |
| if start_date or end_date: |
| |
| start_datetime = datetime.combine(start_date, datetime.min.time()) if start_date else None |
| end_datetime = datetime.combine(end_date, datetime.max.time()) if end_date else None |
| history = chat_history_manager.get_conversations_by_date(start_datetime, end_datetime) |
| |
| |
| if not history: |
| st.info("No conversation history found matching your criteria.") |
| else: |
| |
| history.sort(key=lambda x: x.get("timestamp", ""), reverse=True) |
| |
| |
| df = pd.DataFrame(history) |
| if not df.empty: |
| |
| if all(col in df.columns for col in ["timestamp", "user_query", "assistant_response"]): |
| display_df = df[["timestamp", "user_query", "assistant_response"]] |
| display_df = display_df.rename(columns={ |
| "timestamp": "Date", |
| "user_query": "Your Question", |
| "assistant_response": "AI Response" |
| }) |
| |
| |
| if "Date" in display_df.columns: |
| display_df["Date"] = pd.to_datetime(display_df["Date"]).dt.strftime('%Y-%m-%d %H:%M') |
| |
| |
| for col in ["Your Question", "AI Response"]: |
| if col in display_df.columns: |
| display_df[col] = display_df[col].apply(lambda x: x[:100] + "..." if isinstance(x, str) and len(x) > 100 else x) |
| |
| |
| st.dataframe(display_df, use_container_width=True) |
| |
| |
| if not df.empty: |
| selected_idx = st.selectbox("Select conversation to view details:", |
| range(len(df)), |
| format_func=lambda i: f"{df.iloc[i].get('timestamp', 'Unknown')} - {df.iloc[i].get('user_query', '')[:30]}...") |
| |
| if selected_idx is not None: |
| selected_conv = df.iloc[selected_idx] |
| st.subheader("Conversation Details") |
| |
| st.markdown("**Your Question:**") |
| st.markdown(selected_conv.get("user_query", "")) |
| |
| st.markdown("**AI Response:**") |
| st.markdown(selected_conv.get("assistant_response", "")) |
| |
| |
| if "sources" in selected_conv and selected_conv["sources"]: |
| st.markdown("**Sources:**") |
| for src in selected_conv["sources"]: |
| st.markdown(f"- {src}") |
| |
| |
| if st.button("Continue this conversation"): |
| |
| st.session_state.messages.append({"role": "user", "content": selected_conv.get("user_query", "")}) |
| st.session_state.messages.append({"role": "assistant", "content": selected_conv.get("assistant_response", "")}) |
| |
| st.session_state.current_tab = "Chat" |
| st.experimental_rerun() |
| else: |
| st.error("Unexpected history format. Some columns are missing.") |
| else: |
| st.info("No conversation history found.") |
| except Exception as e: |
| logger.error(f"Error displaying history: {e}") |
| st.error(f"Error loading conversation history: {str(e)}") |
| |
| |
| if HF_DATASET_NAME: |
| if st.button("Sync History to Hugging Face Hub"): |
| with st.spinner("Syncing history..."): |
| success = chat_history_manager.sync_to_hub() |
| if success: |
| st.success("History successfully synced to Hugging Face Hub!") |
| else: |
| st.error("Failed to sync history. Check logs for details.") |
|
|
| |
| with tabs[3]: |
| if st.session_state.current_tab != "Settings": |
| st.session_state.current_tab = "Settings" |
| |
| st.header("Settings") |
| |
| |
| st.subheader("System Information") |
| system_info = { |
| "LLM Model": LLM_MODEL, |
| "Embedding Model": EMBEDDING_MODEL, |
| "HF Dataset": HF_DATASET_NAME or "Not configured", |
| "Telegram Enabled": "Yes" if TELEGRAM_ENABLED else "No" |
| } |
| |
| for key, value in system_info.items(): |
| st.markdown(f"**{key}:** {value}") |
| |
| |
| st.subheader("Telegram Integration") |
| |
| telegram_status = "Not configured" |
| if telegram_bot: |
| telegram_status = st.session_state.telegram_status |
| |
| st.markdown(f"**Status:** {telegram_status}") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| if telegram_bot and st.session_state.telegram_status != "Running": |
| if st.button("Start Telegram Bot"): |
| try: |
| success = telegram_bot.start() |
| if success: |
| st.session_state.telegram_status = "Running" |
| st.success("Telegram bot started!") |
| else: |
| st.error("Failed to start Telegram bot. Check logs for details.") |
| except Exception as e: |
| logger.error(f"Error starting Telegram bot: {e}") |
| st.error(f"Error: {str(e)}") |
| |
| with col2: |
| if telegram_bot and st.session_state.telegram_status == "Running": |
| if st.button("Stop Telegram Bot"): |
| try: |
| telegram_bot.stop() |
| st.session_state.telegram_status = "Stopped" |
| st.info("Telegram bot stopped.") |
| except Exception as e: |
| logger.error(f"Error stopping Telegram bot: {e}") |
| st.error(f"Error: {str(e)}") |
| |
| if telegram_bot: |
| with st.expander("Telegram Bot Settings"): |
| st.markdown(""" |
| To configure the Telegram bot, set these environment variables: |
| - `TELEGRAM_ENABLED`: Set to `true` to enable the bot |
| - `TELEGRAM_BOT_TOKEN`: Your Telegram bot token |
| - `TELEGRAM_ALLOWED_USERS`: Comma-separated list of Telegram user IDs (optional) |
| """) |
| |
| if telegram_bot.allowed_user_ids: |
| st.markdown("**Allowed User IDs:**") |
| for user_id in telegram_bot.allowed_user_ids: |
| st.markdown(f"- {user_id}") |
| else: |
| st.markdown("The bot will respond to all users (no user restrictions configured).") |
| |
| |
| st.markdown("### Telegram Bot Commands") |
| st.markdown(""" |
| - **/start**: Start a conversation with the bot |
| - **/help**: Shows available commands |
| - **/search**: Use `/search your query` to search your knowledge base |
| - **Direct messages**: Send any message to chat with your second brain |
| |
| #### How to Set Up Your Telegram Bot |
| 1. Talk to [@BotFather](https://t.me/botfather) on Telegram |
| 2. Use the `/newbot` command to create a new bot |
| 3. Get your bot token and add it to your `.env` file |
| 4. Set `TELEGRAM_ENABLED=true` in your `.env` file |
| 5. To find your Telegram user ID, talk to [@userinfobot](https://t.me/userinfobot) |
| """) |
| else: |
| st.info("Telegram integration is not enabled. Configure your .env file to enable it.") |
| |
| |
| st.subheader("Hugging Face Dataset Settings") |
| |
| if HF_DATASET_NAME: |
| st.markdown(f"**Dataset Name:** {HF_DATASET_NAME}") |
| st.markdown(f"**Local History File:** {chat_history_manager.local_file}") |
| |
| |
| with st.expander("Setup Instructions"): |
| st.markdown(""" |
| ### Setting up Hugging Face Dataset Persistence |
| |
| 1. Create a private dataset repository on Hugging Face Hub |
| 2. Set your API token in the `.env` file as `HF_API_KEY` |
| 3. Set your dataset name as `HF_DATASET_NAME` (format: username/repo-name) |
| |
| Your chat history will be automatically synced to the Hub. |
| """) |
| else: |
| st.info("Hugging Face Dataset persistence is not configured. Set HF_DATASET_NAME in your .env file.") |
|
|
| |
| if telegram_bot and TELEGRAM_ENABLED and st.session_state.telegram_status == "Not started": |
| try: |
| success = telegram_bot.start() |
| if success: |
| st.session_state.telegram_status = "Running" |
| logger.info("Telegram bot started automatically") |
| except Exception as e: |
| logger.error(f"Error auto-starting Telegram bot: {e}") |
| st.session_state.telegram_status = "Error" |
|
|
| if __name__ == "__main__": |
| |
| pass |