| | |
| | import os |
| | import sys |
| | from typing import TypedDict, List, Union |
| |
|
| | |
| | sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) |
| |
|
| | import gradio as gr |
| | from typing import Dict, Any, List, Optional |
| | from dotenv import load_dotenv |
| | from datetime import datetime, date |
| | import pathlib |
| |
|
| | |
| | import sqlite3 |
| | try: |
| | import psycopg2 |
| | from psycopg2.extras import RealDictCursor |
| | POSTGRES_AVAILABLE = True |
| | except ImportError: |
| | POSTGRES_AVAILABLE = False |
| |
|
| | |
| | from data.utils.erp_db_init import init_sqlite_db |
| | from data.utils.reset_erp_db import reset_erp_db |
| |
|
| | from mcp.server.fastmcp import FastMCP |
| |
|
| | load_dotenv() |
| |
|
| | mcp = FastMCP("ERP Database") |
| |
|
| | |
| | def dict_factory(cursor, row): |
| | d = {} |
| | for idx, col in enumerate(cursor.description): |
| | d[col[0]] = row[idx] |
| | return d |
| |
|
| | |
| | def serialize_dates(obj): |
| | if isinstance(obj, (date, datetime)): |
| | return obj.isoformat() |
| | return obj |
| |
|
| | def get_db_connection(): |
| | """Get database connection based on configuration.""" |
| | |
| | db_type = os.getenv("ERP_DB_TYPE", "sqlite").lower() |
| | |
| | if db_type == "postgres" and POSTGRES_AVAILABLE: |
| | try: |
| | conn = psycopg2.connect( |
| | host=os.getenv("POSTGRES_HOST", "localhost"), |
| | database=os.getenv("POSTGRES_DB", "erp_db"), |
| | user=os.getenv("POSTGRES_USER", "postgres"), |
| | password=os.getenv("POSTGRES_PASSWORD", ""), |
| | port=os.getenv("POSTGRES_PORT", "5432") |
| | ) |
| | return conn, "postgres" |
| | except Exception as e: |
| | raise Exception(f"PostgreSQL connection failed: {str(e)}") |
| | else: |
| | try: |
| | |
| | db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite") |
| | |
| | |
| | db_dir = pathlib.Path(os.path.dirname(db_path)) |
| | if not db_dir.exists() or not pathlib.Path(db_path).exists(): |
| | init_sqlite_db(db_path) |
| | |
| | conn = sqlite3.connect(db_path) |
| | conn.row_factory = dict_factory |
| | |
| | |
| | conn.execute("PRAGMA foreign_keys = ON") |
| | |
| | return conn, "sqlite" |
| | except Exception as e: |
| | raise Exception(f"SQLite connection failed: {str(e)}") |
| |
|
| | @mcp.tool() |
| | async def execute_query(query: str, params: Optional[List] = None) -> Dict[str, Any]: |
| | """ |
| | Execute a custom SQL query on the ERP database. |
| | Only SELECT statements are allowed for security reasons. |
| | |
| | Args: |
| | query (str): SQL query to execute (must be a SELECT statement) |
| | params (List, optional): Parameters for parameterized queries |
| | |
| | Returns: |
| | dict: Query results or error message |
| | """ |
| | try: |
| | |
| | if not query.strip().upper().startswith('SELECT'): |
| | return { |
| | "success": False, |
| | "error": "Only SELECT statements are allowed for security reasons." |
| | } |
| | |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | else: |
| | cursor = conn.cursor() |
| | |
| | if params: |
| | cursor.execute(query, params) |
| | else: |
| | cursor.execute(query) |
| | |
| | results = cursor.fetchall() |
| | |
| | if db_type == "postgres": |
| | results = [dict(row) for row in results] |
| | if not results: |
| | return "No results found" |
| | return results |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def list_erp_tables() -> Dict[str, Any]: |
| | """ |
| | List all ERP tables in the database. |
| | |
| | Returns: |
| | dict: List of ERP table names |
| | """ |
| | try: |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | query = """ |
| | SELECT table_name |
| | FROM information_schema.tables |
| | WHERE table_schema = 'public' |
| | AND table_name LIKE 'erp_%' |
| | ORDER BY table_name; |
| | """ |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | cursor.execute(query) |
| | results = cursor.fetchall() |
| | table_names = [row['table_name'] for row in results] |
| | else: |
| | query = """ |
| | SELECT name as table_name |
| | FROM sqlite_master |
| | WHERE type='table' AND name LIKE 'erp_%' |
| | ORDER BY name; |
| | """ |
| | cursor = conn.cursor() |
| | cursor.execute(query) |
| | results = cursor.fetchall() |
| | table_names = [row['table_name'] for row in results] |
| | |
| | if not table_names: |
| | return { |
| | "success": False, |
| | "message": "No ERP tables found", |
| | "tables": [] |
| | } |
| | return { |
| | "success": True, |
| | "message": f"Found {len(table_names)} tables", |
| | "count": len(table_names), |
| | "tables": table_names |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def get_order_status(order_id: int) -> Dict[str, Any]: |
| | """ |
| | Get the status of an order by order ID. |
| | |
| | Args: |
| | order_id (int): The ID of the order to check |
| | |
| | Returns: |
| | dict: Order status information including shipping and destination countries |
| | """ |
| | try: |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | param_placeholder = "%s" |
| | else: |
| | cursor = conn.cursor() |
| | param_placeholder = "?" |
| | |
| | query = f""" |
| | SELECT o.*, c.name as customer_name, c.email as customer_email, |
| | o.shipping_country, o.destination_country |
| | FROM erp_orders o |
| | JOIN erp_customers c ON o.customer_id = c.customer_id |
| | WHERE o.order_id = {param_placeholder}; |
| | """ |
| | |
| | cursor.execute(query, [order_id]) |
| | order = cursor.fetchone() |
| | |
| | if not order: |
| | return { |
| | "success": False, |
| | "message": f"Order with ID {order_id} not found" |
| | } |
| | |
| | |
| | items_query = f""" |
| | SELECT oi.order_item_id, oi.order_id, oi.product_id, oi.quantity, oi.unit_price, oi.subtotal, |
| | p.product_name, p.sku |
| | FROM erp_order_items oi |
| | JOIN erp_products p ON oi.product_id = p.product_id |
| | WHERE oi.order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(items_query, [order_id]) |
| | items = cursor.fetchall() |
| | |
| | |
| | history_query = f""" |
| | SELECT * FROM erp_order_history |
| | WHERE order_id = {param_placeholder} |
| | ORDER BY timestamp DESC; |
| | """ |
| | cursor.execute(history_query, [order_id]) |
| | history = cursor.fetchall() |
| | |
| | order_dict = order if db_type == "sqlite" else dict(order) |
| | items_list = items if db_type == "sqlite" else [dict(item) for item in items] |
| | history_list = history if db_type == "sqlite" else [dict(entry) for entry in history] |
| | |
| | |
| | return { |
| | "success": True, |
| | "order": { |
| | "order_id": order_id, |
| | "customer": { |
| | "customer_id": order_dict['customer_id'], |
| | "name": order_dict['customer_name'], |
| | "email": order_dict['customer_email'] |
| | }, |
| | "status": order_dict['status'], |
| | "shipping_address": order_dict.get('shipping_address', 'N/A'), |
| | "shipping_country": order_dict.get('shipping_country', 'N/A'), |
| | "destination_country": order_dict.get('destination_country', 'N/A'), |
| | "items": items_list, |
| | "history": history_list |
| | } |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def place_new_order( |
| | customer_id: int, |
| | items: List[Dict[str, Any]], |
| | shipping_address: str, |
| | shipping_country: str, |
| | destination_country: str, |
| | previous_order_id: Optional[int] = None |
| | ) -> Dict[str, Any]: |
| | """ |
| | Place a new order in the ERP system, after ensuring there are no global disruptions affecting the shipment. |
| | |
| | Args: |
| | customer_id (int): ID of the customer placing the order |
| | items (List[Dict]): List of items to order, each with product_id and quantity |
| | shipping_address (str): Shipping address for the order |
| | shipping_country (str): Country where the order will be shipped from |
| | destination_country (str): Country where the order will be delivered to |
| | previous_order_id (int, optional): ID of a previous order this is replacing |
| | |
| | Returns: |
| | dict: New order information |
| | """ |
| | try: |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | param_placeholder = "%s" |
| | returning_clause = "RETURNING order_id" |
| | date_interval = "CURRENT_DATE + INTERVAL '7 days'" |
| | due_date_interval = "CURRENT_DATE + INTERVAL '30 days'" |
| | concat_op = "||" |
| | else: |
| | cursor = conn.cursor() |
| | param_placeholder = "?" |
| | returning_clause = "" |
| | date_interval = "date('now', '+7 days')" |
| | due_date_interval = "date('now', '+30 days')" |
| | concat_op = "||" |
| | |
| | |
| | total_amount = 0 |
| | for item in items: |
| | product_query = f"SELECT price FROM erp_products WHERE product_id = {param_placeholder};" |
| | cursor.execute(product_query, [item['product_id']]) |
| | product = cursor.fetchone() |
| | if not product: |
| | return { |
| | "success": False, |
| | "error": f"Product with ID {item['product_id']} not found" |
| | } |
| | total_amount += product['price'] * item['quantity'] |
| | |
| | |
| | order_query = f""" |
| | INSERT INTO erp_orders ( |
| | customer_id, order_date, total_amount, status, |
| | shipping_address, shipping_country, destination_country, previous_order_id, |
| | estimated_delivery, payment_status |
| | ) VALUES ( |
| | {param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Processing', |
| | {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}, |
| | {date_interval}, 'Pending' |
| | ) {returning_clause}; |
| | """ |
| | |
| | cursor.execute(order_query, [ |
| | customer_id, total_amount, shipping_address, shipping_country, destination_country, previous_order_id |
| | ]) |
| | |
| | if db_type == "postgres": |
| | new_order_id = cursor.fetchone()['order_id'] |
| | else: |
| | new_order_id = cursor.lastrowid |
| | |
| | |
| | for item in items: |
| | |
| | cursor.execute(product_query, [item['product_id']]) |
| | product = cursor.fetchone() |
| | unit_price = product['price'] |
| | subtotal = unit_price * item['quantity'] |
| | |
| | item_query = f""" |
| | INSERT INTO erp_order_items ( |
| | order_id, product_id, quantity, unit_price, subtotal |
| | ) VALUES ( |
| | {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder}, {param_placeholder} |
| | ); |
| | """ |
| | cursor.execute(item_query, [ |
| | new_order_id, item['product_id'], item['quantity'], |
| | unit_price, subtotal |
| | ]) |
| | |
| | |
| | update_stock_query = f""" |
| | UPDATE erp_products |
| | SET stock_quantity = stock_quantity - {param_placeholder} |
| | WHERE product_id = {param_placeholder}; |
| | """ |
| | cursor.execute(update_stock_query, [item['quantity'], item['product_id']]) |
| | |
| | |
| | history_query = f""" |
| | INSERT INTO erp_order_history ( |
| | order_id, timestamp, status_change, notes, updated_by |
| | ) VALUES ( |
| | {param_placeholder}, CURRENT_TIMESTAMP, 'Order Created', 'New order placed', 'System' |
| | ); |
| | """ |
| | cursor.execute(history_query, [new_order_id]) |
| | |
| | |
| | if previous_order_id: |
| | prev_order_note_query = f""" |
| | INSERT INTO erp_order_history ( |
| | order_id, timestamp, status_change, notes, updated_by |
| | ) VALUES ( |
| | {param_placeholder}, CURRENT_TIMESTAMP, 'Replaced', 'Order replaced by order #' {concat_op} {param_placeholder}, 'System' |
| | ); |
| | """ |
| | cursor.execute(prev_order_note_query, [previous_order_id, new_order_id]) |
| | |
| | |
| | invoice_query = f""" |
| | INSERT INTO erp_invoices ( |
| | order_id, invoice_date, amount, payment_terms, due_date, is_paid, invoice_number |
| | ) VALUES ( |
| | {param_placeholder}, CURRENT_DATE, {param_placeholder}, 'Net 30', {due_date_interval}, 0, 'INV-' {concat_op} {param_placeholder} |
| | ) {returning_clause}; |
| | """ |
| | cursor.execute(invoice_query, [new_order_id, total_amount, str(new_order_id)]) |
| | |
| | if db_type == "postgres": |
| | invoice_id = cursor.fetchone()['invoice_id'] |
| | else: |
| | invoice_id = cursor.lastrowid |
| | |
| | conn.commit() |
| | |
| | |
| | cursor.execute(f"SELECT * FROM erp_orders WHERE order_id = {param_placeholder};", [new_order_id]) |
| | order = cursor.fetchone() |
| | |
| | order_dict = order |
| | |
| | |
| | estimated_delivery = order_dict.get('estimated_delivery') |
| | if estimated_delivery: |
| | if isinstance(estimated_delivery, str): |
| | estimated_delivery_str = estimated_delivery |
| | else: |
| | estimated_delivery_str = estimated_delivery.strftime("%Y-%m-%d") if hasattr(estimated_delivery, 'strftime') else str(estimated_delivery) |
| | else: |
| | estimated_delivery_str = None |
| | |
| | |
| | return { |
| | "success": True, |
| | "order": { |
| | "order_id": new_order_id, |
| | "invoice_id": invoice_id, |
| | "total_amount": float(total_amount), |
| | "status": order_dict['status'], |
| | "estimated_delivery": estimated_delivery_str, |
| | "customer_id": customer_id, |
| | "shipping_address": shipping_address, |
| | "shipping_country": shipping_country, |
| | "destination_country": destination_country, |
| | "previous_order_id": previous_order_id |
| | } |
| | } |
| | |
| | except Exception as e: |
| | if 'conn' in locals(): |
| | conn.rollback() |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def cancel_order(order_id: int, reason: str) -> Dict[str, Any]: |
| | """ |
| | Cancel an existing order in the ERP system. |
| | |
| | Args: |
| | order_id (int): ID of the order to cancel |
| | reason (str): Reason for cancellation |
| | |
| | Returns: |
| | dict: Result of the cancellation operation |
| | """ |
| | try: |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | param_placeholder = "%s" |
| | else: |
| | cursor = conn.cursor() |
| | param_placeholder = "?" |
| | |
| | |
| | check_query = f""" |
| | SELECT status, customer_id FROM erp_orders WHERE order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(check_query, [order_id]) |
| | order = cursor.fetchone() |
| | |
| | if not order: |
| | return { |
| | "success": False, |
| | "error": f"Order with ID {order_id} not found" |
| | } |
| | |
| | if order['status'] in ['Delivered', 'Cancelled']: |
| | return { |
| | "success": False, |
| | "error": f"Cannot cancel order with status '{order['status']}'" |
| | } |
| | |
| | |
| | items_query = f""" |
| | SELECT product_id, quantity FROM erp_order_items WHERE order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(items_query, [order_id]) |
| | items = cursor.fetchall() |
| | |
| | |
| | update_query = f""" |
| | UPDATE erp_orders SET status = 'Cancelled', payment_status = 'Cancelled' |
| | WHERE order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(update_query, [order_id]) |
| | |
| | |
| | history_query = f""" |
| | INSERT INTO erp_order_history ( |
| | order_id, timestamp, status_change, notes, updated_by |
| | ) VALUES ( |
| | {param_placeholder}, CURRENT_TIMESTAMP, 'Cancelled', {param_placeholder}, 'System' |
| | ); |
| | """ |
| | cursor.execute(history_query, [order_id, f"Order cancelled: {reason}"]) |
| | |
| | |
| | for item in items: |
| | restore_stock_query = f""" |
| | UPDATE erp_products |
| | SET stock_quantity = stock_quantity + {param_placeholder} |
| | WHERE product_id = {param_placeholder}; |
| | """ |
| | cursor.execute(restore_stock_query, [item['quantity'], item['product_id']]) |
| | |
| | |
| | invoice_query = f""" |
| | UPDATE erp_invoices |
| | SET is_paid = 0 |
| | WHERE order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(invoice_query, [order_id]) |
| | |
| | conn.commit() |
| | |
| | return { |
| | "success": True, |
| | "message": f"Order #{order_id} has been successfully cancelled", |
| | "reason": reason, |
| | "items_restored": len(items) |
| | } |
| | |
| | except Exception as e: |
| | if 'conn' in locals(): |
| | conn.rollback() |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def get_invoice_details(invoice_id: Optional[int] = None, order_id: Optional[int] = None) -> Dict[str, Any]: |
| | """ |
| | Get invoice details by invoice ID or order ID. |
| | |
| | Args: |
| | invoice_id (int, optional): ID of the invoice |
| | order_id (int, optional): ID of the order |
| | |
| | Returns: |
| | dict: Invoice details including customer and order information |
| | """ |
| | if not invoice_id and not order_id: |
| | return { |
| | "success": False, |
| | "error": "Either invoice_id or order_id must be provided" |
| | } |
| | |
| | try: |
| | conn, db_type = get_db_connection() |
| | |
| | if db_type == "postgres": |
| | cursor = conn.cursor(cursor_factory=RealDictCursor) |
| | param_placeholder = "%s" |
| | else: |
| | cursor = conn.cursor() |
| | param_placeholder = "?" |
| | |
| | if invoice_id: |
| | query = f""" |
| | SELECT i.*, o.order_date, o.status as order_status, |
| | c.name as customer_name, c.email as customer_email, c.address as customer_address |
| | FROM erp_invoices i |
| | JOIN erp_orders o ON i.order_id = o.order_id |
| | JOIN erp_customers c ON o.customer_id = c.customer_id |
| | WHERE i.invoice_id = {param_placeholder}; |
| | """ |
| | cursor.execute(query, [invoice_id]) |
| | else: |
| | query = f""" |
| | SELECT i.*, o.order_date, o.status as order_status, |
| | c.name as customer_name, c.email as customer_email, c.address as customer_address |
| | FROM erp_invoices i |
| | JOIN erp_orders o ON i.order_id = o.order_id |
| | JOIN erp_customers c ON o.customer_id = c.customer_id |
| | WHERE i.order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(query, [order_id]) |
| | |
| | invoice = cursor.fetchone() |
| | |
| | if not invoice: |
| | return { |
| | "success": False, |
| | "error": f"Invoice not found for the provided {'invoice_id' if invoice_id else 'order_id'}" |
| | } |
| | |
| | |
| | items_query = f""" |
| | SELECT oi.*, p.product_name, p.sku |
| | FROM erp_order_items oi |
| | JOIN erp_products p ON oi.product_id = p.product_id |
| | WHERE oi.order_id = {param_placeholder}; |
| | """ |
| | cursor.execute(items_query, [invoice['order_id']]) |
| | items = cursor.fetchall() |
| | |
| | invoice_dict = invoice |
| | items_list = items |
| | |
| | |
| | order_date = invoice_dict.get('order_date') |
| | if order_date: |
| | if isinstance(order_date, str): |
| | order_date_str = order_date |
| | else: |
| | order_date_str = order_date.strftime("%Y-%m-%d") if hasattr(order_date, 'strftime') else str(order_date) |
| | else: |
| | order_date_str = None |
| | |
| | due_date = invoice_dict.get('due_date') |
| | if due_date: |
| | if isinstance(due_date, str): |
| | due_date_str = due_date |
| | else: |
| | due_date_str = due_date.strftime("%Y-%m-%d") if hasattr(due_date, 'strftime') else str(due_date) |
| | else: |
| | due_date_str = None |
| | |
| | |
| | return { |
| | "success": True, |
| | "invoice": { |
| | "invoice_id": invoice_dict['invoice_id'], |
| | "invoice_number": invoice_dict.get('invoice_number', ''), |
| | "order_id": invoice_dict['order_id'], |
| | "order_date": order_date_str, |
| | "order_status": invoice_dict['order_status'], |
| | "amount": float(invoice_dict['amount']), |
| | "due_date": due_date_str, |
| | "payment_status": "Paid" if invoice_dict['is_paid'] else "Unpaid", |
| | "customer": { |
| | "name": invoice_dict['customer_name'], |
| | "email": invoice_dict['customer_email'], |
| | "address": invoice_dict['customer_address'] |
| | }, |
| | "items": items_list |
| | } |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | finally: |
| | if 'conn' in locals(): |
| | conn.close() |
| |
|
| | @mcp.tool() |
| | async def reset_database() -> Dict[str, Any]: |
| | """ |
| | Reset the ERP database by deleting it and recreating it with fresh data. |
| | |
| | Returns: |
| | dict: Result of the database reset operation |
| | """ |
| | try: |
| | |
| | db_path = os.getenv("SQLITE_DB_PATH", "./data/erp_db.sqlite") |
| | |
| | |
| | result = reset_erp_db(db_path) |
| | |
| | if result: |
| | return { |
| | "success": True, |
| | "message": "Database reset successfully. All tables have been recreated with fresh sample data." |
| | } |
| | else: |
| | return { |
| | "success": False, |
| | "error": "Failed to reset database. Check server logs for details." |
| | } |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": f"Error resetting database: {str(e)}" |
| | } |
| |
|
| | |
| | execute_query_interface = gr.Interface( |
| | fn=execute_query, |
| | inputs=[ |
| | gr.Textbox(lines=5, label="SQL Query"), |
| | gr.Textbox(label="Parameters (comma-separated)", placeholder="Optional") |
| | ], |
| | outputs=gr.Textbox(lines=10), |
| | title="Execute SQL Query", |
| | description="Execute a custom SQL query on the ERP database" |
| | ) |
| |
|
| | list_tables_interface = gr.Interface( |
| | fn=list_erp_tables, |
| | inputs=[], |
| | outputs=gr.Textbox(lines=10), |
| | title="List ERP Tables", |
| | description="List all ERP tables in the database" |
| | ) |
| |
|
| | order_status_interface = gr.Interface( |
| | fn=get_order_status, |
| | inputs=gr.Number(label="Order ID", precision=0), |
| | outputs=gr.Textbox(lines=15), |
| | title="Get Order Status", |
| | description="Get the status of an order by order ID" |
| | ) |
| |
|
| | place_order_interface = gr.Interface( |
| | fn=place_new_order, |
| | inputs=[ |
| | gr.Number(label="Customer ID", precision=0), |
| | gr.Textbox(label="Product IDs (comma-separated)", placeholder="1, 2, 3"), |
| | gr.Textbox(label="Quantities (comma-separated)", placeholder="2, 1, 3"), |
| | gr.Textbox(label="Shipping Address"), |
| | gr.Textbox(label="Shipping Country"), |
| | gr.Textbox(label="Destination Country"), |
| | gr.Number(label="Previous Order ID (optional)", precision=0) |
| | ], |
| | outputs=gr.Textbox(lines=10), |
| | title="Place New Order", |
| | description="Place a new order in the ERP system" |
| | ) |
| |
|
| | cancel_order_interface = gr.Interface( |
| | fn=cancel_order, |
| | inputs=[ |
| | gr.Number(label="Order ID", precision=0), |
| | gr.Textbox(label="Reason for Cancellation") |
| | ], |
| | outputs=gr.Textbox(lines=5), |
| | title="Cancel Order", |
| | description="Cancel an existing order in the ERP system" |
| | ) |
| |
|
| | invoice_details_interface = gr.Interface( |
| | fn=get_invoice_details, |
| | inputs=[ |
| | gr.Number(label="Invoice ID (optional)", precision=0), |
| | gr.Number(label="Order ID (optional)", precision=0) |
| | ], |
| | outputs=gr.Textbox(lines=15), |
| | title="Get Invoice Details", |
| | description="Get invoice details by invoice ID or order ID" |
| | ) |
| |
|
| | reset_database_interface = gr.Interface( |
| | fn=reset_database, |
| | inputs=[], |
| | outputs=gr.Textbox(lines=5), |
| | title="Reset Database", |
| | description="Reset the ERP database by deleting it and recreating it with fresh data" |
| | ) |
| |
|
| | |
| | demo = gr.TabbedInterface( |
| | [ |
| | execute_query_interface, |
| | list_tables_interface, |
| | order_status_interface, |
| | place_order_interface, |
| | cancel_order_interface, |
| | invoice_details_interface, |
| | reset_database_interface |
| | ], |
| | [ |
| | "Execute Query", |
| | "List Tables", |
| | "Order Status", |
| | "Place Order", |
| | "Cancel Order", |
| | "Invoice Details", |
| | "Reset Database" |
| | ], |
| | title="ERP System Tools" |
| | ) |
| |
|
| | |
| | demo.launch(mcp_server=True, ssr_mode=False, share=True) |
| |
|