Database_Agent / app_upload_file_db.py
prthm11's picture
Rename app.py to app_upload_file_db.py
f47702d verified
# --- IMPORTS ---
from werkzeug.exceptions import TooManyRequests
from flask import Flask, request, jsonify, render_template
from flask_socketio import SocketIO, emit
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import initialize_agent, AgentType, create_react_agent, AgentExecutor
from langchain_community.agent_toolkits import create_sql_agent, SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
from pymongo import MongoClient
import threading
import os, uuid
import re
import traceback
import ast
from bson import json_util
from dotenv import load_dotenv
from werkzeug.utils import secure_filename
from werkzeug.exceptions import HTTPException
from langchain.prompts import ChatPromptTemplate
from tabulate import tabulate
from fuzzywuzzy import fuzz
# from langchain_groq import ChatGroq
from datetime import datetime
def error_safe(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except HTTPException as he:
return jsonify({"status": "error", "message": he.description}), he.code
except Exception as e:
print("[ERROR] Uncaught Exception in", f.__name__)
traceback.print_exc()
return jsonify({"status": "error", "message": str(e)}), 500
wrapper.__name__ = f.__name__
return wrapper
# --- ENV + FLASK SETUP ---
load_dotenv()
os.environ["GEMINI_API_KEY"] = os.getenv("GEMINI_API_KEY")
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(32)
app.config['UPLOAD_FOLDER'] = 'uploads'
socketio = SocketIO(app, cors_allowed_origins="*")
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
llm = ChatGoogleGenerativeAI(
temperature=0.2,
model="gemini-2.0-flash",
max_retries=50,
api_key=os.getenv("GEMINI_API_KEY")
)
# llm = ChatGroq(temperature=0.2, model_name="mistral-saba-24b",api_key=os.getenv("GROQ_API_KEY"))
# --- GLOBALS ---
agent_executor = None
memory = ConversationBufferMemory(
memory_key="chat_history", return_messages=True, input_key="input")
mongo_db = None
client = None
db_mode = None # "mongo" or "sql"
# --- SHARED ---
def is_schema_request(prompt: str) -> bool:
pattern = re.compile(
r'\b(schema|table names|tables|columns|structure|column names|collections?|field names|metadata|describe|show)\b', re.IGNORECASE)
return bool(pattern.search(prompt))
def is_sensitive_request(prompt: str) -> bool:
sensitive_keywords = [
"password", "token", "credential", "secret", "api key", "schema", "structure",
"collection name", "field name", "user_id", "order_id", "payment_id",
"internal", "database structure", "table structure", "email", "phone", "contact", "ssn"
]
lowered = prompt.lower()
return any(keyword in lowered for keyword in sensitive_keywords)
intent_prompt = ChatPromptTemplate.from_messages([
("system", "Classify if the user is asking schema/structure/sensitive info (tables, columns, schema): YES or NO."),
("human", "{prompt}")
])
intent_checker = intent_prompt | llm
def is_schema_leak_request(prompt):
try:
classification = intent_checker.invoke({"prompt": prompt})
return "yes" in classification.content.strip().lower()
except:
return False
# --- INIT SQL AGENT ---
def init_sql_agent(db_path):
global agent_executor, db_mode
db = SQLDatabase.from_uri(f"sqlite:///{db_path}")
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
prefix = '''You are a helpful SQL expert agent that ALWAYS returns natural language answers using the tools.'''
# Always format your responses in Markdown. For example:
# - Use bullet points
# - Use bold for headers
# - Wrap code in triple backticks
# - Tables should use Markdown table syntax
# You must NEVER:
# - Show or mention SQL syntax.
# - Reveal table names, column names, or database schema.
# - Respond with any technical details or structure of the database.
# - Return code or tool names.
# - Give wrong Answers.
# You must ALWAYS:
# - Respond in plain, friendly language.
# - Don't Summarize the result for the user (e.g., "There are 9 tables in the system.")
# - If asked to list table names or schema, politely refuse and respond with:
# "I'm sorry, I can't share database structure information."
# - ALWAYS HAVE TO SOLVE COMPLEX USER QUERIES. FOR THAT, UNDERSTAND THE PROMPT, ANALYSE PROPER AND THEN GIVE ANSWER.
# - Your Answers should be correct, you have to do understand process well and give accurate answers.
# - IF USER ASK ABOUT DATA, Which is not there in a database, then GIVE FOLLOWING ANSWER:
# "There is no such data in the Database."
# Strict Rules You MUST Follow:
# - NEVER display or mention SQL queries.
# - NEVER explain SQL syntax or logic.
# - NEVER return technical or code-like responses.
# - ONLY respond in natural, human-friendly language.
# - You are not allow to give the name of any COLUMNS, TABLES, DATABASE, ENTITY, SYNTAX, STRUCTURE, DESIGN, ETC...
# If the user asks for anything other than retrieving data (SELECT), respond using this exact message:
# "I'm not allowed to perform operations other than SELECT queries. Please ask something that involves reading data."
# Do not return SQL queries or raw technical responses to the user.
# For example:
# Wrong: SELECT * FROM ...
# Correct: The user assigned to the cart is Alice Smith.
# Use the tools provided to get the correct data from the database and summarize the response clearly.
# If the input is unclear or lacks sufficient data, ask for clarification using the SubmitFinalAnswer tool.
# Never return SQL queries as your response.
# If you cannot find an answer,
# Double-check your query and running it again.
# - If a query fails, revise and try again.
# - Else 'No data found' using SubmitFinalAnswer.No SQL, no code. '''
agent_executor = create_sql_agent(
llm=llm,
toolkit=toolkit,
verbose=True,
prefix=prefix,
agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
memory=memory,
agent_executor_kwargs={"handle_parsing_errors": True},
)
db_mode = "sql"
# --- INIT MONGO AGENT ---
system_message = """
You are **MongoDBQueryBot**, a highly intelligent and accurate assistant for answering questions about data stored in a MongoDB database using tools.
"""
# ### 🚨 Critical Instructions (Strictly Follow These):
# - You **must always** use tools provided to answer user questions.
# - Always join IDs with associated human-readable values like names or titles when answering.
# - Prefer displaying `user name`, `employee name`, or `product name` instead of internal IDs like `user_id`, `emp_id`, or `product_id`.
# - Avoid responding only with technical identifiers. Make responses meaningful to users.
# - **Never** guess or fabricate any information.
# - **Do not** show raw JSON, field names, or database structure.
# - Your role is **read-only**: do not suggest or perform insert/update/delete.
# - After Using All the available tools, if you are Unable to find any documents, then give followig ANSWER:
# "Please, rephrase your query because I can't exactly understand, what you want !"
# - If a query can't be answered or is unrelated to reading data, reply:
# ❌ "I'm only allowed to retrieve data. Please ask a query involving reading information."
# - IF USER ASK ABOUT DATA, Which is not there in a database, then GIVE FOLLOWING ANSWER:
# "There is no such data in the Database."
# - When returning answers:
# - Do **not return internal IDs** like `user_id`, `order_id`, `payment_id`, etc.
# - Instead, use human-readable fields like `name`, `full_name`, `user_name`, etc., from related collections.
# - If only an ID is available, try joining the relevant collections to fetch the proper display name.
# ### 🧠 How to Think:
# - Understand **exactly** what the user is trying to ask. Do not answer if unclear — ask for clarification.
# - Translate the user prompt into tool inputs by identifying:
# - Which collection to search
# - What value or field they're referring to
# - The correct format expected by the tool
# ### 🛠️ Tool Usage Guide:
# - Use `FindDocuments` for queries like:
# - "Show me all employees named John"
# - "What is the salary of Manager X?"
# - Use `ListCollections` to discover available data types (but don’t share them directly).
# - **IMPORTANT : Don't Iterate only in one tool, if you can't able to answer using current tool you using, then swith the tool !**
# - Use `JoinCollections` to resolve IDs into names when the question asks about people, customers, or products.
# - When resolving names from payments, use this format:
# `from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name`
# - Your goal is to **return the person's name** (e.g., `name`, `user_name`, `full_name`) not their ID.
# - Always prioritize returning names instead of internal identifiers.
# - Examples:
# - For payment-related questions → Join Payments → Orders → Users and return name
# - For order questions → Join Orders → Users and return user names
# ### 🧾 Response Format:
# - Use **clear markdown with tables** when displaying data.
# - If no data is found: return `**No documents found.**`
# - Stay professional, brief, and relevant.
# ### 🚫 Never Do This:
# - Do not leak MongoDB structure, schema, or field names.
# - Do not suggest code, MongoDB syntax, or field mappings.
# - Do not hallucinate or make assumptions.
# Start by analyzing the prompt carefully, select the right tool, invoke it, and return a user-friendly answer based on the result.
# """
def find_docs_tool_func(query: str) -> str:
"""
Flexible MongoDB search with fallback:
- First tries in specified collection.
- If no results found, falls back to search across all collections.
Input format:
- collection=<collection>, key=<field>, value=<value>
- OR: collection=<collection>, value=<value>
"""
try:
parts = dict(part.strip().split("=", 1)
for part in query.split(",") if "=" in part)
collection = parts.get("collection")
key = parts.get("key")
value = parts.get("value")
if not collection:
return "❌ 'collection' is required."
def query_collection(coll_name):
if key and value:
return list(mongo_db[coll_name].find({key: value}, {'_id': 0}))
elif value:
return [doc for doc in mongo_db[coll_name].find({}, {'_id': 0}) if any(str(v).lower() == value.lower() for v in doc.values())]
else:
return list(mongo_db[coll_name].find({}, {'_id': 0}))
docs = query_collection(collection)
if docs:
return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n"
for coll in mongo_db.list_collection_names():
if coll == collection:
continue
docs = query_collection(coll)
if docs:
return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n"
return "**No documents found.**"
except Exception as e:
return f"Invalid input format or error: {str(e)}"
def aggregate_group_by(_input: str):
try:
if _input.strip().startswith("{"):
# Parse JSON-like string
args = ast.literal_eval(_input)
collection = args.get("collection_name") or args.get("collection")
field = args.get("group_by") or args.get("field")
else:
# Handle legacy input format
args = dict(x.split("=") for x in _input.split(","))
collection = args["collection"]
field = args["field"]
pipeline = [
{"$group": {"_id": f"${field}", "count": {"$sum": 1}}},
{"$project": {"_id": 0, field: "$_id", "count": 1}}
]
result = list(mongo_db[collection].aggregate(pipeline))
if not result:
return "**No data found.**"
return "\n markdown\n" + tabulate(result, headers="keys", tablefmt="github") + "\n"
except Exception as e:
return f"Aggregation failed: {e}"
def get_all_documents(collection: str):
try:
docs = list(mongo_db[collection].find({}, {'_id': 0}))
if not docs:
return "**No documents found.**"
return "\n markdown\n" + tabulate(docs, headers="keys", tablefmt="github") + "\n"
except Exception as e:
return f"Error fetching documents: {e}"
def fuzzy_find_documents(query: str):
try:
parts = dict(part.strip().split("=", 1) for part in query.split(","))
collection = parts["collection"]
value = parts["value"]
threshold = int(parts.get("threshold", 80))
matches = []
for doc in mongo_db[collection].find({}, {'_id': 0}):
if any(fuzz.partial_ratio(str(v).lower(), value.lower()) >= threshold for v in doc.values()):
matches.append(doc)
if not matches:
return "**No fuzzy matches found.**"
return "\n markdown\n" + tabulate(matches, headers="keys", tablefmt="github") + "\n"
except Exception as e:
return f"Fuzzy match error: {e}"
# def join_collections_tool_func(_input: str):
# try:
# # Parse input like: from=Products, key=category_id, to=Categories, match=category_id, return=category_name
# args = dict(x.strip().split("=", 1) for x in _input.split(","))
# from_collection = args["from"]
# foreign_key = args["key"]
# to_collection = args["to"]
# match_key = args["match"]
# return_field = args["return"]
# results = []
# foreign_lookup = {
# doc[match_key]: doc.get(return_field)
# for doc in mongo_db[to_collection].find()
# if match_key in doc
# }
# for doc in mongo_db[from_collection].find({}, {'_id': 0}):
# doc[return_field] = foreign_lookup.get(doc.get(foreign_key), "Unknown")
# results.append(doc)
# if not results:
# return "**No documents found.**"
# return "\n markdown\n" + tabulate(results, headers="keys", tablefmt="github") + "\n"
# except Exception as e:
# return f"Join failed: {e}"
def join_collections_tool_func(_input: str):
"""
Supports 2-level join (Payments → Orders → Users) or any pair-wise join
Input formats:
- from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name
- from=Products, key=category_id, to=Categories, match=category_id, return=category_name
"""
try:
args = dict(x.strip().split("=", 1) for x in _input.split(","))
from_coll = args["from"]
key = args["key"]
to_coll = args["to"]
match = args["match"]
return_field = args["return"]
next_key = args.get("next_key")
next_to = args.get("next_to")
next_match = args.get("next_match")
# First join (e.g., Payments → Orders)
to_docs = {doc[match]: doc for doc in mongo_db[to_coll].find()
if match in doc}
joined = []
for doc in mongo_db[from_coll].find({}, {'_id': 0}):
foreign_doc = to_docs.get(doc.get(key))
if not foreign_doc:
continue
merged = {**doc, **foreign_doc}
joined.append(merged)
# Second join (e.g., Orders → Users)
if next_key and next_to and next_match:
next_docs = {
doc[next_match]: doc for doc in mongo_db[next_to].find() if next_match in doc}
for doc in joined:
user_doc = next_docs.get(doc.get(next_key))
if user_doc:
doc[return_field] = user_doc.get(return_field, "Unknown")
else:
doc[return_field] = "Unknown"
# Prepare final result
if not joined:
return "**No documents found.**"
final = [{return_field: doc.get(return_field)}
for doc in joined if return_field in doc]
return "\n markdown\n" + tabulate(final, headers="keys", tablefmt="github") + "\n"
except Exception as e:
return f"Join failed: {e}"
def smart_join_router(prompt: str) -> str:
"""
An intelligent router that suggests the correct JoinCollections input string
for common user intent like payments → orders → users → name.
"""
prompt_lower = prompt.lower()
if "payment" in prompt_lower and any(term in prompt_lower for term in ["who", "name", "user", "person"]):
return "from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name"
elif "order" in prompt_lower and "name" in prompt_lower:
return "from=Orders, key=user_id, to=Users, match=user_id, return=name"
# Extend as needed
return "Unable to auto-generate join path. Please provide more context."
def init_mongo_agent(json_path):
global agent_executor, client, mongo_db, db_mode
# client = MongoClient("mongodb://localhost:27017/")
client = MongoClient("mongodb+srv://dixitmwa:DixitWa%40123!@cluster0.qiysaz9.mongodb.net/")
mongo_db = client['uploaded_mongo']
with open(json_path, 'r', encoding='utf-8') as f:
data = json_util.loads(f.read())
# Handle both single-collection and multi-collection formats
if isinstance(data, list):
# Default collection name if only a list is provided
collection = mongo_db['default_collection']
collection.drop()
collection.insert_many(data)
elif isinstance(data, dict):
for col_name, docs in data.items():
collection = mongo_db[col_name]
collection.drop()
if isinstance(docs, list):
collection.insert_many(docs)
else:
collection.insert_one(docs)
else:
raise ValueError("Unsupported JSON format. Must be a list or dict.")
def list_collections(_input=None):
return mongo_db.list_collection_names()
find_docs_tool = Tool(
name="FindDocuments",
description=(
"Use this tool to find documents in a MongoDB collection.\n"
"Input format:\n"
"- `collection=<collection>, key=<field>, value=<value>` for precise queries\n"
"- OR `collection=<collection>, value=<value>` to search across all fields\n"
"If `key` is omitted, the tool will automatically scan all fields to find matching values.\n"
"Examples:\n"
"- `collection=default_collection, key=name, value=Lauren Alexander`\n"
"- `collection=default_collection, value=Lauren Alexander`"
),
func=find_docs_tool_func)
aggregate_tool = Tool(
name="AggregateGroupBy",
func=aggregate_group_by,
description=(
"Group documents and count by any field. Format: collection=<name>, field=<group_by_field>. E.g., collection=residents, field=gender"
)
)
get_all_documents_tool = Tool(
name="GetAllDocuments",
func=get_all_documents,
description=(
"Fetch all documents from a collection. Input: collection name only. Example: residents"
)
)
fuzzy_tool = Tool(
name="FuzzyFindDocuments",
func=fuzzy_find_documents,
description=("Fuzzy match documents across all fields in a collection. Format: collection=<name>, value=<search_term>, threshold=80 (optional)"
)
)
join_collection_tool = Tool(
name="JoinCollections",
func=join_collections_tool_func,
description=(
"Join collections to map foreign keys to human-readable values. Supports 1 or 2-level joins.\n"
"Formats:\n"
"- from=Payments, key=order_id, to=Orders, match=order_id, return=status\n"
"- from=Payments, key=order_id, to=Orders, match=order_id, next_key=user_id, next_to=Users, next_match=user_id, return=name"
)
)
smart_router_tool = Tool(
name="SmartJoinRouter",
func=smart_join_router,
description=(
"Suggest the correct JoinCollections input format based on user intent.\n"
"Use this when you are unsure how to form the join input."
)
)
tools = [
Tool(name="FindDocuments", func=find_docs_tool,
description="Flexible MongoDB search..."),
Tool(name="ListCollections", func=lambda x: list_collections(),
description="List all collections..."),
Tool(name="AggregateGroupBy", func=aggregate_tool,
description="Group and count by any field..."),
Tool(name="GetAllDocuments", func=get_all_documents_tool,
description="Fetch all documents from a collection..."),
Tool(name="FuzzyFindDocuments", func=fuzzy_tool,
description="Fuzzy match documents across all fields..."),
Tool(name="JoinCollections", func=join_collection_tool,
description="Join related collections to return names instead of IDs..."),
Tool(name="SmartJoinCollections", func=smart_router_tool,
description="Smrt Join related collections to return names instead of IDs...")
]
agent_executor = initialize_agent(
tools=tools,
llm=llm,
agent_type=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True,
prefix=system_message,
handle_parsing_errors=True
)
db_mode = "mongo"
@app.errorhandler(Exception)
def handle_all_errors(e):
print(f"[ERROR] Global handler caught an exception: {str(e)}")
traceback.print_exc()
if isinstance(e, HTTPException):
return jsonify({"status": "error", "message": e.description}), e.code
return jsonify({"status": "error", "message": "An unexpected error occurred"}), 500
@app.errorhandler(TooManyRequests)
def handle_429_error(e):
return jsonify({
"status": "error",
"message": "🚦 Agent is busy, try again after sometime."
}), 429
# --- ROUTES ---
@app.route("/")
def index():
return render_template("app_index.html")
@app.route("/upload_db", methods=["POST"])
@error_safe
def upload_db():
file = request.files.get("file")
if not file or file.filename == "":
return jsonify(success=False, message="No file provided"), 400
filename = secure_filename(file.filename)
path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(path)
try:
if filename.endswith(".json"):
init_mongo_agent(path)
mongo_db = globals().get("mongo_db")
db_name = getattr(mongo_db, "name", None) or os.path.splitext(filename)[0]
return jsonify({"database_name": db_name, "message": "MongoDB initialized"}), 200
# return jsonify(success=True, message="MongoDB initialized")
# elif filename.endswith(".db"):
# init_sql_agent(path)
# return jsonify(success=True, message="SQL DB initialized")
# SQL DB (.db or .sqlite)
elif filename.lower().endswith(".db") or filename.lower().endswith(".sqlite"):
init_sql_agent(path) # your existing initializer
db_name = os.path.splitext(filename)[0]
return jsonify({"database_name": db_name, "message": "SQL DB initialized"}), 200
else:
return jsonify(success=False, message="Unsupported file format"), 400
except Exception as e:
traceback.print_exc()
return jsonify(success=False, message=f"Init failed: {e}"), 500
@app.route("/generate", methods=["POST"])
@error_safe
def generate():
try:
data = request.get_json(force=True) or {}
prompt = data.get("prompt", "").strip()
if not prompt:
return jsonify({"status": "error", "message": "Prompt is required"}), 400
except Exception:
traceback.print_exc()
return jsonify({"status": "error", "message": "Invalid input"}), 400
try:
# invoke your agent synchronously
result = agent_executor.invoke({"input": prompt})
# Normalize final_answer from agent output safely
if isinstance(result, dict):
final_answer = (
result.get("final_answer")
or result.get("output")
or result.get("answer")
or result.get("text")
or ""
)
else:
final_answer = str(result or "")
if final_answer is None:
final_answer = ""
# Optionally keep emitting to socket so clients listening to socketio still get it
try:
socketio.emit("final", {"message": final_answer})
except Exception:
app.logger.debug("socket emit failed, continuing")
return jsonify({"final_answer": final_answer, "prompt": prompt}), 200
except Exception as e:
app.logger.exception("Agent invocation failed")
return jsonify({"prompt": prompt, "final_answer": "", "message": f"Agent error: {str(e)[:200]}"}), 500
if __name__ == "__main__":
# socketio.run(app, debug=True)
socketio.run(app, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True)