app / chatxbt-assistant.py
LONGYKING
token stream fixed
05b7f8b
raw
history blame
4.97 kB
import os, json
from typing import Optional, Dict
from dotenv import load_dotenv
load_dotenv()
import chainlit as cl
from phi.assistant import Assistant
from phi.llm.openai import OpenAIChat
from phi.tools.duckduckgo import DuckDuckGo
from phi.tools.yfinance import YFinanceTools
from src.databases.postgres import sqlalchemy_engine
from src.knowledge_bases.combined import knowledge_base
from src.tools.crypto_swap_toolkit import CryptoSwapTools
from src.tools.crypto_data_toolkit import CryptoDataTools
from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders
from src.config.portkey_config import generate_portkey_config
from src.tools.user_profile_toolkit import UserProfileToolkit
from phi.storage.assistant.postgres import PgAssistantStorage
from src.tools.crypto_evm_wallet_toolkit import CryptoEVMWalletTools
from src.tools.user_confirmation_pin_toolkit import UserConfirmationPinToolkit
storage = PgAssistantStorage(
# stores runs in the ai.assistant_runs table
table_name="assistant_runs",
db_engine=sqlalchemy_engine
)
@cl.oauth_callback
def oauth_callback(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_user: cl.User,
) -> Optional[cl.User]:
# step-todo: will handle auth from v2 api
return cl.User(
identifier=raw_user_data.get('email'),
email=raw_user_data.get('email'),
name=raw_user_data.get('name'),
metadata={**raw_user_data, **({'image':default_user.metadata.get('image')})}
)
@cl.password_auth_callback
def auth_callback(username: str, password: str):
# Fetch the user matching username from your database
# and compare the hashed password with the value stored in the database
if (username, password) == ("admin", "admin"):
user = {
"identifier": "admin", "email": username, "metadata": {"role": "admin", "provider": "credentials"}
}
cl.user_session.set("user_profile", user)
return cl.User(
identifier="admin", email=username, metadata={"role": "admin", "provider": "credentials"}
)
else:
return None
@cl.set_starters
async def set_starters():
return [
cl.Starter(
label="Create a crypto wallet for me",
message="create a crypto wallet for me",
icon="/public/wallet-svgrepo-com.svg",
),
cl.Starter(
label="Latest News on defi, crypto and solana",
message="What news are currentlly trending on defi.",
icon="/public/news-svgrepo-com.svg",
),
cl.Starter(
label="Get price of BTC, ETH and PEPE",
message="Get me the Price of BTC, ETH and PEPE",
icon="/public/coins-electronics-svgrepo-com.svg",
),
cl.Starter(
label="Get trending stocks",
message="Get latest stock",
icon="/public/stockchart-svgrepo-com.svg",
)
]
@cl.on_chat_start
async def start():
is_dev_mode = True if os.getenv("DEV_MODE") else False
portkey_local_gateway = True if os.getenv("PORTKEY_LOCAL_GATEWAY_URL") else False
portkey_config = generate_portkey_config(local=portkey_local_gateway)
# Initialize the assistant
cxbt_assistant = Assistant(
llm=OpenAIChat(
api_key=os.getenv("PORTKEY_API_KEY"),
base_url=os.getenv("PORTKEY_LOCAL_GATEWAY_URL") or PORTKEY_GATEWAY_URL,
default_headers=createHeaders(
api_key=os.getenv("PORTKEY_API_KEY") or None,
config=portkey_config
),
),
tools=[
DuckDuckGo(),
CryptoDataTools(),
CryptoSwapTools(),
UserProfileToolkit(),
CryptoEVMWalletTools(),
UserConfirmationPinToolkit(),
YFinanceTools(stock_price=True)
],
show_tool_calls=is_dev_mode,
markdown=True,
knowledge_base=knowledge_base,
storage=storage,
search_knowledge=True,
read_chat_history=True,
add_references_to_prompt=True,
add_chat_history_to_prompt=True,
prevent_hallucinations=True,
prevent_prompt_injection=True
)
cxbt_assistant.knowledge_base.load(recreate=False)
# Set the assistant in the user session
cl.user_session.set("agent", cxbt_assistant)
@cl.on_message
async def main(message: cl.Message):
msg = cl.Message(content="")
await msg.send()
# Retrieve the assistant from the user session
agent = cl.user_session.get("agent")
# Process the user message using the assistant
for delta in agent.run(message.content, stream=True):
for part in delta:
if token := part or "":
# Send the response back to the user
print(token)
await msg.stream_token(token)
await msg.update()
# Run the Chainlit application
if __name__ == "__main__":
cl.run()