import base64 import json import tempfile from pathlib import Path from typing import Optional import gradio as gr import httpx from extract_msg import Message as MsgMessage from gradio_pdf import PDF # Hardcoded API Gateway URL - configured for external stakeholders API_GATEWAY_URL = "https://email-order-processor-gateway-9jipw94n.ew.gateway.dev/api/v1" def get_auth_headers(api_key: Optional[str] = None) -> dict: """ Get authorization headers for the API Gateway. Args: api_key: API key for Google API Gateway authentication Returns: Dictionary with authorization headers """ headers = {"Content-Type": "application/json"} if api_key: headers["x-api-key"] = api_key else: print("Warning: No API key provided") return headers def extract_pdf_from_msg(msg_file_path: str) -> Optional[str]: """ Extract PDF attachment from .msg file. Args: msg_file_path: Path to the .msg file Returns: Path to the extracted PDF file, or None if no PDF found """ try: email_msg = MsgMessage(msg_file_path) # Look for PDF attachments for attachment in email_msg.attachments: if isinstance(attachment.longFilename, str) and attachment.longFilename.lower().endswith(".pdf"): # Save PDF to temporary file temp_pdf_path = tempfile.mktemp(suffix=".pdf") with open(temp_pdf_path, "wb") as temp_pdf: pdf_data = attachment.data if isinstance(pdf_data, bytes): temp_pdf.write(pdf_data) else: # Handle other data types - this shouldn't happen normally with PDF data print(f"Warning: PDF data is not bytes, type: {type(pdf_data)}") continue return temp_pdf_path return None except Exception as e: print(f"Error extracting PDF from .msg file: {e}") return None async def process_email_file( msg_file: Path, api_base_url: str, api_key: Optional[str] = None ) -> tuple[Optional[str], Optional[str]]: """ Process a .msg email file by sending it to the API. Args: msg_file: Path to the .msg file api_base_url: Base URL for the API api_key: API key for Google API Gateway authentication Returns: Tuple of (success_response, error_message) """ if not msg_file: return None, "Please upload a .msg file" if not api_base_url.strip(): return None, "Please provide a valid API base URL" # Ensure the URL ends with /api/v1 if it doesn't already if not api_base_url.endswith("/api/v1"): api_base_url = api_base_url + "api/v1" if api_base_url.endswith("/") else api_base_url + "/api/v1" try: # Read and encode the file as base64 file_content = msg_file.read_bytes() base64_content = base64.b64encode(file_content).decode("utf-8") # Prepare the request request_data = {"email_msg_base64": base64_content} # Send request to API async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( f"{api_base_url}/process-email", json=request_data, headers=get_auth_headers(api_key) ) if response.status_code == 200: result = response.json() formatted_result = json.dumps(result, indent=2, ensure_ascii=False) return formatted_result, None else: error_detail = ( response.json().get("detail", "Unknown error") if response.headers.get("content-type", "").startswith("application/json") else response.text ) return None, f"API Error ({response.status_code}): {error_detail}" except httpx.TimeoutException: return None, "Request timed out. The file might be too large or the server is busy." except httpx.ConnectError: return ( None, f"Could not connect to API at {api_base_url}. Please check the URL and ensure the server is running.", ) except Exception as e: return None, f"Error processing file: {str(e)}" async def handle_file_upload(msg_file, api_base_url, api_key=None): """Handle file upload and processing.""" if not msg_file: return None, None, "Please upload a .msg file" # Extract PDF from .msg file pdf_path = extract_pdf_from_msg(msg_file) # Process the email via API result, error = await process_email_file(Path(msg_file), api_base_url, api_key) if error: return None, None, error else: return result, pdf_path, "✅ Email processed successfully!" def create_ui(): """Create the Gradio UI.""" with gr.Blocks( title="Email Order Processor", css=""" .upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; margin: 10px 0; } .status-success { color: #28a745; font-weight: bold; } .status-error { color: #dc3545; font-weight: bold; } .pdf-container { border: 1px solid #ddd; border-radius: 8px; padding: 10px; background-color: #f9f9f9; } .comparison-container { border: 1px solid var(--border-color-primary); border-radius: 12px; padding: 20px; margin: 15px 5px; background-color: var(--background-fill-secondary); box-shadow: 0 2px 8px rgba(0,0,0,0.1); } .api-config { background-color: var(--background-fill-secondary); border: 1px solid var(--border-color-primary); border-radius: 8px; padding: 15px; margin-bottom: 20px; } .api-key-input { margin-top: 10px; border: 2px solid var(--color-accent-soft); border-radius: 6px; background-color: var(--background-fill-primary); } /* Custom styling for markdown content */ .address-card { background: var(--background-fill-primary); border: 1px solid var(--border-color-accent); border-radius: 8px; padding: 16px; margin: 8px 0; } .address-card h2 { color: var(--color-accent); margin-top: 0; margin-bottom: 12px; font-size: 1.2em; border-bottom: 2px solid var(--border-color-accent); padding-bottom: 4px; } .address-card p { margin: 6px 0; line-height: 1.4; } .items-section { background: var(--background-fill-primary); border-radius: 8px; padding: 16px; } .items-section h2 { color: var(--color-accent); border-bottom: 2px solid var(--border-color-accent); padding-bottom: 8px; margin-bottom: 16px; } .items-section h3 { background: var(--background-fill-secondary); padding: 8px 12px; border-radius: 6px; margin: 16px 0 8px 0; border-left: 4px solid var(--color-accent); } .items-section hr { border: none; height: 1px; background: var(--border-color-primary); margin: 16px 0; } .stats-section { background: var(--background-fill-primary); border-radius: 8px; padding: 16px; } .stats-section h2 { color: var(--color-accent); border-bottom: 2px solid var(--border-color-accent); padding-bottom: 8px; margin-bottom: 16px; } """, ) as demo: gr.Markdown("# 📧 Email Order Processor") gr.Markdown( """ Upload a **.msg** email file containing purchase order information to extract structured data. **Instructions:** 1. Enter your API key below 2. Select a `.msg` file (Outlook email format) 3. Click 'Process Email' to analyze the file 4. View the extracted purchase order data and original PDF side by side """ ) # API Configuration Section (Simplified for external users) with gr.Row(), gr.Column(elem_classes=["api-config"]): gr.Markdown("### 🔐 Authentication") gr.Markdown("*Connected to: email-order-processor-04i60gfy4hofo.apigateway.ax-logistics-ai.cloud.goog*") # Hidden URL field that always contains the hardcoded API Gateway URL api_base_url = gr.Textbox(value=API_GATEWAY_URL, visible=False) # API Key input - always visible for external users api_key_input = gr.Textbox( label="API Key", type="password", placeholder="Enter your API key to access the service", info="Please enter the API key provided by your administrator", elem_classes=["api-key-input"], ) # Show auth status def update_auth_status(api_key=None): if api_key and api_key.strip(): return "🔐 Authentication: **Active** ✅" else: return "⚠️ **API key required** - Please enter your API key above" auth_status = gr.Markdown("", elem_classes=["status-text"]) # Update auth status when API key changes api_key_input.change(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) # Set initial auth status demo.load(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Upload Email File") msg_file = gr.File( label="Email Message File (.msg)", file_types=[".msg"], file_count="single", elem_classes=["upload-area"], ) process_btn = gr.Button("🚀 Process Email", variant="primary", size="lg") status_text = gr.Markdown("", elem_classes=["status-text"]) with gr.Column(scale=2): gr.Markdown("### Raw API Response") result_json = gr.JSON(label="Purchase Order Information", show_label=False) # Side-by-side comparison section with gr.Row(): with gr.Column(scale=1, elem_classes=["comparison-container"]): gr.Markdown("### 📄 Original PDF Document") pdf_viewer = PDF(label="Purchase Order PDF", height=600, elem_classes=["pdf-container"]) with gr.Column(scale=1, elem_classes=["comparison-container"]): gr.Markdown("### 📊 Extracted Data Summary") # Create formatted display components with gr.Accordion("🏢 Address Information", open=True): address_display = gr.Markdown(label="Address Details") with gr.Accordion("📦 Items Information", open=True): items_display = gr.Markdown(label="Items Details") with gr.Accordion("🔍 Processing Statistics", open=False): stats_display = gr.Markdown("No data processed yet") # Event handlers - simplified for external users def format_extracted_data(json_result): """Format the extracted data for better display.""" if not json_result: return None, None, "No data processed yet" try: # json_result is already a dict from Gradio JSON component data = json.loads(json_result) if isinstance(json_result, str) else json_result # Extract address information address_info = data.get("address", {}) billing_addr = address_info.get("billing_address", {}) shipping_addr = address_info.get("shipping_address", {}) # Format addresses as readable cards def format_address_card(addr_data, title): if not addr_data: return f"

