import os from typing import Optional, Dict from dotenv import load_dotenv import chainlit as cl from phi.assistant import Assistant from phi.llm.openai import OpenAIChat from phi.tools.duckduckgo import DuckDuckGo from phi.tools.yfinance import YFinanceTools from src.databases.postgres import sqlalchemy_engine from src.knowledge_bases.combined import knowledge_base from src.tools.crypto_swap_toolkit import CryptoSwapTools from src.tools.crypto_bridge_toolkit import CrossChainSwapTools from src.tools.crypto_data_toolkit import CryptoDataTools from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders from src.config.portkey_config import generate_portkey_config from src.tools.user_profile_toolkit import UserProfileToolkit from phi.storage.assistant.postgres import PgAssistantStorage from src.tools.crypto_evm_wallet_toolkit import CryptoEVMWalletTools from src.tools.user_confirmation_pin_toolkit import UserConfirmationPinToolkit # Load environment variables load_dotenv() # Initialize storage storage = PgAssistantStorage( table_name="assistant_runs", db_engine=sqlalchemy_engine ) # OAuth callback @cl.oauth_callback def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User) -> Optional[cl.User]: return cl.User( identifier=raw_user_data.get('email'), email=raw_user_data.get('email'), name=raw_user_data.get('name'), metadata={**raw_user_data, **({'image': default_user.metadata.get('image')})} ) # Password authentication callback @cl.password_auth_callback def auth_callback(username: str, password: str) -> Optional[cl.User]: if (username, password) == ("admin", "admin"): user = { "identifier": "admin", "email": username, "metadata": {"role": "admin", "provider": "credentials"} } cl.user_session.set("user_profile", user) return cl.User( identifier="admin", email=username, metadata={"role": "admin", "provider": "credentials"} ) return None # Set starters @cl.set_starters async def set_starters(): return [ cl.Starter( label="Create a crypto wallet for me", message="create an evm crypto wallet for me", icon="/public/wallet-svgrepo-com.svg", ), cl.Starter( label="Latest News on defi, crypto and solana", message="What news are currently trending on defi.", icon="/public/news-svgrepo-com.svg", ), cl.Starter( label="Get price of BTC, ETH and PEPE", message="Get me the price of BTC, ETH and PEPE", icon="/public/coins-electronics-svgrepo-com.svg", ), cl.Starter( label="Get trending stocks", message="Get latest stock", icon="/public/stockchart-svgrepo-com.svg", ) ] # On chat start @cl.on_chat_start async def start(): is_dev_mode = bool(os.getenv("DEV_MODE")) portkey_local_gateway = bool(os.getenv("PORTKEY_LOCAL_GATEWAY_URL")) portkey_config = generate_portkey_config(local=portkey_local_gateway) # Initialize the assistant cxbt_assistant = Assistant( introduction="Hi, I'm ChatXBT, your AI assistant for Web3 and DeFi. I can help you with your queries related to Web3 and DeFi.", llm=OpenAIChat( # api_key=os.getenv("PORTKEY_API_KEY"), # # base_url=os.getenv("PORTKEY_LOCAL_GATEWAY_URL") or PORTKEY_GATEWAY_URL, # base_url=PORTKEY_GATEWAY_URL, # default_headers=createHeaders( # api_key=os.getenv("PORTKEY_API_KEY") or None, # config=portkey_config # ), ), tools=[ DuckDuckGo(), CryptoDataTools(), CryptoSwapTools(), CrossChainSwapTools(), UserProfileToolkit(), CryptoEVMWalletTools(), # UserConfirmationPinToolkit(), YFinanceTools(stock_price=True) ], show_tool_calls=False, markdown=True, knowledge_base=knowledge_base, storage=storage, search_knowledge=True, read_chat_history=True, add_references_to_prompt=True, add_chat_history_to_prompt=True, prevent_hallucinations=True, prevent_prompt_injection=True, add_datetime_to_instructions=True ) cxbt_assistant.knowledge_base.load(recreate=False) # Set the assistant in the user session cl.user_session.set("agent", cxbt_assistant) # On message @cl.on_message async def main(message: cl.Message): msg = cl.Message(content="") # Retrieve the assistant from the user session agent = cl.user_session.get("agent") # Process the user message using the assistant try: response_generator = agent.run(message=message.content, stream=True) for delta in response_generator: await msg.stream_token(str(delta)) except TypeError as e: # Handle specific TypeError and log or print additional information for debugging print(f"Error occurred: {e}") await msg.stream_token(f"\n\n I encountetrd an error, please try again later.") await msg.send() # Run the Chainlit application if __name__ == "__main__": cl.run_sync()