import streamlit as st import requests import json import os import time import mimetypes from datetime import datetime from fuzzywuzzy import fuzz # ====== CONFIG ====== UNSTRACT_BASE = "https://llmwhisperer-api.us-central.unstract.com/api/v2" UNSTRACT_API_KEY = os.getenv("UNSTRACT_API_KEY") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" MISTRAL_MODEL = "mistralai/ministral-8b" st.set_page_config(page_title="EZOFIS Document Validation Agent", layout="wide") st.markdown(""" """, unsafe_allow_html=True) st.markdown( "

EZOFIS Document Validation Agent

", unsafe_allow_html=True ) # ====== UI LAYOUT ====== col_left, col_right = st.columns([1.35, 1.05]) with col_left: # Step 1: Checklist st.markdown("1 Your Document Checklist (JSON)", unsafe_allow_html=True) sample_checklist = '''{ "required_documents": [ {"type": "Driver's License", "description": "Government-issued photo ID"}, {"type": "Passport", "description": "Valid passport"}, {"type": "SIN Card", "description": "Social Insurance Number document"}, {"type": "Bank Statement", "description": "Last 3 months bank statement"}, {"type": "Employment Letter", "description": "Signed letter from employer"}, {"type": "Pay Stub", "description": "Most recent pay stub"}, {"type": "Proof of Address", "description": "Utility bill or lease"}, {"type": "Ontario Health Card", "description": "Provincial health insurance card"} ] }''' checklist_text = st.text_area( "Paste or edit your checklist JSON below:", value=sample_checklist, height=220, key="doc_checklist_json" ) try: checklist = json.loads(checklist_text) required_types = [doc["type"] for doc in checklist["required_documents"]] except Exception as e: st.error("Invalid checklist JSON.") st.stop() # Step 2: Document upload st.markdown("2 Upload Document(s) to Validate", unsafe_allow_html=True) uploaded_files = st.file_uploader( "Upload PDF, DOCX, XLSX, PNG, JPG, TIFF, etc.", type=["pdf", "docx", "xlsx", "xls", "png", "jpg", "jpeg", "tiff"], key="mortgage_files", accept_multiple_files=True ) # Step 3: Thresholds st.markdown("3 Configure Acceptance Thresholds", unsafe_allow_html=True) min_match_score = st.slider("Minimum Type Match Score (0-100)", 50, 100, 70, 1) min_confidence = st.slider("Minimum LLM Confidence (0-100)", 50, 100, 70, 1) with col_right: # Step 4: Agent instructions st.markdown("4 Instruct Agent", unsafe_allow_html=True) sample_instruction = """You are a careful, expert document validation agent for mortgage and finance workflows. Before you answer, do this: Carefully scan the document for ANY evidence of regional/provincial or country-specific card types (such as "Ontario Health Card", "Medicare Card", "Insurance Card", "SIN", "Driver's License", "Passport", etc.)—be as specific as possible using visible card titles, authority names, or issuer logos. Checklist for precision: - Prefer the **most specific** document type (e.g. "Ontario Health Card" over just "Identification Card" or "Provincial ID"). - If there is any ambiguity, include relevant keywords from the card (like "Health", "Medicare", "OHIP", "SIN", "Social Insurance", "Driver", etc.) in the output type. - If still not sure, show your best guess but include all possible hints from the document text.""" agent_instruction = st.text_area( "Instructions for the Document Validation Agent (edit as needed):", value=sample_instruction, height=240, key="agent_instruction" ) # Step 5: Current date st.markdown("5 Set Current Date for Expiry Validation", unsafe_allow_html=True) current_date = st.date_input( "Current date to be used by the agent for expiry checking", value=datetime.now().date(), key="current_date" ) date_str = str(current_date) # Step 6: Run button run_btn = st.button("Run Document Validation", type="primary") # ====== HELPER FUNCTIONS ====== def get_content_type(filename): mime, _ = mimetypes.guess_type(filename) ext = filename.lower().split('.')[-1] if ext == "pdf": return "text/plain" if mime is None: return "application/octet-stream" return mime def extract_text_from_unstract(uploaded_file, status_box=None): filename = getattr(uploaded_file, "name", "uploaded_file") file_bytes = uploaded_file.read() content_type = get_content_type(filename) headers = { "unstract-key": UNSTRACT_API_KEY, "Content-Type": content_type, } url = f"{UNSTRACT_BASE}/whisper" if status_box: status_box.info("Step 1: Uploading and extracting text (OCR)...") r = requests.post(url, headers=headers, data=file_bytes) if r.status_code != 202: if status_box: status_box.error(f"Unstract error: {r.status_code} - {r.text}") return None whisper_hash = r.json().get("whisper_hash") if not whisper_hash: if status_box: status_box.error("Unstract: No whisper_hash received.") return None # Poll status status_url = f"{UNSTRACT_BASE}/whisper-status?whisper_hash={whisper_hash}" for i in range(30): status_r = requests.get(status_url, headers={"unstract-key": UNSTRACT_API_KEY}) if status_r.status_code != 200: if status_box: status_box.error(f"Unstract status error: {status_r.status_code} - {status_r.text}") return None status = status_r.json().get("status") if status == "processed": break if status_box: status_box.info(f"EZOFIS AI OCR AGENT in progress... ({i+1}/30)") time.sleep(2) else: if status_box: status_box.error("Unstract: Timeout waiting for OCR.") return None retrieve_url = f"{UNSTRACT_BASE}/whisper-retrieve?whisper_hash={whisper_hash}&text_only=true" r = requests.get(retrieve_url, headers={"unstract-key": UNSTRACT_API_KEY}) if r.status_code != 200: if status_box: status_box.error(f"Unstract: Error retrieving text: {r.status_code} - {r.text}") return None try: data = r.json() return data.get("result_text") or r.text except Exception: return r.text def build_mistral_prompt(doc_text, checklist, agent_instruction, current_date): return f""" {agent_instruction} IMPORTANT: Today's date for validation is: {current_date}. You MUST use this exact date, NOT today's system date, when checking if a document is expired. Analyze the following extracted document text and the checklist JSON: {json.dumps(checklist)} Respond with this JSON (your response will be evaluated automatically): {{ "document_type": "...", // e.g. Ontario Health Card, BC Services Card "expiry_date": "...", // ISO format if possible "is_expired": true/false, // must be true if expiry_date is before {current_date} "looks_genuine": true/false, "confidence": , "checklist_matched": true/false, "verdict": "..." // One-sentence reason }} Document Text: {doc_text[:4000]} """.strip() def query_mistral_llm(doc_text, checklist, agent_instruction, current_date, status_box=None): prompt = build_mistral_prompt(doc_text, checklist, agent_instruction, current_date) headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": "https://chat.openai.com", "X-Title": "EZOFIS-Doc-Validator", "Content-Type": "application/json", } data = { "model": MISTRAL_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 1024 } if status_box: status_box.info("Step 2: Validating document with EZOFIS DOC VALIDATION AGENT...") resp = requests.post(OPENROUTER_URL, headers=headers, json=data, timeout=90) if resp.status_code != 200: if status_box: status_box.error(f"OpenRouter error: {resp.status_code}: {resp.text}") return None, None, prompt result = resp.json()["choices"][0]["message"]["content"] start = result.find("{") end = result.rfind("}") + 1 if start == -1 or end == 0: if status_box: status_box.error("Agent did not return JSON.") status_box.write(result) return None, result, prompt try: return json.loads(result[start:end]), result, prompt except Exception as e: if status_box: status_box.error("Error parsing LLM response.") status_box.write(result) return None, result, prompt def advanced_llm_verdict(llm_json, min_confidence, status_box=None): conf = llm_json.get("confidence", 0) if conf < min_confidence or conf >= min_confidence + 15: return None, None, None verdict_prompt = f""" Here is the extracted document information and prior validation result: {json.dumps(llm_json)} The minimum required confidence is {min_confidence}. Should this document be accepted or rejected for an application, based on all available information? Respond ONLY as: {{ "accepted": true/false, "reason": "..." }} """ headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": "https://chat.openai.com", "X-Title": "EZOFIS-Doc-Validator", "Content-Type": "application/json", } data = { "model": MISTRAL_MODEL, "messages": [{"role": "user", "content": verdict_prompt}], "temperature": 0.1, "max_tokens": 256 } if status_box: status_box.info("Step 3: LLM self-verdict (gray zone confidence)...") resp = requests.post(OPENROUTER_URL, headers=headers, json=data, timeout=60) if resp.status_code == 200: try: content = resp.json()["choices"][0]["message"]["content"] vstart = content.find("{") vend = content.rfind("}") + 1 verdict_json = json.loads(content[vstart:vend]) return verdict_json, content, verdict_prompt except Exception: return None, content, verdict_prompt return None, None, verdict_prompt def fuzzy_match_type(detected_type, checklist_types): best_type = None best_score = 0 for t in checklist_types: score = fuzz.token_set_ratio(str(detected_type), str(t)) if score > best_score: best_type = t best_score = score return best_type, best_score # ====== CARD RENDERING FUNCTION ====== def show_validation_card(result): accepted = result["Accepted"] == "Yes" expired = result["Expired"] == "Yes" genuine = result["Genuine"] == "Yes" decision_color = "#d32f2f" if not accepted else "#388e3c" yes_color = "#388e3c" no_color = "#d32f2f" bg_reason = "#ffeaea" if not accepted else "#eafbe8" st.markdown(f"""
{result['File']}
Decision: {'Accepted' if accepted else 'Rejected'}
Confidence: {result['Confidence']}%
Reason:
{result['Reason']}
Detected Document: Matched with Checklist:
{result['Detected Type']} {result['Checklist Match']}
Genuine: Expired:
{"Yes" if genuine else "No"} {"Yes" if expired else "No"}
Expiry Date: {result["Expiry Date"]}
""", unsafe_allow_html=True) # ====== MAIN PROCESSING LOOP ====== if 'run_btn' not in locals(): run_btn = False if run_btn and uploaded_files: results = [] debug_data = [] with col_right: for uploaded_file in uploaded_files: st.markdown( f"
" f"Validating: {uploaded_file.name}" f"
", unsafe_allow_html=True ) status_box = st.empty() debug = {} # Step 1: OCR doc_text = extract_text_from_unstract(uploaded_file, status_box) debug['OCR_extracted_text'] = doc_text if not doc_text: status_box.error("Skipping due to OCR extraction error.") debug['error'] = "OCR extraction error" debug_data.append({uploaded_file.name: debug}) continue # Step 2: LLM Validation llm_json, llm_raw, llm_prompt = query_mistral_llm(doc_text, checklist, agent_instruction, date_str, status_box) debug['LLM_prompt'] = llm_prompt debug['LLM_raw_response'] = llm_raw debug['LLM_parsed_json'] = llm_json if not llm_json: status_box.error("Skipping due to LLM error.") debug['error'] = "LLM processing error" debug_data.append({uploaded_file.name: debug}) continue detected_type = llm_json.get("document_type", "") matched_type, match_score = fuzzy_match_type(detected_type, required_types) checklist_matched = llm_json.get("checklist_matched", False) if checklist_matched and match_score < min_match_score: checklist_matched = False llm_conf = llm_json.get("confidence", 0) # Robustly handle is_expired is_expired = llm_json.get("is_expired", False) if isinstance(is_expired, str): is_expired = is_expired.lower() == "true" accepted = ( checklist_matched and llm_json.get("looks_genuine", False) and not is_expired and (llm_conf >= min_confidence) ) reason = [] if not checklist_matched: reason.append("No matching checklist item found. Document rejected.") else: reason.append( f"Document type '{detected_type}' matched checklist '{matched_type}' with score {match_score}/100." ) if not llm_json.get("looks_genuine", False): reason.append("Document does not look genuine.") if is_expired: reason.append("Document is expired.") reason.append(f"Genuineness confidence: {llm_conf}.") reason.append(llm_json.get("verdict", "")) verdict_json, verdict_raw, verdict_prompt = advanced_llm_verdict(llm_json, min_confidence, status_box) debug['LLM_self_verdict_prompt'] = verdict_prompt debug['LLM_self_verdict_raw'] = verdict_raw debug['LLM_self_verdict_json'] = verdict_json if verdict_json: accepted = verdict_json.get("accepted", False) reason.append(f"LLM Self-verdict: {verdict_json.get('reason','')}") status_box.info("Final decision (gray zone) taken by LLM self-verdict.") results.append({ "File": uploaded_file.name, "Detected Type": detected_type, "Checklist Match": matched_type if checklist_matched else "-", "Type Score": match_score, "Expiry Date": llm_json.get("expiry_date", "-"), "Expired": "Yes" if is_expired else "No", "Genuine": "Yes" if llm_json.get("looks_genuine", False) else "No", "Confidence": llm_conf, "Accepted": "Yes" if accepted else "No", "Reason": " ".join(reason) }) debug['Checklist_match_details'] = { "detected_type": detected_type, "matched_type": matched_type, "match_score": match_score, "checklist_matched": checklist_matched, "accepted": accepted } debug_data.append({uploaded_file.name: debug}) status_box.success("Validation complete. See result below.") # ==== Card-style results ==== if results: st.success("All validations complete.") for result in results: show_validation_card(result) else: st.warning("No valid results.") with st.expander("Debug Panel (per document)"): for doc_debug in debug_data: for fname, dbg in doc_debug.items(): st.markdown(f"**{fname}**") st.json(dbg)