# erp_server.py import os import sys from typing import TypedDict, List, Union # Add the project root directory to the Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) import gradio as gr from typing import Dict, Any, List, Optional from dotenv import load_dotenv from datetime import datetime, date import pathlib # Import both database libraries import sqlite3 try: import psycopg2 from psycopg2.extras import RealDictCursor POSTGRES_AVAILABLE = True except ImportError: POSTGRES_AVAILABLE = False # Import SQLite initialization and reset functions from data.utils.erp_db_init import init_sqlite_db from data.utils.reset_erp_db import reset_erp_db from mcp.server.fastmcp import FastMCP load_dotenv() mcp = FastMCP("ERP Database") # Helper function to convert SQLite row to dict def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # Helper function to handle date serialization for JSON def serialize_dates(obj): if isinstance(obj, (date, datetime)): return obj.isoformat() return obj def get_db_connection(): """Get database connection based on configuration.""" # Default to SQLite unless explicitly set to use PostgreSQL db_type = os.getenv("ERP_DB_TYPE", "sqlite").lower() if db_type == "postgres" and POSTGRES_AVAILABLE: try: conn = psycopg2.connect( host=os.getenv("POSTGRES_HOST", "localhost"), database=os.getenv("POSTGRES_DB", "erp_db"), user=os.getenv("POSTGRES_USER", "postgres"), password=os.getenv("POSTGRES_PASSWORD", ""), port=os.getenv("POSTGRES_PORT", "5432") ) return conn, "postgres" except Exception as e: raise Exception(f"PostgreSQL connection failed: {str(e)}") else: try: # Get SQLite database path from environment or use default db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite") # Ensure database is initialized db_dir = pathlib.Path(os.path.dirname(db_path)) if not db_dir.exists() or not pathlib.Path(db_path).exists(): init_sqlite_db(db_path) conn = sqlite3.connect(db_path) conn.row_factory = dict_factory # Enable foreign keys conn.execute("PRAGMA foreign_keys = ON") return conn, "sqlite" except Exception as e: raise Exception(f"SQLite connection failed: {str(e)}") @mcp.tool() async def execute_query(query: str, params: Optional[List] = None) -> Dict[str, Any]: """ Execute a custom SQL query on the ERP database. Only SELECT statements are allowed for security reasons. Args: query (str): SQL query to execute (must be a SELECT statement) params (List, optional): Parameters for parameterized queries Returns: dict: Query results or error message """ try: # Check if it's a SELECT query if not query.strip().upper().startswith('SELECT'): return { "success": False, "error": "Only SELECT statements are allowed for security reasons." } conn, db_type = get_db_connection() if db_type == "postgres": cursor = conn.cursor(cursor_factory=RealDictCursor) else: # sqlite cursor = conn.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) results = cursor.fetchall() # Convert results to regular dict for JSON serialization if db_type == "postgres": results = [dict(row) for row in results] if not results: return "No results found" return results except Exception as e: return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def list_erp_tables() -> Dict[str, Any]: """ List all ERP tables in the database. Returns: dict: List of ERP table names """ try: conn, db_type = get_db_connection() if db_type == "postgres": query = """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'erp_%' ORDER BY table_name; """ cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(query) results = cursor.fetchall() table_names = [row['table_name'] for row in results] else: # sqlite query = """ SELECT name as table_name FROM sqlite_master WHERE type='table' AND name LIKE 'erp_%' ORDER BY name; """ cursor = conn.cursor() cursor.execute(query) results = cursor.fetchall() table_names = [row['table_name'] for row in results] if not table_names: return { "success": False, "message": "No ERP tables found", "tables": [] } return { "success": True, "message": f"Found {len(table_names)} tables", "count": len(table_names), "tables": table_names } except Exception as e: return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def get_order_status(order_id: int) -> Dict[str, Any]: """ Get the status of an order by order ID. Args: order_id (int): The ID of the order to check Returns: dict: Order status information including shipping and destination countries """ try: conn, db_type = get_db_connection() if db_type == "postgres": cursor = conn.cursor(cursor_factory=RealDictCursor) param_placeholder = "%s" else: # sqlite cursor = conn.cursor() param_placeholder = "?" query = f""" SELECT o.*, c.name as customer_name, c.email as customer_email, o.shipping_country, o.destination_country FROM erp_orders o JOIN erp_customers c ON o.customer_id = c.customer_id WHERE o.order_id = {param_placeholder}; """ cursor.execute(query, [order_id]) order = cursor.fetchone() if not order: return { "success": False, "message": f"Order with ID {order_id} not found" } # Get order items items_query = f""" SELECT oi.order_item_id, oi.order_id, oi.product_id, oi.quantity, oi.unit_price, oi.subtotal, p.product_name, p.sku FROM erp_order_items oi JOIN erp_products p ON oi.product_id = p.product_id WHERE oi.order_id = {param_placeholder}; """ cursor.execute(items_query, [order_id]) items = cursor.fetchall() # Get order history history_query = f""" SELECT * FROM erp_order_history WHERE order_id = {param_placeholder} ORDER BY timestamp DESC; """ cursor.execute(history_query, [order_id]) history = cursor.fetchall() order_dict = order if db_type == "sqlite" else dict(order) items_list = items if db_type == "sqlite" else [dict(item) for item in items] history_list = history if db_type == "sqlite" else [dict(entry) for entry in history] # Return JSON formatted response return { "success": True, "order": { "order_id": order_id, "customer": { "customer_id": order_dict['customer_id'], "name": order_dict['customer_name'], "email": order_dict['customer_email'] }, "status": order_dict['status'], "shipping_address": order_dict.get('shipping_address', 'N/A'), "shipping_country": order_dict.get('shipping_country', 'N/A'), "destination_country": order_dict.get('destination_country', 'N/A'), "items": items_list, "history": history_list } } except Exception as e: return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def place_new_order( customer_id: int, items: List[Dict[str, Any]], shipping_address: str, shipping_country: str, destination_country: str, previous_order_id: Optional[int] = None ) -> Dict[str, Any]: """ Place a new order in the ERP system, after ensuring there are no global disruptions affecting the shipment. Args: customer_id (int): ID of the customer placing the order items (List[Dict]): List of items to order, each with product_id and quantity shipping_address (str): Shipping address for the order shipping_country (str): Country where the order will be shipped from destination_country (str): Country where the order will be delivered to previous_order_id (int, optional): ID of a previous order this is replacing Returns: dict: New order information """ try: conn, db_type = get_db_connection() if db_type == "postgres": cursor = conn.cursor(cursor_factory=RealDictCursor) param_placeholder = "%s" returning_clause = "RETURNING order_id" date_interval = "CURRENT_DATE + INTERVAL '7 days'" due_date_interval = "CURRENT_DATE + INTERVAL '30 days'" concat_op = "||" else: # sqlite cursor = conn.cursor() param_placeholder = "?" returning_clause = "" date_interval = "date('now', '+7 days')" due_date_interval = "date('now', '+30 days')" concat_op = "||" # Calculate total amount based on the total items and their prices total_amount = 0 for item in items: product_query = f"SELECT price FROM erp_products WHERE product_id = {param_placeholder};" cursor.execute(product_query, [item['product_id']]) product = cursor.fetchone() if not product: return { "success": False, "error": f"Product with ID {item['product_id']} not found" } total_amount += product['price'] * item['quantity'] # Insert new order into the erp_orders table for the customer order_query = f""" INSERT INTO erp_orders ( customer_id, order_date, total_amount, status, shipping_address, shipping_country, destination_country, previous_order_id, estimated_delivery, payment_status ) VALUES ( {param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Processing', {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}, {date_interval}, 'Pending' ) {returning_clause}; """ cursor.execute(order_query, [ customer_id, total_amount, shipping_address, shipping_country, destination_country, previous_order_id ]) if db_type == "postgres": new_order_id = cursor.fetchone()['order_id'] else: # sqlite new_order_id = cursor.lastrowid # Insert order items into erp_order_items table for the new order for item in items: # Get product price cursor.execute(product_query, [item['product_id']]) product = cursor.fetchone() unit_price = product['price'] subtotal = unit_price * item['quantity'] item_query = f""" INSERT INTO erp_order_items ( order_id, product_id, quantity, unit_price, subtotal ) VALUES ( {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder} ); """ cursor.execute(item_query, [ new_order_id, item['product_id'], item['quantity'], unit_price, subtotal ]) # Update product stock update_stock_query = f""" UPDATE erp_products SET stock_quantity = stock_quantity - {param_placeholder} WHERE product_id = {param_placeholder}; """ cursor.execute(update_stock_query, [item['quantity'], item['product_id']]) # Create order history entry into the erp_order_history for the new order history_query = f""" INSERT INTO erp_order_history ( order_id, timestamp, status_change, notes, updated_by ) VALUES ( {param_placeholder}, CURRENT_TIMESTAMP, 'Order Created', 'New order placed', 'System' ); """ cursor.execute(history_query, [new_order_id]) # If this is a replacement order, add a note to the previous order if previous_order_id: prev_order_note_query = f""" INSERT INTO erp_order_history ( order_id, timestamp, status_change, notes, updated_by ) VALUES ( {param_placeholder}, CURRENT_TIMESTAMP, 'Replaced', 'Order replaced by order #' {concat_op} {param_placeholder}, 'System' ); """ cursor.execute(prev_order_note_query, [previous_order_id, new_order_id]) # Generate invoice for the new order invoice_query = f""" INSERT INTO erp_invoices ( order_id, invoice_date, amount, payment_terms, due_date, is_paid, invoice_number ) VALUES ( {param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Net 30', {due_date_interval}, 0, 'INV-' {concat_op} {param_placeholder} ) {returning_clause}; """ cursor.execute(invoice_query, [new_order_id, total_amount, str(new_order_id)]) if db_type == "postgres": invoice_id = cursor.fetchone()['invoice_id'] else: # sqlite invoice_id = cursor.lastrowid conn.commit() # Get the complete new order details cursor.execute(f"SELECT * FROM erp_orders WHERE order_id = {param_placeholder};", [new_order_id]) order = cursor.fetchone() order_dict = order # Format date for response estimated_delivery = order_dict.get('estimated_delivery') if estimated_delivery: if isinstance(estimated_delivery, str): estimated_delivery_str = estimated_delivery else: estimated_delivery_str = estimated_delivery.strftime("%Y-%m-%d") if hasattr(estimated_delivery, 'strftime') else str(estimated_delivery) else: estimated_delivery_str = None # Return JSON formatted response return { "success": True, "order": { "order_id": new_order_id, "invoice_id": invoice_id, "total_amount": float(total_amount), "status": order_dict['status'], "estimated_delivery": estimated_delivery_str, "customer_id": customer_id, "shipping_address": shipping_address, "shipping_country": shipping_country, "destination_country": destination_country, "previous_order_id": previous_order_id } } except Exception as e: if 'conn' in locals(): conn.rollback() return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def cancel_order(order_id: int, reason: str) -> Dict[str, Any]: """ Cancel an existing order in the ERP system. Args: order_id (int): ID of the order to cancel reason (str): Reason for cancellation Returns: dict: Result of the cancellation operation """ try: conn, db_type = get_db_connection() if db_type == "postgres": cursor = conn.cursor(cursor_factory=RealDictCursor) param_placeholder = "%s" else: # sqlite cursor = conn.cursor() param_placeholder = "?" # Check if order exists and can be cancelled check_query = f""" SELECT status, customer_id FROM erp_orders WHERE order_id = {param_placeholder}; """ cursor.execute(check_query, [order_id]) order = cursor.fetchone() if not order: return { "success": False, "error": f"Order with ID {order_id} not found" } if order['status'] in ['Delivered', 'Cancelled']: return { "success": False, "error": f"Cannot cancel order with status '{order['status']}'" } # Get order items to restore stock items_query = f""" SELECT product_id, quantity FROM erp_order_items WHERE order_id = {param_placeholder}; """ cursor.execute(items_query, [order_id]) items = cursor.fetchall() # Update order status to Cancelled update_query = f""" UPDATE erp_orders SET status = 'Cancelled', payment_status = 'Cancelled' WHERE order_id = {param_placeholder}; """ cursor.execute(update_query, [order_id]) # Add entry to order history history_query = f""" INSERT INTO erp_order_history ( order_id, timestamp, status_change, notes, updated_by ) VALUES ( {param_placeholder}, CURRENT_TIMESTAMP, 'Cancelled', {param_placeholder}, 'System' ); """ cursor.execute(history_query, [order_id, f"Order cancelled: {reason}"]) # Restore product stock quantities for item in items: restore_stock_query = f""" UPDATE erp_products SET stock_quantity = stock_quantity + {param_placeholder} WHERE product_id = {param_placeholder}; """ cursor.execute(restore_stock_query, [item['quantity'], item['product_id']]) # Update invoice if exists invoice_query = f""" UPDATE erp_invoices SET is_paid = 0 WHERE order_id = {param_placeholder}; """ cursor.execute(invoice_query, [order_id]) conn.commit() return { "success": True, "message": f"Order #{order_id} has been successfully cancelled", "reason": reason, "items_restored": len(items) } except Exception as e: if 'conn' in locals(): conn.rollback() return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def get_invoice_details(invoice_id: Optional[int] = None, order_id: Optional[int] = None) -> Dict[str, Any]: """ Get invoice details by invoice ID or order ID. Args: invoice_id (int, optional): ID of the invoice order_id (int, optional): ID of the order Returns: dict: Invoice details including customer and order information """ if not invoice_id and not order_id: return { "success": False, "error": "Either invoice_id or order_id must be provided" } try: conn, db_type = get_db_connection() if db_type == "postgres": cursor = conn.cursor(cursor_factory=RealDictCursor) param_placeholder = "%s" else: # sqlite cursor = conn.cursor() param_placeholder = "?" if invoice_id: query = f""" SELECT i.*, o.order_date, o.status as order_status, c.name as customer_name, c.email as customer_email, c.address as customer_address FROM erp_invoices i JOIN erp_orders o ON i.order_id = o.order_id JOIN erp_customers c ON o.customer_id = c.customer_id WHERE i.invoice_id = {param_placeholder}; """ cursor.execute(query, [invoice_id]) else: query = f""" SELECT i.*, o.order_date, o.status as order_status, c.name as customer_name, c.email as customer_email, c.address as customer_address FROM erp_invoices i JOIN erp_orders o ON i.order_id = o.order_id JOIN erp_customers c ON o.customer_id = c.customer_id WHERE i.order_id = {param_placeholder}; """ cursor.execute(query, [order_id]) invoice = cursor.fetchone() if not invoice: return { "success": False, "error": f"Invoice not found for the provided {'invoice_id' if invoice_id else 'order_id'}" } # Get order items items_query = f""" SELECT oi.*, p.product_name, p.sku FROM erp_order_items oi JOIN erp_products p ON oi.product_id = p.product_id WHERE oi.order_id = {param_placeholder}; """ cursor.execute(items_query, [invoice['order_id']]) items = cursor.fetchall() invoice_dict = invoice items_list = items # Format dates for response order_date = invoice_dict.get('order_date') if order_date: if isinstance(order_date, str): order_date_str = order_date else: order_date_str = order_date.strftime("%Y-%m-%d") if hasattr(order_date, 'strftime') else str(order_date) else: order_date_str = None due_date = invoice_dict.get('due_date') if due_date: if isinstance(due_date, str): due_date_str = due_date else: due_date_str = due_date.strftime("%Y-%m-%d") if hasattr(due_date, 'strftime') else str(due_date) else: due_date_str = None # Return JSON formatted response return { "success": True, "invoice": { "invoice_id": invoice_dict['invoice_id'], "invoice_number": invoice_dict.get('invoice_number', ''), "order_id": invoice_dict['order_id'], "order_date": order_date_str, "order_status": invoice_dict['order_status'], "amount": float(invoice_dict['amount']), "due_date": due_date_str, "payment_status": "Paid" if invoice_dict['is_paid'] else "Unpaid", "customer": { "name": invoice_dict['customer_name'], "email": invoice_dict['customer_email'], "address": invoice_dict['customer_address'] }, "items": items_list } } except Exception as e: return { "success": False, "error": str(e) } finally: if 'conn' in locals(): conn.close() @mcp.tool() async def reset_database() -> Dict[str, Any]: """ Reset the ERP database by deleting it and recreating it with fresh data. Returns: dict: Result of the database reset operation """ try: # Get SQLite database path from environment or use default db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite") # Reset the database result = reset_erp_db(db_path) if result: return { "success": True, "message": "Database reset successfully. All tables have been recreated with fresh sample data." } else: return { "success": False, "error": "Failed to reset database. Check server logs for details." } except Exception as e: return { "success": False, "error": f"Error resetting database: {str(e)}" } # Create Gradio interfaces for each function execute_query_interface = gr.Interface( fn=execute_query, inputs=[ gr.Textbox(lines=5, label="SQL Query"), gr.Textbox(label="Parameters (comma-separated)", placeholder="Optional") ], outputs=gr.Textbox(lines=10), title="Execute SQL Query", description="Execute a custom SQL query on the ERP database" ) list_tables_interface = gr.Interface( fn=list_erp_tables, inputs=[], outputs=gr.Textbox(lines=10), title="List ERP Tables", description="List all ERP tables in the database" ) order_status_interface = gr.Interface( fn=get_order_status, inputs=gr.Number(label="Order ID", precision=0), outputs=gr.Textbox(lines=15), title="Get Order Status", description="Get the status of an order by order ID" ) place_order_interface = gr.Interface( fn=place_new_order, inputs=[ gr.Number(label="Customer ID", precision=0), gr.Textbox(label="Product IDs (comma-separated)", placeholder="1, 2, 3"), gr.Textbox(label="Quantities (comma-separated)", placeholder="2, 1, 3"), gr.Textbox(label="Shipping Address"), gr.Textbox(label="Shipping Country"), gr.Textbox(label="Destination Country"), gr.Number(label="Previous Order ID (optional)", precision=0) ], outputs=gr.Textbox(lines=10), title="Place New Order", description="Place a new order in the ERP system" ) cancel_order_interface = gr.Interface( fn=cancel_order, inputs=[ gr.Number(label="Order ID", precision=0), gr.Textbox(label="Reason for Cancellation") ], outputs=gr.Textbox(lines=5), title="Cancel Order", description="Cancel an existing order in the ERP system" ) invoice_details_interface = gr.Interface( fn=get_invoice_details, inputs=[ gr.Number(label="Invoice ID (optional)", precision=0), gr.Number(label="Order ID (optional)", precision=0) ], outputs=gr.Textbox(lines=15), title="Get Invoice Details", description="Get invoice details by invoice ID or order ID" ) reset_database_interface = gr.Interface( fn=reset_database, inputs=[], outputs=gr.Textbox(lines=5), title="Reset Database", description="Reset the ERP database by deleting it and recreating it with fresh data" ) # Create a Gradio TabItem for each interface demo = gr.TabbedInterface( [ execute_query_interface, list_tables_interface, order_status_interface, place_order_interface, cancel_order_interface, invoice_details_interface, reset_database_interface ], [ "Execute Query", "List Tables", "Order Status", "Place Order", "Cancel Order", "Invoice Details", "Reset Database" ], title="ERP System Tools" ) # Launch the demo with MCP server enabled demo.launch(mcp_server=True, ssr_mode=False, share=True)