# --- IMPORTS --- from werkzeug.exceptions import TooManyRequests from flask import Flask, request, jsonify, render_template from flask_socketio import SocketIO, emit from langchain_google_genai import ChatGoogleGenerativeAI from langchain.agents import initialize_agent, AgentType, create_react_agent, AgentExecutor from langchain_community.agent_toolkits import create_sql_agent, SQLDatabaseToolkit from langchain_community.utilities import SQLDatabase from langchain.tools import Tool from langchain.memory import ConversationBufferMemory from pymongo import MongoClient import threading import os, uuid import re import traceback import ast from bson import json_util from dotenv import load_dotenv from werkzeug.utils import secure_filename from werkzeug.exceptions import HTTPException from langchain.prompts import ChatPromptTemplate from tabulate import tabulate from fuzzywuzzy import fuzz # from langchain_groq import ChatGroq from datetime import datetime def error_safe(f): def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except HTTPException as he: return jsonify({"status": "error", "message": he.description}), he.code except Exception as e: print("[ERROR] Uncaught Exception in", f.__name__) traceback.print_exc() return jsonify({"status": "error", "message": str(e)}), 500 wrapper.__name__ = f.__name__ return wrapper # --- ENV + FLASK SETUP --- load_dotenv() os.environ["GEMINI_API_KEY"] = os.getenv("GEMINI_API_KEY") app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(32) app.config['UPLOAD_FOLDER'] = 'uploads' socketio = SocketIO(app, cors_allowed_origins="*") os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) llm = ChatGoogleGenerativeAI( temperature=0.2, model="gemini-2.0-flash", max_retries=50, api_key=os.getenv("GEMINI_API_KEY") ) # llm = ChatGroq(temperature=0.2, model_name="mistral-saba-24b",api_key=os.getenv("GROQ_API_KEY")) # --- GLOBALS --- agent_executor = None memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, input_key="input") mongo_db = None client = None db_mode = None # "mongo" or "sql" # --- SHARED --- def is_schema_request(prompt: str) -> bool: pattern = re.compile( r'\b(schema|table names|tables|columns|structure|column names|collections?|field names|metadata|describe|show)\b', re.IGNORECASE) return bool(pattern.search(prompt)) def is_sensitive_request(prompt: str) -> bool: sensitive_keywords = [ "password", "token", "credential", "secret", "api key", "schema", "structure", "collection name", "field name", "user_id", "order_id", "payment_id", "internal", "database structure", "table structure", "email", "phone", "contact", "ssn" ] lowered = prompt.lower() return any(keyword in lowered for keyword in sensitive_keywords) intent_prompt = ChatPromptTemplate.from_messages([ ("system", "Classify if the user is asking schema/structure/sensitive info (tables, columns, schema): YES or NO."), ("human", "{prompt}") ]) intent_checker = intent_prompt | llm def is_schema_leak_request(prompt): try: classification = intent_checker.invoke({"prompt": prompt}) return "yes" in classification.content.strip().lower() except: return False # --- INIT SQL AGENT --- def init_sql_agent(db_path): global agent_executor, db_mode db = SQLDatabase.from_uri(f"sqlite:///{db_path}") toolkit = SQLDatabaseToolkit(db=db, llm=llm) prefix = '''You are a helpful SQL expert agent that ALWAYS returns natural language answers using the tools.''' # Always format your responses in Markdown. For example: # - Use bullet points # - Use bold for headers # - Wrap code in triple backticks # - Tables should use Markdown table syntax # You must NEVER: # - Show or mention SQL syntax. # - Reveal table names, column names, or database schema. # - Respond with any technical details or structure of the database. # - Return code or tool names. # - Give wrong Answers. # You must ALWAYS: # - Respond in plain, friendly language. # - Don't Summarize the result for the user (e.g., "There are 9 tables in the system.") # - If asked to list table names or schema, politely refuse and respond with: # "I'm sorry, I can't share database structure information." # - ALWAYS HAVE TO SOLVE COMPLEX USER QUERIES. FOR THAT, UNDERSTAND THE PROMPT, ANALYSE PROPER AND THEN GIVE ANSWER. # - Your Answers should be correct, you have to do understand process well and give accurate answers. # - IF USER ASK ABOUT DATA, Which is not there in a database, then GIVE FOLLOWING ANSWER: # "There is no such data in the Database." # Strict Rules You MUST Follow: # - NEVER display or mention SQL queries. # - NEVER explain SQL syntax or logic. # - NEVER return technical or code-like responses. # - ONLY respond in natural, human-friendly language. # - You are not allow to give the name of any COLUMNS, TABLES, DATABASE, ENTITY, SYNTAX, STRUCTURE, DESIGN, ETC... # If the user asks for anything other than retrieving data (SELECT), respond using this exact message: # "I'm not allowed to perform operations other than SELECT queries. Please ask something that involves reading data." # Do not return SQL queries or raw technical responses to the user. # For example: # Wrong: SELECT * FROM ... # Correct: The user assigned to the cart is Alice Smith. # Use the tools provided to get the correct data from the database and summarize the response clearly. # If the input is unclear or lacks sufficient data, ask for clarification using the SubmitFinalAnswer tool. # Never return SQL queries as your response. # If you cannot find an answer, # Double-check your query and running it again. # - If a query fails, revise and try again. # - Else 'No data found' using SubmitFinalAnswer.No SQL, no code. ''' agent_executor = create_sql_agent( llm=llm, toolkit=toolkit, verbose=True, prefix=prefix, agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION, memory=memory, agent_executor_kwargs={"handle_parsing_errors": True}, ) db_mode = "sql" # --- INIT MONGO AGENT --- system_message = """ You are **MongoDBQueryBot**, a highly intelligent and accurate assistant for answering questions about data stored in a MongoDB database using tools. """ # ### 🚨 Critical Instructions (Strictly Follow These): # - You **must always** use tools provided to answer user questions. # - Always join IDs with associated human-readable values like names or titles when answering. # - Prefer displaying `user name`, `employee name`, or `product name` instead of internal IDs like `user_id`, `emp_id`, or `product_id`. # - Avoid responding only with technical identifiers. Make responses meaningful to users. # - **Never** guess or fabricate any information. # - **Do not** show raw JSON, field names, or database structure. # - Your role is **read-only**: do not suggest or perform insert/update/delete. # - After Using All the available tools, if you are Unable to find any documents, then give followig ANSWER: # "Please, rephrase your query because I can't exactly understand, what you want !" # - If a query can't be answered or is unrelated to reading data, reply: # ❌ "I'm only allowed to retrieve data. Please ask a query involving reading information." # - IF USER ASK ABOUT DATA, Which is not there in a database, then GIVE FOLLOWING ANSWER: # "There is no such data in the Database." # - When returning answers: # - Do **not return internal IDs** like `user_id`, `order_id`, `payment_id`, etc. # - Instead, use human-readable fields like `name`, `full_name`, `user_name`, etc., from related collections. # - If only an ID is available, try joining the relevant collections to fetch the proper display name. # ### 🧠 How to Think: # - Understand **exactly** what the user is trying to ask. Do not answer if unclear β€” ask for clarification. # - Translate the user prompt into tool inputs by identifying: # - Which collection to search # - What value or field they're referring to # - The correct format expected by the tool # ### πŸ› οΈ Tool Usage Guide: # - Use `FindDocuments` for queries like: # - "Show me all employees named John" # - "What is the salary of Manager X?" # - Use `ListCollections` to discover available data types (but don’t share them directly). # - **IMPORTANT : Don't Iterate only in one tool, if you can't able to answer using current tool you using, then swith the tool !** # - Use `JoinCollections` to resolve IDs into names when the question asks about people, customers, or products. # - When resolving names from payments, use this format: # `from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name` # - Your goal is to **return the person's name** (e.g., `name`, `user_name`, `full_name`) not their ID. # - Always prioritize returning names instead of internal identifiers. # - Examples: # - For payment-related questions β†’ Join Payments β†’ Orders β†’ Users and return name # - For order questions β†’ Join Orders β†’ Users and return user names # ### 🧾 Response Format: # - Use **clear markdown with tables** when displaying data. # - If no data is found: return `**No documents found.**` # - Stay professional, brief, and relevant. # ### 🚫 Never Do This: # - Do not leak MongoDB structure, schema, or field names. # - Do not suggest code, MongoDB syntax, or field mappings. # - Do not hallucinate or make assumptions. # Start by analyzing the prompt carefully, select the right tool, invoke it, and return a user-friendly answer based on the result. # """ def find_docs_tool_func(query: str) -> str: """ Flexible MongoDB search with fallback: - First tries in specified collection. - If no results found, falls back to search across all collections. Input format: - collection=, key=, value= - OR: collection=, value= """ try: parts = dict(part.strip().split("=", 1) for part in query.split(",") if "=" in part) collection = parts.get("collection") key = parts.get("key") value = parts.get("value") if not collection: return "❌ 'collection' is required." def query_collection(coll_name): if key and value: return list(mongo_db[coll_name].find({key: value}, {'_id': 0})) elif value: return [doc for doc in mongo_db[coll_name].find({}, {'_id': 0}) if any(str(v).lower() == value.lower() for v in doc.values())] else: return list(mongo_db[coll_name].find({}, {'_id': 0})) docs = query_collection(collection) if docs: return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n" for coll in mongo_db.list_collection_names(): if coll == collection: continue docs = query_collection(coll) if docs: return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n" return "**No documents found.**" except Exception as e: return f"Invalid input format or error: {str(e)}" def aggregate_group_by(_input: str): try: if _input.strip().startswith("{"): # Parse JSON-like string args = ast.literal_eval(_input) collection = args.get("collection_name") or args.get("collection") field = args.get("group_by") or args.get("field") else: # Handle legacy input format args = dict(x.split("=") for x in _input.split(",")) collection = args["collection"] field = args["field"] pipeline = [ {"$group": {"_id": f"${field}", "count": {"$sum": 1}}}, {"$project": {"_id": 0, field: "$_id", "count": 1}} ] result = list(mongo_db[collection].aggregate(pipeline)) if not result: return "**No data found.**" return "\n markdown\n" + tabulate(result, headers="keys", tablefmt="github") + "\n" except Exception as e: return f"Aggregation failed: {e}" def get_all_documents(collection: str): try: docs = list(mongo_db[collection].find({}, {'_id': 0})) if not docs: return "**No documents found.**" return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n" except Exception as e: return f"Error fetching documents: {e}" def fuzzy_find_documents(query: str): try: parts = dict(part.strip().split("=", 1) for part in query.split(",")) collection = parts["collection"] value = parts["value"] threshold = int(parts.get("threshold", 80)) matches = [] for doc in mongo_db[collection].find({}, {'_id': 0}): if any(fuzz.partial_ratio(str(v).lower(), value.lower()) >= threshold for v in doc.values()): matches.append(doc) if not matches: return "**No fuzzy matches found.**" return "\n markdown\n" + tabulate(matches, headers="keys", tablefmt="github") + "\n" except Exception as e: return f"Fuzzy match error: {e}" # def join_collections_tool_func(_input: str): # try: # # Parse input like: from=Products, key=category_id, to=Categories, match=category_id, return=category_name # args = dict(x.strip().split("=", 1) for x in _input.split(",")) # from_collection = args["from"] # foreign_key = args["key"] # to_collection = args["to"] # match_key = args["match"] # return_field = args["return"] # results = [] # foreign_lookup = { # doc[match_key]: doc.get(return_field) # for doc in mongo_db[to_collection].find() # if match_key in doc # } # for doc in mongo_db[from_collection].find({}, {'_id': 0}): # doc[return_field] = foreign_lookup.get(doc.get(foreign_key), "Unknown") # results.append(doc) # if not results: # return "**No documents found.**" # return "\n markdown\n" + tabulate(results, headers="keys", tablefmt="github") + "\n" # except Exception as e: # return f"Join failed: {e}" def join_collections_tool_func(_input: str): """ Supports 2-level join (Payments β†’ Orders β†’ Users) or any pair-wise join Input formats: - from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name - from=Products, key=category_id, to=Categories, match=category_id, return=category_name """ try: args = dict(x.strip().split("=", 1) for x in _input.split(",")) from_coll = args["from"] key = args["key"] to_coll = args["to"] match = args["match"] return_field = args["return"] next_key = args.get("next_key") next_to = args.get("next_to") next_match = args.get("next_match") # First join (e.g., Payments β†’ Orders) to_docs = {doc[match]: doc for doc in mongo_db[to_coll].find() if match in doc} joined = [] for doc in mongo_db[from_coll].find({}, {'_id': 0}): foreign_doc = to_docs.get(doc.get(key)) if not foreign_doc: continue merged = {**doc, **foreign_doc} joined.append(merged) # Second join (e.g., Orders β†’ Users) if next_key and next_to and next_match: next_docs = { doc[next_match]: doc for doc in mongo_db[next_to].find() if next_match in doc} for doc in joined: user_doc = next_docs.get(doc.get(next_key)) if user_doc: doc[return_field] = user_doc.get(return_field, "Unknown") else: doc[return_field] = "Unknown" # Prepare final result if not joined: return "**No documents found.**" final = [{return_field: doc.get(return_field)} for doc in joined if return_field in doc] return "\n markdown\n" + tabulate(final, headers="keys", tablefmt="github") + "\n" except Exception as e: return f"Join failed: {e}" def smart_join_router(prompt: str) -> str: """ An intelligent router that suggests the correct JoinCollections input string for common user intent like payments β†’ orders β†’ users β†’ name. """ prompt_lower = prompt.lower() if "payment" in prompt_lower and any(term in prompt_lower for term in ["who", "name", "user", "person"]): return "from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name" elif "order" in prompt_lower and "name" in prompt_lower: return "from=Orders, key=user_id, to=Users, match=user_id, return=name" # Extend as needed return "Unable to auto-generate join path. Please provide more context." def init_mongo_agent(json_path): global agent_executor, client, mongo_db, db_mode # client = MongoClient("mongodb://localhost:27017/") client = MongoClient("mongodb+srv://dixitmwa:DixitWa%40123!@cluster0.qiysaz9.mongodb.net/") mongo_db = client['uploaded_mongo'] with open(json_path, 'r', encoding='utf-8') as f: data = json_util.loads(f.read()) # Handle both single-collection and multi-collection formats if isinstance(data, list): # Default collection name if only a list is provided collection = mongo_db['default_collection'] collection.drop() collection.insert_many(data) elif isinstance(data, dict): for col_name, docs in data.items(): collection = mongo_db[col_name] collection.drop() if isinstance(docs, list): collection.insert_many(docs) else: collection.insert_one(docs) else: raise ValueError("Unsupported JSON format. Must be a list or dict.") def list_collections(_input=None): return mongo_db.list_collection_names() find_docs_tool = Tool( name="FindDocuments", description=( "Use this tool to find documents in a MongoDB collection.\n" "Input format:\n" "- `collection=, key=, value=` for precise queries\n" "- OR `collection=, value=` to search across all fields\n" "If `key` is omitted, the tool will automatically scan all fields to find matching values.\n" "Examples:\n" "- `collection=default_collection, key=name, value=Lauren Alexander`\n" "- `collection=default_collection, value=Lauren Alexander`" ), func=find_docs_tool_func) aggregate_tool = Tool( name="AggregateGroupBy", func=aggregate_group_by, description=( "Group documents and count by any field. Format: collection=, field=. E.g., collection=residents, field=gender" ) ) get_all_documents_tool = Tool( name="GetAllDocuments", func=get_all_documents, description=( "Fetch all documents from a collection. Input: collection name only. Example: residents" ) ) fuzzy_tool = Tool( name="FuzzyFindDocuments", func=fuzzy_find_documents, description=("Fuzzy match documents across all fields in a collection. Format: collection=, value=, threshold=80 (optional)" ) ) join_collection_tool = Tool( name="JoinCollections", func=join_collections_tool_func, description=( "Join collections to map foreign keys to human-readable values. Supports 1 or 2-level joins.\n" "Formats:\n" "- from=Payments, key=order_id, to=Orders, match=order_id, return=status\n" "- from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name" ) ) smart_router_tool = Tool( name="SmartJoinRouter", func=smart_join_router, description=( "Suggest the correct JoinCollections input format based on user intent.\n" "Use this when you are unsure how to form the join input." ) ) tools = [ Tool(name="FindDocuments", func=find_docs_tool, description="Flexible MongoDB search..."), Tool(name="ListCollections", func=lambda x: list_collections(), description="List all collections..."), Tool(name="AggregateGroupBy", func=aggregate_tool, description="Group and count by any field..."), Tool(name="GetAllDocuments", func=get_all_documents_tool, description="Fetch all documents from a collection..."), Tool(name="FuzzyFindDocuments", func=fuzzy_tool, description="Fuzzy match documents across all fields..."), Tool(name="JoinCollections", func=join_collection_tool, description="Join related collections to return names instead of IDs..."), Tool(name="SmartJoinCollections", func=smart_router_tool, description="Smrt Join related collections to return names instead of IDs...") ] agent_executor = initialize_agent( tools=tools, llm=llm, agent_type=AgentType.CONVERSATIONAL_REACT_DESCRIPTION, memory=memory, verbose=True, prefix=system_message, handle_parsing_errors=True ) db_mode = "mongo" @app.errorhandler(Exception) def handle_all_errors(e): print(f"[ERROR] Global handler caught an exception: {str(e)}") traceback.print_exc() if isinstance(e, HTTPException): return jsonify({"status": "error", "message": e.description}), e.code return jsonify({"status": "error", "message": "An unexpected error occurred"}), 500 @app.errorhandler(TooManyRequests) def handle_429_error(e): return jsonify({ "status": "error", "message": "🚦 Agent is busy, try again after sometime." }), 429 # --- ROUTES --- @app.route("/") def index(): return render_template("app_index.html") @app.route("/upload_db", methods=["POST"]) @error_safe def upload_db(): file = request.files.get("file") if not file or file.filename == "": return jsonify(success=False, message="No file provided"), 400 filename = secure_filename(file.filename) path = os.path.join(app.config["UPLOAD_FOLDER"], filename) file.save(path) try: if filename.endswith(".json"): init_mongo_agent(path) mongo_db = globals().get("mongo_db") db_name = getattr(mongo_db, "name", None) or os.path.splitext(filename)[0] return jsonify({"database_name": db_name, "message": "MongoDB initialized"}), 200 # return jsonify(success=True, message="MongoDB initialized") # elif filename.endswith(".db"): # init_sql_agent(path) # return jsonify(success=True, message="SQL DB initialized") # SQL DB (.db or .sqlite) elif filename.lower().endswith(".db") or filename.lower().endswith(".sqlite"): init_sql_agent(path) # your existing initializer db_name = os.path.splitext(filename)[0] return jsonify({"database_name": db_name, "message": "SQL DB initialized"}), 200 else: return jsonify(success=False, message="Unsupported file format"), 400 except Exception as e: traceback.print_exc() return jsonify(success=False, message=f"Init failed: {e}"), 500 @app.route("/generate", methods=["POST"]) @error_safe def generate(): try: data = request.get_json(force=True) or {} prompt = data.get("prompt", "").strip() if not prompt: return jsonify({"status": "error", "message": "Prompt is required"}), 400 except Exception: traceback.print_exc() return jsonify({"status": "error", "message": "Invalid input"}), 400 try: # invoke your agent synchronously result = agent_executor.invoke({"input": prompt}) # Normalize final_answer from agent output safely if isinstance(result, dict): final_answer = ( result.get("final_answer") or result.get("output") or result.get("answer") or result.get("text") or "" ) else: final_answer = str(result or "") if final_answer is None: final_answer = "" # Optionally keep emitting to socket so clients listening to socketio still get it try: socketio.emit("final", {"message": final_answer}) except Exception: app.logger.debug("socket emit failed, continuing") return jsonify({"final_answer": final_answer, "prompt": prompt}), 200 except Exception as e: app.logger.exception("Agent invocation failed") return jsonify({"prompt": prompt, "final_answer": "", "message": f"Agent error: {str(e)[:200]}"}), 500 if __name__ == "__main__": # socketio.run(app, debug=True) socketio.run(app, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True)