Spaces:
Build error
Build error
| from typing import List | |
| import nest_asyncio | |
| import streamlit as st | |
| from phi.assistant import Assistant | |
| from phi.document import Document | |
| from phi.document.reader.pdf import PDFReader | |
| from phi.document.reader.website import WebsiteReader | |
| from phi.utils.log import logger | |
| from assistant import get_llm_os # type: ignore | |
| nest_asyncio.apply() | |
| st.set_page_config( | |
| page_title="Project Assistants", | |
| page_icon="🤖", | |
| ) | |
| st.title("Project Assistants") | |
| st.markdown("🤖") | |
| def main() -> None: | |
| # Get LLM Model | |
| llm_id = st.sidebar.selectbox("Select LLM", options=["gpt-4o", "gpt-4-turbo"]) or "gpt-4o" | |
| # Set llm_id in session state | |
| if "llm_id" not in st.session_state: | |
| st.session_state["llm_id"] = llm_id | |
| # Restart the assistant if llm_id changes | |
| elif st.session_state["llm_id"] != llm_id: | |
| st.session_state["llm_id"] = llm_id | |
| restart_assistant() | |
| # Sidebar checkboxes for selecting tools | |
| st.sidebar.markdown("### Select Tools") | |
| # Enable Calculator | |
| if "calculator_enabled" not in st.session_state: | |
| st.session_state["calculator_enabled"] = True | |
| # Get calculator_enabled from session state if set | |
| calculator_enabled = st.session_state["calculator_enabled"] | |
| # Checkbox for enabling calculator | |
| calculator = st.sidebar.checkbox("Calculator", value=calculator_enabled, help="Enable calculator.") | |
| if calculator_enabled != calculator: | |
| st.session_state["calculator_enabled"] = calculator | |
| calculator_enabled = calculator | |
| restart_assistant() | |
| # Enable file tools | |
| if "file_tools_enabled" not in st.session_state: | |
| st.session_state["file_tools_enabled"] = True | |
| # Get file_tools_enabled from session state if set | |
| file_tools_enabled = st.session_state["file_tools_enabled"] | |
| # Checkbox for enabling shell tools | |
| file_tools = st.sidebar.checkbox("File Tools", value=file_tools_enabled, help="Enable file tools.") | |
| if file_tools_enabled != file_tools: | |
| st.session_state["file_tools_enabled"] = file_tools | |
| file_tools_enabled = file_tools | |
| restart_assistant() | |
| # Enable Web Search via DuckDuckGo | |
| if "ddg_search_enabled" not in st.session_state: | |
| st.session_state["ddg_search_enabled"] = True | |
| # Get ddg_search_enabled from session state if set | |
| ddg_search_enabled = st.session_state["ddg_search_enabled"] | |
| # Checkbox for enabling web search | |
| ddg_search = st.sidebar.checkbox("Web Search", value=ddg_search_enabled, help="Enable web search using DuckDuckGo.") | |
| if ddg_search_enabled != ddg_search: | |
| st.session_state["ddg_search_enabled"] = ddg_search | |
| ddg_search_enabled = ddg_search | |
| restart_assistant() | |
| # Enable shell tools | |
| if "shell_tools_enabled" not in st.session_state: | |
| st.session_state["shell_tools_enabled"] = False | |
| # Get shell_tools_enabled from session state if set | |
| shell_tools_enabled = st.session_state["shell_tools_enabled"] | |
| # Checkbox for enabling shell tools | |
| shell_tools = st.sidebar.checkbox("Shell Tools", value=shell_tools_enabled, help="Enable shell tools.") | |
| if shell_tools_enabled != shell_tools: | |
| st.session_state["shell_tools_enabled"] = shell_tools | |
| shell_tools_enabled = shell_tools | |
| restart_assistant() | |
| # Sidebar checkboxes for selecting team members | |
| st.sidebar.markdown("### Select Team Members") | |
| # Enable Data Analyst | |
| if "data_analyst_enabled" not in st.session_state: | |
| st.session_state["data_analyst_enabled"] = False | |
| # Get data_analyst_enabled from session state if set | |
| data_analyst_enabled = st.session_state["data_analyst_enabled"] | |
| # Checkbox for enabling web search | |
| data_analyst = st.sidebar.checkbox( | |
| "Data Analyst", | |
| value=data_analyst_enabled, | |
| help="Enable the Data Analyst assistant for data related queries.", | |
| ) | |
| if data_analyst_enabled != data_analyst: | |
| st.session_state["data_analyst_enabled"] = data_analyst | |
| data_analyst_enabled = data_analyst | |
| restart_assistant() | |
| # Enable Python Assistant | |
| if "python_assistant_enabled" not in st.session_state: | |
| st.session_state["python_assistant_enabled"] = False | |
| # Get python_assistant_enabled from session state if set | |
| python_assistant_enabled = st.session_state["python_assistant_enabled"] | |
| # Checkbox for enabling web search | |
| python_assistant = st.sidebar.checkbox( | |
| "Python Assistant", | |
| value=python_assistant_enabled, | |
| help="Enable the Python Assistant for writing and running python code.", | |
| ) | |
| if python_assistant_enabled != python_assistant: | |
| st.session_state["python_assistant_enabled"] = python_assistant | |
| python_assistant_enabled = python_assistant | |
| restart_assistant() | |
| # Enable Research Assistant | |
| if "research_assistant_enabled" not in st.session_state: | |
| st.session_state["research_assistant_enabled"] = False | |
| # Get research_assistant_enabled from session state if set | |
| research_assistant_enabled = st.session_state["research_assistant_enabled"] | |
| # Checkbox for enabling web search | |
| research_assistant = st.sidebar.checkbox( | |
| "Research Assistant", | |
| value=research_assistant_enabled, | |
| help="Enable the research assistant (uses Exa).", | |
| ) | |
| if research_assistant_enabled != research_assistant: | |
| st.session_state["research_assistant_enabled"] = research_assistant | |
| research_assistant_enabled = research_assistant | |
| restart_assistant() | |
| # Enable Investment Assistant | |
| if "investment_assistant_enabled" not in st.session_state: | |
| st.session_state["investment_assistant_enabled"] = False | |
| # Get investment_assistant_enabled from session state if set | |
| investment_assistant_enabled = st.session_state["investment_assistant_enabled"] | |
| # Checkbox for enabling web search | |
| investment_assistant = st.sidebar.checkbox( | |
| "Investment Assistant", | |
| value=investment_assistant_enabled, | |
| help="Enable the investment assistant. NOTE: This is not financial advice.", | |
| ) | |
| if investment_assistant_enabled != investment_assistant: | |
| st.session_state["investment_assistant_enabled"] = investment_assistant | |
| investment_assistant_enabled = investment_assistant | |
| restart_assistant() | |
| # Get the assistant | |
| llm_os: Assistant | |
| if "llm_os" not in st.session_state or st.session_state["llm_os"] is None: | |
| logger.info(f"---*--- Creating {llm_id} LLM OS ---*---") | |
| llm_os = get_llm_os( | |
| llm_id=llm_id, | |
| calculator=calculator_enabled, | |
| ddg_search=ddg_search_enabled, | |
| file_tools=file_tools_enabled, | |
| shell_tools=shell_tools_enabled, | |
| data_analyst=data_analyst_enabled, | |
| python_assistant=python_assistant_enabled, | |
| research_assistant=research_assistant_enabled, | |
| investment_assistant=investment_assistant_enabled, | |
| ) | |
| st.session_state["llm_os"] = llm_os | |
| else: | |
| llm_os = st.session_state["llm_os"] | |
| # Create assistant run (i.e. log to database) and save run_id in session state | |
| try: | |
| st.session_state["llm_os_run_id"] = llm_os.create_run() | |
| except Exception: | |
| st.warning("Could not create LLM OS run, is the database running?") | |
| return | |
| # Load existing messages | |
| assistant_chat_history = llm_os.memory.get_chat_history() | |
| if len(assistant_chat_history) > 0: | |
| logger.debug("Loading chat history") | |
| st.session_state["messages"] = assistant_chat_history | |
| else: | |
| logger.debug("No chat history found") | |
| st.session_state["messages"] = [{"role": "assistant", "content": "Ask me questions..."}] | |
| # Prompt for user input | |
| if prompt := st.chat_input(): | |
| st.session_state["messages"].append({"role": "user", "content": prompt}) | |
| # Display existing chat messages | |
| for message in st.session_state["messages"]: | |
| if message["role"] == "system": | |
| continue | |
| with st.chat_message(message["role"]): | |
| st.write(message["content"]) | |
| # If last message is from a user, generate a new response | |
| last_message = st.session_state["messages"][-1] | |
| if last_message.get("role") == "user": | |
| question = last_message["content"] | |
| with st.chat_message("assistant"): | |
| response = "" | |
| resp_container = st.empty() | |
| for delta in llm_os.run(question): | |
| response += delta # type: ignore | |
| resp_container.markdown(response) | |
| st.session_state["messages"].append({"role": "assistant", "content": response}) | |
| # Load LLM OS knowledge base | |
| if llm_os.knowledge_base: | |
| # -*- Add websites to knowledge base | |
| if "url_scrape_key" not in st.session_state: | |
| st.session_state["url_scrape_key"] = 0 | |
| input_url = st.sidebar.text_input( | |
| "Add URL to Knowledge Base", type="default", key=st.session_state["url_scrape_key"] | |
| ) | |
| add_url_button = st.sidebar.button("Add URL") | |
| if add_url_button: | |
| if input_url is not None: | |
| alert = st.sidebar.info("Processing URLs...", icon="ℹ️") | |
| if f"{input_url}_scraped" not in st.session_state: | |
| scraper = WebsiteReader(max_links=2, max_depth=1) | |
| web_documents: List[Document] = scraper.read(input_url) | |
| if web_documents: | |
| llm_os.knowledge_base.load_documents(web_documents, upsert=True) | |
| else: | |
| st.sidebar.error("Could not read website") | |
| st.session_state[f"{input_url}_uploaded"] = True | |
| alert.empty() | |
| # Add PDFs to knowledge base | |
| if "file_uploader_key" not in st.session_state: | |
| st.session_state["file_uploader_key"] = 100 | |
| uploaded_file = st.sidebar.file_uploader( | |
| "Add a PDF :page_facing_up:", type="pdf", key=st.session_state["file_uploader_key"] | |
| ) | |
| if uploaded_file is not None: | |
| alert = st.sidebar.info("Processing PDF...", icon="🧠") | |
| auto_rag_name = uploaded_file.name.split(".")[0] | |
| if f"{auto_rag_name}_uploaded" not in st.session_state: | |
| reader = PDFReader() | |
| auto_rag_documents: List[Document] = reader.read(uploaded_file) | |
| if auto_rag_documents: | |
| llm_os.knowledge_base.load_documents(auto_rag_documents, upsert=True) | |
| else: | |
| st.sidebar.error("Could not read PDF") | |
| st.session_state[f"{auto_rag_name}_uploaded"] = True | |
| alert.empty() | |
| if llm_os.knowledge_base and llm_os.knowledge_base.vector_db: | |
| if st.sidebar.button("Clear Knowledge Base"): | |
| llm_os.knowledge_base.vector_db.clear() | |
| st.sidebar.success("Knowledge base cleared") | |
| # Show team member memory | |
| if llm_os.team and len(llm_os.team) > 0: | |
| for team_member in llm_os.team: | |
| if len(team_member.memory.chat_history) > 0: | |
| with st.status(f"{team_member.name} Memory", expanded=False, state="complete"): | |
| with st.container(): | |
| _team_member_memory_container = st.empty() | |
| _team_member_memory_container.json(team_member.memory.get_llm_messages()) | |
| if llm_os.storage: | |
| llm_os_run_ids: List[str] = llm_os.storage.get_all_run_ids() | |
| new_llm_os_run_id = st.sidebar.selectbox("Run ID", options=llm_os_run_ids) | |
| if st.session_state["llm_os_run_id"] != new_llm_os_run_id: | |
| logger.info(f"---*--- Loading {llm_id} run: {new_llm_os_run_id} ---*---") | |
| st.session_state["llm_os"] = get_llm_os( | |
| llm_id=llm_id, | |
| calculator=calculator_enabled, | |
| ddg_search=ddg_search_enabled, | |
| file_tools=file_tools_enabled, | |
| shell_tools=shell_tools_enabled, | |
| data_analyst=data_analyst_enabled, | |
| python_assistant=python_assistant_enabled, | |
| research_assistant=research_assistant_enabled, | |
| investment_assistant=investment_assistant_enabled, | |
| run_id=new_llm_os_run_id, | |
| ) | |
| st.rerun() | |
| if st.sidebar.button("New Run"): | |
| restart_assistant() | |
| def restart_assistant(): | |
| logger.debug("---*--- Restarting Assistant ---*---") | |
| st.session_state["llm_os"] = None | |
| st.session_state["llm_os_run_id"] = None | |
| if "url_scrape_key" in st.session_state: | |
| st.session_state["url_scrape_key"] += 1 | |
| if "file_uploader_key" in st.session_state: | |
| st.session_state["file_uploader_key"] += 1 | |
| st.rerun() | |
| main() |