d365stuff-chat / app.py
mafzaal's picture
Update .dockerignore and .gitignore to include new directories and log files; enhance app.py with Dropbox download functionality and logging; modify build-vector-store.sh for improved error handling and options; remove obsolete posts and pipeline.py.
4542bf4
import os
import sys
from pathlib import Path
import zipfile
import logging
import requests
from langchain.schema.runnable.config import RunnableConfig
from langchain_core.messages import HumanMessage
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
from lets_talk.config import VECTOR_STORAGE_PATH
VECTOR_STORE_DOWNLOAD_URL = os.getenv("VECTOR_STORE_DOWNLOAD_URL")
def download_from_dropbox(direct_download_url):
"""Download a file from Dropbox with proper handling of the download link."""
try:
logger.info("Downloading vector store from %s...", direct_download_url)
response = requests.get(direct_download_url, stream=True)
response.raise_for_status()
# Save to temp file
with open("/tmp/vector_store.zip", "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info("Download completed successfully")
return True
except Exception as e:
logger.error("Failed to download from Dropbox: %s", str(e))
return False
if Path(VECTOR_STORAGE_PATH).exists():
logger.info("Vector store already exists at %s", VECTOR_STORAGE_PATH)
elif VECTOR_STORE_DOWNLOAD_URL:
logger.info("Starting vector store download process")
if download_from_dropbox(VECTOR_STORE_DOWNLOAD_URL):
try:
# Verify the zip file
with zipfile.ZipFile("/tmp/vector_store.zip", "r") as zip_ref:
logger.info("Extracting vector store files...")
zip_ref.extractall(VECTOR_STORAGE_PATH)
logger.info("Vector store extraction completed")
except zipfile.BadZipFile:
logger.error("Downloaded file is not a valid zip file")
sys.exit(1)
except Exception as e:
logger.error("Failed to extract vector store: %s", str(e))
sys.exit(1)
else:
logger.error("Failed to download vector store")
sys.exit(1)
else:
# Check if the vector store exists
logger.info("Checking vector store...")
if not Path(VECTOR_STORAGE_PATH).exists():
logger.error("Vector store not found at %s. Please create it first.", VECTOR_STORAGE_PATH)
sys.exit(1)
import chainlit as cl
from prompt import prompt
from lets_talk.agent_v2 import create_agent
d365stuff_agent = create_agent(prompt=prompt)
@cl.on_chat_start
async def setup_chain():
# Store the chain in user session
cl.user_session.set("agent", d365stuff_agent)
# Set a loading message
welcome_message = "Welcome to [D365 Stuff](https://www.d365stuff.co) Chat! How can I help you today?"
msg = cl.Message(content=welcome_message, author="System")
await msg.send()
@cl.on_message
async def on_message(message: cl.Message):
"""
Handler for user messages. Processes the query through the research agent
and streams the response back to the user.
Args:
message: The user's message
"""
agent_executor = cl.user_session.get("agent")
# Check if agent_executor exists
if not agent_executor:
await cl.Message(content="Agent not initialized properly. Please refresh the page.").send()
return
# Create Chainlit message for streaming
# msg = cl.Message(content="")
final_answer = cl.Message(content="")
configurable = {"thread_id": cl.context.session.id}
cb = cl.LangchainCallbackHandler()
runnable_config= RunnableConfig(callbacks=[cb], configurable=configurable)
async for response_msg, metadata in agent_executor.astream({"messages": [HumanMessage(content=message.content)]}, stream_mode="messages", config=runnable_config):
if (
response_msg.content
and not isinstance(response_msg, HumanMessage)
and metadata["langgraph_node"] == "agent"
):
await final_answer.stream_token(response_msg.content)
await final_answer.send()