import os from datetime import datetime from config import Config from chatbot.rag_store import ( get_dataframe, get_all_database_files ) # ============================= # LANGCHAIN IMPORTS # ============================= from langchain.agents import initialize_agent, AgentType from langchain.memory import ConversationBufferMemory from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.tools.tavily_search import TavilySearchResults # ============================= # ENVIRONMENT # ============================= os.environ["TAVILY_API_KEY"] = Config.TAVILY_API_KEY # ============================= # GEMINI LLM # ============================= llm = ChatGoogleGenerativeAI( model=Config.GEMINI_MODEL, google_api_key=Config.GEMINI_API_KEY, temperature=0.2, convert_system_message_to_human=True ) # ============================= # LIVE SEARCH TOOL # ============================= search = TavilySearchResults( tavily_api_key=Config.TAVILY_API_KEY, max_results=7, search_depth="advanced" ) tools = [search] # ============================= # MEMORY # ============================= memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # ============================= # SYSTEM PROMPT # ============================= SYSTEM_PROMPT = f""" You are Smart Dashboard Premium AI Assistant. CURRENT DATE: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} MISSION: - Always provide highly accurate answers - Use live web search for: * Latest news * Sports * IPL * Finance * Weather * Stocks * AI * Jobs * Government * Technology * Sustainability - Prioritize uploaded dashboard CSV database if relevant - If user asks about uploaded files: * Analyze MySQL stored datasets * Compare uploaded records * Answer from CSV - If CSV lacks answer: * Use live web - Be expert in: * Flask * SQL * Dashboard systems * ESG * Sustainability * CRM * Analytics * Coding - Always sound professional """ # ============================= # AGENT # ============================= agent = initialize_agent( tools=tools, llm=llm, agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION, memory=memory, verbose=False, handle_parsing_errors=True, max_iterations=7, early_stopping_method="generate", agent_kwargs={ "system_message": SYSTEM_PROMPT } ) # ============================= # LIVE QUERY KEYWORDS # ============================= LIVE_KEYWORDS = [ "today", "latest", "current", "now", "live", "recent", "news", "score", "match", "ipl", "weather", "stock", "market", "bitcoin", "price", "technology", "ai", "policy", "government", "startup", "jobs", "salary", "sports", "release", "trend", "update" ] # ============================= # DATASET INTENT DETECTION # ============================= def is_dataset_related(question, df): q = question.lower() if df is None: return False # COLUMN MATCH for col in df.columns: if col.lower() in q: return True # VALUE MATCH dataset_text = df.astype(str).to_string().lower() for word in q.split(): if word in dataset_text: return True # DASHBOARD KEYWORDS dashboard_keywords = [ "dataset", "csv", "upload", "dashboard", "summary", "report", "chart", "graph", "file", "score", "data" ] if any(word in q for word in dashboard_keywords): return True return False # ============================= # DATASET ANALYSIS # ============================= def analyze_dataset(question, df): q = question.lower() # -------------------------------- # SUMMARY # -------------------------------- if "summary" in q or "describe" in q: return f""" Dataset Summary: Rows: {df.shape[0]} Columns: {df.shape[1]} Column Names: {', '.join(df.columns)} """ # -------------------------------- # SCORE COLUMN # -------------------------------- if "score" in [c.lower() for c in df.columns]: score_col = next( c for c in df.columns if c.lower() == "score" ) name_col = next( (c for c in df.columns if c.lower() in ["name", "company", "user"]), df.columns[0] ) if "highest" in q or "top" in q: top = df.loc[df[score_col].astype(float).idxmax()] return f""" Top Performer: {name_col}: {top[name_col]} Score: {top[score_col]} """ if "lowest" in q: low = df.loc[df[score_col].astype(float).idxmin()] return f""" Lowest Performer: {name_col}: {low[name_col]} Score: {low[score_col]} """ if "average" in q: return f""" Average Score: {df[score_col].astype(float).mean()} """ # -------------------------------- # ADVANCED LLM DATA ANALYSIS # -------------------------------- csv_context = f""" You are a professional data analyst. Dataset Columns: {list(df.columns)} Dataset Sample: {df.head(30).to_string()} User Question: {question} RULES: - Answer strictly from dataset - Be precise - If unavailable say: 'Dataset does not contain this information' """ response = llm.invoke(csv_context) return response.content # ============================= # MAIN CHATBOT # ============================= def general_chatbot(question): try: q = question.lower() # ============================= # LOAD CURRENT DATAFRAME # ============================= df = get_dataframe() # ============================= # LIVE SEARCH PRIORITY # ============================= if any(keyword in q for keyword in LIVE_KEYWORDS): live_prompt = f""" Use live web search. Provide latest verified information. Question: {question} """ return agent.run(live_prompt) # ============================= # DATASET QUESTIONS # ============================= if is_dataset_related(question, df): if df is not None: dataset_response = analyze_dataset(question, df) # FALLBACK IF DATA NOT FOUND if any( phrase in dataset_response.lower() for phrase in [ "does not contain", "not available", "no information" ] ): web_prompt = f""" Use live web search. Question: {question} """ return agent.run(web_prompt) return dataset_response else: return "No CSV dataset is currently loaded." # ============================= # DATABASE FILE HISTORY # ============================= if "uploaded files" in q or "file history" in q: files = get_all_database_files() if not files: return "No uploaded files found in MySQL database." result = "Uploaded File History:\n\n" for file in files: result += ( f"ID: {file['id']} | " f"File: {file['filename']} | " f"Uploaded: {file['upload_date']}\n" ) return result # ============================= # GENERAL PREMIUM RESPONSE # ============================= return agent.run(question) except Exception as e: return f"Error: {str(e)}"