{title}

No data available

" lines = ["
", f"

{title}

"] if addr_data.get("company_name"): lines.append(f"

🏢 Company: {addr_data['company_name']}

") if addr_data.get("department"): lines.append(f"

🏬 Department: {addr_data['department']}

") if addr_data.get("contact_person"): lines.append(f"

👤 Contact: {addr_data['contact_person']}

") # Build address line address_parts = [] if addr_data.get("street_address"): address_parts.append(addr_data["street_address"]) city_parts = [] if addr_data.get("postal_code"): city_parts.append(addr_data["postal_code"]) if addr_data.get("city"): city_parts.append(addr_data["city"]) if city_parts: address_parts.append(" ".join(city_parts)) if addr_data.get("country"): address_parts.append(addr_data["country"]) if address_parts: lines.append("

📍 Address:

") lines.append("

") lines.append("
".join(address_parts)) lines.append("

") if addr_data.get("phone"): lines.append(f"

📞 Phone: {addr_data['phone']}

") if addr_data.get("email"): lines.append( f"

📧 Email: {addr_data['email']}

" ) # Add delivery instructions for shipping address if title == "Shipping Address" and addr_data.get("delivery_instructions"): lines.append( f"

📦 Delivery Instructions: {addr_data['delivery_instructions']}

" ) lines.append("
") return "\n".join(lines) billing_display = format_address_card(billing_addr, "Billing Address") shipping_display = format_address_card(shipping_addr, "Shipping Address") address_display = f"{billing_display}\n\n{shipping_display}" # Extract and format items information items_info = data.get("items", []) def format_items_table(items): if not items: return "

