DPT2 / app.py
Seth0330's picture
Update app.py
e0f5793 verified
# app.py
# Invoice -> JSON (Paste Text Only) with better accuracy:
# - Pipe-table aware parsing
# - Regex extractors for common headers (Invoice No, Dates, PO, totals, taxes, GSTIN, etc.)
# - Line-item table parser (SNO, Description, Qty, UOM, Rate, Total Value)
# - Synonym dictionary -> canonical schema keys
# - Semantic mapping (MiniLM) for leftovers
# - MD2JSON prompt with strong hints; final schema = RULES ∪ MODEL (model cannot remove found values)
import re
import json
from typing import List, Dict, Any, Tuple
import copy
import numpy as np
import streamlit as st
import torch
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
st.set_page_config(page_title="Invoice → JSON (Paste Text) · Accurate v2", layout="wide")
st.title("Invoice → JSON (Paste Text) — Accurate v2")
# ----------------------------- Schema -----------------------------
SCHEMA_JSON: Dict[str, Any] = {
"invoice_header": {
"car_number": None,
"shipment_number": None,
"shipping_point": None,
"currency": None,
"invoice_number": None,
"invoice_date": None,
"order_number": None,
"customer_order_number": None,
"our_order_number": None,
"sales_order_number": None,
"purchase_order_number": None,
"order_date": None,
"supplier_name": None,
"supplier_address": None,
"supplier_phone": None,
"supplier_email": None,
"supplier_tax_id": None,
"customer_name": None,
"customer_address": None,
"customer_phone": None,
"customer_email": None,
"customer_tax_id": None,
"ship_to_name": None,
"ship_to_address": None,
"bill_to_name": None,
"bill_to_address": None,
"remit_to_name": None,
"remit_to_address": None,
"tax_id": None,
"tax_registration_number": None,
"vat_number": None,
"payment_terms": None,
"payment_method": None,
"payment_reference": None,
"bank_account_number": None,
"iban": None,
"swift_code": None,
"total_before_tax": None,
"tax_amount": None,
"tax_rate": None,
"shipping_charges": None,
"discount": None,
"total_due": None,
"amount_paid": None,
"balance_due": None,
"due_date": None,
"invoice_status": None,
"reference_number": None,
"project_code": None,
"department": None,
"contact_person": None,
"notes": None,
"additional_info": None
},
"line_items": [
{
"quantity": None,
"units": None,
"description": None,
"footage": None,
"price": None,
"amount": None,
"notes": None
}
]
}
STATIC_HEADERS: List[str] = list(SCHEMA_JSON["invoice_header"].keys())
# ----------------------------- Sidebar -----------------------------
st.sidebar.header("Settings")
threshold = st.sidebar.slider("Semantic match threshold (cosine)", 0.0, 1.0, 0.60, 0.01)
max_new_tokens = st.sidebar.slider("Max new tokens (MD2JSON)", 128, 2048, 512, 32)
show_intermediates = st.sidebar.checkbox("Show intermediates", value=True)
# ----------------------------- Models (cached) -----------------------------
@st.cache_resource(show_spinner=True)
def load_models():
sentence_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
json_converter = pipeline("text2text-generation", model="yahyakhoder/MD2JSON-T5-small-V1")
return sentence_model, json_converter
sentence_model, json_converter = load_models()
# ----------------------------- Synonym map -> schema keys -----------------------------
SYN2KEY: Dict[str, str] = {
# direct header synonyms
"invoice no": "invoice_number",
"invoice number": "invoice_number",
"invoice#": "invoice_number",
"inv no": "invoice_number",
"inv#": "invoice_number",
"invoice date": "invoice_date",
"date of invoice": "invoice_date",
"po no": "purchase_order_number",
"po number": "purchase_order_number",
"purchase order": "purchase_order_number",
"order no": "order_number",
"order number": "order_number",
"sales order": "sales_order_number",
"customer order": "customer_order_number",
"our order": "our_order_number",
"due date": "due_date",
"date of supply": "order_date",
"gstin": "supplier_tax_id",
"gstin no": "supplier_tax_id",
"tax id": "tax_id",
"vat number": "vat_number",
"tax registration number": "tax_registration_number",
"place of supply": "shipping_point",
"state code": "additional_info", # keep if you prefer a specific field
"taxable value": "total_before_tax",
"total value": "total_due",
"total amount": "total_due",
"amount due": "total_due",
"bank": "bank_account_number", # we’ll fix value using bank block parsing
"account no": "bank_account_number",
"account number": "bank_account_number",
"ifs code": "swift_code", # India: really IFSC; we’ll drop it into 'payment_reference' or keep separate
"ifsc": "payment_reference",
"swift code": "swift_code",
"iban": "iban",
"e-way bill no": "reference_number",
"eway bill": "reference_number",
"dispatched via": "additional_info",
"documents dispatched through": "additional_info",
"kind attn": "contact_person",
# parties
"billed to": "bill_to_name",
"receiver": "bill_to_name",
"shipped to": "ship_to_name",
"consignee": "ship_to_name",
}
# ----------------------------- Utilities -----------------------------
def norm(s: str) -> str:
return re.sub(r"\s+", " ", s).strip()
def to_lower(s: str) -> str:
return s.lower().strip()
def deep_copy_schema() -> Dict[str, Any]:
return json.loads(json.dumps(SCHEMA_JSON))
# ----------------------------- Pipe-table aware candidate extractor -----------------------------
def extract_candidates(text: str) -> Dict[str, str]:
"""
Build candidates from:
1) colon lines: Key: Value
2) pipe rows: | ... | ... | (pick obvious key:value pairs like "Invoice No: X" inside cells)
3) single-value lines for totals (Taxable Value, Total, etc.)
"""
cands: Dict[str, str] = {}
# 1) colon lines
for raw in text.splitlines():
line = raw.strip().strip("|").strip()
if not line:
continue
if ":" in line:
# multiple '|'? try to split cells and parse each cell
if "|" in raw:
parts = [p.strip() for p in raw.split("|") if p.strip()]
for cell in parts:
if ":" in cell:
k, v = cell.split(":", 1)
cands[norm(k)] = norm(v)
else:
k, v = line.split(":", 1)
cands[norm(k)] = norm(v)
# 2) rows with ' | ' patterns but without colon in cells (rare)
for raw in text.splitlines():
if "|" in raw and ":" not in raw:
parts = [p.strip() for p in raw.split("|") if p.strip() and not set(p.strip()) <= set("-")]
# Heuristic: e.g., ["Dispatched Via","From","To","Under","No","Dated","Freight","Freight Amount"]
# Hard to build k:v reliably here without a header row + next row; we skip unless obvious.
# 3) totals without colon (e.g., "Taxable Value: 201801.60" already handled; but catch "Taxable Value 201801.60")
for raw in text.splitlines():
m = re.search(r"\b(Taxable\s+Value|Total\s+Value|Total\s+Amount|Amount\s+Due)\b[:\s]*([0-9][0-9,]*(?:\.[0-9]{2})?)", raw, re.I)
if m:
k = norm(m.group(1))
v = norm(m.group(2))
cands[k] = v
return cands
# ----------------------------- Regex “hard extractors” -----------------------------
def regex_extract_all(text: str) -> Dict[str, str]:
out: Dict[str, str] = {}
# Invoice number
m = re.search(r"\bInvoice\s*(?:No\.?|Number|#)\s*[:\-]?\s*([A-Z0-9\-\/]+)", text, re.I)
if m: out["invoice_number"] = m.group(1)
# Invoice date (DD-MM-YYYY or similar)
m = re.search(r"\bInvoice\s*Date\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I)
if m: out["invoice_date"] = m.group(1)
# PO number + date
m = re.search(r"\bPO\s*(?:No\.?|Number)?\s*[:\-]?\s*([A-Z0-9\-\/]+)", text, re.I)
if m: out["purchase_order_number"] = m.group(1)
m = re.search(r"\bPO\s*Date\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I)
if m: out["order_date"] = m.group(1)
# Date of Supply -> order_date (if not already)
if "order_date" not in out:
m = re.search(r"\bDate\s*of\s*Supply\s*[:\-]?\s*([0-9]{1,2}[-/][0-9]{1,2}[-/][0-9]{2,4})", text, re.I)
if m: out["order_date"] = m.group(1)
# Place of Supply -> shipping_point
m = re.search(r"\bPlace\s*of\s*Supply\s*[:\-]?\s*([A-Za-z0-9 ,\-\(\)]+)", text, re.I)
if m: out["shipping_point"] = m.group(1).strip(" |")
# GSTIN (take the first)
m = re.search(r"\bGSTIN\s*(?:No\.?)?\s*[:\-]?\s*([A-Z0-9]{15})", text, re.I)
if m: out["supplier_tax_id"] = m.group(1)
# Taxable Value -> total_before_tax
m = re.search(r"\bTaxable\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I)
if m: out["total_before_tax"] = m.group(1).replace(",", "")
# CGST/SGST values -> tax_amount (sum)
cgst = re.search(r"\bCGST\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I)
sgst = re.search(r"\bSGST\s*Value\s*[:\-]?\s*([0-9][0-9,]*(?:\.[0-9]{2})?)", text, re.I)
if cgst and sgst:
try:
tax_total = float(cgst.group(1).replace(",", "")) + float(sgst.group(1).replace(",", ""))
out["tax_amount"] = f"{tax_total:.2f}"
# Tax rate (if both % available and equal, set combined)
cgstp = re.search(r"\bCGST\s*%?\s*[:\-]?\s*([0-9]+(?:\.[0-9]+)?)", text, re.I)
sgstp = re.search(r"\bSGST\s*%?\s*[:\-]?\s*([0-9]+(?:\.[0-9]+)?)", text, re.I)
if cgstp and sgstp:
try:
rate = float(cgstp.group(1)) + float(sgstp.group(1))
out["tax_rate"] = f"{rate:g}"
except:
pass
except:
pass
# E-Way bill -> reference_number
m = re.search(r"\bE[-\s]?Way\s*bill\s*no\.?\s*[:\-]?\s*([0-9 ]+)", text, re.I)
if m: out["reference_number"] = m.group(1).strip()
return out
# ----------------------------- Bank block parsing -----------------------------
def extract_bank_block(text: str) -> Dict[str, str]:
bank: Dict[str, str] = {}
# account name
m = re.search(r"\bAccount\s*Name\s*:\s*(.+)", text, re.I)
if m: bank["supplier_name"] = m.group(1).strip()
# account no
m = re.search(r"\bAccount\s*(?:No|Number)\s*:\s*([A-Za-z0-9\- ]+)", text, re.I)
if m: bank["bank_account_number"] = m.group(1).strip()
# bank name
m = re.search(r"\bBank\s*:\s*([A-Za-z0-9 ,\-\(\)&]+)", text, re.I)
if m:
# place bank name into additional_info to avoid overwriting bank_account_number
bank["additional_info"] = ("Bank: " + m.group(1).strip())
# IFSC/IFS Code
m = re.search(r"\bIFSC?\s*Code\s*:\s*([A-Za-z0-9]+)", text, re.I)
if m: bank["payment_reference"] = m.group(1).strip()
# SWIFT
m = re.search(r"\bSWIFT\s*Code\s*:\s*([A-Za-z0-9]+)", text, re.I)
if m: bank["swift_code"] = m.group(1).strip()
# Branch / MICR etc -> additional_info
branch = re.search(r"\bBranch\s*:\s*(.+)", text, re.I)
micr = re.search(r"\bMICR\s*Code\s*:\s*([0-9]+)", text, re.I)
extra_bits = []
if branch: extra_bits.append("Branch: " + branch.group(1).strip())
if micr: extra_bits.append("MICR: " + micr.group(1).strip())
if extra_bits:
bank["additional_info"] = ((bank.get("additional_info") + " | ") if bank.get("additional_info") else "") + " | ".join(extra_bits)
return bank
# ----------------------------- Line-item parser (from table) -----------------------------
def parse_line_items(text: str) -> List[Dict[str, Any]]:
"""
Parse a classic table with header like:
| SNO | Description | HSN/SAC | Qty | UOM | Rate | ... | Total Value |
"""
items: List[Dict[str, Any]] = []
lines = [ln for ln in text.splitlines() if ln.strip()]
# find header row index
header_idx = -1
for i, ln in enumerate(lines):
if ("|") in ln and ("Description" in ln and ("Qty" in ln or "QTY" in ln)) and ("Rate" in ln or "Price" in ln) and ("Total" in ln):
header_idx = i
break
if header_idx == -1:
return items
# parse header cells
headers = [c.strip().lower() for c in lines[header_idx].split("|")]
# clean
headers = [h for h in headers if h and set(h) - set("-")]
# parse body until a blank line or a non-table line
for j in range(header_idx + 1, len(lines)):
row = lines[j]
if row.strip().startswith("|") and row.count("|") >= 2:
cells = [c.strip() for c in row.split("|")]
cells = [c for c in cells if c and set(c) - set("-")]
if len(cells) < 3:
continue
# map to our schema per best-effort
rowd = {"quantity": None, "units": None, "description": None, "footage": None, "price": None, "amount": None, "notes": None}
# Try to find index of each logical column
def idx_of(name_parts: List[str]) -> int:
for k, h in enumerate(headers):
if any(p in h for p in name_parts):
return k
return -1
i_desc = idx_of(["description", "item"])
i_qty = idx_of(["qty", "quantity"])
i_uom = idx_of(["uom", "unit"])
i_rate = idx_of(["rate", "price"])
i_amt = idx_of(["total value", "amount", "total"])
# safe get
def safe(i: int) -> str:
return cells[i] if 0 <= i < len(cells) else ""
if i_desc != -1: rowd["description"] = safe(i_desc) or None
if i_qty != -1: rowd["quantity"] = safe(i_qty) or None
if i_uom != -1: rowd["units"] = safe(i_uom) or None
if i_rate != -1: rowd["price"] = safe(i_rate) or None
if i_amt != -1: rowd["amount"] = safe(i_amt) or None
# optional: footage if present in desc like "60.000 mtrs"
if rowd["units"] and rowd["quantity"]:
rowd["footage"] = f'{rowd["quantity"]} {rowd["units"]}'
items.append(rowd)
else:
# stop at first non-table line after header
if j > header_idx + 1:
break
return items
# ----------------------------- Semantic mapping for leftovers -----------------------------
def semantic_map_candidates(candidates: Dict[str, str], static_headers: List[str], thresh: float) -> Dict[str, str]:
if not candidates:
return {}
cand_keys = list(candidates.keys())
# synonym pass first
mapped: Dict[str, str] = {}
leftovers: Dict[str, str] = {}
for k, v in candidates.items():
lk = k.lower()
lk_norm = re.sub(r"[^a-z0-9]+", " ", lk).strip()
hit = None
for syn, key in SYN2KEY.items():
if syn in lk_norm:
hit = key
break
if hit:
mapped[hit] = v
else:
leftovers[k] = v
if leftovers:
cand_emb = sentence_model.encode(list(leftovers.keys()), normalize_embeddings=True)
head_emb = sentence_model.encode(static_headers, normalize_embeddings=True)
M = util.cos_sim(torch.tensor(cand_emb), torch.tensor(head_emb)).cpu().numpy()
keys_left = list(leftovers.keys())
for i, ck in enumerate(keys_left):
j = int(np.argmax(M[i]))
score = float(M[i][j])
if score >= thresh:
mapped[static_headers[j]] = leftovers[ck]
return mapped
# ----------------------------- Build MD2JSON prompt -----------------------------
def build_prompt(invoice_text: str, mapped_hints: Dict[str, str], items_hints: List[Dict[str, Any]]) -> str:
instruction = (
'Use this schema:\n'
'{\n'
' "invoice_header": {\n'
' "car_number": "string or null",\n'
' "shipment_number": "string or null",\n'
' "shipping_point": "string or null",\n'
' "currency": "string or null",\n'
' "invoice_number": "string or null",\n'
' "invoice_date": "string or null",\n'
' "order_number": "string or null",\n'
' "customer_order_number": "string or null",\n'
' "our_order_number": "string or null",\n'
' "sales_order_number": "string or null",\n'
' "purchase_order_number": "string or null",\n'
' "order_date": "string or null",\n'
' "supplier_name": "string or null",\n'
' "supplier_address": "string or null",\n'
' "supplier_phone": "string or null",\n'
' "supplier_email": "string or null",\n'
' "supplier_tax_id": "string or null",\n'
' "customer_name": "string or null",\n'
' "customer_address": "string or null",\n'
' "customer_phone": "string or null",\n'
' "customer_email": "string or null",\n'
' "customer_tax_id": "string or null",\n'
' "ship_to_name": "string or null",\n'
' "ship_to_address": "string or null",\n'
' "bill_to_name": "string or null",\n'
' "bill_to_address": "string or null",\n'
' "remit_to_name": "string or null",\n'
' "remit_to_address": "string or null",\n'
' "tax_id": "string or null",\n'
' "tax_registration_number": "string or null",\n'
' "vat_number": "string or null",\n'
' "payment_terms": "string or null",\n'
' "payment_method": "string or null",\n'
' "payment_reference": "string or null",\n'
' "bank_account_number": "string or null",\n'
' "iban": "string or null",\n'
' "swift_code": "string or null",\n'
' "total_before_tax": "string or null",\n'
' "tax_amount": "string or null",\n'
' "tax_rate": "string or null",\n'
' "shipping_charges": "string or null",\n'
' "discount": "string or null",\n'
' "total_due": "string or null",\n'
' "amount_paid": "string or null",\n'
' "balance_due": "string or null",\n'
' "due_date": "string or null",\n'
' "invoice_status": "string or null",\n'
' "reference_number": "string or null",\n'
' "project_code": "string or null",\n'
' "department": "string or null",\n'
' "contact_person": "string or null",\n'
' "notes": "string or null",\n'
' "additional_info": "string or null"\n'
' },\n'
' "line_items": [\n'
' {\n'
' "quantity": "string or null",\n'
' "units": "string or null",\n'
' "description": "string or null",\n'
' "footage": "string or null",\n'
' "price": "string or null",\n'
' "amount": "string or null",\n'
' "notes": "string or null"\n'
' }\n'
' ]\n'
'}\n'
'If a field is missing for a line item or header, use null. '
'Do not invent fields. Do not add any header or shipment data to any line item. '
'Return ONLY the JSON object, no explanation.\n'
)
hints = ""
if mapped_hints:
hints += "\nHints (header):\n" + " ".join([f"#{k}: {v}" for k, v in mapped_hints.items()])
if items_hints:
try:
hints += "\nHints (line_items):\n" + json.dumps(items_hints, ensure_ascii=False)
except:
pass
return instruction + "\nInvoice Text:\n" + invoice_text.strip() + hints
def strict_json(text: str) -> Dict[str, Any]:
# try direct
try:
return json.loads(text)
except:
pass
# extract largest {...}
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start:end+1])
except:
pass
raise ValueError("Model did not return valid JSON.")
# ----------------------------- Final merge policy -----------------------------
def merge_schema(rule_json: Dict[str, Any], model_json: Dict[str, Any]) -> Dict[str, Any]:
"""
RULES WIN: Keep everything we extracted deterministically; fill only missing (None) from model.
"""
final = copy.deepcopy(rule_json)
# header
hdr = final["invoice_header"]
mdl_hdr = (model_json.get("invoice_header") or {})
for k in hdr.keys():
if hdr[k] in [None, "", "null"]:
v = mdl_hdr.get(k, None)
if v not in [None, "", "null"]:
hdr[k] = v
# line_items: if we got some via rules, keep them; else take model's
if final["line_items"] and any(any(v for v in row.values() if v not in [None, "", "null"]) for row in final["line_items"]):
pass
else:
mdl_items = model_json.get("line_items")
if isinstance(mdl_items, list) and mdl_items:
final["line_items"] = mdl_items
else:
# keep template with nulls
pass
return final
# ----------------------------- UI -----------------------------
invoice_text = st.text_area(
"Paste the invoice text here.",
height=320,
placeholder="Paste the invoice content (OCR/plain text) ..."
)
if st.button("Generate JSON", type="primary", use_container_width=True):
if not invoice_text.strip():
st.error("Please paste the invoice text first.")
st.stop()
txt = invoice_text
# 1) Deterministic extraction
# 1a) candidates (pipe-table aware)
candidates = extract_candidates(txt)
# 1b) regex “hard” fields
hard = regex_extract_all(txt)
# 1c) bank block
bank = extract_bank_block(txt)
# 1d) line items from table
items = parse_line_items(txt)
# 1e) map candidates (synonyms + semantic) to schema headers
sem_mapped = semantic_map_candidates(candidates, STATIC_HEADERS, threshold)
# 1f) combine deterministic header fields
header_found: Dict[str, Any] = {}
header_found.update(sem_mapped)
header_found.update(hard)
header_found.update(bank)
# 2) Build RULE JSON (schema-shaped, rules filled)
rule_json = deep_copy_schema()
for k, v in header_found.items():
if k in rule_json["invoice_header"]:
rule_json["invoice_header"][k] = v
# line items
if items:
rule_json["line_items"] = items
if show_intermediates:
st.subheader("Candidates (first 20)")
st.json(dict(list(candidates.items())[:20]))
st.subheader("Regex/Hard fields")
st.json(hard)
st.subheader("Bank block")
st.json(bank)
st.subheader("Semantic-mapped headers")
st.json(sem_mapped)
st.subheader("Line items (parsed)")
st.json(items)
# 3) MD2JSON generation with strong hints
with st.spinner("Generating structured JSON with MD2JSON-T5-small-V1..."):
prompt = build_prompt(txt, header_found, items)
gen = json_converter(prompt, max_new_tokens=max_new_tokens)[0]["generated_text"]
try:
model_json = strict_json(gen)
except:
model_json = deep_copy_schema() # model failed; keep empty shape
# 4) Final merge (rules win)
final_json = merge_schema(rule_json, model_json)
st.subheader("Final JSON")
st.json(final_json)
st.download_button("Download JSON", data=json.dumps(final_json, indent=2),
file_name="invoice.json", mime="application/json", use_container_width=True)