Jarv1s10's picture
Update app.py
265f352 verified
import base64
import json
import tempfile
from pathlib import Path
from typing import Optional
import gradio as gr
import httpx
from extract_msg import Message as MsgMessage
from gradio_pdf import PDF
# Hardcoded API Gateway URL - configured for external stakeholders
API_GATEWAY_URL = "https://email-order-processor-gateway-9jipw94n.ew.gateway.dev/api/v1"
def get_auth_headers(api_key: Optional[str] = None) -> dict:
"""
Get authorization headers for the API Gateway.
Args:
api_key: API key for Google API Gateway authentication
Returns:
Dictionary with authorization headers
"""
headers = {"Content-Type": "application/json"}
if api_key:
headers["x-api-key"] = api_key
else:
print("Warning: No API key provided")
return headers
def extract_pdf_from_msg(msg_file_path: str) -> Optional[str]:
"""
Extract PDF attachment from .msg file.
Args:
msg_file_path: Path to the .msg file
Returns:
Path to the extracted PDF file, or None if no PDF found
"""
try:
email_msg = MsgMessage(msg_file_path)
# Look for PDF attachments
for attachment in email_msg.attachments:
if isinstance(attachment.longFilename, str) and attachment.longFilename.lower().endswith(".pdf"):
# Save PDF to temporary file
temp_pdf_path = tempfile.mktemp(suffix=".pdf")
with open(temp_pdf_path, "wb") as temp_pdf:
pdf_data = attachment.data
if isinstance(pdf_data, bytes):
temp_pdf.write(pdf_data)
else:
# Handle other data types - this shouldn't happen normally with PDF data
print(f"Warning: PDF data is not bytes, type: {type(pdf_data)}")
continue
return temp_pdf_path
return None
except Exception as e:
print(f"Error extracting PDF from .msg file: {e}")
return None
async def process_email_file(
msg_file: Path, api_base_url: str, api_key: Optional[str] = None
) -> tuple[Optional[str], Optional[str]]:
"""
Process a .msg email file by sending it to the API.
Args:
msg_file: Path to the .msg file
api_base_url: Base URL for the API
api_key: API key for Google API Gateway authentication
Returns:
Tuple of (success_response, error_message)
"""
if not msg_file:
return None, "Please upload a .msg file"
if not api_base_url.strip():
return None, "Please provide a valid API base URL"
# Ensure the URL ends with /api/v1 if it doesn't already
if not api_base_url.endswith("/api/v1"):
api_base_url = api_base_url + "api/v1" if api_base_url.endswith("/") else api_base_url + "/api/v1"
try:
# Read and encode the file as base64
file_content = msg_file.read_bytes()
base64_content = base64.b64encode(file_content).decode("utf-8")
# Prepare the request
request_data = {"email_msg_base64": base64_content}
# Send request to API
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{api_base_url}/process-email", json=request_data, headers=get_auth_headers(api_key)
)
if response.status_code == 200:
result = response.json()
formatted_result = json.dumps(result, indent=2, ensure_ascii=False)
return formatted_result, None
else:
error_detail = (
response.json().get("detail", "Unknown error")
if response.headers.get("content-type", "").startswith("application/json")
else response.text
)
return None, f"API Error ({response.status_code}): {error_detail}"
except httpx.TimeoutException:
return None, "Request timed out. The file might be too large or the server is busy."
except httpx.ConnectError:
return (
None,
f"Could not connect to API at {api_base_url}. Please check the URL and ensure the server is running.",
)
except Exception as e:
return None, f"Error processing file: {str(e)}"
async def handle_file_upload(msg_file, api_base_url, api_key=None):
"""Handle file upload and processing."""
if not msg_file:
return None, None, "Please upload a .msg file"
# Extract PDF from .msg file
pdf_path = extract_pdf_from_msg(msg_file)
# Process the email via API
result, error = await process_email_file(Path(msg_file), api_base_url, api_key)
if error:
return None, None, error
else:
return result, pdf_path, "βœ… Email processed successfully!"
def create_ui():
"""Create the Gradio UI."""
with gr.Blocks(
title="Email Order Processor",
css="""
.upload-area {
border: 2px dashed #ccc;
border-radius: 10px;
padding: 20px;
text-align: center;
margin: 10px 0;
}
.status-success {
color: #28a745;
font-weight: bold;
}
.status-error {
color: #dc3545;
font-weight: bold;
}
.pdf-container {
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
background-color: #f9f9f9;
}
.comparison-container {
border: 1px solid var(--border-color-primary);
border-radius: 12px;
padding: 20px;
margin: 15px 5px;
background-color: var(--background-fill-secondary);
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.api-config {
background-color: var(--background-fill-secondary);
border: 1px solid var(--border-color-primary);
border-radius: 8px;
padding: 15px;
margin-bottom: 20px;
}
.api-key-input {
margin-top: 10px;
border: 2px solid var(--color-accent-soft);
border-radius: 6px;
background-color: var(--background-fill-primary);
}
/* Custom styling for markdown content */
.address-card {
background: var(--background-fill-primary);
border: 1px solid var(--border-color-accent);
border-radius: 8px;
padding: 16px;
margin: 8px 0;
}
.address-card h2 {
color: var(--color-accent);
margin-top: 0;
margin-bottom: 12px;
font-size: 1.2em;
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 4px;
}
.address-card p {
margin: 6px 0;
line-height: 1.4;
}
.items-section {
background: var(--background-fill-primary);
border-radius: 8px;
padding: 16px;
}
.items-section h2 {
color: var(--color-accent);
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 8px;
margin-bottom: 16px;
}
.items-section h3 {
background: var(--background-fill-secondary);
padding: 8px 12px;
border-radius: 6px;
margin: 16px 0 8px 0;
border-left: 4px solid var(--color-accent);
}
.items-section hr {
border: none;
height: 1px;
background: var(--border-color-primary);
margin: 16px 0;
}
.stats-section {
background: var(--background-fill-primary);
border-radius: 8px;
padding: 16px;
}
.stats-section h2 {
color: var(--color-accent);
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 8px;
margin-bottom: 16px;
}
""",
) as demo:
gr.Markdown("# πŸ“§ Email Order Processor")
gr.Markdown(
"""
Upload a **.msg** email file containing purchase order information to extract structured data.
**Instructions:**
1. Enter your API key below
2. Select a `.msg` file (Outlook email format)
3. Click 'Process Email' to analyze the file
4. View the extracted purchase order data and original PDF side by side
"""
)
# API Configuration Section (Simplified for external users)
with gr.Row(), gr.Column(elem_classes=["api-config"]):
gr.Markdown("### πŸ” Authentication")
gr.Markdown("*Connected to: email-order-processor-04i60gfy4hofo.apigateway.ax-logistics-ai.cloud.goog*")
# Hidden URL field that always contains the hardcoded API Gateway URL
api_base_url = gr.Textbox(value=API_GATEWAY_URL, visible=False)
# API Key input - always visible for external users
api_key_input = gr.Textbox(
label="API Key",
type="password",
placeholder="Enter your API key to access the service",
info="Please enter the API key provided by your administrator",
elem_classes=["api-key-input"],
)
# Show auth status
def update_auth_status(api_key=None):
if api_key and api_key.strip():
return "πŸ” Authentication: **Active** βœ…"
else:
return "⚠️ **API key required** - Please enter your API key above"
auth_status = gr.Markdown("", elem_classes=["status-text"])
# Update auth status when API key changes
api_key_input.change(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status])
# Set initial auth status
demo.load(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status])
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Upload Email File")
msg_file = gr.File(
label="Email Message File (.msg)",
file_types=[".msg"],
file_count="single",
elem_classes=["upload-area"],
)
process_btn = gr.Button("πŸš€ Process Email", variant="primary", size="lg")
status_text = gr.Markdown("", elem_classes=["status-text"])
with gr.Column(scale=2):
gr.Markdown("### Raw API Response")
result_json = gr.JSON(label="Purchase Order Information", show_label=False)
# Side-by-side comparison section
with gr.Row():
with gr.Column(scale=1, elem_classes=["comparison-container"]):
gr.Markdown("### πŸ“„ Original PDF Document")
pdf_viewer = PDF(label="Purchase Order PDF", height=600, elem_classes=["pdf-container"])
with gr.Column(scale=1, elem_classes=["comparison-container"]):
gr.Markdown("### πŸ“Š Extracted Data Summary")
# Create formatted display components
with gr.Accordion("🏒 Address Information", open=True):
address_display = gr.Markdown(label="Address Details")
with gr.Accordion("πŸ“¦ Items Information", open=True):
items_display = gr.Markdown(label="Items Details")
with gr.Accordion("πŸ” Processing Statistics", open=False):
stats_display = gr.Markdown("No data processed yet")
# Event handlers - simplified for external users
def format_extracted_data(json_result):
"""Format the extracted data for better display."""
if not json_result:
return None, None, "No data processed yet"
try:
# json_result is already a dict from Gradio JSON component
data = json.loads(json_result) if isinstance(json_result, str) else json_result
# Extract address information
address_info = data.get("address", {})
billing_addr = address_info.get("billing_address", {})
shipping_addr = address_info.get("shipping_address", {})
# Format addresses as readable cards
def format_address_card(addr_data, title):
if not addr_data:
return f"<div class='address-card'><h2>{title}</h2><p><em>No data available</em></p></div>"
lines = ["<div class='address-card'>", f"<h2>{title}</h2>"]
if addr_data.get("company_name"):
lines.append(f"<p><strong>🏒 Company:</strong> {addr_data['company_name']}</p>")
if addr_data.get("department"):
lines.append(f"<p><strong>🏬 Department:</strong> {addr_data['department']}</p>")
if addr_data.get("contact_person"):
lines.append(f"<p><strong>πŸ‘€ Contact:</strong> {addr_data['contact_person']}</p>")
# Build address line
address_parts = []
if addr_data.get("street_address"):
address_parts.append(addr_data["street_address"])
city_parts = []
if addr_data.get("postal_code"):
city_parts.append(addr_data["postal_code"])
if addr_data.get("city"):
city_parts.append(addr_data["city"])
if city_parts:
address_parts.append(" ".join(city_parts))
if addr_data.get("country"):
address_parts.append(addr_data["country"])
if address_parts:
lines.append("<p><strong>πŸ“ Address:</strong></p>")
lines.append("<p style='margin-left: 20px; font-style: italic;'>")
lines.append("<br>".join(address_parts))
lines.append("</p>")
if addr_data.get("phone"):
lines.append(f"<p><strong>πŸ“ž Phone:</strong> {addr_data['phone']}</p>")
if addr_data.get("email"):
lines.append(
f"<p><strong>πŸ“§ Email:</strong> <a href='mailto:{addr_data['email']}'>{addr_data['email']}</a></p>"
)
# Add delivery instructions for shipping address
if title == "Shipping Address" and addr_data.get("delivery_instructions"):
lines.append(
f"<p><strong>πŸ“¦ Delivery Instructions:</strong> <em>{addr_data['delivery_instructions']}</em></p>"
)
lines.append("</div>")
return "\n".join(lines)
billing_display = format_address_card(billing_addr, "Billing Address")
shipping_display = format_address_card(shipping_addr, "Shipping Address")
address_display = f"{billing_display}\n\n{shipping_display}"
# Extract and format items information
items_info = data.get("items", [])
def format_items_table(items):
if not items:
return "<div class='items-section'><h2>πŸ“¦ Order Items</h2><p><em>No items found</em></p></div>"
lines = ["<div class='items-section'>", "<h2>πŸ“¦ Order Items</h2>"]
for i, item in enumerate(items, 1):
lines.append(f"<h3>Item {i}: {item.get('name', 'Unknown Item')}</h3>")
lines.append("<div style='margin-left: 15px;'>")
# Basic item info in a more structured format
info_items = []
if item.get("description"):
info_items.append(f"<p><strong>Description:</strong> {item['description']}</p>")
info_items.append(
f"<p><strong>Quantity:</strong> <span style='font-size: 1.1em; color: var(--color-accent);'>{item.get('quantity', 'N/A')}</span></p>"
)
if item.get("price_per_unit"):
info_items.append(
f"<p><strong>Price per unit:</strong> <span style='color: var(--color-accent);'>{item['price_per_unit']:.2f}</span></p>"
)
if item.get("total_price"):
info_items.append(
f"<p><strong>Total price:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{item['total_price']:.2f}</span></p>"
)
lines.extend(info_items)
# Database matching info with better styling
db_details = item.get("db_details")
if db_details:
confidence = item.get("db_match_confidence", "unknown")
confidence_emoji = {"exact": "βœ…", "high": "🟒", "medium": "🟑", "low": "🟠"}.get(
confidence.lower(), "❓"
)
confidence_color = {
"exact": "#28a745",
"high": "#17a2b8",
"medium": "#ffc107",
"low": "#fd7e14",
}.get(confidence.lower(), "#6c757d")
lines.append(
f"<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid {confidence_color};'>"
)
lines.append(
f"<p><strong>{confidence_emoji} Database Match:</strong> <span style='color: {confidence_color}; font-weight: bold;'>{confidence.upper()}</span></p>"
)
lines.append(
f"<p><strong>Product Number:</strong> <code>{db_details.get('product_number', 'N/A')}</code></p>"
)
lines.append(
f"<p><strong>Manufacturer:</strong> {db_details.get('manufacturer_name', 'N/A')}</p>"
)
lines.append(
f"<p><strong>Manufacturer Number:</strong> <code>{db_details.get('manufacturer_number', 'N/A')}</code></p>"
)
if item.get("db_match_reason"):
lines.append(
f"<p><strong>Match Reason:</strong> <em>{item['db_match_reason']}</em></p>"
)
lines.append("</div>")
else:
lines.append(
"<div style='background: #f8d7da; padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid #dc3545;'>"
)
lines.append(
"<p><strong>❌ Database Match:</strong> <span style='color: #dc3545; font-weight: bold;'>Not found</span></p>"
)
lines.append("</div>")
lines.append("</div>")
if i < len(items): # Don't add separator after last item
lines.append("<hr>")
lines.append("</div>")
return "\n".join(lines)
items_display = format_items_table(items_info)
# Generate statistics with better formatting
total_items = len(items_info)
items_with_db_match = len([item for item in items_info if item.get("db_details")])
match_rate = f"{(items_with_db_match / total_items * 100):.1f}%" if total_items > 0 else "0%"
# Calculate totals
total_quantity = sum(item.get("quantity", 0) for item in items_info)
total_value = sum(item.get("total_price", 0) for item in items_info if item.get("total_price"))
stats_text = f"""
<div class='stats-section'>
<h2>πŸ“Š Processing Summary</h2>
<div style='display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 16px;'>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>πŸ“¦ Items Processing</h4>
<p><strong>Total items extracted:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_items}</span></p>
<p><strong>Items matched in database:</strong> <span style='color: #28a745;'>{items_with_db_match}</span></p>
<p><strong>Database match rate:</strong> <span style='font-weight: bold; color: {"#28a745" if float(match_rate.rstrip("%")) > 70 else "#ffc107" if float(match_rate.rstrip("%")) > 30 else "#dc3545"};'>{match_rate}</span></p>
</div>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>πŸ“ˆ Order Summary</h4>
<p><strong>Total quantity:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_quantity}</span> units</p>
<p><strong>Total value:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{total_value:.2f}</span></p>
<p><em style='font-size: 0.9em; color: var(--text-color-subdued);'>(if prices available)</em></p>
</div>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>🏒 Companies</h4>
<p><strong>Billing:</strong> {billing_addr.get("company_name", "N/A")}</p>
<p><strong>Shipping:</strong> {shipping_addr.get("company_name", "N/A")}</p>
<p><strong>Country:</strong> {billing_addr.get("country") or shipping_addr.get("country", "N/A")}</p>
</div>
</div>
<div style='text-align: center; margin-top: 20px; padding: 12px; background: #d4edda; border-radius: 8px; border: 1px solid #c3e6cb;'>
<p style='margin: 0; color: #155724; font-weight: bold;'>βœ… Processing Status: Complete</p>
</div>
</div>
"""
return address_display, items_display, stats_text
except Exception as e:
return None, None, f"Error parsing extracted data: {str(e)}"
# Wire up the events
process_btn.click(
fn=handle_file_upload,
inputs=[msg_file, api_base_url, api_key_input],
outputs=[result_json, pdf_viewer, status_text],
show_progress="full",
)
# Update the formatted displays when result changes
result_json.change(
fn=format_extracted_data, inputs=[result_json], outputs=[address_display, items_display, stats_display]
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.launch()