📦 Order Items

No items found

" lines = ["
", "

📦 Order Items

"] for i, item in enumerate(items, 1): lines.append(f"

Item {i}: {item.get('name', 'Unknown Item')}

") lines.append("
") # Basic item info in a more structured format info_items = [] if item.get("description"): info_items.append(f"

Description: {item['description']}

") info_items.append( f"

Quantity: {item.get('quantity', 'N/A')}

" ) if item.get("price_per_unit"): info_items.append( f"

Price per unit: {item['price_per_unit']:.2f}

" ) if item.get("total_price"): info_items.append( f"

Total price: {item['total_price']:.2f}

" ) lines.extend(info_items) # Database matching info with better styling db_details = item.get("db_details") if db_details: confidence = item.get("db_match_confidence", "unknown") confidence_emoji = {"exact": "✅", "high": "🟢", "medium": "🟡", "low": "🟠"}.get( confidence.lower(), "❓" ) confidence_color = { "exact": "#28a745", "high": "#17a2b8", "medium": "#ffc107", "low": "#fd7e14", }.get(confidence.lower(), "#6c757d") lines.append( f"
" ) lines.append( f"

{confidence_emoji} Database Match: {confidence.upper()}

" ) lines.append( f"

Product Number: {db_details.get('product_number', 'N/A')}

" ) lines.append( f"

Manufacturer: {db_details.get('manufacturer_name', 'N/A')}

" ) lines.append( f"

Manufacturer Number: {db_details.get('manufacturer_number', 'N/A')}

" ) if item.get("db_match_reason"): lines.append( f"

Match Reason: {item['db_match_reason']}

" ) lines.append("
") else: lines.append( "
" ) lines.append( "

❌ Database Match: Not found

" ) lines.append("
") lines.append("
") if i < len(items): # Don't add separator after last item lines.append("
") lines.append("
") return "\n".join(lines) items_display = format_items_table(items_info) # Generate statistics with better formatting total_items = len(items_info) items_with_db_match = len([item for item in items_info if item.get("db_details")]) match_rate = f"{(items_with_db_match / total_items * 100):.1f}%" if total_items > 0 else "0%" # Calculate totals total_quantity = sum(item.get("quantity", 0) for item in items_info) total_value = sum(item.get("total_price", 0) for item in items_info if item.get("total_price")) stats_text = f"""

📊 Processing Summary

📦 Items Processing

Total items extracted: {total_items}

Items matched in database: {items_with_db_match}

Database match rate: {match_rate}

📈 Order Summary

Total quantity: {total_quantity} units

Total value: {total_value:.2f}

(if prices available)

🏢 Companies

Billing: {billing_addr.get("company_name", "N/A")}

Shipping: {shipping_addr.get("company_name", "N/A")}

Country: {billing_addr.get("country") or shipping_addr.get("country", "N/A")}

✅ Processing Status: Complete

""" return address_display, items_display, stats_text except Exception as e: return None, None, f"Error parsing extracted data: {str(e)}" # Wire up the events process_btn.click( fn=handle_file_upload, inputs=[msg_file, api_base_url, api_key_input], outputs=[result_json, pdf_viewer, status_text], show_progress="full", ) # Update the formatted displays when result changes result_json.change( fn=format_extracted_data, inputs=[result_json], outputs=[address_display, items_display, stats_display] ) return demo if __name__ == "__main__": demo = create_ui() demo.launch()