from datetime import datetime, date, timedelta from typing import Optional as _Optional import json import httpx from urllib.parse import urljoin from llama_index.llms.groq import Groq import asyncio import random import os from dotenv import load_dotenv load_dotenv(dotenv_path="./.env.local") from agents import function_tool , RunContextWrapper from .VectorDBManagers import VectorDBManager BASE_DIR = os.path.dirname(os.path.abspath(__file__)) CHROMA_DB_PATH = os.path.join(BASE_DIR, "chromafast_db") WEBSITE_PATH = os.path.join(BASE_DIR, "website") manager = VectorDBManager(db_path=CHROMA_DB_PATH, collection_name="DB_collection") if not os.path.exists(CHROMA_DB_PATH) or manager.is_collection_empty(): print("🆕 No existing embeddings found. Building new Chroma DB...") manager.build_index_from_documents(WEBSITE_PATH) else: print("📂 Existing Chroma DB found. Loading it...") manager.load_existing_index() @function_tool( name_override="suggestion_ragtool", description_override=""" Name: suggestion_ragtool Query the company's knowledge base for information. Description: all Question except any meeting , call , invitation like schedule Searches the company's internal knowledge base to provide informative, paragraph-style answers related to services, policies, technologies, or any general information embedded in the vector store. Parameters: context (RunContextWrapper): The openai session context used for communicating with the user. query (str): The question or query asked by the user about the company. Returns: str: """, ) async def suggestion_ragtool(ctx: RunContextWrapper, query: str) : """ 🔍 Tool Name: suggestion_ragtool Description: all Question except any meeting , call , invitation like schedule Searches the company's internal knowledge base to provide informative, paragraph-style answers related to services, policies, technologies, or any general information embedded in the vector store. Parameters: context (RunContext): The Openai session context used for communicating with the user. query (str): The question or query asked by the user about the company. Returns: str: """ try: print(f"Answering from knowledgebase: {query}") res = await manager.aquery(query) print("Query result:", res) result=str(res) return result except Exception as e: print(f"Error: {e}") return f"❌ Failed: Unable to answer the question. {str(e)}"