| """ |
| FormFlow β Agentic Document-to-Action Pipeline with Human-in-the-Loop Approval |
| ================================================================================ |
| Three small, inspectable agents, chained together, with a mandatory human |
| approval gate before anything is "actioned": |
| |
| 1. Extraction Agent -> pulls structured fields out of a document |
| 2. Risk Assessment Agent -> runs explicit, visible rules over those fields |
| 3. Decision Agent -> proposes Approve / Flag for Review / Reject |
| |
| Nothing is ever auto-executed. The human always clicks to confirm or override. |
| |
| No GPU required. No embedding model. No external API calls. Pure Python + regex |
| for extraction and scoring, by design β every decision is traceable. |
| """ |
|
|
| import re |
| from datetime import datetime |
| import gradio as gr |
| from pypdf import PdfReader |
|
|
| |
| |
| |
| |
| AUDIT_LOG = [] |
|
|
|
|
| |
| |
| |
| def extract_text(file_path: str) -> str: |
| if file_path.lower().endswith(".pdf"): |
| reader = PdfReader(file_path) |
| return "\n".join(page.extract_text() or "" for page in reader.pages) |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| return f.read() |
|
|
|
|
| |
| |
| |
| def extraction_agent(text: str) -> dict: |
| """ |
| Rule-based structured field extraction. Deliberately regex/pattern-based |
| rather than an LLM call, so every extracted value is directly traceable |
| to a specific pattern in the source text. See README for the note on |
| swapping this for an LLM-based extractor on messier documents. |
| """ |
| fields = {} |
|
|
| def find(pattern, default="Not found", group=1, flags=re.IGNORECASE): |
| m = re.search(pattern, text, flags) |
| return m.group(group).strip() if m else default |
|
|
| fields["vendor"] = find(r"From:\s*(.+)") |
| fields["invoice_number"] = find(r"Invoice Number:\s*([A-Za-z0-9\-]+)") |
| fields["invoice_date"] = find(r"Invoice Date:\s*([\d\-/]+)") |
| fields["due_date"] = find(r"Due Date:\s*([\d\-/]+)") |
| fields["payment_terms"] = find(r"Payment Terms:\s*(.+)") |
| fields["total_due_raw"] = find(r"TOTAL DUE:\s*\$?([\d,]+\.\d{2})") |
| fields["payment_method"] = find(r"Payment Method:\s*(.+)") |
| fields["account_name"] = find(r"Account Name:\s*(.+)") |
| fields["bank"] = find(r"Bank:\s*(.+)") |
| fields["notes"] = find(r"Notes:\s*(.+?)(?:\n\n|\Z)", flags=re.IGNORECASE | re.DOTALL) |
|
|
| |
| try: |
| fields["total_due"] = float(fields["total_due_raw"].replace(",", "")) |
| except (ValueError, AttributeError): |
| fields["total_due"] = None |
|
|
| |
| fields["days_to_pay"] = None |
| try: |
| d1 = datetime.strptime(fields["invoice_date"], "%Y-%m-%d") |
| d2 = datetime.strptime(fields["due_date"], "%Y-%m-%d") |
| fields["days_to_pay"] = (d2 - d1).days |
| except (ValueError, TypeError): |
| pass |
|
|
| return fields |
|
|
|
|
| |
| |
| |
| RISK_RULES = [ |
| { |
| "name": "Large amount", |
| "check": lambda f: f["total_due"] is not None and f["total_due"] > 20000, |
| "weight": 2, |
| "explain": "Invoice total exceeds $20,000 β above standard auto-approval threshold.", |
| }, |
| { |
| "name": "Very short payment window", |
| "check": lambda f: f["days_to_pay"] is not None and f["days_to_pay"] <= 2, |
| "weight": 3, |
| "explain": "Payment is due within 2 days or less of the invoice date β unusually urgent for a standard vendor invoice.", |
| }, |
| { |
| "name": "Wire-transfer-only payment", |
| "check": lambda f: "wire" in f["payment_method"].lower() and "only" in f["payment_method"].lower(), |
| "weight": 2, |
| "explain": "Payment method explicitly restricts to wire transfer only β a common fraud indicator, since wires are hard to reverse.", |
| }, |
| { |
| "name": "Personal-sounding account name", |
| "check": lambda f: bool(re.search(r"\bpersonal\b", f["account_name"], re.IGNORECASE)), |
| "weight": 3, |
| "explain": "Receiving account name is explicitly marked as a personal account rather than a registered business account.", |
| }, |
| { |
| "name": "Urgency / avoid-contact language", |
| "check": lambda f: bool(re.search(r"urgent|time-sensitive|do not contact|email only", f["notes"], re.IGNORECASE)), |
| "weight": 3, |
| "explain": "Notes field contains urgency or avoid-contact language β a classic social-engineering / invoice fraud pattern.", |
| }, |
| { |
| "name": "Missing standard fields", |
| "check": lambda f: f["total_due"] is None or f["vendor"] == "Not found", |
| "weight": 1, |
| "explain": "One or more standard invoice fields (vendor, total due) could not be confidently extracted.", |
| }, |
| { |
| "name": "Offshore / unregistered bank", |
| "check": lambda f: bool(re.search(r"offshore", f["bank"], re.IGNORECASE)), |
| "weight": 2, |
| "explain": "Receiving bank is described as offshore β higher scrutiny warranted for cross-border, less-traceable transfers.", |
| }, |
| ] |
|
|
|
|
| def risk_assessment_agent(fields: dict): |
| triggered = [] |
| score = 0 |
| for rule in RISK_RULES: |
| try: |
| if rule["check"](fields): |
| triggered.append(rule) |
| score += rule["weight"] |
| except Exception: |
| continue |
| return score, triggered |
|
|
|
|
| |
| |
| |
| def decision_agent(score: int): |
| if score == 0: |
| return "β
APPROVE", "No risk indicators triggered. Recommended for standard approval." |
| elif score <= 3: |
| return "π‘ FLAG FOR MANUAL REVIEW", "Some risk indicators present. Recommend a second pair of eyes before approval." |
| else: |
| return "π΄ REJECT / ESCALATE", "Multiple strong risk indicators triggered. Recommend escalation to fraud/compliance review, not standard approval." |
|
|
|
|
| |
| |
| |
| def process_document(file_obj, sample_choice): |
| try: |
| if file_obj is not None: |
| text = extract_text(file_obj.name) |
| doc_label = f"Uploaded file: {file_obj.name.split('/')[-1]}" |
| elif sample_choice == "Sample 2: Suspicious Invoice": |
| text = extract_text("sample_invoice_suspicious.txt") |
| doc_label = "Sample 2: Suspicious Invoice" |
| else: |
| text = extract_text("sample_invoice.txt") |
| doc_label = "Sample 1: Normal Invoice" |
| except Exception as e: |
| return (f"β Could not read document: {e}", "", "", "", gr.update(visible=False), gr.update(visible=False), doc_label) |
|
|
| fields = extraction_agent(text) |
| score, triggered = risk_assessment_agent(fields) |
| decision, decision_explain = decision_agent(score) |
|
|
| |
| fields_md = "### π Extraction Agent β Structured Fields\n" |
| fields_md += f"- **Vendor:** {fields['vendor']}\n" |
| fields_md += f"- **Invoice #:** {fields['invoice_number']}\n" |
| fields_md += f"- **Invoice date:** {fields['invoice_date']}\n" |
| fields_md += f"- **Due date:** {fields['due_date']}\n" |
| fields_md += f"- **Days to pay:** {fields['days_to_pay'] if fields['days_to_pay'] is not None else 'Could not calculate'}\n" |
| fields_md += f"- **Total due:** ${fields['total_due']:,.2f}" if fields["total_due"] is not None else "- **Total due:** Not found" |
| fields_md += f"\n- **Payment method:** {fields['payment_method']}\n" |
| fields_md += f"- **Account name:** {fields['account_name']}\n" |
| fields_md += f"- **Bank:** {fields['bank']}\n" |
|
|
| |
| risk_md = f"### β οΈ Risk Assessment Agent β Score: {score}\n" |
| if triggered: |
| risk_md += f"**{len(triggered)} risk indicator(s) triggered:**\n\n" |
| for rule in triggered: |
| risk_md += f"- **{rule['name']}** (weight {rule['weight']}): {rule['explain']}\n" |
| else: |
| risk_md += "No risk indicators triggered.\n" |
|
|
| |
| decision_md = f"### π€ Decision Agent β Proposed Action\n## {decision}\n\n{decision_explain}\n\n" |
| decision_md += "**βΈοΈ This is a proposal only. Nothing has been logged or executed yet β confirm or override below.**" |
|
|
| return ( |
| fields_md, |
| risk_md, |
| decision_md, |
| decision, |
| gr.update(visible=True), |
| gr.update(visible=True), |
| doc_label, |
| ) |
|
|
|
|
| def confirm_decision(decision_text, human_choice, doc_label): |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| if human_choice == "confirm": |
| entry = f"[{timestamp}] {doc_label} β AI recommended **{decision_text}** β Human **CONFIRMED** the recommendation." |
| else: |
| entry = f"[{timestamp}] {doc_label} β AI recommended **{decision_text}** β Human **OVERRODE** and approved anyway." |
| AUDIT_LOG.insert(0, entry) |
| log_display = "### π Audit Log (this session)\n\n" + "\n\n".join(AUDIT_LOG) |
| return log_display |
|
|
|
|
| |
| |
| |
| with gr.Blocks(title="FormFlow") as demo: |
| gr.Markdown( |
| """ |
| # π§Ύ FormFlow β Agentic Document-to-Action Pipeline |
| Upload an invoice (or try the samples below). Watch three agents work in |
| sequence β **Extraction β Risk Assessment β Decision** β and notice that |
| the AI only ever *proposes* an action. A human has to click to confirm |
| or override before anything is logged. |
| """ |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### 1. Load a document") |
| sample_dropdown = gr.Dropdown( |
| choices=["Sample 1: Normal Invoice", "Sample 2: Suspicious Invoice"], |
| value="Sample 1: Normal Invoice", |
| label="Try a sample", |
| ) |
| file_input = gr.File(label="...or upload your own .txt / .pdf", file_types=[".txt", ".pdf"]) |
| run_btn = gr.Button("Run Pipeline", variant="primary") |
| doc_label_box = gr.Textbox(label="Currently loaded", interactive=False) |
|
|
| gr.Markdown( |
| "**Tip:** run Sample 1 first to see a clean approval, then run " |
| "Sample 2 to see the same pipeline catch five separate fraud " |
| "indicators on a very different-looking invoice." |
| ) |
|
|
| with gr.Column(scale=2): |
| fields_output = gr.Markdown() |
| risk_output = gr.Markdown() |
| decision_output = gr.Markdown() |
|
|
| with gr.Row(visible=False) as approval_row: |
| confirm_btn = gr.Button("β
Confirm AI Recommendation", variant="primary") |
| override_btn = gr.Button("β οΈ Override and Approve Anyway", variant="stop") |
|
|
| with gr.Group(visible=False) as audit_group: |
| audit_output = gr.Markdown("### π Audit Log (this session)\n\n_No actions confirmed yet._") |
|
|
| decision_state = gr.State("") |
|
|
| def run_and_store(file_obj, sample_choice): |
| fields_md, risk_md, decision_md, decision, btn_update, audit_update, doc_label = process_document(file_obj, sample_choice) |
| return fields_md, risk_md, decision_md, decision, btn_update, audit_update, doc_label |
|
|
| run_btn.click( |
| fn=run_and_store, |
| inputs=[file_input, sample_dropdown], |
| outputs=[fields_output, risk_output, decision_output, decision_state, approval_row, audit_group, doc_label_box], |
| ) |
|
|
| confirm_btn.click( |
| fn=lambda decision, doc_label: confirm_decision(decision, "confirm", doc_label), |
| inputs=[decision_state, doc_label_box], |
| outputs=[audit_output], |
| ) |
|
|
| override_btn.click( |
| fn=lambda decision, doc_label: confirm_decision(decision, "override", doc_label), |
| inputs=[decision_state, doc_label_box], |
| outputs=[audit_output], |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |