import base64 import json import tempfile from pathlib import Path from typing import Optional import gradio as gr import httpx from extract_msg import Message as MsgMessage from gradio_pdf import PDF # Hardcoded API Gateway URL - configured for external stakeholders API_GATEWAY_URL = "https://email-order-processor-gateway-9jipw94n.ew.gateway.dev/api/v1" def get_auth_headers(api_key: Optional[str] = None) -> dict: """ Get authorization headers for the API Gateway. Args: api_key: API key for Google API Gateway authentication Returns: Dictionary with authorization headers """ headers = {"Content-Type": "application/json"} if api_key: headers["x-api-key"] = api_key else: print("Warning: No API key provided") return headers def extract_pdf_from_msg(msg_file_path: str) -> Optional[str]: """ Extract PDF attachment from .msg file. Args: msg_file_path: Path to the .msg file Returns: Path to the extracted PDF file, or None if no PDF found """ try: email_msg = MsgMessage(msg_file_path) # Look for PDF attachments for attachment in email_msg.attachments: if isinstance(attachment.longFilename, str) and attachment.longFilename.lower().endswith(".pdf"): # Save PDF to temporary file temp_pdf_path = tempfile.mktemp(suffix=".pdf") with open(temp_pdf_path, "wb") as temp_pdf: pdf_data = attachment.data if isinstance(pdf_data, bytes): temp_pdf.write(pdf_data) else: # Handle other data types - this shouldn't happen normally with PDF data print(f"Warning: PDF data is not bytes, type: {type(pdf_data)}") continue return temp_pdf_path return None except Exception as e: print(f"Error extracting PDF from .msg file: {e}") return None async def process_email_file( msg_file: Path, api_base_url: str, api_key: Optional[str] = None ) -> tuple[Optional[str], Optional[str]]: """ Process a .msg email file by sending it to the API. Args: msg_file: Path to the .msg file api_base_url: Base URL for the API api_key: API key for Google API Gateway authentication Returns: Tuple of (success_response, error_message) """ if not msg_file: return None, "Please upload a .msg file" if not api_base_url.strip(): return None, "Please provide a valid API base URL" # Ensure the URL ends with /api/v1 if it doesn't already if not api_base_url.endswith("/api/v1"): api_base_url = api_base_url + "api/v1" if api_base_url.endswith("/") else api_base_url + "/api/v1" try: # Read and encode the file as base64 file_content = msg_file.read_bytes() base64_content = base64.b64encode(file_content).decode("utf-8") # Prepare the request request_data = {"email_msg_base64": base64_content} # Send request to API async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( f"{api_base_url}/process-email", json=request_data, headers=get_auth_headers(api_key) ) if response.status_code == 200: result = response.json() formatted_result = json.dumps(result, indent=2, ensure_ascii=False) return formatted_result, None else: error_detail = ( response.json().get("detail", "Unknown error") if response.headers.get("content-type", "").startswith("application/json") else response.text ) return None, f"API Error ({response.status_code}): {error_detail}" except httpx.TimeoutException: return None, "Request timed out. The file might be too large or the server is busy." except httpx.ConnectError: return ( None, f"Could not connect to API at {api_base_url}. Please check the URL and ensure the server is running.", ) except Exception as e: return None, f"Error processing file: {str(e)}" async def handle_file_upload(msg_file, api_base_url, api_key=None): """Handle file upload and processing.""" if not msg_file: return None, None, "Please upload a .msg file" # Extract PDF from .msg file pdf_path = extract_pdf_from_msg(msg_file) # Process the email via API result, error = await process_email_file(Path(msg_file), api_base_url, api_key) if error: return None, None, error else: return result, pdf_path, "✅ Email processed successfully!" def create_ui(): """Create the Gradio UI.""" with gr.Blocks( title="Email Order Processor", css=""" .upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; margin: 10px 0; } .status-success { color: #28a745; font-weight: bold; } .status-error { color: #dc3545; font-weight: bold; } .pdf-container { border: 1px solid #ddd; border-radius: 8px; padding: 10px; background-color: #f9f9f9; } .comparison-container { border: 1px solid var(--border-color-primary); border-radius: 12px; padding: 20px; margin: 15px 5px; background-color: var(--background-fill-secondary); box-shadow: 0 2px 8px rgba(0,0,0,0.1); } .api-config { background-color: var(--background-fill-secondary); border: 1px solid var(--border-color-primary); border-radius: 8px; padding: 15px; margin-bottom: 20px; } .api-key-input { margin-top: 10px; border: 2px solid var(--color-accent-soft); border-radius: 6px; background-color: var(--background-fill-primary); } /* Custom styling for markdown content */ .address-card { background: var(--background-fill-primary); border: 1px solid var(--border-color-accent); border-radius: 8px; padding: 16px; margin: 8px 0; } .address-card h2 { color: var(--color-accent); margin-top: 0; margin-bottom: 12px; font-size: 1.2em; border-bottom: 2px solid var(--border-color-accent); padding-bottom: 4px; } .address-card p { margin: 6px 0; line-height: 1.4; } .items-section { background: var(--background-fill-primary); border-radius: 8px; padding: 16px; } .items-section h2 { color: var(--color-accent); border-bottom: 2px solid var(--border-color-accent); padding-bottom: 8px; margin-bottom: 16px; } .items-section h3 { background: var(--background-fill-secondary); padding: 8px 12px; border-radius: 6px; margin: 16px 0 8px 0; border-left: 4px solid var(--color-accent); } .items-section hr { border: none; height: 1px; background: var(--border-color-primary); margin: 16px 0; } .stats-section { background: var(--background-fill-primary); border-radius: 8px; padding: 16px; } .stats-section h2 { color: var(--color-accent); border-bottom: 2px solid var(--border-color-accent); padding-bottom: 8px; margin-bottom: 16px; } """, ) as demo: gr.Markdown("# 📧 Email Order Processor") gr.Markdown( """ Upload a **.msg** email file containing purchase order information to extract structured data. **Instructions:** 1. Enter your API key below 2. Select a `.msg` file (Outlook email format) 3. Click 'Process Email' to analyze the file 4. View the extracted purchase order data and original PDF side by side """ ) # API Configuration Section (Simplified for external users) with gr.Row(), gr.Column(elem_classes=["api-config"]): gr.Markdown("### 🔐 Authentication") gr.Markdown("*Connected to: email-order-processor-04i60gfy4hofo.apigateway.ax-logistics-ai.cloud.goog*") # Hidden URL field that always contains the hardcoded API Gateway URL api_base_url = gr.Textbox(value=API_GATEWAY_URL, visible=False) # API Key input - always visible for external users api_key_input = gr.Textbox( label="API Key", type="password", placeholder="Enter your API key to access the service", info="Please enter the API key provided by your administrator", elem_classes=["api-key-input"], ) # Show auth status def update_auth_status(api_key=None): if api_key and api_key.strip(): return "🔐 Authentication: **Active** ✅" else: return "⚠️ **API key required** - Please enter your API key above" auth_status = gr.Markdown("", elem_classes=["status-text"]) # Update auth status when API key changes api_key_input.change(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) # Set initial auth status demo.load(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Upload Email File") msg_file = gr.File( label="Email Message File (.msg)", file_types=[".msg"], file_count="single", elem_classes=["upload-area"], ) process_btn = gr.Button("🚀 Process Email", variant="primary", size="lg") status_text = gr.Markdown("", elem_classes=["status-text"]) with gr.Column(scale=2): gr.Markdown("### Raw API Response") result_json = gr.JSON(label="Purchase Order Information", show_label=False) # Side-by-side comparison section with gr.Row(): with gr.Column(scale=1, elem_classes=["comparison-container"]): gr.Markdown("### 📄 Original PDF Document") pdf_viewer = PDF(label="Purchase Order PDF", height=600, elem_classes=["pdf-container"]) with gr.Column(scale=1, elem_classes=["comparison-container"]): gr.Markdown("### 📊 Extracted Data Summary") # Create formatted display components with gr.Accordion("🏢 Address Information", open=True): address_display = gr.Markdown(label="Address Details") with gr.Accordion("📦 Items Information", open=True): items_display = gr.Markdown(label="Items Details") with gr.Accordion("🔍 Processing Statistics", open=False): stats_display = gr.Markdown("No data processed yet") # Event handlers - simplified for external users def format_extracted_data(json_result): """Format the extracted data for better display.""" if not json_result: return None, None, "No data processed yet" try: # json_result is already a dict from Gradio JSON component data = json.loads(json_result) if isinstance(json_result, str) else json_result # Extract address information address_info = data.get("address", {}) billing_addr = address_info.get("billing_address", {}) shipping_addr = address_info.get("shipping_address", {}) # Format addresses as readable cards def format_address_card(addr_data, title): if not addr_data: return f"
No data available
🏢 Company: {addr_data['company_name']}
") if addr_data.get("department"): lines.append(f"🏬 Department: {addr_data['department']}
") if addr_data.get("contact_person"): lines.append(f"👤 Contact: {addr_data['contact_person']}
") # Build address line address_parts = [] if addr_data.get("street_address"): address_parts.append(addr_data["street_address"]) city_parts = [] if addr_data.get("postal_code"): city_parts.append(addr_data["postal_code"]) if addr_data.get("city"): city_parts.append(addr_data["city"]) if city_parts: address_parts.append(" ".join(city_parts)) if addr_data.get("country"): address_parts.append(addr_data["country"]) if address_parts: lines.append("📍 Address:
") lines.append("")
lines.append("
".join(address_parts))
lines.append("
📞 Phone: {addr_data['phone']}
") if addr_data.get("email"): lines.append( f"📧 Email: {addr_data['email']}
" ) # Add delivery instructions for shipping address if title == "Shipping Address" and addr_data.get("delivery_instructions"): lines.append( f"📦 Delivery Instructions: {addr_data['delivery_instructions']}
" ) lines.append("No items found
Description: {item['description']}
") info_items.append( f"Quantity: {item.get('quantity', 'N/A')}
" ) if item.get("price_per_unit"): info_items.append( f"Price per unit: {item['price_per_unit']:.2f}
" ) if item.get("total_price"): info_items.append( f"Total price: {item['total_price']:.2f}
" ) lines.extend(info_items) # Database matching info with better styling db_details = item.get("db_details") if db_details: confidence = item.get("db_match_confidence", "unknown") confidence_emoji = {"exact": "✅", "high": "🟢", "medium": "🟡", "low": "🟠"}.get( confidence.lower(), "❓" ) confidence_color = { "exact": "#28a745", "high": "#17a2b8", "medium": "#ffc107", "low": "#fd7e14", }.get(confidence.lower(), "#6c757d") lines.append( f"{confidence_emoji} Database Match: {confidence.upper()}
" ) lines.append( f"Product Number: {db_details.get('product_number', 'N/A')}
Manufacturer: {db_details.get('manufacturer_name', 'N/A')}
" ) lines.append( f"Manufacturer Number: {db_details.get('manufacturer_number', 'N/A')}
Match Reason: {item['db_match_reason']}
" ) lines.append("❌ Database Match: Not found
" ) lines.append("Total items extracted: {total_items}
Items matched in database: {items_with_db_match}
Database match rate: {match_rate}
Total quantity: {total_quantity} units
Total value: {total_value:.2f}
(if prices available)
Billing: {billing_addr.get("company_name", "N/A")}
Shipping: {shipping_addr.get("company_name", "N/A")}
Country: {billing_addr.get("country") or shipping_addr.get("country", "N/A")}
✅ Processing Status: Complete