app / chatxbt-assistant.py
lemdaddy's picture
Changes
25d025b
raw
history blame
5.2 kB
import os
from typing import Optional, Dict
from dotenv import load_dotenv
import chainlit as cl
from phi.assistant import Assistant
from phi.llm.openai import OpenAIChat
from phi.tools.duckduckgo import DuckDuckGo
from phi.tools.yfinance import YFinanceTools
from src.databases.postgres import sqlalchemy_engine
from src.knowledge_bases.combined import knowledge_base
from src.tools.crypto_swap_toolkit import CryptoSwapTools
from src.tools.crypto_data_toolkit import CryptoDataTools
from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders
from src.config.portkey_config import generate_portkey_config
from src.tools.user_profile_toolkit import UserProfileToolkit
from phi.storage.assistant.postgres import PgAssistantStorage
from src.tools.crypto_evm_wallet_toolkit import CryptoEVMWalletTools
from src.tools.user_confirmation_pin_toolkit import UserConfirmationPinToolkit
# Load environment variables
load_dotenv()
# Initialize storage
storage = PgAssistantStorage(
table_name="assistant_runs",
db_engine=sqlalchemy_engine
)
# OAuth callback
@cl.oauth_callback
def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User) -> Optional[cl.User]:
return cl.User(
identifier=raw_user_data.get('email'),
email=raw_user_data.get('email'),
name=raw_user_data.get('name'),
metadata={**raw_user_data, **({'image': default_user.metadata.get('image')})}
)
# Password authentication callback
@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:
if (username, password) == ("admin", "admin"):
user = {
"identifier": "admin", "email": username, "metadata": {"role": "admin", "provider": "credentials"}
}
cl.user_session.set("user_profile", user)
return cl.User(
identifier="admin", email=username, metadata={"role": "admin", "provider": "credentials"}
)
return None
# Set starters
@cl.set_starters
async def set_starters():
return [
cl.Starter(
label="Create a crypto wallet for me",
message="create a crypto wallet for me",
icon="/public/wallet-svgrepo-com.svg",
),
cl.Starter(
label="Latest News on defi, crypto and solana",
message="What news are currently trending on defi.",
icon="/public/news-svgrepo-com.svg",
),
cl.Starter(
label="Get price of BTC, ETH and PEPE",
message="Get me the price of BTC, ETH and PEPE",
icon="/public/coins-electronics-svgrepo-com.svg",
),
cl.Starter(
label="Get trending stocks",
message="Get latest stock",
icon="/public/stockchart-svgrepo-com.svg",
)
]
# On chat start
@cl.on_chat_start
async def start():
is_dev_mode = bool(os.getenv("DEV_MODE"))
portkey_local_gateway = bool(os.getenv("PORTKEY_LOCAL_GATEWAY_URL"))
portkey_config = generate_portkey_config(local=portkey_local_gateway)
# Initialize the assistant
cxbt_assistant = Assistant(
introduction="Hi, I'm ChatXBT, your AI assistant for Web3 and DeFi. I can help you with your queries related to Web3 and DeFi.",
llm=OpenAIChat(
api_key=os.getenv("PORTKEY_API_KEY"),
# base_url=os.getenv("PORTKEY_LOCAL_GATEWAY_URL") or PORTKEY_GATEWAY_URL,
base_url=PORTKEY_GATEWAY_URL,
default_headers=createHeaders(
api_key=os.getenv("PORTKEY_API_KEY") or None,
config=portkey_config
),
),
tools=[
DuckDuckGo(),
CryptoDataTools(),
CryptoSwapTools(),
UserProfileToolkit(),
CryptoEVMWalletTools(),
UserConfirmationPinToolkit(),
YFinanceTools(stock_price=True)
],
show_tool_calls=is_dev_mode,
markdown=True,
knowledge_base=knowledge_base,
storage=storage,
search_knowledge=True,
read_chat_history=True,
add_references_to_prompt=True,
add_chat_history_to_prompt=True,
prevent_hallucinations=True,
prevent_prompt_injection=True,
add_datetime_to_instructions=True
)
cxbt_assistant.knowledge_base.load(recreate=False)
# Set the assistant in the user session
cl.user_session.set("agent", cxbt_assistant)
# On message
@cl.on_message
async def main(message: cl.Message):
msg = cl.Message(content="")
# Retrieve the assistant from the user session
agent = cl.user_session.get("agent")
# Process the user message using the assistant
try:
response_generator = agent.run(message=message.content, stream=True)
for delta in response_generator:
await msg.stream_token(str(delta))
except TypeError as e:
# Handle specific TypeError and log or print additional information for debugging
print(f"Error occurred: {e}")
await msg.stream_token(f"\n\n I encountetrd an error, please try again later.")
await msg.send()
# Run the Chainlit application
if __name__ == "__main__":
cl.run_sync()