AP_AGENT / app.py
Seth0330's picture
Update app.py
498d8b2 verified
import streamlit as st
import requests
import json
import re
import os
import time
import mimetypes
import pandas as pd
from langchain_community.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, Tool, AgentType
from fuzzywuzzy import fuzz
# --- Streamlit Page Settings ---
st.set_page_config(page_title="EZOFIS Accounts Payable Agent", layout="wide")
# --- Styles for SaaS Feel ---
st.markdown("""
<style>
.block-card {
background: #fff;
border-radius: 20px;
box-shadow: 0 2px 16px rgba(25,39,64,0.05);
padding: 32px 26px 24px 26px;
margin-bottom: 24px;
}
.step-num {
background: #A020F0;
color: #fff;
border-radius: 999px;
padding: 6px 13px;
font-weight: 700;
margin-right: 14px;
font-size: 20px;
display: inline-block;
vertical-align: middle;
}
.stButton>button {
background: #A020F0 !important;
color: white !important;
border-radius: 12px !important;
padding: 10px 32px !important;
font-weight: 700;
border: none !important;
font-size: 18px !important;
margin-top: 12px !important;
}
.stSlider>div>div>div>div {
background: #F3F6FB !important;
border-radius: 999px;
}
.css-12w0qpk {padding-top: 0rem;}
.css-1kyxreq {padding-top: 0rem;}
</style>
""", unsafe_allow_html=True)
MODELS = {
"OpenAI GPT-4.1": {
"api_url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4-1106-preview",
"key_env": "OPENAI_API_KEY",
"response_format": None,
"extra_headers": {},
},
}
def get_api_key(model_choice):
key = os.getenv(MODELS[model_choice]["key_env"])
if not key:
st.error(f"❌ {MODELS[model_choice]['key_env']} not set")
st.stop()
return key
def query_llm(model_choice, prompt):
cfg = MODELS[model_choice]
headers = {
"Authorization": f"Bearer {get_api_key(model_choice)}",
"Content-Type": "application/json",
}
if cfg.get("extra_headers"):
headers.update(cfg["extra_headers"])
payload = {
"model": cfg["model"],
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2000,
}
if cfg.get("response_format"):
payload["response_format"] = cfg["response_format"]
try:
with st.spinner(f"🔍 Fine Tuning The Extracted Data..."):
r = requests.post(cfg["api_url"], headers=headers, json=payload, timeout=90)
if r.status_code != 200:
st.error(f"🚨 API Error {r.status_code}: {r.text}")
return None
content = r.json()["choices"][0]["message"]["content"]
st.session_state.last_api = content
st.session_state.last_raw = r.text
return content
except Exception as e:
st.error(f"Connection error: {e}")
return None
def clean_json_response(text):
if not text:
return None
orig = text
text = re.sub(r'```(?:json)?', '', text).strip()
start, end = text.find('{'), text.rfind('}') + 1
if start < 0 or end < 1:
st.error("Couldn't locate JSON in response.")
st.code(orig)
return None
frag = text[start:end]
frag = re.sub(r',\s*([}\]])', r'\1', frag)
try:
return json.loads(frag)
except json.JSONDecodeError as e:
repaired = re.sub(r'"\s*"\s*(?="[^"]+"\s*:)', '","', frag)
try:
return json.loads(repaired)
except json.JSONDecodeError:
st.error(f"JSON parse error: {e}")
st.code(frag)
return None
def fallback_supplier(text):
for line in text.splitlines():
line = line.strip()
if line:
return line
return None
def get_extraction_prompt(model_choice, txt):
return (
"You are an expert invoice parser. "
"Extract data according to the visible table structure and column headers in the invoice. "
"For every line item, only extract fields that correspond to the table columns for that row (do not include header/shipment fields in line items). "
"Merge all multi-line content within a single cell into that field (especially for the 'description' and 'notes'). "
"Shipment/invoice-level fields such as CAR NUMBER, SHIPPING POINT, SHIPMENT NUMBER, CURRENCY, etc., must go ONLY into the 'invoice_header', not as line item fields.\n"
"Use this schema:\n"
'{\n'
' "invoice_header": {\n'
' "car_number": "string or null",\n'
' "shipment_number": "string or null",\n'
' "shipping_point": "string or null",\n'
' "currency": "string or null",\n'
' "invoice_number": "string or null",\n'
' "invoice_date": "string or null",\n'
' "order_number": "string or null",\n'
' "customer_order_number": "string or null",\n'
' "our_order_number": "string or null",\n'
' "sales_order_number": "string or null",\n'
' "purchase_order_number": "string or null",\n'
' "order_date": "string or null",\n'
' "supplier_name": "string or null",\n'
' "supplier_address": "string or null",\n'
' "supplier_phone": "string or null",\n'
' "supplier_email": "string or null",\n'
' "supplier_tax_id": "string or null",\n'
' "customer_name": "string or null",\n'
' "customer_address": "string or null",\n'
' "customer_phone": "string or null",\n'
' "customer_email": "string or null",\n'
' "customer_tax_id": "string or null",\n'
' "ship_to_name": "string or null",\n'
' "ship_to_address": "string or null",\n'
' "bill_to_name": "string or null",\n'
' "bill_to_address": "string or null",\n'
' "remit_to_name": "string or null",\n'
' "remit_to_address": "string or null",\n'
' "tax_id": "string or null",\n'
' "tax_registration_number": "string or null",\n'
' "vat_number": "string or null",\n'
' "payment_terms": "string or null",\n'
' "payment_method": "string or null",\n'
' "payment_reference": "string or null",\n'
' "bank_account_number": "string or null",\n'
' "iban": "string or null",\n'
' "swift_code": "string or null",\n'
' "total_before_tax": "string or null",\n'
' "tax_amount": "string or null",\n'
' "tax_rate": "string or null",\n'
' "shipping_charges": "string or null",\n'
' "discount": "string or null",\n'
' "total_due": "string or null",\n'
' "amount_paid": "string or null",\n'
' "balance_due": "string or null",\n'
' "due_date": "string or null",\n'
' "invoice_status": "string or null",\n'
' "reference_number": "string or null",\n'
' "project_code": "string or null",\n'
' "department": "string or null",\n'
' "contact_person": "string or null",\n'
' "notes": "string or null",\n'
' "additional_info": "string or null"\n'
' },\n'
' "line_items": [\n'
' {\n'
' "quantity": "string or null",\n'
' "units": "string or null",\n'
' "description": "string or null",\n'
' "footage": "string or null",\n'
' "price": "string or null",\n'
' "amount": "string or null",\n'
' "notes": "string or null"\n'
' }\n'
' ]\n'
'}'
"\nIf a field is missing for a line item or header, use null. "
"Do not invent fields. Do not add any header or shipment data to any line item. Return ONLY the JSON object, no explanation.\n"
"\nInvoice Text:\n"
f"{txt}"
)
def ensure_total_due(invoice_header):
if invoice_header.get("total_due") in [None, ""]:
for field in ["invoice_total", "invoice_value", "total_before_tax", "balance_due", "amount_paid"]:
if field in invoice_header and invoice_header[field]:
invoice_header["total_due"] = invoice_header[field]
break
return invoice_header
def clean_num(val):
if val is None:
return None
if isinstance(val, (int, float)):
return float(val)
matches = re.findall(r"[-+]?\d[\d,]*\.?\d*", str(val))
if matches:
cleaned = [m.replace(',', '') for m in matches if m]
as_floats = [float(c) for c in cleaned if c.replace('.', '', 1).isdigit()]
if as_floats:
return max(as_floats)
return None
def weighted_fuzzy_score(s1, s2):
if not s1 and not s2:
return 100
return fuzz.token_set_ratio(str(s1).lower(), str(s2).lower())
def find_po_number_in_json(po_number, invoice_json):
def _flatten(obj):
fields = []
if isinstance(obj, dict):
for v in obj.values():
fields.extend(_flatten(v))
elif isinstance(obj, list):
for item in obj:
fields.extend(_flatten(item))
elif obj is not None:
fields.append(str(obj))
return fields
po_str = str(po_number).strip().replace(" ", "").replace(".0", "")
try:
po_int = str(int(float(po_number)))
except:
po_int = po_str
all_strs = [str(s).strip().replace(" ", "").replace(".0", "") for s in _flatten(invoice_json)]
for s in all_strs:
if not s:
continue
if po_str and (po_str in s or s in po_str):
return True
if po_int and (po_int in s or s in po_int):
return True
return False
def find_best_po_match(inv, po_df, weight_supplier, weight_po_number, weight_currency, weight_total_due, weight_line_item):
inv_hdr = inv["invoice_header"]
inv_supplier = inv_hdr.get("supplier_name") or ""
inv_po_number = inv_hdr.get("purchase_order_number") or inv_hdr.get("po_number") or inv_hdr.get("order_number") or ""
inv_currency = inv_hdr.get("currency") or ""
inv_total_due = clean_num(inv_hdr.get("total_due"))
inv_line_items = inv.get("line_items", [])
scores = []
for idx, row in po_df.iterrows():
po_supplier = row.get("Supplier Name", "")
po_po_number = str(row.get("PO Number", ""))
po_currency = row.get("Currency", "")
po_total = clean_num(row.get("PO Total Value", ""))
po_desc = row.get("Item Description", "")
po_qty = str(row.get("Item Quantity", ""))
po_unit = str(row.get("Item Unit Price", ""))
po_line_total = clean_num(row.get("Line Item Total", ""))
field_details = []
s_supplier = weighted_fuzzy_score(inv_supplier, po_supplier)
field_details.append({
"field": "Supplier Name",
"invoice": inv_supplier,
"po": po_supplier,
"score": s_supplier
})
s_po_number = 100 if find_po_number_in_json(po_po_number, inv) else 0
field_details.append({
"field": "PO Number (anywhere in JSON)",
"invoice": "found" if s_po_number else "not found",
"po": po_po_number,
"score": s_po_number
})
s_currency = weighted_fuzzy_score(inv_currency, po_currency)
field_details.append({
"field": "Currency",
"invoice": inv_currency,
"po": po_currency,
"score": s_currency
})
s_total = 100 if inv_total_due is not None and po_total is not None and abs(inv_total_due - po_total) < 2 else 0
field_details.append({
"field": "Total Due",
"invoice": inv_total_due,
"po": po_total,
"score": s_total
})
# Line item logic as before
line_item_score = 0
line_reason = ""
best_line_detail = None
for line in inv_line_items:
desc_score = weighted_fuzzy_score(line.get("description", ""), po_desc)
qty_score = 100 if clean_num(line.get("quantity")) == clean_num(po_qty) else 0
unit_score = 100 if clean_num(line.get("price")) == clean_num(po_unit) else 0
amount_score = 100 if clean_num(line.get("amount")) == po_line_total else 0
total = desc_score * 0.5 + qty_score * 0.2 + unit_score * 0.15 + amount_score * 0.15
detail = {
"field": "Line Item",
"invoice": {
"description": line.get("description", ""),
"quantity": line.get("quantity", ""),
"price": line.get("price", ""),
"amount": line.get("amount", ""),
},
"po": {
"description": po_desc,
"quantity": po_qty,
"price": po_unit,
"amount": po_line_total,
},
"desc_score": desc_score,
"qty_score": qty_score,
"unit_score": unit_score,
"amount_score": amount_score,
"line_item_score": total
}
if total > line_item_score:
line_item_score = total
best_line_detail = detail
line_reason = (
f"Best line item: desc_score={desc_score}, qty_score={qty_score}, "
f"unit_score={unit_score}, amount_score={amount_score}"
)
wsum = weight_supplier + weight_po_number + weight_currency + weight_total_due + weight_line_item
total_score = (
s_supplier * weight_supplier/100 +
s_po_number * weight_po_number/100 +
s_currency * weight_currency/100 +
s_total * weight_total_due/100 +
line_item_score * weight_line_item/100
) if wsum == 100 else 0
reason = (
f"Supplier match: {s_supplier}/100 (invoice: '{inv_supplier}' vs PO: '{po_supplier}'), "
f"PO Number: {s_po_number}/100 ({'found anywhere in JSON' if s_po_number else 'not found'}), "
f"Currency: {s_currency}/100 (invoice: '{inv_currency}' vs PO: '{po_currency}'), "
f"Total Due: {'match' if s_total else 'no match'} (invoice: {inv_total_due} vs PO: {po_total}), "
f"Line item best match: {int(line_item_score)}/100. {line_reason}"
)
debug = {
"po_idx": idx,
"po_supplier": po_supplier,
"po_po_number": po_po_number,
"po_total": po_total,
"scores": field_details,
"line_item_score": line_item_score,
"best_line_detail": best_line_detail,
"total_score": total_score,
"line_reason": line_reason,
"inv_total_due": inv_total_due
}
scores.append((row, total_score, reason, debug))
scores.sort(key=lambda tup: tup[1], reverse=True)
if not scores:
return None, 0, "No POs found.", {}
best_row, best_score, reason, debug = scores[0]
return best_row, best_score, reason, debug
def extract_invoice_info(model_choice, text):
prompt = get_extraction_prompt(model_choice, text)
raw = query_llm(model_choice, prompt)
if not raw:
return None
data = clean_json_response(raw)
if not data:
return None
hdr = data.get("invoice_header", {})
if not hdr and any(k in data for k in ("invoice_number","supplier_name","customer_name")):
hdr = data
for k in ("invoice_number","invoice_date","po_number","invoice_value","supplier_name","customer_name"):
hdr.setdefault(k, None)
if not hdr.get("supplier_name"):
hdr["supplier_name"] = fallback_supplier(text)
hdr = ensure_total_due(hdr)
items = data.get("line_items", [])
if not isinstance(items, list):
items = []
for itm in items:
if not isinstance(itm, dict):
continue
for k in ("item_number","description","quantity","unit_price","total_price"):
itm.setdefault(k, None)
return {"invoice_header": hdr, "line_items": items}
def get_content_type(filename):
mime, _ = mimetypes.guess_type(filename)
ext = filename.lower().split('.')[-1]
if ext == "pdf":
return "text/plain"
if mime is None:
return "application/octet-stream"
return mime
UNSTRACT_BASE = "https://llmwhisperer-api.us-central.unstract.com/api/v2"
UNSTRACT_API_KEY = os.getenv("UNSTRACT_API_KEY")
def extract_text_from_unstract(uploaded_file):
filename = getattr(uploaded_file, "name", "uploaded_file")
file_bytes = uploaded_file.read()
content_type = get_content_type(filename)
headers = {
"unstract-key": UNSTRACT_API_KEY,
"Content-Type": content_type,
}
url = f"{UNSTRACT_BASE}/whisper"
with st.spinner("Uploading and processing document with EZOFIS AI OCR AGENT..."):
r = requests.post(url, headers=headers, data=file_bytes)
if r.status_code != 202:
st.error(f"Unstract: Error uploading file: {r.status_code} - {r.text}")
return None
whisper_hash = r.json().get("whisper_hash")
if not whisper_hash:
st.error("Unstract: No whisper_hash received.")
return None
status_url = f"{UNSTRACT_BASE}/whisper-status?whisper_hash={whisper_hash}"
status_placeholder = st.empty()
for i in range(30):
status_r = requests.get(status_url, headers={"unstract-key": UNSTRACT_API_KEY})
if status_r.status_code != 200:
st.error(f"Unstract: Error checking status: {status_r.status_code} - {status_r.text}")
return None
status = status_r.json().get("status")
if status == "processed":
status_placeholder.info("EZOFIS AI OCR AGENT STATUS: processed! 🎉")
break
status_placeholder.info(f"EZOFIS AI OCR AGENT STATUS: {status or 'waiting'}... ({i+1})")
time.sleep(2)
else:
status_placeholder.error("Unstract: Timeout waiting for OCR to finish.")
return None
retrieve_url = f"{UNSTRACT_BASE}/whisper-retrieve?whisper_hash={whisper_hash}&text_only=true"
r = requests.get(retrieve_url, headers={"unstract-key": UNSTRACT_API_KEY})
if r.status_code != 200:
st.error(f"Unstract: Error retrieving extracted text: {r.status_code} - {r.text}")
return None
try:
data = r.json()
return data.get("result_text") or r.text
except Exception:
return r.text
# ---------------- UI LAYOUT ----------------------
st.markdown(
"<h1 style='font-weight:800; margin-bottom:8px;'>EZOFIS Accounts Payable Agent</h1>",
unsafe_allow_html=True
)
st.markdown(
"<div style='font-size:20px; margin-bottom:28px; color:#24345C;'>Modern workflow automation for finance teams</div>",
unsafe_allow_html=True
)
# ---- Three columns layout for horizontal flow
col1, col2, col3 = st.columns([2,2,3])
# ---- Step 1: Upload POs (col1) ----
with col1:
st.markdown("<span class='step-num'>1</span> <b>Upload Active Purchase Orders (POs)</b>", unsafe_allow_html=True)
po_file = st.file_uploader(
"CSV with PO number, Supplier, Items, etc.",
type=["csv"],
key="po_csv",
label_visibility="collapsed"
)
po_df = None
if po_file:
po_df = pd.read_csv(po_file)
st.success(f"Loaded {len(po_df)} records from uploaded CSV.")
st.session_state['last_po_df'] = po_df
# ---- Step 2: Scoring Weights (col1) ----
with col1:
st.markdown("<span class='step-num'>2</span> <b>Configure Scoring Weights</b>", unsafe_allow_html=True)
st.markdown("Set weights for matching. Total must equal 100%.", unsafe_allow_html=True)
def int_slider(label, value, key):
return st.slider(label, 0, 100, value, 1, key=key, format="%d")
weight_supplier = int_slider("Supplier Name (%)", 25, "w_supplier")
weight_po_number = int_slider("PO Number (%)", 25, "w_po")
weight_currency = int_slider("Currency (%)", 10, "w_curr")
weight_total_due = int_slider("Total Due (%)", 20, "w_due")
weight_line_item = int_slider("Line Item (%)", 20, "w_line")
weight_sum = weight_supplier + weight_po_number + weight_currency + weight_total_due + weight_line_item
if weight_sum != 100:
st.warning(f"Sum of weights is {weight_sum}%. Adjust so it equals 100%.")
st.markdown("<span class='step-num'>3</span> <b>Set Decision Thresholds</b>", unsafe_allow_html=True)
approved_threshold = st.slider("Threshold for 'APPROVED'", min_value=0, max_value=100, value=85, format="%d")
partial_threshold = st.slider("Threshold for 'PARTIALLY APPROVED'", min_value=0, max_value=approved_threshold-1, value=70, format="%d")
# ---- Step 4: Upload Invoice (col2) ----
with col2:
st.markdown("<span class='step-num'>4</span> <b>Upload Invoice/Document</b>", unsafe_allow_html=True)
inv_file = st.file_uploader(
"Upload PDF, DOCX, XLSX, PNG, JPG, TIFF",
type=["pdf", "docx", "xlsx", "xls", "png", "jpg", "jpeg", "tiff"],
key="invoice_file",
label_visibility="collapsed"
)
# ---- Step 5: Extract Data (col2) ----
with col2:
st.markdown("<span class='step-num'>5</span> <b>Extract Data</b>", unsafe_allow_html=True)
if st.button("Extract"):
if inv_file:
with st.spinner("Extracting text from document..."):
text = extract_text_from_unstract(inv_file)
if text:
mdl = "OpenAI GPT-4.1"
extracted_info = extract_invoice_info(mdl, text)
if extracted_info:
if "invoice_header" in extracted_info:
extracted_info["invoice_header"] = ensure_total_due(extracted_info["invoice_header"])
st.success("Extraction Complete")
st.session_state['last_extracted_info'] = extracted_info
else:
st.warning("Please upload an invoice/document first.")
# ---- Step 6: AP Agent Decision (col3) ----
with col3:
st.markdown("<span class='step-num'>6</span> <b>AP Agent Decision</b>", unsafe_allow_html=True)
if st.button("Make a decision (EZOFIS AP AGENT)"):
extracted_info = st.session_state.get('last_extracted_info', None)
po_df = st.session_state.get('last_po_df', None)
if extracted_info is not None and po_df is not None:
def po_match_tool_func(input_text):
invoice = st.session_state.get("last_extracted_info")
po_df = st.session_state.get("last_po_df")
if invoice is None or po_df is None:
return json.dumps({
"decision": "REJECTED",
"reason": "Invoice or PO data not found.",
"debug": {},
})
best_row, best_score, reason, debug = find_best_po_match(
invoice, po_df, weight_supplier, weight_po_number, weight_currency, weight_total_due, weight_line_item
)
if best_score > approved_threshold:
status = "APPROVED"
elif best_score > partial_threshold:
status = "PARTIALLY APPROVED"
else:
status = "REJECTED"
return json.dumps({
"decision": status,
"reason": f"Best match score: {int(best_score)}/100. {reason}",
"debug": debug,
"po_row": best_row.to_dict() if best_row is not None else None
})
tools = [
Tool(
name="po_match_tool",
func=po_match_tool_func,
description="Smartly match invoice to PO using all possible fields.",
)
]
decision_llm = ChatOpenAI(
openai_api_key=get_api_key("OpenAI GPT-4.1"),
model=MODELS["OpenAI GPT-4.1"]["model"],
temperature=0,
streaming=False,
)
agent = initialize_agent(
tools,
decision_llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
prompt = (
"You are an expert accounts payable agent. "
"Use po_match_tool to check for the best possible match using supplier, PO number (which may appear anywhere in the invoice JSON, even within other fields), currency, line items, and total value. "
"Weigh the importance of each field as an expert would, according to the user-configured weights. "
"Return a JSON with decision (APPROVED, PARTIALLY APPROVED, REJECTED), reason (include field scores and reasoning), debug, and the best matched PO row.\n"
f"Invoice JSON:\n{json.dumps(extracted_info, indent=2)}"
)
with st.spinner("AI is reasoning and making a decision..."):
result = agent.run(prompt)
# Always display debug/info
st.markdown("<h3 style='margin-top:18px;'>AI Decision & Reason</h3>", unsafe_allow_html=True)
try:
result_json = json.loads(result)
st.write(f"**Decision:** {result_json.get('decision', 'N/A')}")
st.write(f"**Reason:** {result_json.get('reason', 'N/A')}")
st.markdown("##### Debug & Matching Details")
st.json(result_json.get('debug'))
st.markdown("##### Extracted Invoice JSON")
st.json(extracted_info)
st.markdown("##### Matched PO Row")
st.json(result_json.get('po_row'))
except Exception:
st.subheader("AI Decision & Reason")
st.write(result)
# Always show extraction/decision debug in full for troubleshooting
if "last_api" in st.session_state:
with st.expander("Debug"):
st.code(st.session_state.last_api)
st.code(st.session_state.last_raw)