import os from typing import Optional, Dict from dotenv import load_dotenv load_dotenv() import chainlit as cl from phi.assistant import Assistant from phi.llm.openai import OpenAIChat from phi.tools.duckduckgo import DuckDuckGo from phi.tools.yfinance import YFinanceTools from src.databases.postgres import sqlalchemy_engine from src.tools.crypto_swap_toolkit import CryptoSwapTools from src.tools.crypto_data_toolkit import CryptoDataTools from phi.storage.assistant.postgres import PgAssistantStorage from src.knowledge_bases.combined import knowledge_base storage = PgAssistantStorage( # store runs in the ai.assistant_runs table table_name="assistant_runs", db_engine=sqlalchemy_engine ) @cl.oauth_callback def oauth_callback( provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User, ) -> Optional[cl.User]: # step-todo: will handle auth from v2 api return default_user @cl.set_starters async def set_starters(): return [ cl.Starter( label="Create a crypto wallet for me", message="create a crypto wallet for me", icon="/public/wallet-svgrepo-com.svg", ), cl.Starter( label="Latest News on defi, crypto and solana", message="What news are currentlly trending on defi.", icon="/public/news-svgrepo-com.svg", ), cl.Starter( label="Get price of BTC, ETH and PEPE", message="Get me the Price of BTC, ETH and PEPE", icon="/public/coins-electronics-svgrepo-com.svg", ), cl.Starter( label="Get trending stocks", message="Get latest stock", icon="/public/stockchart-svgrepo-com.svg", ) ] @cl.on_chat_start def start(): is_dev_mode = True if os.getenv("DEV_MODE") else False # Initialize the assistant cxbt_assistant = Assistant( llm=OpenAIChat(model="gpt-4o"), tools=[CryptoDataTools(), CryptoSwapTools(), DuckDuckGo(), YFinanceTools(stock_price=True)], show_tool_calls= is_dev_mode, markdown=True, knowledge_base=knowledge_base, storage=storage, search_knowledge=True, read_chat_history=True, add_references_to_prompt=True, add_chat_history_to_prompt=True ) cxbt_assistant.knowledge_base.load(recreate=False) # Set the assistant in the user session cl.user_session.set("agent", cxbt_assistant) @cl.on_message async def main(message: cl.Message): # Retrieve the assistant from the user session agent = cl.user_session.get("agent") # Process the user message using the assistant # response = agent.run(message.content, stream=True) response = "" for delta in agent.run(message.content, stream=True): response += delta # Send the response back to the user await cl.Message(content=response).send() # Run the Chainlit application if __name__ == "__main__": cl.run()