Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Optional | |
| import gradio as gr | |
| import httpx | |
| from extract_msg import Message as MsgMessage | |
| from gradio_pdf import PDF | |
| # Hardcoded API Gateway URL - configured for external stakeholders | |
| API_GATEWAY_URL = "https://email-order-processor-gateway-9jipw94n.ew.gateway.dev/api/v1" | |
| def get_auth_headers(api_key: Optional[str] = None) -> dict: | |
| """ | |
| Get authorization headers for the API Gateway. | |
| Args: | |
| api_key: API key for Google API Gateway authentication | |
| Returns: | |
| Dictionary with authorization headers | |
| """ | |
| headers = {"Content-Type": "application/json"} | |
| if api_key: | |
| headers["x-api-key"] = api_key | |
| else: | |
| print("Warning: No API key provided") | |
| return headers | |
| def extract_pdf_from_msg(msg_file_path: str) -> Optional[str]: | |
| """ | |
| Extract PDF attachment from .msg file. | |
| Args: | |
| msg_file_path: Path to the .msg file | |
| Returns: | |
| Path to the extracted PDF file, or None if no PDF found | |
| """ | |
| try: | |
| email_msg = MsgMessage(msg_file_path) | |
| # Look for PDF attachments | |
| for attachment in email_msg.attachments: | |
| if isinstance(attachment.longFilename, str) and attachment.longFilename.lower().endswith(".pdf"): | |
| # Save PDF to temporary file | |
| temp_pdf_path = tempfile.mktemp(suffix=".pdf") | |
| with open(temp_pdf_path, "wb") as temp_pdf: | |
| pdf_data = attachment.data | |
| if isinstance(pdf_data, bytes): | |
| temp_pdf.write(pdf_data) | |
| else: | |
| # Handle other data types - this shouldn't happen normally with PDF data | |
| print(f"Warning: PDF data is not bytes, type: {type(pdf_data)}") | |
| continue | |
| return temp_pdf_path | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting PDF from .msg file: {e}") | |
| return None | |
| async def process_email_file( | |
| msg_file: Path, api_base_url: str, api_key: Optional[str] = None | |
| ) -> tuple[Optional[str], Optional[str]]: | |
| """ | |
| Process a .msg email file by sending it to the API. | |
| Args: | |
| msg_file: Path to the .msg file | |
| api_base_url: Base URL for the API | |
| api_key: API key for Google API Gateway authentication | |
| Returns: | |
| Tuple of (success_response, error_message) | |
| """ | |
| if not msg_file: | |
| return None, "Please upload a .msg file" | |
| if not api_base_url.strip(): | |
| return None, "Please provide a valid API base URL" | |
| # Ensure the URL ends with /api/v1 if it doesn't already | |
| if not api_base_url.endswith("/api/v1"): | |
| api_base_url = api_base_url + "api/v1" if api_base_url.endswith("/") else api_base_url + "/api/v1" | |
| try: | |
| # Read and encode the file as base64 | |
| file_content = msg_file.read_bytes() | |
| base64_content = base64.b64encode(file_content).decode("utf-8") | |
| # Prepare the request | |
| request_data = {"email_msg_base64": base64_content} | |
| # Send request to API | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| response = await client.post( | |
| f"{api_base_url}/process-email", json=request_data, headers=get_auth_headers(api_key) | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| formatted_result = json.dumps(result, indent=2, ensure_ascii=False) | |
| return formatted_result, None | |
| else: | |
| error_detail = ( | |
| response.json().get("detail", "Unknown error") | |
| if response.headers.get("content-type", "").startswith("application/json") | |
| else response.text | |
| ) | |
| return None, f"API Error ({response.status_code}): {error_detail}" | |
| except httpx.TimeoutException: | |
| return None, "Request timed out. The file might be too large or the server is busy." | |
| except httpx.ConnectError: | |
| return ( | |
| None, | |
| f"Could not connect to API at {api_base_url}. Please check the URL and ensure the server is running.", | |
| ) | |
| except Exception as e: | |
| return None, f"Error processing file: {str(e)}" | |
| async def handle_file_upload(msg_file, api_base_url, api_key=None): | |
| """Handle file upload and processing.""" | |
| if not msg_file: | |
| return None, None, "Please upload a .msg file" | |
| # Extract PDF from .msg file | |
| pdf_path = extract_pdf_from_msg(msg_file) | |
| # Process the email via API | |
| result, error = await process_email_file(Path(msg_file), api_base_url, api_key) | |
| if error: | |
| return None, None, error | |
| else: | |
| return result, pdf_path, "β Email processed successfully!" | |
| def create_ui(): | |
| """Create the Gradio UI.""" | |
| with gr.Blocks( | |
| title="Email Order Processor", | |
| css=""" | |
| .upload-area { | |
| border: 2px dashed #ccc; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| margin: 10px 0; | |
| } | |
| .status-success { | |
| color: #28a745; | |
| font-weight: bold; | |
| } | |
| .status-error { | |
| color: #dc3545; | |
| font-weight: bold; | |
| } | |
| .pdf-container { | |
| border: 1px solid #ddd; | |
| border-radius: 8px; | |
| padding: 10px; | |
| background-color: #f9f9f9; | |
| } | |
| .comparison-container { | |
| border: 1px solid var(--border-color-primary); | |
| border-radius: 12px; | |
| padding: 20px; | |
| margin: 15px 5px; | |
| background-color: var(--background-fill-secondary); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.1); | |
| } | |
| .api-config { | |
| background-color: var(--background-fill-secondary); | |
| border: 1px solid var(--border-color-primary); | |
| border-radius: 8px; | |
| padding: 15px; | |
| margin-bottom: 20px; | |
| } | |
| .api-key-input { | |
| margin-top: 10px; | |
| border: 2px solid var(--color-accent-soft); | |
| border-radius: 6px; | |
| background-color: var(--background-fill-primary); | |
| } | |
| /* Custom styling for markdown content */ | |
| .address-card { | |
| background: var(--background-fill-primary); | |
| border: 1px solid var(--border-color-accent); | |
| border-radius: 8px; | |
| padding: 16px; | |
| margin: 8px 0; | |
| } | |
| .address-card h2 { | |
| color: var(--color-accent); | |
| margin-top: 0; | |
| margin-bottom: 12px; | |
| font-size: 1.2em; | |
| border-bottom: 2px solid var(--border-color-accent); | |
| padding-bottom: 4px; | |
| } | |
| .address-card p { | |
| margin: 6px 0; | |
| line-height: 1.4; | |
| } | |
| .items-section { | |
| background: var(--background-fill-primary); | |
| border-radius: 8px; | |
| padding: 16px; | |
| } | |
| .items-section h2 { | |
| color: var(--color-accent); | |
| border-bottom: 2px solid var(--border-color-accent); | |
| padding-bottom: 8px; | |
| margin-bottom: 16px; | |
| } | |
| .items-section h3 { | |
| background: var(--background-fill-secondary); | |
| padding: 8px 12px; | |
| border-radius: 6px; | |
| margin: 16px 0 8px 0; | |
| border-left: 4px solid var(--color-accent); | |
| } | |
| .items-section hr { | |
| border: none; | |
| height: 1px; | |
| background: var(--border-color-primary); | |
| margin: 16px 0; | |
| } | |
| .stats-section { | |
| background: var(--background-fill-primary); | |
| border-radius: 8px; | |
| padding: 16px; | |
| } | |
| .stats-section h2 { | |
| color: var(--color-accent); | |
| border-bottom: 2px solid var(--border-color-accent); | |
| padding-bottom: 8px; | |
| margin-bottom: 16px; | |
| } | |
| """, | |
| ) as demo: | |
| gr.Markdown("# π§ Email Order Processor") | |
| gr.Markdown( | |
| """ | |
| Upload a **.msg** email file containing purchase order information to extract structured data. | |
| **Instructions:** | |
| 1. Enter your API key below | |
| 2. Select a `.msg` file (Outlook email format) | |
| 3. Click 'Process Email' to analyze the file | |
| 4. View the extracted purchase order data and original PDF side by side | |
| """ | |
| ) | |
| # API Configuration Section (Simplified for external users) | |
| with gr.Row(), gr.Column(elem_classes=["api-config"]): | |
| gr.Markdown("### π Authentication") | |
| gr.Markdown("*Connected to: email-order-processor-04i60gfy4hofo.apigateway.ax-logistics-ai.cloud.goog*") | |
| # Hidden URL field that always contains the hardcoded API Gateway URL | |
| api_base_url = gr.Textbox(value=API_GATEWAY_URL, visible=False) | |
| # API Key input - always visible for external users | |
| api_key_input = gr.Textbox( | |
| label="API Key", | |
| type="password", | |
| placeholder="Enter your API key to access the service", | |
| info="Please enter the API key provided by your administrator", | |
| elem_classes=["api-key-input"], | |
| ) | |
| # Show auth status | |
| def update_auth_status(api_key=None): | |
| if api_key and api_key.strip(): | |
| return "π Authentication: **Active** β " | |
| else: | |
| return "β οΈ **API key required** - Please enter your API key above" | |
| auth_status = gr.Markdown("", elem_classes=["status-text"]) | |
| # Update auth status when API key changes | |
| api_key_input.change(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) | |
| # Set initial auth status | |
| demo.load(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status]) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Upload Email File") | |
| msg_file = gr.File( | |
| label="Email Message File (.msg)", | |
| file_types=[".msg"], | |
| file_count="single", | |
| elem_classes=["upload-area"], | |
| ) | |
| process_btn = gr.Button("π Process Email", variant="primary", size="lg") | |
| status_text = gr.Markdown("", elem_classes=["status-text"]) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Raw API Response") | |
| result_json = gr.JSON(label="Purchase Order Information", show_label=False) | |
| # Side-by-side comparison section | |
| with gr.Row(): | |
| with gr.Column(scale=1, elem_classes=["comparison-container"]): | |
| gr.Markdown("### π Original PDF Document") | |
| pdf_viewer = PDF(label="Purchase Order PDF", height=600, elem_classes=["pdf-container"]) | |
| with gr.Column(scale=1, elem_classes=["comparison-container"]): | |
| gr.Markdown("### π Extracted Data Summary") | |
| # Create formatted display components | |
| with gr.Accordion("π’ Address Information", open=True): | |
| address_display = gr.Markdown(label="Address Details") | |
| with gr.Accordion("π¦ Items Information", open=True): | |
| items_display = gr.Markdown(label="Items Details") | |
| with gr.Accordion("π Processing Statistics", open=False): | |
| stats_display = gr.Markdown("No data processed yet") | |
| # Event handlers - simplified for external users | |
| def format_extracted_data(json_result): | |
| """Format the extracted data for better display.""" | |
| if not json_result: | |
| return None, None, "No data processed yet" | |
| try: | |
| # json_result is already a dict from Gradio JSON component | |
| data = json.loads(json_result) if isinstance(json_result, str) else json_result | |
| # Extract address information | |
| address_info = data.get("address", {}) | |
| billing_addr = address_info.get("billing_address", {}) | |
| shipping_addr = address_info.get("shipping_address", {}) | |
| # Format addresses as readable cards | |
| def format_address_card(addr_data, title): | |
| if not addr_data: | |
| return f"<div class='address-card'><h2>{title}</h2><p><em>No data available</em></p></div>" | |
| lines = ["<div class='address-card'>", f"<h2>{title}</h2>"] | |
| if addr_data.get("company_name"): | |
| lines.append(f"<p><strong>π’ Company:</strong> {addr_data['company_name']}</p>") | |
| if addr_data.get("department"): | |
| lines.append(f"<p><strong>π¬ Department:</strong> {addr_data['department']}</p>") | |
| if addr_data.get("contact_person"): | |
| lines.append(f"<p><strong>π€ Contact:</strong> {addr_data['contact_person']}</p>") | |
| # Build address line | |
| address_parts = [] | |
| if addr_data.get("street_address"): | |
| address_parts.append(addr_data["street_address"]) | |
| city_parts = [] | |
| if addr_data.get("postal_code"): | |
| city_parts.append(addr_data["postal_code"]) | |
| if addr_data.get("city"): | |
| city_parts.append(addr_data["city"]) | |
| if city_parts: | |
| address_parts.append(" ".join(city_parts)) | |
| if addr_data.get("country"): | |
| address_parts.append(addr_data["country"]) | |
| if address_parts: | |
| lines.append("<p><strong>π Address:</strong></p>") | |
| lines.append("<p style='margin-left: 20px; font-style: italic;'>") | |
| lines.append("<br>".join(address_parts)) | |
| lines.append("</p>") | |
| if addr_data.get("phone"): | |
| lines.append(f"<p><strong>π Phone:</strong> {addr_data['phone']}</p>") | |
| if addr_data.get("email"): | |
| lines.append( | |
| f"<p><strong>π§ Email:</strong> <a href='mailto:{addr_data['email']}'>{addr_data['email']}</a></p>" | |
| ) | |
| # Add delivery instructions for shipping address | |
| if title == "Shipping Address" and addr_data.get("delivery_instructions"): | |
| lines.append( | |
| f"<p><strong>π¦ Delivery Instructions:</strong> <em>{addr_data['delivery_instructions']}</em></p>" | |
| ) | |
| lines.append("</div>") | |
| return "\n".join(lines) | |
| billing_display = format_address_card(billing_addr, "Billing Address") | |
| shipping_display = format_address_card(shipping_addr, "Shipping Address") | |
| address_display = f"{billing_display}\n\n{shipping_display}" | |
| # Extract and format items information | |
| items_info = data.get("items", []) | |
| def format_items_table(items): | |
| if not items: | |
| return "<div class='items-section'><h2>π¦ Order Items</h2><p><em>No items found</em></p></div>" | |
| lines = ["<div class='items-section'>", "<h2>π¦ Order Items</h2>"] | |
| for i, item in enumerate(items, 1): | |
| lines.append(f"<h3>Item {i}: {item.get('name', 'Unknown Item')}</h3>") | |
| lines.append("<div style='margin-left: 15px;'>") | |
| # Basic item info in a more structured format | |
| info_items = [] | |
| if item.get("description"): | |
| info_items.append(f"<p><strong>Description:</strong> {item['description']}</p>") | |
| info_items.append( | |
| f"<p><strong>Quantity:</strong> <span style='font-size: 1.1em; color: var(--color-accent);'>{item.get('quantity', 'N/A')}</span></p>" | |
| ) | |
| if item.get("price_per_unit"): | |
| info_items.append( | |
| f"<p><strong>Price per unit:</strong> <span style='color: var(--color-accent);'>{item['price_per_unit']:.2f}</span></p>" | |
| ) | |
| if item.get("total_price"): | |
| info_items.append( | |
| f"<p><strong>Total price:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{item['total_price']:.2f}</span></p>" | |
| ) | |
| lines.extend(info_items) | |
| # Database matching info with better styling | |
| db_details = item.get("db_details") | |
| if db_details: | |
| confidence = item.get("db_match_confidence", "unknown") | |
| confidence_emoji = {"exact": "β ", "high": "π’", "medium": "π‘", "low": "π "}.get( | |
| confidence.lower(), "β" | |
| ) | |
| confidence_color = { | |
| "exact": "#28a745", | |
| "high": "#17a2b8", | |
| "medium": "#ffc107", | |
| "low": "#fd7e14", | |
| }.get(confidence.lower(), "#6c757d") | |
| lines.append( | |
| f"<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid {confidence_color};'>" | |
| ) | |
| lines.append( | |
| f"<p><strong>{confidence_emoji} Database Match:</strong> <span style='color: {confidence_color}; font-weight: bold;'>{confidence.upper()}</span></p>" | |
| ) | |
| lines.append( | |
| f"<p><strong>Product Number:</strong> <code>{db_details.get('product_number', 'N/A')}</code></p>" | |
| ) | |
| lines.append( | |
| f"<p><strong>Manufacturer:</strong> {db_details.get('manufacturer_name', 'N/A')}</p>" | |
| ) | |
| lines.append( | |
| f"<p><strong>Manufacturer Number:</strong> <code>{db_details.get('manufacturer_number', 'N/A')}</code></p>" | |
| ) | |
| if item.get("db_match_reason"): | |
| lines.append( | |
| f"<p><strong>Match Reason:</strong> <em>{item['db_match_reason']}</em></p>" | |
| ) | |
| lines.append("</div>") | |
| else: | |
| lines.append( | |
| "<div style='background: #f8d7da; padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid #dc3545;'>" | |
| ) | |
| lines.append( | |
| "<p><strong>β Database Match:</strong> <span style='color: #dc3545; font-weight: bold;'>Not found</span></p>" | |
| ) | |
| lines.append("</div>") | |
| lines.append("</div>") | |
| if i < len(items): # Don't add separator after last item | |
| lines.append("<hr>") | |
| lines.append("</div>") | |
| return "\n".join(lines) | |
| items_display = format_items_table(items_info) | |
| # Generate statistics with better formatting | |
| total_items = len(items_info) | |
| items_with_db_match = len([item for item in items_info if item.get("db_details")]) | |
| match_rate = f"{(items_with_db_match / total_items * 100):.1f}%" if total_items > 0 else "0%" | |
| # Calculate totals | |
| total_quantity = sum(item.get("quantity", 0) for item in items_info) | |
| total_value = sum(item.get("total_price", 0) for item in items_info if item.get("total_price")) | |
| stats_text = f""" | |
| <div class='stats-section'> | |
| <h2>π Processing Summary</h2> | |
| <div style='display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 16px;'> | |
| <div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'> | |
| <h4 style='margin-top: 0; color: var(--color-accent);'>π¦ Items Processing</h4> | |
| <p><strong>Total items extracted:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_items}</span></p> | |
| <p><strong>Items matched in database:</strong> <span style='color: #28a745;'>{items_with_db_match}</span></p> | |
| <p><strong>Database match rate:</strong> <span style='font-weight: bold; color: {"#28a745" if float(match_rate.rstrip("%")) > 70 else "#ffc107" if float(match_rate.rstrip("%")) > 30 else "#dc3545"};'>{match_rate}</span></p> | |
| </div> | |
| <div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'> | |
| <h4 style='margin-top: 0; color: var(--color-accent);'>π Order Summary</h4> | |
| <p><strong>Total quantity:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_quantity}</span> units</p> | |
| <p><strong>Total value:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{total_value:.2f}</span></p> | |
| <p><em style='font-size: 0.9em; color: var(--text-color-subdued);'>(if prices available)</em></p> | |
| </div> | |
| <div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'> | |
| <h4 style='margin-top: 0; color: var(--color-accent);'>π’ Companies</h4> | |
| <p><strong>Billing:</strong> {billing_addr.get("company_name", "N/A")}</p> | |
| <p><strong>Shipping:</strong> {shipping_addr.get("company_name", "N/A")}</p> | |
| <p><strong>Country:</strong> {billing_addr.get("country") or shipping_addr.get("country", "N/A")}</p> | |
| </div> | |
| </div> | |
| <div style='text-align: center; margin-top: 20px; padding: 12px; background: #d4edda; border-radius: 8px; border: 1px solid #c3e6cb;'> | |
| <p style='margin: 0; color: #155724; font-weight: bold;'>β Processing Status: Complete</p> | |
| </div> | |
| </div> | |
| """ | |
| return address_display, items_display, stats_text | |
| except Exception as e: | |
| return None, None, f"Error parsing extracted data: {str(e)}" | |
| # Wire up the events | |
| process_btn.click( | |
| fn=handle_file_upload, | |
| inputs=[msg_file, api_base_url, api_key_input], | |
| outputs=[result_json, pdf_viewer, status_text], | |
| show_progress="full", | |
| ) | |
| # Update the formatted displays when result changes | |
| result_json.change( | |
| fn=format_extracted_data, inputs=[result_json], outputs=[address_display, items_display, stats_display] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_ui() | |
| demo.launch() | |