Abhishek
Added dummy sqlite DB
3b382fd
# erp_server.py
import os
import sys
from typing import TypedDict, List, Union
# Add the project root directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
import gradio as gr
from typing import Dict, Any, List, Optional
from dotenv import load_dotenv
from datetime import datetime, date
import pathlib
# Import both database libraries
import sqlite3
try:
import psycopg2
from psycopg2.extras import RealDictCursor
POSTGRES_AVAILABLE = True
except ImportError:
POSTGRES_AVAILABLE = False
# Import SQLite initialization and reset functions
from data.utils.erp_db_init import init_sqlite_db
from data.utils.reset_erp_db import reset_erp_db
from mcp.server.fastmcp import FastMCP
load_dotenv()
mcp = FastMCP("ERP Database")
# Helper function to convert SQLite row to dict
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Helper function to handle date serialization for JSON
def serialize_dates(obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return obj
def get_db_connection():
"""Get database connection based on configuration."""
# Default to SQLite unless explicitly set to use PostgreSQL
db_type = os.getenv("ERP_DB_TYPE", "sqlite").lower()
if db_type == "postgres" and POSTGRES_AVAILABLE:
try:
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
database=os.getenv("POSTGRES_DB", "erp_db"),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", ""),
port=os.getenv("POSTGRES_PORT", "5432")
)
return conn, "postgres"
except Exception as e:
raise Exception(f"PostgreSQL connection failed: {str(e)}")
else:
try:
# Get SQLite database path from environment or use default
db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite")
# Ensure database is initialized
db_dir = pathlib.Path(os.path.dirname(db_path))
if not db_dir.exists() or not pathlib.Path(db_path).exists():
init_sqlite_db(db_path)
conn = sqlite3.connect(db_path)
conn.row_factory = dict_factory
# Enable foreign keys
conn.execute("PRAGMA foreign_keys = ON")
return conn, "sqlite"
except Exception as e:
raise Exception(f"SQLite connection failed: {str(e)}")
@mcp.tool()
async def execute_query(query: str, params: Optional[List] = None) -> Dict[str, Any]:
"""
Execute a custom SQL query on the ERP database.
Only SELECT statements are allowed for security reasons.
Args:
query (str): SQL query to execute (must be a SELECT statement)
params (List, optional): Parameters for parameterized queries
Returns:
dict: Query results or error message
"""
try:
# Check if it's a SELECT query
if not query.strip().upper().startswith('SELECT'):
return {
"success": False,
"error": "Only SELECT statements are allowed for security reasons."
}
conn, db_type = get_db_connection()
if db_type == "postgres":
cursor = conn.cursor(cursor_factory=RealDictCursor)
else: # sqlite
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
results = cursor.fetchall()
# Convert results to regular dict for JSON serialization
if db_type == "postgres":
results = [dict(row) for row in results]
if not results:
return "No results found"
return results
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def list_erp_tables() -> Dict[str, Any]:
"""
List all ERP tables in the database.
Returns:
dict: List of ERP table names
"""
try:
conn, db_type = get_db_connection()
if db_type == "postgres":
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'erp_%'
ORDER BY table_name;
"""
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query)
results = cursor.fetchall()
table_names = [row['table_name'] for row in results]
else: # sqlite
query = """
SELECT name as table_name
FROM sqlite_master
WHERE type='table' AND name LIKE 'erp_%'
ORDER BY name;
"""
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
table_names = [row['table_name'] for row in results]
if not table_names:
return {
"success": False,
"message": "No ERP tables found",
"tables": []
}
return {
"success": True,
"message": f"Found {len(table_names)} tables",
"count": len(table_names),
"tables": table_names
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def get_order_status(order_id: int) -> Dict[str, Any]:
"""
Get the status of an order by order ID.
Args:
order_id (int): The ID of the order to check
Returns:
dict: Order status information including shipping and destination countries
"""
try:
conn, db_type = get_db_connection()
if db_type == "postgres":
cursor = conn.cursor(cursor_factory=RealDictCursor)
param_placeholder = "%s"
else: # sqlite
cursor = conn.cursor()
param_placeholder = "?"
query = f"""
SELECT o.*, c.name as customer_name, c.email as customer_email,
o.shipping_country, o.destination_country
FROM erp_orders o
JOIN erp_customers c ON o.customer_id = c.customer_id
WHERE o.order_id = {param_placeholder};
"""
cursor.execute(query, [order_id])
order = cursor.fetchone()
if not order:
return {
"success": False,
"message": f"Order with ID {order_id} not found"
}
# Get order items
items_query = f"""
SELECT oi.order_item_id, oi.order_id, oi.product_id, oi.quantity, oi.unit_price, oi.subtotal,
p.product_name, p.sku
FROM erp_order_items oi
JOIN erp_products p ON oi.product_id = p.product_id
WHERE oi.order_id = {param_placeholder};
"""
cursor.execute(items_query, [order_id])
items = cursor.fetchall()
# Get order history
history_query = f"""
SELECT * FROM erp_order_history
WHERE order_id = {param_placeholder}
ORDER BY timestamp DESC;
"""
cursor.execute(history_query, [order_id])
history = cursor.fetchall()
order_dict = order if db_type == "sqlite" else dict(order)
items_list = items if db_type == "sqlite" else [dict(item) for item in items]
history_list = history if db_type == "sqlite" else [dict(entry) for entry in history]
# Return JSON formatted response
return {
"success": True,
"order": {
"order_id": order_id,
"customer": {
"customer_id": order_dict['customer_id'],
"name": order_dict['customer_name'],
"email": order_dict['customer_email']
},
"status": order_dict['status'],
"shipping_address": order_dict.get('shipping_address', 'N/A'),
"shipping_country": order_dict.get('shipping_country', 'N/A'),
"destination_country": order_dict.get('destination_country', 'N/A'),
"items": items_list,
"history": history_list
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def place_new_order(
customer_id: int,
items: List[Dict[str, Any]],
shipping_address: str,
shipping_country: str,
destination_country: str,
previous_order_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Place a new order in the ERP system, after ensuring there are no global disruptions affecting the shipment.
Args:
customer_id (int): ID of the customer placing the order
items (List[Dict]): List of items to order, each with product_id and quantity
shipping_address (str): Shipping address for the order
shipping_country (str): Country where the order will be shipped from
destination_country (str): Country where the order will be delivered to
previous_order_id (int, optional): ID of a previous order this is replacing
Returns:
dict: New order information
"""
try:
conn, db_type = get_db_connection()
if db_type == "postgres":
cursor = conn.cursor(cursor_factory=RealDictCursor)
param_placeholder = "%s"
returning_clause = "RETURNING order_id"
date_interval = "CURRENT_DATE + INTERVAL '7 days'"
due_date_interval = "CURRENT_DATE + INTERVAL '30 days'"
concat_op = "||"
else: # sqlite
cursor = conn.cursor()
param_placeholder = "?"
returning_clause = ""
date_interval = "date('now', '+7 days')"
due_date_interval = "date('now', '+30 days')"
concat_op = "||"
# Calculate total amount based on the total items and their prices
total_amount = 0
for item in items:
product_query = f"SELECT price FROM erp_products WHERE product_id = {param_placeholder};"
cursor.execute(product_query, [item['product_id']])
product = cursor.fetchone()
if not product:
return {
"success": False,
"error": f"Product with ID {item['product_id']} not found"
}
total_amount += product['price'] * item['quantity']
# Insert new order into the erp_orders table for the customer
order_query = f"""
INSERT INTO erp_orders (
customer_id, order_date, total_amount, status,
shipping_address, shipping_country, destination_country, previous_order_id,
estimated_delivery, payment_status
) VALUES (
{param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Processing',
{param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder},
{date_interval}, 'Pending'
) {returning_clause};
"""
cursor.execute(order_query, [
customer_id, total_amount, shipping_address, shipping_country, destination_country, previous_order_id
])
if db_type == "postgres":
new_order_id = cursor.fetchone()['order_id']
else: # sqlite
new_order_id = cursor.lastrowid
# Insert order items into erp_order_items table for the new order
for item in items:
# Get product price
cursor.execute(product_query, [item['product_id']])
product = cursor.fetchone()
unit_price = product['price']
subtotal = unit_price * item['quantity']
item_query = f"""
INSERT INTO erp_order_items (
order_id, product_id, quantity, unit_price, subtotal
) VALUES (
{param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}
);
"""
cursor.execute(item_query, [
new_order_id, item['product_id'], item['quantity'],
unit_price, subtotal
])
# Update product stock
update_stock_query = f"""
UPDATE erp_products
SET stock_quantity = stock_quantity - {param_placeholder}
WHERE product_id = {param_placeholder};
"""
cursor.execute(update_stock_query, [item['quantity'], item['product_id']])
# Create order history entry into the erp_order_history for the new order
history_query = f"""
INSERT INTO erp_order_history (
order_id, timestamp, status_change, notes, updated_by
) VALUES (
{param_placeholder}, CURRENT_TIMESTAMP, 'Order Created', 'New order placed', 'System'
);
"""
cursor.execute(history_query, [new_order_id])
# If this is a replacement order, add a note to the previous order
if previous_order_id:
prev_order_note_query = f"""
INSERT INTO erp_order_history (
order_id, timestamp, status_change, notes, updated_by
) VALUES (
{param_placeholder}, CURRENT_TIMESTAMP, 'Replaced', 'Order replaced by order #' {concat_op} {param_placeholder}, 'System'
);
"""
cursor.execute(prev_order_note_query, [previous_order_id, new_order_id])
# Generate invoice for the new order
invoice_query = f"""
INSERT INTO erp_invoices (
order_id, invoice_date, amount, payment_terms, due_date, is_paid, invoice_number
) VALUES (
{param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Net 30', {due_date_interval}, 0, 'INV-' {concat_op} {param_placeholder}
) {returning_clause};
"""
cursor.execute(invoice_query, [new_order_id, total_amount, str(new_order_id)])
if db_type == "postgres":
invoice_id = cursor.fetchone()['invoice_id']
else: # sqlite
invoice_id = cursor.lastrowid
conn.commit()
# Get the complete new order details
cursor.execute(f"SELECT * FROM erp_orders WHERE order_id = {param_placeholder};", [new_order_id])
order = cursor.fetchone()
order_dict = order
# Format date for response
estimated_delivery = order_dict.get('estimated_delivery')
if estimated_delivery:
if isinstance(estimated_delivery, str):
estimated_delivery_str = estimated_delivery
else:
estimated_delivery_str = estimated_delivery.strftime("%Y-%m-%d") if hasattr(estimated_delivery, 'strftime') else str(estimated_delivery)
else:
estimated_delivery_str = None
# Return JSON formatted response
return {
"success": True,
"order": {
"order_id": new_order_id,
"invoice_id": invoice_id,
"total_amount": float(total_amount),
"status": order_dict['status'],
"estimated_delivery": estimated_delivery_str,
"customer_id": customer_id,
"shipping_address": shipping_address,
"shipping_country": shipping_country,
"destination_country": destination_country,
"previous_order_id": previous_order_id
}
}
except Exception as e:
if 'conn' in locals():
conn.rollback()
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def cancel_order(order_id: int, reason: str) -> Dict[str, Any]:
"""
Cancel an existing order in the ERP system.
Args:
order_id (int): ID of the order to cancel
reason (str): Reason for cancellation
Returns:
dict: Result of the cancellation operation
"""
try:
conn, db_type = get_db_connection()
if db_type == "postgres":
cursor = conn.cursor(cursor_factory=RealDictCursor)
param_placeholder = "%s"
else: # sqlite
cursor = conn.cursor()
param_placeholder = "?"
# Check if order exists and can be cancelled
check_query = f"""
SELECT status, customer_id FROM erp_orders WHERE order_id = {param_placeholder};
"""
cursor.execute(check_query, [order_id])
order = cursor.fetchone()
if not order:
return {
"success": False,
"error": f"Order with ID {order_id} not found"
}
if order['status'] in ['Delivered', 'Cancelled']:
return {
"success": False,
"error": f"Cannot cancel order with status '{order['status']}'"
}
# Get order items to restore stock
items_query = f"""
SELECT product_id, quantity FROM erp_order_items WHERE order_id = {param_placeholder};
"""
cursor.execute(items_query, [order_id])
items = cursor.fetchall()
# Update order status to Cancelled
update_query = f"""
UPDATE erp_orders SET status = 'Cancelled', payment_status = 'Cancelled'
WHERE order_id = {param_placeholder};
"""
cursor.execute(update_query, [order_id])
# Add entry to order history
history_query = f"""
INSERT INTO erp_order_history (
order_id, timestamp, status_change, notes, updated_by
) VALUES (
{param_placeholder}, CURRENT_TIMESTAMP, 'Cancelled', {param_placeholder}, 'System'
);
"""
cursor.execute(history_query, [order_id, f"Order cancelled: {reason}"])
# Restore product stock quantities
for item in items:
restore_stock_query = f"""
UPDATE erp_products
SET stock_quantity = stock_quantity + {param_placeholder}
WHERE product_id = {param_placeholder};
"""
cursor.execute(restore_stock_query, [item['quantity'], item['product_id']])
# Update invoice if exists
invoice_query = f"""
UPDATE erp_invoices
SET is_paid = 0
WHERE order_id = {param_placeholder};
"""
cursor.execute(invoice_query, [order_id])
conn.commit()
return {
"success": True,
"message": f"Order #{order_id} has been successfully cancelled",
"reason": reason,
"items_restored": len(items)
}
except Exception as e:
if 'conn' in locals():
conn.rollback()
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def get_invoice_details(invoice_id: Optional[int] = None, order_id: Optional[int] = None) -> Dict[str, Any]:
"""
Get invoice details by invoice ID or order ID.
Args:
invoice_id (int, optional): ID of the invoice
order_id (int, optional): ID of the order
Returns:
dict: Invoice details including customer and order information
"""
if not invoice_id and not order_id:
return {
"success": False,
"error": "Either invoice_id or order_id must be provided"
}
try:
conn, db_type = get_db_connection()
if db_type == "postgres":
cursor = conn.cursor(cursor_factory=RealDictCursor)
param_placeholder = "%s"
else: # sqlite
cursor = conn.cursor()
param_placeholder = "?"
if invoice_id:
query = f"""
SELECT i.*, o.order_date, o.status as order_status,
c.name as customer_name, c.email as customer_email, c.address as customer_address
FROM erp_invoices i
JOIN erp_orders o ON i.order_id = o.order_id
JOIN erp_customers c ON o.customer_id = c.customer_id
WHERE i.invoice_id = {param_placeholder};
"""
cursor.execute(query, [invoice_id])
else:
query = f"""
SELECT i.*, o.order_date, o.status as order_status,
c.name as customer_name, c.email as customer_email, c.address as customer_address
FROM erp_invoices i
JOIN erp_orders o ON i.order_id = o.order_id
JOIN erp_customers c ON o.customer_id = c.customer_id
WHERE i.order_id = {param_placeholder};
"""
cursor.execute(query, [order_id])
invoice = cursor.fetchone()
if not invoice:
return {
"success": False,
"error": f"Invoice not found for the provided {'invoice_id' if invoice_id else 'order_id'}"
}
# Get order items
items_query = f"""
SELECT oi.*, p.product_name, p.sku
FROM erp_order_items oi
JOIN erp_products p ON oi.product_id = p.product_id
WHERE oi.order_id = {param_placeholder};
"""
cursor.execute(items_query, [invoice['order_id']])
items = cursor.fetchall()
invoice_dict = invoice
items_list = items
# Format dates for response
order_date = invoice_dict.get('order_date')
if order_date:
if isinstance(order_date, str):
order_date_str = order_date
else:
order_date_str = order_date.strftime("%Y-%m-%d") if hasattr(order_date, 'strftime') else str(order_date)
else:
order_date_str = None
due_date = invoice_dict.get('due_date')
if due_date:
if isinstance(due_date, str):
due_date_str = due_date
else:
due_date_str = due_date.strftime("%Y-%m-%d") if hasattr(due_date, 'strftime') else str(due_date)
else:
due_date_str = None
# Return JSON formatted response
return {
"success": True,
"invoice": {
"invoice_id": invoice_dict['invoice_id'],
"invoice_number": invoice_dict.get('invoice_number', ''),
"order_id": invoice_dict['order_id'],
"order_date": order_date_str,
"order_status": invoice_dict['order_status'],
"amount": float(invoice_dict['amount']),
"due_date": due_date_str,
"payment_status": "Paid" if invoice_dict['is_paid'] else "Unpaid",
"customer": {
"name": invoice_dict['customer_name'],
"email": invoice_dict['customer_email'],
"address": invoice_dict['customer_address']
},
"items": items_list
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
if 'conn' in locals():
conn.close()
@mcp.tool()
async def reset_database() -> Dict[str, Any]:
"""
Reset the ERP database by deleting it and recreating it with fresh data.
Returns:
dict: Result of the database reset operation
"""
try:
# Get SQLite database path from environment or use default
db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite")
# Reset the database
result = reset_erp_db(db_path)
if result:
return {
"success": True,
"message": "Database reset successfully. All tables have been recreated with fresh sample data."
}
else:
return {
"success": False,
"error": "Failed to reset database. Check server logs for details."
}
except Exception as e:
return {
"success": False,
"error": f"Error resetting database: {str(e)}"
}
# Create Gradio interfaces for each function
execute_query_interface = gr.Interface(
fn=execute_query,
inputs=[
gr.Textbox(lines=5, label="SQL Query"),
gr.Textbox(label="Parameters (comma-separated)", placeholder="Optional")
],
outputs=gr.Textbox(lines=10),
title="Execute SQL Query",
description="Execute a custom SQL query on the ERP database"
)
list_tables_interface = gr.Interface(
fn=list_erp_tables,
inputs=[],
outputs=gr.Textbox(lines=10),
title="List ERP Tables",
description="List all ERP tables in the database"
)
order_status_interface = gr.Interface(
fn=get_order_status,
inputs=gr.Number(label="Order ID", precision=0),
outputs=gr.Textbox(lines=15),
title="Get Order Status",
description="Get the status of an order by order ID"
)
place_order_interface = gr.Interface(
fn=place_new_order,
inputs=[
gr.Number(label="Customer ID", precision=0),
gr.Textbox(label="Product IDs (comma-separated)", placeholder="1, 2, 3"),
gr.Textbox(label="Quantities (comma-separated)", placeholder="2, 1, 3"),
gr.Textbox(label="Shipping Address"),
gr.Textbox(label="Shipping Country"),
gr.Textbox(label="Destination Country"),
gr.Number(label="Previous Order ID (optional)", precision=0)
],
outputs=gr.Textbox(lines=10),
title="Place New Order",
description="Place a new order in the ERP system"
)
cancel_order_interface = gr.Interface(
fn=cancel_order,
inputs=[
gr.Number(label="Order ID", precision=0),
gr.Textbox(label="Reason for Cancellation")
],
outputs=gr.Textbox(lines=5),
title="Cancel Order",
description="Cancel an existing order in the ERP system"
)
invoice_details_interface = gr.Interface(
fn=get_invoice_details,
inputs=[
gr.Number(label="Invoice ID (optional)", precision=0),
gr.Number(label="Order ID (optional)", precision=0)
],
outputs=gr.Textbox(lines=15),
title="Get Invoice Details",
description="Get invoice details by invoice ID or order ID"
)
reset_database_interface = gr.Interface(
fn=reset_database,
inputs=[],
outputs=gr.Textbox(lines=5),
title="Reset Database",
description="Reset the ERP database by deleting it and recreating it with fresh data"
)
# Create a Gradio TabItem for each interface
demo = gr.TabbedInterface(
[
execute_query_interface,
list_tables_interface,
order_status_interface,
place_order_interface,
cancel_order_interface,
invoice_details_interface,
reset_database_interface
],
[
"Execute Query",
"List Tables",
"Order Status",
"Place Order",
"Cancel Order",
"Invoice Details",
"Reset Database"
],
title="ERP System Tools"
)
# Launch the demo with MCP server enabled
demo.launch(mcp_server=True, ssr_mode=False, share=True)