formflow / app.py
Faraz618's picture
Create app.py
3f6a646 verified
Raw
History Blame Contribute Delete
13.2 kB
"""
FormFlow β€” Agentic Document-to-Action Pipeline with Human-in-the-Loop Approval
================================================================================
Three small, inspectable agents, chained together, with a mandatory human
approval gate before anything is "actioned":
1. Extraction Agent -> pulls structured fields out of a document
2. Risk Assessment Agent -> runs explicit, visible rules over those fields
3. Decision Agent -> proposes Approve / Flag for Review / Reject
Nothing is ever auto-executed. The human always clicks to confirm or override.
No GPU required. No embedding model. No external API calls. Pure Python + regex
for extraction and scoring, by design β€” every decision is traceable.
"""
import re
from datetime import datetime
import gradio as gr
from pypdf import PdfReader
# -----------------------------------------------------------------------
# In-memory audit log (resets each session β€” see README "What I'd build next"
# for the note on persisting this to a real database in a production version)
# -----------------------------------------------------------------------
AUDIT_LOG = []
# -----------------------------------------------------------------------
# Step 0: Document loading
# -----------------------------------------------------------------------
def extract_text(file_path: str) -> str:
if file_path.lower().endswith(".pdf"):
reader = PdfReader(file_path)
return "\n".join(page.extract_text() or "" for page in reader.pages)
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
# -----------------------------------------------------------------------
# Step 1: Extraction Agent
# -----------------------------------------------------------------------
def extraction_agent(text: str) -> dict:
"""
Rule-based structured field extraction. Deliberately regex/pattern-based
rather than an LLM call, so every extracted value is directly traceable
to a specific pattern in the source text. See README for the note on
swapping this for an LLM-based extractor on messier documents.
"""
fields = {}
def find(pattern, default="Not found", group=1, flags=re.IGNORECASE):
m = re.search(pattern, text, flags)
return m.group(group).strip() if m else default
fields["vendor"] = find(r"From:\s*(.+)")
fields["invoice_number"] = find(r"Invoice Number:\s*([A-Za-z0-9\-]+)")
fields["invoice_date"] = find(r"Invoice Date:\s*([\d\-/]+)")
fields["due_date"] = find(r"Due Date:\s*([\d\-/]+)")
fields["payment_terms"] = find(r"Payment Terms:\s*(.+)")
fields["total_due_raw"] = find(r"TOTAL DUE:\s*\$?([\d,]+\.\d{2})")
fields["payment_method"] = find(r"Payment Method:\s*(.+)")
fields["account_name"] = find(r"Account Name:\s*(.+)")
fields["bank"] = find(r"Bank:\s*(.+)")
fields["notes"] = find(r"Notes:\s*(.+?)(?:\n\n|\Z)", flags=re.IGNORECASE | re.DOTALL)
# parse amount to a float for downstream risk scoring
try:
fields["total_due"] = float(fields["total_due_raw"].replace(",", ""))
except (ValueError, AttributeError):
fields["total_due"] = None
# parse due date and invoice date to estimate the payment window in days
fields["days_to_pay"] = None
try:
d1 = datetime.strptime(fields["invoice_date"], "%Y-%m-%d")
d2 = datetime.strptime(fields["due_date"], "%Y-%m-%d")
fields["days_to_pay"] = (d2 - d1).days
except (ValueError, TypeError):
pass
return fields
# -----------------------------------------------------------------------
# Step 2: Risk Assessment Agent β€” explicit, inspectable rules
# -----------------------------------------------------------------------
RISK_RULES = [
{
"name": "Large amount",
"check": lambda f: f["total_due"] is not None and f["total_due"] > 20000,
"weight": 2,
"explain": "Invoice total exceeds $20,000 β€” above standard auto-approval threshold.",
},
{
"name": "Very short payment window",
"check": lambda f: f["days_to_pay"] is not None and f["days_to_pay"] <= 2,
"weight": 3,
"explain": "Payment is due within 2 days or less of the invoice date β€” unusually urgent for a standard vendor invoice.",
},
{
"name": "Wire-transfer-only payment",
"check": lambda f: "wire" in f["payment_method"].lower() and "only" in f["payment_method"].lower(),
"weight": 2,
"explain": "Payment method explicitly restricts to wire transfer only β€” a common fraud indicator, since wires are hard to reverse.",
},
{
"name": "Personal-sounding account name",
"check": lambda f: bool(re.search(r"\bpersonal\b", f["account_name"], re.IGNORECASE)),
"weight": 3,
"explain": "Receiving account name is explicitly marked as a personal account rather than a registered business account.",
},
{
"name": "Urgency / avoid-contact language",
"check": lambda f: bool(re.search(r"urgent|time-sensitive|do not contact|email only", f["notes"], re.IGNORECASE)),
"weight": 3,
"explain": "Notes field contains urgency or avoid-contact language β€” a classic social-engineering / invoice fraud pattern.",
},
{
"name": "Missing standard fields",
"check": lambda f: f["total_due"] is None or f["vendor"] == "Not found",
"weight": 1,
"explain": "One or more standard invoice fields (vendor, total due) could not be confidently extracted.",
},
{
"name": "Offshore / unregistered bank",
"check": lambda f: bool(re.search(r"offshore", f["bank"], re.IGNORECASE)),
"weight": 2,
"explain": "Receiving bank is described as offshore β€” higher scrutiny warranted for cross-border, less-traceable transfers.",
},
]
def risk_assessment_agent(fields: dict):
triggered = []
score = 0
for rule in RISK_RULES:
try:
if rule["check"](fields):
triggered.append(rule)
score += rule["weight"]
except Exception:
continue # a rule failing to evaluate (e.g. missing field) just doesn't fire
return score, triggered
# -----------------------------------------------------------------------
# Step 3: Decision Agent β€” proposes, never executes
# -----------------------------------------------------------------------
def decision_agent(score: int):
if score == 0:
return "βœ… APPROVE", "No risk indicators triggered. Recommended for standard approval."
elif score <= 3:
return "🟑 FLAG FOR MANUAL REVIEW", "Some risk indicators present. Recommend a second pair of eyes before approval."
else:
return "πŸ”΄ REJECT / ESCALATE", "Multiple strong risk indicators triggered. Recommend escalation to fraud/compliance review, not standard approval."
# -----------------------------------------------------------------------
# Orchestration
# -----------------------------------------------------------------------
def process_document(file_obj, sample_choice):
try:
if file_obj is not None:
text = extract_text(file_obj.name)
doc_label = f"Uploaded file: {file_obj.name.split('/')[-1]}"
elif sample_choice == "Sample 2: Suspicious Invoice":
text = extract_text("sample_invoice_suspicious.txt")
doc_label = "Sample 2: Suspicious Invoice"
else:
text = extract_text("sample_invoice.txt")
doc_label = "Sample 1: Normal Invoice"
except Exception as e:
return (f"❌ Could not read document: {e}", "", "", "", gr.update(visible=False), gr.update(visible=False), doc_label)
fields = extraction_agent(text)
score, triggered = risk_assessment_agent(fields)
decision, decision_explain = decision_agent(score)
# --- Extracted fields display ---
fields_md = "### πŸ“‹ Extraction Agent β€” Structured Fields\n"
fields_md += f"- **Vendor:** {fields['vendor']}\n"
fields_md += f"- **Invoice #:** {fields['invoice_number']}\n"
fields_md += f"- **Invoice date:** {fields['invoice_date']}\n"
fields_md += f"- **Due date:** {fields['due_date']}\n"
fields_md += f"- **Days to pay:** {fields['days_to_pay'] if fields['days_to_pay'] is not None else 'Could not calculate'}\n"
fields_md += f"- **Total due:** ${fields['total_due']:,.2f}" if fields["total_due"] is not None else "- **Total due:** Not found"
fields_md += f"\n- **Payment method:** {fields['payment_method']}\n"
fields_md += f"- **Account name:** {fields['account_name']}\n"
fields_md += f"- **Bank:** {fields['bank']}\n"
# --- Risk assessment display ---
risk_md = f"### ⚠️ Risk Assessment Agent β€” Score: {score}\n"
if triggered:
risk_md += f"**{len(triggered)} risk indicator(s) triggered:**\n\n"
for rule in triggered:
risk_md += f"- **{rule['name']}** (weight {rule['weight']}): {rule['explain']}\n"
else:
risk_md += "No risk indicators triggered.\n"
# --- Decision display ---
decision_md = f"### πŸ€– Decision Agent β€” Proposed Action\n## {decision}\n\n{decision_explain}\n\n"
decision_md += "**⏸️ This is a proposal only. Nothing has been logged or executed yet β€” confirm or override below.**"
return (
fields_md,
risk_md,
decision_md,
decision, # stored for the confirm/override buttons
gr.update(visible=True), # show approval buttons
gr.update(visible=True), # show audit log section
doc_label,
)
def confirm_decision(decision_text, human_choice, doc_label):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if human_choice == "confirm":
entry = f"[{timestamp}] {doc_label} β€” AI recommended **{decision_text}** β†’ Human **CONFIRMED** the recommendation."
else:
entry = f"[{timestamp}] {doc_label} β€” AI recommended **{decision_text}** β†’ Human **OVERRODE** and approved anyway."
AUDIT_LOG.insert(0, entry)
log_display = "### πŸ“ Audit Log (this session)\n\n" + "\n\n".join(AUDIT_LOG)
return log_display
# -----------------------------------------------------------------------
# Gradio UI
# -----------------------------------------------------------------------
with gr.Blocks(title="FormFlow") as demo:
gr.Markdown(
"""
# 🧾 FormFlow β€” Agentic Document-to-Action Pipeline
Upload an invoice (or try the samples below). Watch three agents work in
sequence β€” **Extraction β†’ Risk Assessment β†’ Decision** β€” and notice that
the AI only ever *proposes* an action. A human has to click to confirm
or override before anything is logged.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Load a document")
sample_dropdown = gr.Dropdown(
choices=["Sample 1: Normal Invoice", "Sample 2: Suspicious Invoice"],
value="Sample 1: Normal Invoice",
label="Try a sample",
)
file_input = gr.File(label="...or upload your own .txt / .pdf", file_types=[".txt", ".pdf"])
run_btn = gr.Button("Run Pipeline", variant="primary")
doc_label_box = gr.Textbox(label="Currently loaded", interactive=False)
gr.Markdown(
"**Tip:** run Sample 1 first to see a clean approval, then run "
"Sample 2 to see the same pipeline catch five separate fraud "
"indicators on a very different-looking invoice."
)
with gr.Column(scale=2):
fields_output = gr.Markdown()
risk_output = gr.Markdown()
decision_output = gr.Markdown()
with gr.Row(visible=False) as approval_row:
confirm_btn = gr.Button("βœ… Confirm AI Recommendation", variant="primary")
override_btn = gr.Button("⚠️ Override and Approve Anyway", variant="stop")
with gr.Group(visible=False) as audit_group:
audit_output = gr.Markdown("### πŸ“ Audit Log (this session)\n\n_No actions confirmed yet._")
decision_state = gr.State("")
def run_and_store(file_obj, sample_choice):
fields_md, risk_md, decision_md, decision, btn_update, audit_update, doc_label = process_document(file_obj, sample_choice)
return fields_md, risk_md, decision_md, decision, btn_update, audit_update, doc_label
run_btn.click(
fn=run_and_store,
inputs=[file_input, sample_dropdown],
outputs=[fields_output, risk_output, decision_output, decision_state, approval_row, audit_group, doc_label_box],
)
confirm_btn.click(
fn=lambda decision, doc_label: confirm_decision(decision, "confirm", doc_label),
inputs=[decision_state, doc_label_box],
outputs=[audit_output],
)
override_btn.click(
fn=lambda decision, doc_label: confirm_decision(decision, "override", doc_label),
inputs=[decision_state, doc_label_box],
outputs=[audit_output],
)
if __name__ == "__main__":
demo.launch()