Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import json | |
| import os | |
| import time | |
| import mimetypes | |
| from datetime import datetime | |
| from fuzzywuzzy import fuzz | |
| # ====== CONFIG ====== | |
| UNSTRACT_BASE = "https://llmwhisperer-api.us-central.unstract.com/api/v2" | |
| UNSTRACT_API_KEY = os.getenv("UNSTRACT_API_KEY") | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" | |
| MISTRAL_MODEL = "mistralai/ministral-8b" | |
| st.set_page_config(page_title="EZOFIS Document Validation Agent", layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .step-num { | |
| background: #A020F0; color: #fff; border-radius: 999px; | |
| padding: 6px 16px; font-weight: 700; margin-right: 14px; font-size: 18px; | |
| display: inline-block; vertical-align: middle;} | |
| .stButton>button { | |
| background: #A020F0 !important; color: white !important; border-radius: 12px !important; | |
| padding: 10px 32px !important; font-weight: 700; border: none !important; font-size: 18px !important; | |
| margin-top: 12px !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.markdown( | |
| "<h1 style='font-weight:800; margin-bottom:8px;'>EZOFIS Document Validation Agent</h1>", | |
| unsafe_allow_html=True | |
| ) | |
| # ====== UI LAYOUT ====== | |
| col_left, col_right = st.columns([1.35, 1.05]) | |
| with col_left: | |
| # Step 1: Checklist | |
| st.markdown("<span class='step-num'>1</span> <b>Your Document Checklist (JSON)</b>", unsafe_allow_html=True) | |
| sample_checklist = '''{ | |
| "required_documents": [ | |
| {"type": "Driver's License", "description": "Government-issued photo ID"}, | |
| {"type": "Passport", "description": "Valid passport"}, | |
| {"type": "SIN Card", "description": "Social Insurance Number document"}, | |
| {"type": "Bank Statement", "description": "Last 3 months bank statement"}, | |
| {"type": "Employment Letter", "description": "Signed letter from employer"}, | |
| {"type": "Pay Stub", "description": "Most recent pay stub"}, | |
| {"type": "Proof of Address", "description": "Utility bill or lease"}, | |
| {"type": "Ontario Health Card", "description": "Provincial health insurance card"} | |
| ] | |
| }''' | |
| checklist_text = st.text_area( | |
| "Paste or edit your checklist JSON below:", | |
| value=sample_checklist, | |
| height=220, | |
| key="doc_checklist_json" | |
| ) | |
| try: | |
| checklist = json.loads(checklist_text) | |
| required_types = [doc["type"] for doc in checklist["required_documents"]] | |
| except Exception as e: | |
| st.error("Invalid checklist JSON.") | |
| st.stop() | |
| # Step 2: Document upload | |
| st.markdown("<span class='step-num'>2</span> <b>Upload Document(s) to Validate</b>", unsafe_allow_html=True) | |
| uploaded_files = st.file_uploader( | |
| "Upload PDF, DOCX, XLSX, PNG, JPG, TIFF, etc.", | |
| type=["pdf", "docx", "xlsx", "xls", "png", "jpg", "jpeg", "tiff"], | |
| key="mortgage_files", | |
| accept_multiple_files=True | |
| ) | |
| # Step 3: Thresholds | |
| st.markdown("<span class='step-num'>3</span> <b>Configure Acceptance Thresholds</b>", unsafe_allow_html=True) | |
| min_match_score = st.slider("Minimum Type Match Score (0-100)", 50, 100, 70, 1) | |
| min_confidence = st.slider("Minimum LLM Confidence (0-100)", 50, 100, 70, 1) | |
| with col_right: | |
| # Step 4: Agent instructions | |
| st.markdown("<span class='step-num'>4</span> <b>Instruct Agent</b>", unsafe_allow_html=True) | |
| sample_instruction = """You are a careful, expert document validation agent for mortgage and finance workflows. | |
| Before you answer, do this: Carefully scan the document for ANY evidence of regional/provincial or country-specific card types (such as "Ontario Health Card", "Medicare Card", "Insurance Card", "SIN", "Driver's License", "Passport", etc.)—be as specific as possible using visible card titles, authority names, or issuer logos. | |
| Checklist for precision: | |
| - Prefer the **most specific** document type (e.g. "Ontario Health Card" over just "Identification Card" or "Provincial ID"). | |
| - If there is any ambiguity, include relevant keywords from the card (like "Health", "Medicare", "OHIP", "SIN", "Social Insurance", "Driver", etc.) in the output type. | |
| - If still not sure, show your best guess but include all possible hints from the document text.""" | |
| agent_instruction = st.text_area( | |
| "Instructions for the Document Validation Agent (edit as needed):", | |
| value=sample_instruction, | |
| height=240, | |
| key="agent_instruction" | |
| ) | |
| # Step 5: Current date | |
| st.markdown("<span class='step-num'>5</span> <b>Set Current Date for Expiry Validation</b>", unsafe_allow_html=True) | |
| current_date = st.date_input( | |
| "Current date to be used by the agent for expiry checking", | |
| value=datetime.now().date(), | |
| key="current_date" | |
| ) | |
| date_str = str(current_date) | |
| # Step 6: Run button | |
| run_btn = st.button("Run Document Validation", type="primary") | |
| # ====== HELPER FUNCTIONS ====== | |
| def get_content_type(filename): | |
| mime, _ = mimetypes.guess_type(filename) | |
| ext = filename.lower().split('.')[-1] | |
| if ext == "pdf": | |
| return "text/plain" | |
| if mime is None: | |
| return "application/octet-stream" | |
| return mime | |
| def extract_text_from_unstract(uploaded_file, status_box=None): | |
| filename = getattr(uploaded_file, "name", "uploaded_file") | |
| file_bytes = uploaded_file.read() | |
| content_type = get_content_type(filename) | |
| headers = { | |
| "unstract-key": UNSTRACT_API_KEY, | |
| "Content-Type": content_type, | |
| } | |
| url = f"{UNSTRACT_BASE}/whisper" | |
| if status_box: | |
| status_box.info("Step 1: Uploading and extracting text (OCR)...") | |
| r = requests.post(url, headers=headers, data=file_bytes) | |
| if r.status_code != 202: | |
| if status_box: | |
| status_box.error(f"Unstract error: {r.status_code} - {r.text}") | |
| return None | |
| whisper_hash = r.json().get("whisper_hash") | |
| if not whisper_hash: | |
| if status_box: | |
| status_box.error("Unstract: No whisper_hash received.") | |
| return None | |
| # Poll status | |
| status_url = f"{UNSTRACT_BASE}/whisper-status?whisper_hash={whisper_hash}" | |
| for i in range(30): | |
| status_r = requests.get(status_url, headers={"unstract-key": UNSTRACT_API_KEY}) | |
| if status_r.status_code != 200: | |
| if status_box: | |
| status_box.error(f"Unstract status error: {status_r.status_code} - {status_r.text}") | |
| return None | |
| status = status_r.json().get("status") | |
| if status == "processed": | |
| break | |
| if status_box: | |
| status_box.info(f"EZOFIS AI OCR AGENT in progress... ({i+1}/30)") | |
| time.sleep(2) | |
| else: | |
| if status_box: | |
| status_box.error("Unstract: Timeout waiting for OCR.") | |
| return None | |
| retrieve_url = f"{UNSTRACT_BASE}/whisper-retrieve?whisper_hash={whisper_hash}&text_only=true" | |
| r = requests.get(retrieve_url, headers={"unstract-key": UNSTRACT_API_KEY}) | |
| if r.status_code != 200: | |
| if status_box: | |
| status_box.error(f"Unstract: Error retrieving text: {r.status_code} - {r.text}") | |
| return None | |
| try: | |
| data = r.json() | |
| return data.get("result_text") or r.text | |
| except Exception: | |
| return r.text | |
| def build_mistral_prompt(doc_text, checklist, agent_instruction, current_date): | |
| return f""" | |
| {agent_instruction} | |
| IMPORTANT: Today's date for validation is: {current_date}. You MUST use this exact date, NOT today's system date, when checking if a document is expired. | |
| Analyze the following extracted document text and the checklist JSON: | |
| {json.dumps(checklist)} | |
| Respond with this JSON (your response will be evaluated automatically): | |
| {{ | |
| "document_type": "...", // e.g. Ontario Health Card, BC Services Card | |
| "expiry_date": "...", // ISO format if possible | |
| "is_expired": true/false, // must be true if expiry_date is before {current_date} | |
| "looks_genuine": true/false, | |
| "confidence": <score 0-100>, | |
| "checklist_matched": true/false, | |
| "verdict": "..." // One-sentence reason | |
| }} | |
| Document Text: | |
| {doc_text[:4000]} | |
| """.strip() | |
| def query_mistral_llm(doc_text, checklist, agent_instruction, current_date, status_box=None): | |
| prompt = build_mistral_prompt(doc_text, checklist, agent_instruction, current_date) | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "HTTP-Referer": "https://chat.openai.com", | |
| "X-Title": "EZOFIS-Doc-Validator", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "model": MISTRAL_MODEL, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.1, | |
| "max_tokens": 1024 | |
| } | |
| if status_box: | |
| status_box.info("Step 2: Validating document with EZOFIS DOC VALIDATION AGENT...") | |
| resp = requests.post(OPENROUTER_URL, headers=headers, json=data, timeout=90) | |
| if resp.status_code != 200: | |
| if status_box: | |
| status_box.error(f"OpenRouter error: {resp.status_code}: {resp.text}") | |
| return None, None, prompt | |
| result = resp.json()["choices"][0]["message"]["content"] | |
| start = result.find("{") | |
| end = result.rfind("}") + 1 | |
| if start == -1 or end == 0: | |
| if status_box: | |
| status_box.error("Agent did not return JSON.") | |
| status_box.write(result) | |
| return None, result, prompt | |
| try: | |
| return json.loads(result[start:end]), result, prompt | |
| except Exception as e: | |
| if status_box: | |
| status_box.error("Error parsing LLM response.") | |
| status_box.write(result) | |
| return None, result, prompt | |
| def advanced_llm_verdict(llm_json, min_confidence, status_box=None): | |
| conf = llm_json.get("confidence", 0) | |
| if conf < min_confidence or conf >= min_confidence + 15: | |
| return None, None, None | |
| verdict_prompt = f""" | |
| Here is the extracted document information and prior validation result: | |
| {json.dumps(llm_json)} | |
| The minimum required confidence is {min_confidence}. Should this document be accepted or rejected for an application, based on all available information? | |
| Respond ONLY as: {{ "accepted": true/false, "reason": "..." }} | |
| """ | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "HTTP-Referer": "https://chat.openai.com", | |
| "X-Title": "EZOFIS-Doc-Validator", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "model": MISTRAL_MODEL, | |
| "messages": [{"role": "user", "content": verdict_prompt}], | |
| "temperature": 0.1, | |
| "max_tokens": 256 | |
| } | |
| if status_box: | |
| status_box.info("Step 3: LLM self-verdict (gray zone confidence)...") | |
| resp = requests.post(OPENROUTER_URL, headers=headers, json=data, timeout=60) | |
| if resp.status_code == 200: | |
| try: | |
| content = resp.json()["choices"][0]["message"]["content"] | |
| vstart = content.find("{") | |
| vend = content.rfind("}") + 1 | |
| verdict_json = json.loads(content[vstart:vend]) | |
| return verdict_json, content, verdict_prompt | |
| except Exception: | |
| return None, content, verdict_prompt | |
| return None, None, verdict_prompt | |
| def fuzzy_match_type(detected_type, checklist_types): | |
| best_type = None | |
| best_score = 0 | |
| for t in checklist_types: | |
| score = fuzz.token_set_ratio(str(detected_type), str(t)) | |
| if score > best_score: | |
| best_type = t | |
| best_score = score | |
| return best_type, best_score | |
| # ====== CARD RENDERING FUNCTION ====== | |
| def show_validation_card(result): | |
| accepted = result["Accepted"] == "Yes" | |
| expired = result["Expired"] == "Yes" | |
| genuine = result["Genuine"] == "Yes" | |
| decision_color = "#d32f2f" if not accepted else "#388e3c" | |
| yes_color = "#388e3c" | |
| no_color = "#d32f2f" | |
| bg_reason = "#ffeaea" if not accepted else "#eafbe8" | |
| st.markdown(f""" | |
| <div style="border-radius:16px;border:2px solid #A020F0; margin-bottom:32px; background:#f9f7ff;padding:18px 22px 22px 22px;box-shadow:0 3px 16px #0001;"> | |
| <div style="font-size:14px;font-weight:600;letter-spacing:0.3px;margin-bottom:10px;color:#333;"> | |
| {result['File']} | |
| </div> | |
| <table style="width:100%;border:none;margin-bottom:12px;"> | |
| <tr> | |
| <td style="width:40%;font-size:17px;font-weight:700;">Decision:</td> | |
| <td style="width:60%;font-size:17px;font-weight:700;color:{decision_color};">{'Accepted' if accepted else 'Rejected'}</td> | |
| </tr> | |
| <tr> | |
| <td style="font-size:17px;font-weight:700;">Confidence:</td> | |
| <td style="font-size:17px;">{result['Confidence']}%</td> | |
| </tr> | |
| </table> | |
| <div style="border-radius:8px;background:{bg_reason};padding:11px 14px 11px 14px;color:#720000;font-size:15.5px;margin-bottom:17px;"> | |
| <span style="font-weight:bold;">Reason:</span><br>{result['Reason']} | |
| </div> | |
| <table style="width:100%;margin-top:10px;margin-bottom:5px;"> | |
| <tr> | |
| <td style="font-weight:600;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px;">Detected Document:</td> | |
| <td style="font-weight:600;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px;">Matched with Checklist:</td> | |
| </tr> | |
| <tr> | |
| <td style="color:{yes_color if accepted else '#222'};font-weight:600;font-size:15px;">{result['Detected Type']}</td> | |
| <td style="color:{yes_color if accepted else '#222'};font-weight:600;font-size:15px;">{result['Checklist Match']}</td> | |
| </tr> | |
| <tr> | |
| <td style="font-weight:600;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px;">Genuine:</td> | |
| <td style="font-weight:600;font-size:15px;border-bottom:1px solid #ddd;padding-bottom:3px;">Expired:</td> | |
| </tr> | |
| <tr> | |
| <td style="color:{yes_color if genuine else no_color};font-weight:600;font-size:15px;">{"Yes" if genuine else "No"}</td> | |
| <td style="color:{yes_color if not expired else no_color};font-weight:600;font-size:15px;">{"Yes" if expired else "No"}</td> | |
| </tr> | |
| </table> | |
| <div style="color:#555;font-size:14px;margin-top:7px;"> | |
| <b>Expiry Date:</b> {result["Expiry Date"]} | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # ====== MAIN PROCESSING LOOP ====== | |
| if 'run_btn' not in locals(): | |
| run_btn = False | |
| if run_btn and uploaded_files: | |
| results = [] | |
| debug_data = [] | |
| with col_right: | |
| for uploaded_file in uploaded_files: | |
| st.markdown( | |
| f"<div style='font-size:15.5px;font-weight:500;color:#424242;margin:14px 0 2px 0;'>" | |
| f"Validating: <span style='color:#A020F0'>{uploaded_file.name}</span>" | |
| f"</div>", | |
| unsafe_allow_html=True | |
| ) | |
| status_box = st.empty() | |
| debug = {} | |
| # Step 1: OCR | |
| doc_text = extract_text_from_unstract(uploaded_file, status_box) | |
| debug['OCR_extracted_text'] = doc_text | |
| if not doc_text: | |
| status_box.error("Skipping due to OCR extraction error.") | |
| debug['error'] = "OCR extraction error" | |
| debug_data.append({uploaded_file.name: debug}) | |
| continue | |
| # Step 2: LLM Validation | |
| llm_json, llm_raw, llm_prompt = query_mistral_llm(doc_text, checklist, agent_instruction, date_str, status_box) | |
| debug['LLM_prompt'] = llm_prompt | |
| debug['LLM_raw_response'] = llm_raw | |
| debug['LLM_parsed_json'] = llm_json | |
| if not llm_json: | |
| status_box.error("Skipping due to LLM error.") | |
| debug['error'] = "LLM processing error" | |
| debug_data.append({uploaded_file.name: debug}) | |
| continue | |
| detected_type = llm_json.get("document_type", "") | |
| matched_type, match_score = fuzzy_match_type(detected_type, required_types) | |
| checklist_matched = llm_json.get("checklist_matched", False) | |
| if checklist_matched and match_score < min_match_score: | |
| checklist_matched = False | |
| llm_conf = llm_json.get("confidence", 0) | |
| # Robustly handle is_expired | |
| is_expired = llm_json.get("is_expired", False) | |
| if isinstance(is_expired, str): | |
| is_expired = is_expired.lower() == "true" | |
| accepted = ( | |
| checklist_matched and | |
| llm_json.get("looks_genuine", False) and | |
| not is_expired and | |
| (llm_conf >= min_confidence) | |
| ) | |
| reason = [] | |
| if not checklist_matched: | |
| reason.append("No matching checklist item found. Document rejected.") | |
| else: | |
| reason.append( | |
| f"Document type '{detected_type}' matched checklist '{matched_type}' with score {match_score}/100." | |
| ) | |
| if not llm_json.get("looks_genuine", False): | |
| reason.append("Document does not look genuine.") | |
| if is_expired: | |
| reason.append("Document is expired.") | |
| reason.append(f"Genuineness confidence: {llm_conf}.") | |
| reason.append(llm_json.get("verdict", "")) | |
| verdict_json, verdict_raw, verdict_prompt = advanced_llm_verdict(llm_json, min_confidence, status_box) | |
| debug['LLM_self_verdict_prompt'] = verdict_prompt | |
| debug['LLM_self_verdict_raw'] = verdict_raw | |
| debug['LLM_self_verdict_json'] = verdict_json | |
| if verdict_json: | |
| accepted = verdict_json.get("accepted", False) | |
| reason.append(f"LLM Self-verdict: {verdict_json.get('reason','')}") | |
| status_box.info("Final decision (gray zone) taken by LLM self-verdict.") | |
| results.append({ | |
| "File": uploaded_file.name, | |
| "Detected Type": detected_type, | |
| "Checklist Match": matched_type if checklist_matched else "-", | |
| "Type Score": match_score, | |
| "Expiry Date": llm_json.get("expiry_date", "-"), | |
| "Expired": "Yes" if is_expired else "No", | |
| "Genuine": "Yes" if llm_json.get("looks_genuine", False) else "No", | |
| "Confidence": llm_conf, | |
| "Accepted": "Yes" if accepted else "No", | |
| "Reason": " ".join(reason) | |
| }) | |
| debug['Checklist_match_details'] = { | |
| "detected_type": detected_type, | |
| "matched_type": matched_type, | |
| "match_score": match_score, | |
| "checklist_matched": checklist_matched, | |
| "accepted": accepted | |
| } | |
| debug_data.append({uploaded_file.name: debug}) | |
| status_box.success("Validation complete. See result below.") | |
| # ==== Card-style results ==== | |
| if results: | |
| st.success("All validations complete.") | |
| for result in results: | |
| show_validation_card(result) | |
| else: | |
| st.warning("No valid results.") | |
| with st.expander("Debug Panel (per document)"): | |
| for doc_debug in debug_data: | |
| for fname, dbg in doc_debug.items(): | |
| st.markdown(f"**{fname}**") | |
| st.json(dbg) | |