Jarv1s10's picture
Update app.py
265f352 verified
raw
history blame
23.6 kB
import base64
import json
import tempfile
from pathlib import Path
from typing import Optional
import gradio as gr
import httpx
from extract_msg import Message as MsgMessage
from gradio_pdf import PDF
# Hardcoded API Gateway URL - configured for external stakeholders
API_GATEWAY_URL = "https://email-order-processor-gateway-9jipw94n.ew.gateway.dev/api/v1"
def get_auth_headers(api_key: Optional[str] = None) -> dict:
"""
Get authorization headers for the API Gateway.
Args:
api_key: API key for Google API Gateway authentication
Returns:
Dictionary with authorization headers
"""
headers = {"Content-Type": "application/json"}
if api_key:
headers["x-api-key"] = api_key
else:
print("Warning: No API key provided")
return headers
def extract_pdf_from_msg(msg_file_path: str) -> Optional[str]:
"""
Extract PDF attachment from .msg file.
Args:
msg_file_path: Path to the .msg file
Returns:
Path to the extracted PDF file, or None if no PDF found
"""
try:
email_msg = MsgMessage(msg_file_path)
# Look for PDF attachments
for attachment in email_msg.attachments:
if isinstance(attachment.longFilename, str) and attachment.longFilename.lower().endswith(".pdf"):
# Save PDF to temporary file
temp_pdf_path = tempfile.mktemp(suffix=".pdf")
with open(temp_pdf_path, "wb") as temp_pdf:
pdf_data = attachment.data
if isinstance(pdf_data, bytes):
temp_pdf.write(pdf_data)
else:
# Handle other data types - this shouldn't happen normally with PDF data
print(f"Warning: PDF data is not bytes, type: {type(pdf_data)}")
continue
return temp_pdf_path
return None
except Exception as e:
print(f"Error extracting PDF from .msg file: {e}")
return None
async def process_email_file(
msg_file: Path, api_base_url: str, api_key: Optional[str] = None
) -> tuple[Optional[str], Optional[str]]:
"""
Process a .msg email file by sending it to the API.
Args:
msg_file: Path to the .msg file
api_base_url: Base URL for the API
api_key: API key for Google API Gateway authentication
Returns:
Tuple of (success_response, error_message)
"""
if not msg_file:
return None, "Please upload a .msg file"
if not api_base_url.strip():
return None, "Please provide a valid API base URL"
# Ensure the URL ends with /api/v1 if it doesn't already
if not api_base_url.endswith("/api/v1"):
api_base_url = api_base_url + "api/v1" if api_base_url.endswith("/") else api_base_url + "/api/v1"
try:
# Read and encode the file as base64
file_content = msg_file.read_bytes()
base64_content = base64.b64encode(file_content).decode("utf-8")
# Prepare the request
request_data = {"email_msg_base64": base64_content}
# Send request to API
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{api_base_url}/process-email", json=request_data, headers=get_auth_headers(api_key)
)
if response.status_code == 200:
result = response.json()
formatted_result = json.dumps(result, indent=2, ensure_ascii=False)
return formatted_result, None
else:
error_detail = (
response.json().get("detail", "Unknown error")
if response.headers.get("content-type", "").startswith("application/json")
else response.text
)
return None, f"API Error ({response.status_code}): {error_detail}"
except httpx.TimeoutException:
return None, "Request timed out. The file might be too large or the server is busy."
except httpx.ConnectError:
return (
None,
f"Could not connect to API at {api_base_url}. Please check the URL and ensure the server is running.",
)
except Exception as e:
return None, f"Error processing file: {str(e)}"
async def handle_file_upload(msg_file, api_base_url, api_key=None):
"""Handle file upload and processing."""
if not msg_file:
return None, None, "Please upload a .msg file"
# Extract PDF from .msg file
pdf_path = extract_pdf_from_msg(msg_file)
# Process the email via API
result, error = await process_email_file(Path(msg_file), api_base_url, api_key)
if error:
return None, None, error
else:
return result, pdf_path, "βœ… Email processed successfully!"
def create_ui():
"""Create the Gradio UI."""
with gr.Blocks(
title="Email Order Processor",
css="""
.upload-area {
border: 2px dashed #ccc;
border-radius: 10px;
padding: 20px;
text-align: center;
margin: 10px 0;
}
.status-success {
color: #28a745;
font-weight: bold;
}
.status-error {
color: #dc3545;
font-weight: bold;
}
.pdf-container {
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
background-color: #f9f9f9;
}
.comparison-container {
border: 1px solid var(--border-color-primary);
border-radius: 12px;
padding: 20px;
margin: 15px 5px;
background-color: var(--background-fill-secondary);
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.api-config {
background-color: var(--background-fill-secondary);
border: 1px solid var(--border-color-primary);
border-radius: 8px;
padding: 15px;
margin-bottom: 20px;
}
.api-key-input {
margin-top: 10px;
border: 2px solid var(--color-accent-soft);
border-radius: 6px;
background-color: var(--background-fill-primary);
}
/* Custom styling for markdown content */
.address-card {
background: var(--background-fill-primary);
border: 1px solid var(--border-color-accent);
border-radius: 8px;
padding: 16px;
margin: 8px 0;
}
.address-card h2 {
color: var(--color-accent);
margin-top: 0;
margin-bottom: 12px;
font-size: 1.2em;
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 4px;
}
.address-card p {
margin: 6px 0;
line-height: 1.4;
}
.items-section {
background: var(--background-fill-primary);
border-radius: 8px;
padding: 16px;
}
.items-section h2 {
color: var(--color-accent);
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 8px;
margin-bottom: 16px;
}
.items-section h3 {
background: var(--background-fill-secondary);
padding: 8px 12px;
border-radius: 6px;
margin: 16px 0 8px 0;
border-left: 4px solid var(--color-accent);
}
.items-section hr {
border: none;
height: 1px;
background: var(--border-color-primary);
margin: 16px 0;
}
.stats-section {
background: var(--background-fill-primary);
border-radius: 8px;
padding: 16px;
}
.stats-section h2 {
color: var(--color-accent);
border-bottom: 2px solid var(--border-color-accent);
padding-bottom: 8px;
margin-bottom: 16px;
}
""",
) as demo:
gr.Markdown("# πŸ“§ Email Order Processor")
gr.Markdown(
"""
Upload a **.msg** email file containing purchase order information to extract structured data.
**Instructions:**
1. Enter your API key below
2. Select a `.msg` file (Outlook email format)
3. Click 'Process Email' to analyze the file
4. View the extracted purchase order data and original PDF side by side
"""
)
# API Configuration Section (Simplified for external users)
with gr.Row(), gr.Column(elem_classes=["api-config"]):
gr.Markdown("### πŸ” Authentication")
gr.Markdown("*Connected to: email-order-processor-04i60gfy4hofo.apigateway.ax-logistics-ai.cloud.goog*")
# Hidden URL field that always contains the hardcoded API Gateway URL
api_base_url = gr.Textbox(value=API_GATEWAY_URL, visible=False)
# API Key input - always visible for external users
api_key_input = gr.Textbox(
label="API Key",
type="password",
placeholder="Enter your API key to access the service",
info="Please enter the API key provided by your administrator",
elem_classes=["api-key-input"],
)
# Show auth status
def update_auth_status(api_key=None):
if api_key and api_key.strip():
return "πŸ” Authentication: **Active** βœ…"
else:
return "⚠️ **API key required** - Please enter your API key above"
auth_status = gr.Markdown("", elem_classes=["status-text"])
# Update auth status when API key changes
api_key_input.change(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status])
# Set initial auth status
demo.load(fn=update_auth_status, inputs=[api_key_input], outputs=[auth_status])
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Upload Email File")
msg_file = gr.File(
label="Email Message File (.msg)",
file_types=[".msg"],
file_count="single",
elem_classes=["upload-area"],
)
process_btn = gr.Button("πŸš€ Process Email", variant="primary", size="lg")
status_text = gr.Markdown("", elem_classes=["status-text"])
with gr.Column(scale=2):
gr.Markdown("### Raw API Response")
result_json = gr.JSON(label="Purchase Order Information", show_label=False)
# Side-by-side comparison section
with gr.Row():
with gr.Column(scale=1, elem_classes=["comparison-container"]):
gr.Markdown("### πŸ“„ Original PDF Document")
pdf_viewer = PDF(label="Purchase Order PDF", height=600, elem_classes=["pdf-container"])
with gr.Column(scale=1, elem_classes=["comparison-container"]):
gr.Markdown("### πŸ“Š Extracted Data Summary")
# Create formatted display components
with gr.Accordion("🏒 Address Information", open=True):
address_display = gr.Markdown(label="Address Details")
with gr.Accordion("πŸ“¦ Items Information", open=True):
items_display = gr.Markdown(label="Items Details")
with gr.Accordion("πŸ” Processing Statistics", open=False):
stats_display = gr.Markdown("No data processed yet")
# Event handlers - simplified for external users
def format_extracted_data(json_result):
"""Format the extracted data for better display."""
if not json_result:
return None, None, "No data processed yet"
try:
# json_result is already a dict from Gradio JSON component
data = json.loads(json_result) if isinstance(json_result, str) else json_result
# Extract address information
address_info = data.get("address", {})
billing_addr = address_info.get("billing_address", {})
shipping_addr = address_info.get("shipping_address", {})
# Format addresses as readable cards
def format_address_card(addr_data, title):
if not addr_data:
return f"<div class='address-card'><h2>{title}</h2><p><em>No data available</em></p></div>"
lines = ["<div class='address-card'>", f"<h2>{title}</h2>"]
if addr_data.get("company_name"):
lines.append(f"<p><strong>🏒 Company:</strong> {addr_data['company_name']}</p>")
if addr_data.get("department"):
lines.append(f"<p><strong>🏬 Department:</strong> {addr_data['department']}</p>")
if addr_data.get("contact_person"):
lines.append(f"<p><strong>πŸ‘€ Contact:</strong> {addr_data['contact_person']}</p>")
# Build address line
address_parts = []
if addr_data.get("street_address"):
address_parts.append(addr_data["street_address"])
city_parts = []
if addr_data.get("postal_code"):
city_parts.append(addr_data["postal_code"])
if addr_data.get("city"):
city_parts.append(addr_data["city"])
if city_parts:
address_parts.append(" ".join(city_parts))
if addr_data.get("country"):
address_parts.append(addr_data["country"])
if address_parts:
lines.append("<p><strong>πŸ“ Address:</strong></p>")
lines.append("<p style='margin-left: 20px; font-style: italic;'>")
lines.append("<br>".join(address_parts))
lines.append("</p>")
if addr_data.get("phone"):
lines.append(f"<p><strong>πŸ“ž Phone:</strong> {addr_data['phone']}</p>")
if addr_data.get("email"):
lines.append(
f"<p><strong>πŸ“§ Email:</strong> <a href='mailto:{addr_data['email']}'>{addr_data['email']}</a></p>"
)
# Add delivery instructions for shipping address
if title == "Shipping Address" and addr_data.get("delivery_instructions"):
lines.append(
f"<p><strong>πŸ“¦ Delivery Instructions:</strong> <em>{addr_data['delivery_instructions']}</em></p>"
)
lines.append("</div>")
return "\n".join(lines)
billing_display = format_address_card(billing_addr, "Billing Address")
shipping_display = format_address_card(shipping_addr, "Shipping Address")
address_display = f"{billing_display}\n\n{shipping_display}"
# Extract and format items information
items_info = data.get("items", [])
def format_items_table(items):
if not items:
return "<div class='items-section'><h2>πŸ“¦ Order Items</h2><p><em>No items found</em></p></div>"
lines = ["<div class='items-section'>", "<h2>πŸ“¦ Order Items</h2>"]
for i, item in enumerate(items, 1):
lines.append(f"<h3>Item {i}: {item.get('name', 'Unknown Item')}</h3>")
lines.append("<div style='margin-left: 15px;'>")
# Basic item info in a more structured format
info_items = []
if item.get("description"):
info_items.append(f"<p><strong>Description:</strong> {item['description']}</p>")
info_items.append(
f"<p><strong>Quantity:</strong> <span style='font-size: 1.1em; color: var(--color-accent);'>{item.get('quantity', 'N/A')}</span></p>"
)
if item.get("price_per_unit"):
info_items.append(
f"<p><strong>Price per unit:</strong> <span style='color: var(--color-accent);'>{item['price_per_unit']:.2f}</span></p>"
)
if item.get("total_price"):
info_items.append(
f"<p><strong>Total price:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{item['total_price']:.2f}</span></p>"
)
lines.extend(info_items)
# Database matching info with better styling
db_details = item.get("db_details")
if db_details:
confidence = item.get("db_match_confidence", "unknown")
confidence_emoji = {"exact": "βœ…", "high": "🟒", "medium": "🟑", "low": "🟠"}.get(
confidence.lower(), "❓"
)
confidence_color = {
"exact": "#28a745",
"high": "#17a2b8",
"medium": "#ffc107",
"low": "#fd7e14",
}.get(confidence.lower(), "#6c757d")
lines.append(
f"<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid {confidence_color};'>"
)
lines.append(
f"<p><strong>{confidence_emoji} Database Match:</strong> <span style='color: {confidence_color}; font-weight: bold;'>{confidence.upper()}</span></p>"
)
lines.append(
f"<p><strong>Product Number:</strong> <code>{db_details.get('product_number', 'N/A')}</code></p>"
)
lines.append(
f"<p><strong>Manufacturer:</strong> {db_details.get('manufacturer_name', 'N/A')}</p>"
)
lines.append(
f"<p><strong>Manufacturer Number:</strong> <code>{db_details.get('manufacturer_number', 'N/A')}</code></p>"
)
if item.get("db_match_reason"):
lines.append(
f"<p><strong>Match Reason:</strong> <em>{item['db_match_reason']}</em></p>"
)
lines.append("</div>")
else:
lines.append(
"<div style='background: #f8d7da; padding: 12px; border-radius: 6px; margin: 8px 0; border-left: 4px solid #dc3545;'>"
)
lines.append(
"<p><strong>❌ Database Match:</strong> <span style='color: #dc3545; font-weight: bold;'>Not found</span></p>"
)
lines.append("</div>")
lines.append("</div>")
if i < len(items): # Don't add separator after last item
lines.append("<hr>")
lines.append("</div>")
return "\n".join(lines)
items_display = format_items_table(items_info)
# Generate statistics with better formatting
total_items = len(items_info)
items_with_db_match = len([item for item in items_info if item.get("db_details")])
match_rate = f"{(items_with_db_match / total_items * 100):.1f}%" if total_items > 0 else "0%"
# Calculate totals
total_quantity = sum(item.get("quantity", 0) for item in items_info)
total_value = sum(item.get("total_price", 0) for item in items_info if item.get("total_price"))
stats_text = f"""
<div class='stats-section'>
<h2>πŸ“Š Processing Summary</h2>
<div style='display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 16px;'>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>πŸ“¦ Items Processing</h4>
<p><strong>Total items extracted:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_items}</span></p>
<p><strong>Items matched in database:</strong> <span style='color: #28a745;'>{items_with_db_match}</span></p>
<p><strong>Database match rate:</strong> <span style='font-weight: bold; color: {"#28a745" if float(match_rate.rstrip("%")) > 70 else "#ffc107" if float(match_rate.rstrip("%")) > 30 else "#dc3545"};'>{match_rate}</span></p>
</div>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>πŸ“ˆ Order Summary</h4>
<p><strong>Total quantity:</strong> <span style='font-size: 1.2em; color: var(--color-accent);'>{total_quantity}</span> units</p>
<p><strong>Total value:</strong> <span style='font-weight: bold; color: var(--color-accent);'>{total_value:.2f}</span></p>
<p><em style='font-size: 0.9em; color: var(--text-color-subdued);'>(if prices available)</em></p>
</div>
<div style='background: var(--background-fill-secondary); padding: 12px; border-radius: 8px;'>
<h4 style='margin-top: 0; color: var(--color-accent);'>🏒 Companies</h4>
<p><strong>Billing:</strong> {billing_addr.get("company_name", "N/A")}</p>
<p><strong>Shipping:</strong> {shipping_addr.get("company_name", "N/A")}</p>
<p><strong>Country:</strong> {billing_addr.get("country") or shipping_addr.get("country", "N/A")}</p>
</div>
</div>
<div style='text-align: center; margin-top: 20px; padding: 12px; background: #d4edda; border-radius: 8px; border: 1px solid #c3e6cb;'>
<p style='margin: 0; color: #155724; font-weight: bold;'>βœ… Processing Status: Complete</p>
</div>
</div>
"""
return address_display, items_display, stats_text
except Exception as e:
return None, None, f"Error parsing extracted data: {str(e)}"
# Wire up the events
process_btn.click(
fn=handle_file_upload,
inputs=[msg_file, api_base_url, api_key_input],
outputs=[result_json, pdf_viewer, status_text],
show_progress="full",
)
# Update the formatted displays when result changes
result_json.change(
fn=format_extracted_data, inputs=[result_json], outputs=[address_display, items_display, stats_display]
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.launch()