Spaces:
Build error
Build error
| import gradio as gr | |
| import data_manager | |
| import gemini_client | |
| import re | |
| import os | |
| import pandas as pd # For displaying data in a table | |
| import datetime | |
| # --- Functions copied/adapted from chatbot.py --- | |
| def extract_order_id(query): | |
| """Attempts to extract an order ID (assumed to be a number) from the query.""" | |
| match = re.search(r'\b\d+\b', query) | |
| if match: | |
| return int(match.group(0)) | |
| return None | |
| def format_prompt(query, order_details=None, order_items=None, recent_orders=None): | |
| """Formats the prompt for the Gemini API, including context.""" | |
| prompt = f"You are an e-commerce customer support assistant. Answer the user's query based *only* on the provided information. If the information is insufficient, say so.\n\nUser Query: \"{query}\"\n\n" | |
| if order_details: | |
| prompt += "--- Order Information ---\n" | |
| prompt += f"Order ID: {order_details.get('order_id')}\n" | |
| prompt += f"Customer: {order_details.get('user_name')} ({order_details.get('email')})\n" | |
| prompt += f"Date: {order_details.get('order_date')}\n" | |
| prompt += f"Status: {order_details.get('status')}\n" | |
| prompt += f"Total: ${order_details.get('total_amount'):.2f}\n" | |
| if order_items: | |
| prompt += "\n--- Items in Order ---\n" | |
| for item in order_items: | |
| prompt += f"- {item.get('product_name')} (Qty: {item.get('quantity')}, Price/Unit: ${item.get('price_per_unit'):.2f})\n" | |
| if recent_orders: | |
| prompt += "\n--- Recent Orders ---\n" | |
| for order in recent_orders: | |
| prompt += f"- Order ID: {order.get('order_id')}, Date: {order.get('order_date')}, Status: {order.get('status')}, Total: ${order.get('total_amount'):.2f}\n" | |
| prompt += "\n---\nAssistant Response:" | |
| return prompt | |
| # --- Data Management Functions --- | |
| def get_product_dataframe(): | |
| """Fetches all products and returns them as a Pandas DataFrame.""" | |
| products = data_manager.get_all_products() | |
| if products: | |
| return pd.DataFrame(products) | |
| else: | |
| return pd.DataFrame({"message": ["No products found"]}) # Empty DataFrame with a message | |
| def get_user_dataframe(): | |
| """Fetches all users and returns them as a Pandas DataFrame.""" | |
| users = data_manager.get_all_users() | |
| if users: | |
| return pd.DataFrame(users) | |
| else: | |
| return pd.DataFrame({"message": ["No users found"]}) | |
| def get_order_dataframe(): | |
| """Fetches all orders and returns them as a Pandas DataFrame.""" | |
| orders = data_manager.get_all_orders() | |
| if orders: | |
| return pd.DataFrame(orders) | |
| else: | |
| return pd.DataFrame({"message": ["No orders found"]}) | |
| def get_order_item_dataframe(): | |
| """Fetches all order items and returns them as a Pandas DataFrame.""" | |
| order_items = data_manager.get_all_order_details() | |
| if order_items: | |
| return pd.DataFrame(order_items) | |
| else: | |
| return pd.DataFrame({"message": ["No order items found"]}) | |
| def add_product_interface(name, description, price, stock): | |
| """Adds a product using the data_manager function and returns a message.""" | |
| try: | |
| price = float(price) | |
| stock = int(stock) | |
| if not all([name, description, price >= 0, stock >= 0]): | |
| return "Error: Please fill all fields correctly. Price and stock must be non-negative.", get_product_dataframe() | |
| product_id = data_manager.add_product(name, description, price, stock) | |
| if product_id: | |
| return "Product added successfully. Refreshing data...", get_product_dataframe() | |
| else: | |
| return "Error: Could not add product.", get_product_dataframe() | |
| except ValueError: | |
| return "Error: Invalid price or stock value.", get_product_dataframe() | |
| def remove_product_interface(product_id): | |
| """Removes a product using the data_manager function and returns a message.""" | |
| try: | |
| product_id = int(product_id) | |
| if product_id <= 0: | |
| return "Error: Product ID must be a positive integer.", get_product_dataframe() | |
| removed = data_manager.remove_product(product_id) | |
| if removed: | |
| return f"Product with ID {product_id} removed successfully.", get_product_dataframe() | |
| else: | |
| return f"Error: Could not remove product with ID {product_id} (product not found or in use).", get_product_dataframe() | |
| except ValueError: | |
| return "Error: Invalid product ID.", get_product_dataframe() | |
| def add_user_interface(name, email, address): | |
| """Adds a user using the data_manager function.""" | |
| user_id = data_manager.add_user(name, email, address) | |
| if user_id: | |
| return f"User added successfully with ID: {user_id}", get_user_dataframe() | |
| else: | |
| return "Error: Could not add user.", get_user_dataframe() | |
| def add_order_interface(user_id, order_date, status, total_amount): | |
| """Adds an order using the data_manager function.""" | |
| try: | |
| user_id = int(user_id) | |
| total_amount = float(total_amount) | |
| datetime.datetime.strptime(order_date, '%Y-%m-%d') # Validate date format | |
| if status not in ['Pending', 'Processing', 'Shipped', 'Delivered', 'Cancelled']: | |
| return "Error: Invalid order status.", get_order_dataframe() | |
| order_id = data_manager.add_order(user_id, order_date, status, total_amount) | |
| if order_id: | |
| return f"Order added successfully with ID: {order_id}", get_order_dataframe() | |
| else: | |
| return "Error: Could not add order.", get_order_dataframe() | |
| except ValueError: | |
| return "Error: Invalid user ID or total amount.", get_order_dataframe() | |
| except ValueError: | |
| return "Error: Invalid date format. Use YYYY-MM-DD.", get_order_dataframe() | |
| def add_order_item_interface(order_id, product_id, quantity, price_per_unit): | |
| """Adds an order item using the data_manager function.""" | |
| try: | |
| order_id = int(order_id) | |
| product_id = int(product_id) | |
| quantity = int(quantity) | |
| price_per_unit = float(price_per_unit) | |
| order_item_id = data_manager.add_order_item(order_id, product_id, quantity, price_per_unit) | |
| if order_item_id: | |
| return f"Order item added successfully with ID: {order_item_id}", get_order_item_dataframe() | |
| else: | |
| return "Error: Could not add order item.", get_order_item_dataframe() | |
| except ValueError: | |
| return "Error: Invalid order ID, product ID, quantity, or price per unit.", get_order_item_dataframe() | |
| def remove_user_interface(user_id): | |
| """Removes a user using the data_manager function and returns a message.""" | |
| try: | |
| user_id = int(user_id) | |
| if user_id <= 0: | |
| return "Error: User ID must be a positive integer.", get_user_dataframe() | |
| removed = data_manager.remove_user(user_id) | |
| if removed: | |
| return f"User with ID {user_id} removed successfully.", get_user_dataframe() | |
| else: | |
| return f"Error: Could not remove user with ID {user_id} (user not found or in use).", get_user_dataframe() | |
| except ValueError: | |
| return "Error: Invalid user ID.", get_user_dataframe() | |
| def remove_order_interface(order_id): | |
| """Removes an order using the data_manager function and returns a message.""" | |
| try: | |
| order_id = int(order_id) | |
| if order_id <= 0: | |
| return "Error: Order ID must be a positive integer.", get_order_dataframe() | |
| removed = data_manager.remove_order(order_id) | |
| if removed: | |
| return f"Order with ID {order_id} removed successfully.", get_order_dataframe() | |
| else: | |
| return f"Error: Could not remove order with ID {order_id} (order not found or in use).", get_order_dataframe() | |
| except ValueError: | |
| return "Error: Invalid order ID.", get_order_dataframe() | |
| def remove_order_item_interface(order_id, product_id): | |
| """Removes an order item using the data_manager function and returns a message.""" | |
| try: | |
| order_id = int(order_id) | |
| product_id = int(product_id) | |
| if order_id <= 0 or product_id <= 0: | |
| return "Error: IDs must be positive integers.", get_order_item_dataframe() | |
| conn = data_manager.get_db_connection() | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute("DELETE FROM order_items WHERE order_id = ? AND product_id = ?", | |
| (order_id, product_id)) | |
| conn.commit() | |
| removed = cursor.rowcount > 0 | |
| conn.close() | |
| if removed: | |
| return f"Order item (Order: {order_id}, Product: {product_id}) removed successfully.", get_order_item_dataframe() | |
| else: | |
| return f"Error: Could not remove order item (Order: {order_id}, Product: {product_id}) - not found.", get_order_item_dataframe() | |
| except sqlite3.Error as e: | |
| conn.rollback() | |
| conn.close() | |
| return f"Database error: {e}", get_order_item_dataframe() | |
| except ValueError: | |
| return "Error: Invalid order ID or product ID.", get_order_item_dataframe() | |
| # --- Chatbot Functions --- | |
| def get_chatbot_response(user_email, query): | |
| """Handles the chatbot logic for the Gradio interface.""" | |
| if not user_email: | |
| return "Error: Please enter your email address." | |
| user_info = data_manager.get_user_by_email(user_email) | |
| if not user_info: | |
| return f"Error: User with email {user_email} not found." | |
| user_id = user_info['user_id'] | |
| order_id = extract_order_id(query) | |
| order_details = None | |
| order_items = None | |
| recent_orders = None | |
| response_message = "" | |
| try: | |
| # Determine context based on query and extracted info | |
| if order_id: | |
| order_details = data_manager.get_order_details(order_id) | |
| if order_details: | |
| # Security check: Ensure the order belongs to the logged-in user | |
| if order_details['email'] != user_email: | |
| response_message = f"Order {order_id} does not belong to user {user_email}." | |
| order_details = None # Clear details if not authorized | |
| else: | |
| # Fetch items only if order details are valid and authorized | |
| order_items = data_manager.get_order_items(order_id) | |
| else: | |
| response_message = f"Order ID {order_id} not found." | |
| # Check for recent orders query (simple keyword check) | |
| elif "recent orders" in query.lower() or "my orders" in query.lower(): | |
| recent_orders = data_manager.get_user_recent_orders(user_id) | |
| if not recent_orders: | |
| response_message = "You have no recent orders." | |
| # Construct and send prompt if we have relevant context | |
| if order_details or recent_orders: | |
| prompt = format_prompt(query, order_details, order_items, recent_orders) | |
| # print("\nAssistant thinking...") # No console print in Gradio func | |
| response = gemini_client.get_gemini_response(prompt) | |
| response_message = response | |
| elif order_id and not order_details and not response_message: | |
| # Already handled the "not found" or "not authorized" message above | |
| pass # response_message already set | |
| elif not response_message: # If no context found and no error message yet | |
| response_message = "Please specify an order ID (e.g., 'status for order 123') or ask about 'recent orders'." | |
| return response_message | |
| except FileNotFoundError as e: | |
| return f"Error: Database not found at {data_manager.DB_PATH}. Please ensure it exists." | |
| except Exception as e: | |
| # Log the full error for debugging if needed, but return a user-friendly message | |
| print(f"An unexpected error occurred: {e}") # Log to console where Gradio runs | |
| return f"An unexpected error occurred. Please try again later." | |
| # --- Database Check --- | |
| if not os.path.exists(data_manager.DB_PATH): | |
| print(f"Database not found at {data_manager.DB_PATH}.") | |
| print("Please run 'python generate_data.py' first to create and populate the database.") | |
| # Optionally, exit or prevent Gradio launch | |
| # exit() # Uncomment to stop if DB is missing | |
| # --- Gradio Interface --- | |
| with gr.Blocks(title="E-commerce Management") as iface: | |
| gr.Markdown("# E-commerce Chatbot and Data Management") | |
| with gr.Tab("Chatbot"): | |
| email_input = gr.Textbox(label="Your Email Address") | |
| query_input = gr.Textbox(label="Your Query", placeholder="e.g., 'What is the status of order 1?' or 'Show my recent orders'") | |
| chatbot_output = gr.Textbox(label="Assistant Response") | |
| chatbot_button = gr.Button("Submit") | |
| chatbot_button.click( | |
| fn=get_chatbot_response, | |
| inputs=[email_input, query_input], | |
| outputs=chatbot_output | |
| ) | |
| with gr.Tab("Data Management"): | |
| with gr.Tabs(): | |
| with gr.Tab("Products"): | |
| gr.Markdown("## Product Data") | |
| product_table = gr.DataFrame(get_product_dataframe(), interactive=False, label="Current Products") | |
| with gr.Accordion("Add New Product", open=False): | |
| new_name_input = gr.Textbox(label="Product Name") | |
| new_description_input = gr.Textbox(label="Description") | |
| new_price_input = gr.Number(label="Price") | |
| new_stock_input = gr.Number(label="Stock") | |
| add_product_button = gr.Button("Add Product") | |
| add_product_output = gr.Textbox() | |
| add_product_button.click( | |
| fn=add_product_interface, | |
| inputs=[new_name_input, new_description_input, new_price_input, new_stock_input], | |
| outputs=[add_product_output, product_table] | |
| ) | |
| with gr.Accordion("Remove Product", open=False): | |
| remove_id_input = gr.Number(label="Product ID to Remove") | |
| remove_product_button = gr.Button("Remove Product") | |
| remove_product_output = gr.Textbox() | |
| remove_product_button.click( | |
| fn=remove_product_interface, | |
| inputs=[remove_id_input], | |
| outputs=[remove_product_output, product_table] | |
| ) | |
| with gr.Tab("Users"): | |
| gr.Markdown("## User Data") | |
| user_table = gr.DataFrame(get_user_dataframe(), interactive=False, label="Current Users") | |
| with gr.Accordion("Add User", open=False): | |
| new_user_name_input = gr.Textbox(label="User Name") | |
| new_user_email_input = gr.Textbox(label="Email") | |
| new_user_address_input = gr.Textbox(label="Address") | |
| add_user_button = gr.Button("Add User") | |
| add_user_output = gr.Textbox() | |
| add_user_button.click( | |
| fn=add_user_interface, | |
| inputs=[new_user_name_input, new_user_email_input, new_user_address_input], | |
| outputs=[add_user_output, user_table] | |
| ) | |
| with gr.Accordion("Remove User", open=False): | |
| remove_user_id_input = gr.Number(label="User ID to Remove") | |
| remove_user_button = gr.Button("Remove User") | |
| remove_user_output = gr.Textbox() | |
| remove_user_button.click( | |
| fn=remove_user_interface, | |
| inputs=[remove_user_id_input], | |
| outputs=[remove_user_output, user_table] | |
| ) | |
| with gr.Tab("Orders"): | |
| gr.Markdown("## Order Data") | |
| order_table = gr.DataFrame(get_order_dataframe(), interactive=False, label="Current Orders") | |
| with gr.Accordion("Add Order", open=False): | |
| new_order_user_id_input = gr.Number(label="User ID") | |
| new_order_date_input = gr.Textbox(label="Order Date (YYYY-MM-DD)") | |
| new_order_status_input = gr.Dropdown(label="Status", choices=['Pending', 'Processing', 'Shipped', 'Delivered', 'Cancelled']) | |
| new_order_total_amount_input = gr.Number(label="Total Amount") | |
| add_order_button = gr.Button("Add Order") | |
| add_order_output = gr.Textbox() | |
| add_order_button.click( | |
| fn=add_order_interface, | |
| inputs=[new_order_user_id_input, new_order_date_input, new_order_status_input, new_order_total_amount_input], | |
| outputs=[add_order_output, order_table] | |
| ) | |
| with gr.Accordion("Remove Order", open=False): | |
| remove_order_id_input = gr.Number(label="Order ID to Remove") | |
| remove_order_button = gr.Button("Remove Order") | |
| remove_order_output = gr.Textbox() | |
| remove_order_button.click( | |
| fn=remove_order_interface, | |
| inputs=[remove_order_id_input], | |
| outputs=[remove_order_output, order_table] | |
| ) | |
| with gr.Tab("Order Items"): | |
| gr.Markdown("## Order Item Data") | |
| order_item_table = gr.DataFrame(get_order_item_dataframe(), interactive=False, label="Current Order Items") | |
| with gr.Accordion("Add Order Item", open=False): | |
| new_order_item_order_id_input = gr.Number(label="Order ID") | |
| new_order_item_product_id_input = gr.Number(label="Product ID") | |
| new_order_item_quantity_input = gr.Number(label="Quantity") | |
| new_order_item_price_per_unit_input = gr.Number(label="Price per Unit") | |
| add_order_item_button = gr.Button("Add Order Item") | |
| add_order_item_output = gr.Textbox() | |
| add_order_item_button.click( | |
| fn=add_order_item_interface, | |
| inputs=[new_order_item_order_id_input, new_order_item_product_id_input, new_order_item_quantity_input, new_order_item_price_per_unit_input], | |
| outputs=[add_order_item_output, order_item_table] | |
| ) | |
| with gr.Accordion("Remove Order Item", open=False): | |
| remove_order_id_input = gr.Number(label="Order ID") | |
| remove_product_id_input = gr.Number(label="Product ID") | |
| remove_order_item_button = gr.Button("Remove Order Item") | |
| remove_order_item_output = gr.Textbox() | |
| remove_order_item_button.click( | |
| fn=remove_order_item_interface, | |
| inputs=[remove_order_id_input, remove_product_id_input], | |
| outputs=[remove_order_item_output, order_item_table] | |
| ) | |
| if __name__ == "__main__": | |
| if os.path.exists(data_manager.DB_PATH): | |
| print("Launching Gradio interface...") | |
| iface.launch(share=True) | |
| else: | |
| print("Gradio interface not launched because the database is missing.") | |