""" ╔══════════════════════════════════════════════════════════════════╗ ║ CSM DUAL-CARD ID OCR SYSTEM — ARCHITECTURE NOTE ║ ╠══════════════════════════════════════════════════════════════════╣ ║ MODEL TASKS (8B VLM): ║ ║ Step 1 → Raw OCR: All text, original script, no translate ║ ║ Step 2 → Doc classify + non-English gap fill only ║ ║ PYTHON TASKS (Authoritative): ║ ║ MRZ parse+verify | Numeral convert | Calendar convert ║ ║ English label extract | Script separate | Cross verify ║ ╚══════════════════════════════════════════════════════════════════╝ """ import os import uuid import time import re import datetime from threading import Thread from typing import Iterable, Dict, Any import gradio as gr import spaces import torch from PIL import Image os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" os.environ["HF_HOME"] = "/tmp/hf_home" from transformers import ( AutoProcessor, AutoModelForImageTextToText, # Universal VLM loader — Qwen2VL + Qwen3VL dono TextIteratorStreamer, BitsAndBytesConfig, ) # Specific class imports — graceful fallback try: from transformers import Qwen3VLForConditionalGeneration QWEN3_AVAILABLE = True print("✅ Qwen3VLForConditionalGeneration available") except ImportError: QWEN3_AVAILABLE = False print("⚠️ Qwen3VL direct import not available — using AutoModel fallback") try: from transformers import Qwen2VLForConditionalGeneration QWEN2_AVAILABLE = True except ImportError: QWEN2_AVAILABLE = False try: from transformers import Qwen2_5_VLForConditionalGeneration QWEN25_AVAILABLE = True except ImportError: QWEN25_AVAILABLE = False try: from peft import PeftModel, PeftConfig PEFT_AVAILABLE = True print("✅ PEFT available") except ImportError: PEFT_AVAILABLE = False print("⚠️ PEFT not available") from gradio.themes import Soft from gradio.themes.utils import colors, fonts, sizes # ===== THEME ===== colors.steel_blue = colors.Color( name="steel_blue", c50="#EBF3F8", c100="#D3E5F0", c200="#A8CCE1", c300="#7DB3D2", c400="#529AC3", c500="#4682B4", c600="#3E72A0", c700="#36638C", c800="#2E5378", c900="#264364", c950="#1E3450", ) class SteelBlueTheme(Soft): def __init__(self, *, primary_hue=colors.gray, secondary_hue=colors.steel_blue, neutral_hue=colors.slate, text_size=sizes.text_lg, font=(fonts.GoogleFont("Outfit"), "Arial", "sans-serif"), font_mono=(fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace")): super().__init__(primary_hue=primary_hue, secondary_hue=secondary_hue, neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono) super().set( background_fill_primary="*primary_50", background_fill_primary_dark="*primary_900", body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", button_primary_text_color="white", button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)", button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)", button_secondary_text_color="black", button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)", button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)", slider_color="*secondary_500", block_title_text_weight="600", block_border_width="3px", block_shadow="*shadow_drop_lg", button_primary_shadow="*shadow_drop_lg", button_large_padding="11px", color_accent_soft="*primary_100", block_label_background_fill="*primary_200", ) steel_blue_theme = SteelBlueTheme() css = """ #main-title h1 { font-size: 2.3em !important; } #output-title h2 { font-size: 2.2em !important; } .ra-wrap{ width: fit-content; } .ra-inner{ position: relative; display: inline-flex; align-items: center; gap: 0; padding: 6px; background: var(--neutral-200); border-radius: 9999px; overflow: hidden; } .ra-input{ display: none; } .ra-label{ position: relative; z-index: 2; padding: 8px 16px; font-family: inherit; font-size: 14px; font-weight: 600; color: var(--neutral-500); cursor: pointer; transition: color 0.2s; white-space: nowrap; } .ra-highlight{ position: absolute; z-index: 1; top: 6px; left: 6px; height: calc(100% - 12px); border-radius: 9999px; background: white; box-shadow: 0 2px 4px rgba(0,0,0,0.1); transition: transform 0.2s, width 0.2s; } .ra-input:checked + .ra-label{ color: black; } .dark .ra-inner { background: var(--neutral-800); } .dark .ra-label { color: var(--neutral-400); } .dark .ra-highlight { background: var(--neutral-600); } .dark .ra-input:checked + .ra-label { color: white; } #gpu-duration-container { padding: 10px; border-radius: 8px; background: var(--background-fill-secondary); border: 1px solid var(--border-color-primary); margin-top: 10px; } """ MAX_MAX_NEW_TOKENS = 4096 DEFAULT_MAX_NEW_TOKENS = 1024 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("CUDA available:", torch.cuda.is_available()) if torch.cuda.is_available(): print("Device:", torch.cuda.get_device_name(0)) print("Using:", device) # ╔══════════════════════════════════════════╗ # ║ UNIVERSAL PROMPTS ║ # ╚══════════════════════════════════════════╝ STEP1_EXTRACT_PROMPT = """You are a universal OCR engine. Transcribe ALL visible text from this document image. OUTPUT FORMAT — fill exactly as shown: PHOTO_PRESENT: yes/no PHOTO_LOCATION: [describe position: top-left / top-right / center-left / not found] SIGNATURE_PRESENT: yes/no SIGNATURE_LOCATION: [describe position: bottom-left / bottom-right / not found] MRZ_PRESENT: yes/no DETECTED_LANGUAGE: [list all languages visible e.g. Arabic+English, Farsi+English, Hindi+English, Chinese, English] ---TEXT_START--- [Every word, number, symbol, label and value visible — line by line] [Original script preserved: Arabic, Farsi, Hindi, Chinese, Cyrillic etc. — DO NOT translate here] [Copy label AND its value together: e.g. "DATE OF BIRTH 12/05/2003"] [MRZ lines: copy character-perfect including ALL < symbols] [Include corner text, watermarks, small print] ---TEXT_END--- ABSOLUTE RULES: - NEVER output pixel coordinates like (50,68) or bounding boxes — plain text ONLY - DO NOT translate in this step — original script as-is - DO NOT skip or summarize any field - Copy every character exactly including < symbols in MRZ""" STEP2_TEMPLATE = """You are a universal KYC document analyst. The Python pipeline has already extracted English fields and parsed MRZ. Your job is ONLY: classify document + fill gaps from non-English text. ━━━ ALREADY EXTRACTED BY PYTHON (DO NOT RE-EXTRACT) ━━━ English Fields Found Directly on Card: {python_fields_table} MRZ Python Parse Result: {mrz_summary} ━━━ YOUR INPUT DATA ━━━ English text block from card: {english_block} Non-English original script block: {original_block} ━━━ YOUR TASKS — ONLY THESE 3 ━━━ TASK 1: Identify document type and issuing info - Read English block and original block - Keywords: PASSPORT/RESIDENT CARD/NATIONAL ID/DRIVING LICENCE/بطاقة/جواز/رخصة/आधार/PAN - Top of card = issuing country/institution (NOT person name) TASK 2: Classify non-English labels → check if already in English fields above - If نام (Farsi: Name) value already in Python English fields → SKIP - If شماره ملی (National Number) already in Python fields → SKIP - Only add fields GENUINELY missing from Python extraction TASK 3: Transliterate non-English values NOT found in English block - Example: محمد → Mohammad | چراغی → Cheraghi - Dates in Shamsi/Hijri: write BOTH original AND note calendar type (DO NOT convert — Python handles conversion) RULES: - NEVER copy template placeholders like [fill here] or [value] - NEVER re-state what Python already found - NEVER guess values not visible in card - If all fields already covered → write "✅ All fields covered by Python extraction" ━━━ OUTPUT FORMAT ━━━ --- ## 📋 Document Classification | | | |---|---| | **Document Type** | | | **Issuing Country** | | | **Issuing Authority** | | --- ## ➕ Additional Fields (non-English only — genuinely new) | Label (Original) | Label (English) | Value (Original) | Value (Transliterated) | |---|---|---|---| | [only if not in Python fields above] | | | | --- ## 🗓️ Calendar Note (if non-Gregorian dates found) | Original Date | Calendar System | Note | |---|---|---| | [date as on card] | [Solar Hijri / Lunar Hijri / Buddhist] | Python will convert | ---""" def load_vl_model(model_id: str, quantization_config=None, pre_quantized: bool = False): """ Universal VLM loader — Qwen2VL / Qwen3VL / any VLM pre_quantized=True → model already has weights quantized, no extra config needed pre_quantized=False → apply quantization_config during load """ load_kwargs = { "torch_dtype": "auto", "device_map": "auto", "trust_remote_code": True, } if quantization_config is not None and not pre_quantized: load_kwargs["quantization_config"] = quantization_config # Try 1: Qwen3VL (newest) if QWEN3_AVAILABLE: try: return Qwen3VLForConditionalGeneration.from_pretrained( model_id, **load_kwargs).eval() except Exception as e: print(f" Qwen3VL failed: {e}, trying AutoModel...") # Try 2: AutoModelForImageTextToText (universal fallback) try: return AutoModelForImageTextToText.from_pretrained( model_id, **load_kwargs).eval() except Exception as e: print(f" AutoModel failed: {e}, trying Qwen2VL...") # Try 3: Qwen2VL last resort if QWEN2_AVAILABLE: return Qwen2VLForConditionalGeneration.from_pretrained( model_id, **load_kwargs).eval() raise RuntimeError(f"No compatible loader found for {model_id}") # ╔══════════════════════════════════════════╗ # ║ MODEL LOADING ║ # ╚══════════════════════════════════════════╝ print("\n" + "="*70) print("🚀 LOADING 4 MODELS") print("="*70) # 4-bit BitsAndBytes config (shared for quantized models) bnb_4bit_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, ) # ── Model 1: Chhagan_ML-VL-OCR-v1 (LoRA on Qwen2VL base) ── print("\n1️⃣ Chhagan_ML-VL-OCR-v1 (LoRA Refined)...") MODEL_ID_C1 = "Chhagan005/Chhagan_ML-VL-OCR-v1" CHHAGAN_V1_AVAILABLE = False processor_c1 = model_c1 = None if PEFT_AVAILABLE: try: config = PeftConfig.from_pretrained(MODEL_ID_C1) base_id = config.base_model_name_or_path processor_c1 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True) base_c1 = load_vl_model(base_id) model_c1 = PeftModel.from_pretrained(base_c1, MODEL_ID_C1).to(device).eval() print(" ✅ Loaded!") CHHAGAN_V1_AVAILABLE = True except Exception as e: print(f" ❌ Failed: {e}") else: print(" ⚠️ PEFT not available") # ── Model 2: Chhagan-DocVL-Qwen3 (LoRA on Qwen3VL base) ── print("\n2️⃣ Chhagan-DocVL-Qwen3 (Qwen3-VL Refined)...") MODEL_ID_C2 = "Chhagan005/Chhagan-DocVL-Qwen3" CHHAGAN_QWEN3_AVAILABLE = False processor_c2 = model_c2 = None if PEFT_AVAILABLE: try: config = PeftConfig.from_pretrained(MODEL_ID_C2) base_id = config.base_model_name_or_path processor_c2 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True) base_c2 = load_vl_model(base_id) model_c2 = PeftModel.from_pretrained(base_c2, MODEL_ID_C2).to(device).eval() print(" ✅ Loaded!") CHHAGAN_QWEN3_AVAILABLE = True except Exception as e: print(f" ❌ Failed: {e}") else: print(" ⚠️ PEFT not available") # ── Model 3: CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized) ── print("\n3️⃣ CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized BNB)...") MODEL_ID_Q4KM = "Chhagan005/CSM-DocExtract-VL-Q4KM" CSM_Q4KM_AVAILABLE = False processor_q4km = model_q4km = None try: processor_q4km = AutoProcessor.from_pretrained( MODEL_ID_Q4KM, trust_remote_code=True ) # Pre-quantized safetensors → torch_dtype=auto, NO extra quantization_config model_q4km = Qwen3VLForConditionalGeneration.from_pretrained( MODEL_ID_Q4KM, torch_dtype="auto", device_map="auto", trust_remote_code=True, ).eval() print(" ✅ Loaded! (Qwen3VL pre-quantized BNB ~6.4GB)") CSM_Q4KM_AVAILABLE = True except Exception as e: try: model_q4km = AutoModelForImageTextToText.from_pretrained( MODEL_ID_Q4KM, torch_dtype="auto", device_map="auto", trust_remote_code=True, ).eval() print(" ✅ Loaded! (AutoModel fallback)") CSM_Q4KM_AVAILABLE = True except Exception as e2: print(f" ❌ Failed: {e2}") # ── Model 4: CSM-DocExtract-VL (Full Qwen3VL, BNB INT4 trained) ── print("\n4️⃣ CSM-DocExtract-VL 4BNB (Full Qwen3VL, BNB INT4 trained)...") MODEL_ID_4BNB = "Chhagan005/CSM-DocExtract-VL" CSM_4BNB_AVAILABLE = False processor_4bnb = model_4bnb = None system_prompt_4bnb = "You are a helpful assistant." # default try: # Read custom system_prompt.txt — this model was trained with it try: from huggingface_hub import hf_hub_download sp_path = hf_hub_download(repo_id=MODEL_ID_4BNB, filename="system_prompt.txt") with open(sp_path, "r", encoding="utf-8") as f: system_prompt_4bnb = f.read().strip() print(f" 📋 system_prompt.txt loaded: {system_prompt_4bnb[:80]}...") except Exception as sp_err: print(f" ⚠️ system_prompt.txt not loaded: {sp_err} — using default") processor_4bnb = AutoProcessor.from_pretrained( MODEL_ID_4BNB, trust_remote_code=True ) # BNB INT4 trained safetensors → torch_dtype=auto, NO extra quantization_config # (ignore .gguf files — those are for llama.cpp, not transformers) model_4bnb = Qwen3VLForConditionalGeneration.from_pretrained( MODEL_ID_4BNB, torch_dtype="auto", device_map="auto", trust_remote_code=True, ignore_mismatched_sizes=True, # GGUF files present — ignore safely ).eval() print(" ✅ Loaded! (Qwen3VL BNB INT4 trained ~6.4GB)") CSM_4BNB_AVAILABLE = True except Exception as e: try: model_4bnb = AutoModelForImageTextToText.from_pretrained( MODEL_ID_4BNB, torch_dtype="auto", device_map="auto", trust_remote_code=True, ).eval() print(" ✅ Loaded! (AutoModel fallback)") CSM_4BNB_AVAILABLE = True except Exception as e2: print(f" ❌ Failed: {e2}") print("\n" + "="*70) print("📊 MODEL STATUS") print("="*70) status = [ ("Chhagan_ML-VL-OCR-v1", CHHAGAN_V1_AVAILABLE, "LoRA Fine-tuned"), ("Chhagan-DocVL-Qwen3", CHHAGAN_QWEN3_AVAILABLE, "Qwen3-VL Fine-tuned"), ("CSM-DocExtract-Q4KM", CSM_Q4KM_AVAILABLE, "Qwen3VL Q4KM pre-quantized"), ("CSM-DocExtract-4BNB", CSM_4BNB_AVAILABLE, "Qwen3VL BitsAndBytes 4-bit"), ] for name, ok, note in status: print(f" {'✅' if ok else '❌'} {name:<35} {note}") print("="*70) loaded = sum(x[1] for x in status) print(f" Total loaded: {loaded}/4\n") # ╔══════════════════════════════════════════╗ # ║ PYTHON PIPELINE FUNCTIONS ║ # ╚══════════════════════════════════════════╝ def convert_eastern_numerals(text: str) -> str: """P2: Convert Persian/Arabic/Devanagari numerals to Western 0-9""" tables = [ str.maketrans('۰۱۲۳۴۵۶۷۸۹', '0123456789'), # Persian str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789'), # Arabic str.maketrans('०१२३४५६७८९', '0123456789'), # Devanagari str.maketrans('০১২৩৪৫৬৭৮৯', '0123456789'), # Bengali str.maketrans('੦੧੨੩੪੫੬੭੮੯', '0123456789'), # Gurmukhi ] for table in tables: text = text.translate(table) return text def detect_calendar_system(raw_text: str) -> str: """Detect calendar system from country/language context""" text_upper = raw_text.upper() if any(kw in raw_text for kw in ['جمهوری اسلامی ایران', 'IRAN', 'AFGHANISTAN', 'افغانستان']): return 'solar_hijri' if any(kw in text_upper for kw in ['SAUDI', 'ARABIA', 'السعودية', 'KUWAIT', 'QATAR', 'BAHRAIN', 'JORDAN']): return 'lunar_hijri' return 'gregorian' def convert_shamsi_to_gregorian(shamsi_date: str) -> str: """P3: Solar Hijri (Shamsi) → Gregorian using khayyam library""" try: import khayyam parts = re.split(r'[/\-\.]', shamsi_date.strip()) if len(parts) == 3: y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) jd = khayyam.JalaliDate(y, m, d) greg = jd.todate() return f"{greg.day:02d}/{greg.month:02d}/{greg.year}" except ImportError: # Approximate manual conversion if khayyam not installed try: parts = re.split(r'[/\-\.]', shamsi_date.strip()) y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) greg_year = y + 621 return f"{d:02d}/{m:02d}/{greg_year} (approx)" except: pass except Exception: pass return f"{shamsi_date} (Shamsi)" def convert_hijri_to_gregorian(hijri_date: str) -> str: """P3: Lunar Hijri → Gregorian using hijri library""" try: from hijri_converter import convert parts = re.split(r'[/\-\.]', hijri_date.strip()) if len(parts) == 3: y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) greg = convert.Hijri(y, m, d).to_gregorian() return f"{greg.day:02d}/{greg.month:02d}/{greg.year}" except ImportError: try: parts = re.split(r'[/\-\.]', hijri_date.strip()) y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) greg_year = y - 43 + 622 return f"{d:02d}/{m:02d}/{greg_year} (approx)" except: pass except: pass return f"{hijri_date} (Hijri)" def separate_scripts(raw_text: str) -> tuple: """P5: Separate English/Latin lines from non-Latin script lines""" english_lines = [] original_lines = [] for line in raw_text.split('\n'): line = line.strip() if not line: continue non_latin = sum(1 for c in line if ord(c) > 591) total_alpha = sum(1 for c in line if c.isalpha()) if total_alpha == 0: english_lines.append(line) elif non_latin / max(total_alpha, 1) > 0.4: original_lines.append(line) else: english_lines.append(line) return '\n'.join(english_lines), '\n'.join(original_lines) def extract_english_fields(raw_text: str) -> list: """P4: Extract English label:value pairs directly from card text — no AI""" results = [] patterns = [ (r'(?:FULL\s+)?NAME\s*[:\-.]?\s*([A-Za-z][A-Za-z\s\-\.\']{1,60})', 'NAME'), (r'DATE\s+OF\s+BIRTH\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), (r'\bDOB\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), (r'BIRTH\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), (r'EXPIRY\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), (r'DATE\s+OF\s+EXPIRY\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), (r'VALID(?:\s+THRU|\s+UNTIL|ITY)?\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), (r'EXPIRATION\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), (r'(?:DATE\s+OF\s+)?ISSUE\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'), (r'DATE\s+OF\s+ISSUE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'), (r'CIVIL\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'CIVIL NUMBER'), (r'PASSPORT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{6,12})', 'PASSPORT NUMBER'), (r'LICENCE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'), (r'LICENSE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'), (r'AADHAAR\s*(?:NO\.?|NUMBER)?\s*[:\-.]?\s*(\d{4}\s?\d{4}\s?\d{4})', 'AADHAAR NUMBER'), (r'\bPAN\s*[:\-.]?\s*([A-Z]{5}\d{4}[A-Z])', 'PAN NUMBER'), (r'EMIRATES\s+ID\s*[:\-.]?\s*(\d{3}-\d{4}-\d{7}-\d)', 'EMIRATES ID'), (r'(?:NATIONAL\s+)?ID\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'ID NUMBER'), (r'DOCUMENT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'DOCUMENT NUMBER'), (r'NATIONALITY\s*[:\-.]?\s*([A-Za-z]{3,30})', 'NATIONALITY'), (r'(?:GENDER|SEX)\s*[:\-.]?\s*(MALE|FEMALE)', 'GENDER'), (r'PLACE\s+OF\s+BIRTH\s*[:\-.]?\s*([A-Za-z\s,]{2,40})', 'PLACE OF BIRTH'), (r'(?:PERMANENT\s+)?ADDRESS\s*[:\-.]?\s*(.{5,80})', 'ADDRESS'), (r'BLOOD\s+(?:GROUP|TYPE)\s*[:\-.]?\s*([ABO]{1,2}[+-]?)', 'BLOOD GROUP'), (r'(?:PROFESSION|OCCUPATION|JOB\s+TITLE)\s*[:\-.]?\s*(.{3,50})', 'PROFESSION'), (r'FATHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "FATHER'S NAME"), (r'MOTHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "MOTHER'S NAME"), (r'EMPLOYER\s*[:\-.]?\s*(.{3,60})', 'EMPLOYER'), ] seen = set() for pattern, label in patterns: m = re.search(pattern, raw_text, re.IGNORECASE) if m and label not in seen: val = m.group(1).strip() if val and len(val) > 1 and '[' not in val: results.append((label, val)) seen.add(label) return results def parse_mrz_lines(raw_text: str) -> dict: """P1: Authoritative Python MRZ parser — TD1, TD3, MRVA, MRVB""" # Normalize: western numerals only raw_text = convert_eastern_numerals(raw_text) lines = [] for line in raw_text.split('\n'): clean = re.sub(r'\s+', '', line.strip()) if re.match(r'^[A-Z0-9<]{25,50}$', clean): lines.append(clean) if not lines: return {} def decode_date(yymmdd: str, is_dob: bool = False) -> str: try: yy, mm, dd = int(yymmdd[0:2]), int(yymmdd[2:4]), int(yymmdd[4:6]) if not (1 <= mm <= 12 and 1 <= dd <= 31): return f"Invalid ({yymmdd})" cur_yy = datetime.datetime.now().year % 100 year = (1900 + yy) if (is_dob and yy > cur_yy) else (2000 + yy) return f"{dd:02d}/{mm:02d}/{year}" except: return yymmdd def clean_fill(s: str) -> str: return re.sub(r'<+$', '', s).replace('<', ' ').strip() def parse_name(line3: str) -> str: name_clean = re.sub(r'<+$', '', line3) if '<<' in name_clean: parts = name_clean.split('<<') surname = parts[0].replace('<', ' ').strip().title() given = parts[1].replace('<', ' ').strip().title() if len(parts) > 1 else '' return f"{given} {surname}".strip() if given else surname return name_clean.replace('<', ' ').strip().title() result = {} # TD1: 3 lines, 28-36 chars td1 = [l for l in lines if 28 <= len(l) <= 36] if len(td1) >= 2: l1, l2 = td1[0], td1[1] l3 = td1[2] if len(td1) > 2 else "" result['doc_type'] = clean_fill(l1[0:2]) result['country_code'] = clean_fill(l1[2:5]) result['doc_number'] = clean_fill(l1[5:14]) if len(l2) >= 19: result['dob'] = decode_date(l2[0:6], is_dob=True) sex = l2[7] if len(l2) > 7 else '' result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') result['expiry'] = decode_date(l2[8:14], is_dob=False) result['nationality'] = clean_fill(l2[15:18]) if l3: result['name'] = parse_name(l3) result['mrz_format'] = 'TD1' return result # TD3: 2 lines, 40-48 chars (Passports) td3 = [l for l in lines if 40 <= len(l) <= 48] if len(td3) >= 2: l1, l2 = td3[0], td3[1] result['doc_type'] = clean_fill(l1[0:2]) result['country_code'] = clean_fill(l1[2:5]) result['name'] = parse_name(l1[5:44]) if len(l2) >= 27: result['doc_number'] = clean_fill(l2[0:9]) result['nationality'] = clean_fill(l2[10:13]) result['dob'] = decode_date(l2[13:19], is_dob=True) sex = l2[20] if len(l2) > 20 else '' result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') result['expiry'] = decode_date(l2[21:27], is_dob=False) result['mrz_format'] = 'TD3' return result # MRVA/MRVB: 2 lines, 36 chars (Visas) mrv = [l for l in lines if 36 <= len(l) <= 38] if len(mrv) >= 2: l1, l2 = mrv[0], mrv[1] result['doc_type'] = clean_fill(l1[0:2]) result['country_code'] = clean_fill(l1[2:5]) result['name'] = parse_name(l1[5:36]) if len(l2) >= 27: result['doc_number'] = clean_fill(l2[0:9]) result['nationality'] = clean_fill(l2[10:13]) result['dob'] = decode_date(l2[13:19], is_dob=True) sex = l2[20] if len(l2) > 20 else '' result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') result['expiry'] = decode_date(l2[21:27], is_dob=False) result['mrz_format'] = 'MRVA/MRVB' return result return {} def build_mrz_table(mrz_data: dict) -> str: if not mrz_data: return "No MRZ detected." table = f"**Python Parsed MRZ — Authoritative ({mrz_data.get('mrz_format','?')} format):**\n\n" table += "| Field | Verified Value |\n|---|---|\n" fields = [ ('mrz_format', 'MRZ Format'), ('doc_type', 'Document Type'), ('country_code', 'Issuing Country Code'), ('doc_number', 'Document / Civil Number'), ('name', 'Full Name'), ('dob', 'Date of Birth'), ('expiry', 'Expiry Date'), ('nationality', 'User Nationality'), ('sex', 'Gender'), ] for key, label in fields: if key in mrz_data: table += f"| {label} | **{mrz_data[key]}** ✅ |\n" return table def build_unified_summary(front_result: str, back_result: str, mrz_data: dict) -> str: """P6: Merge front+back fields, MRZ as ground truth override""" summary = "## 🔄 Unified Deduplicated Record\n\n" if mrz_data: summary += f"> ✅ *MRZ Python-parsed ({mrz_data.get('mrz_format','?')}) — MRZ values are **ground truth**.*\n\n" summary += "### 🔐 MRZ Ground Truth\n\n" summary += build_mrz_table(mrz_data) + "\n\n---\n\n" else: summary += "> *No MRZ — fields merged from front+back. Conflicts flagged ⚠️.*\n\n" def get_rows(text): rows = {} m = re.search(r"## (?:✅|🗂️)[^\n]*\n\|[^\n]*\n\|[-| ]+\n(.*?)(?=\n---|\Z)", text, re.DOTALL) if m: for line in m.group(1).strip().split('\n'): parts = [p.strip() for p in line.split('|') if p.strip()] if len(parts) >= 2: field = re.sub(r'[^\w\s/\']', '', parts[0]).strip() val = parts[1].strip() if val and val.lower() not in ('—', 'not on card', 'n/a', ''): rows[field] = val return rows front_f = get_rows(front_result) back_f = get_rows(back_result) all_f = list(dict.fromkeys(list(front_f.keys()) + list(back_f.keys()))) # MRZ lookup mrz_map = {} if mrz_data: kw_map = { 'name': ['name'], 'doc_number': ['civil', 'document', 'id', 'passport', 'licence'], 'dob': ['birth', 'dob'], 'expiry': ['expiry', 'expiration'], 'sex': ['gender', 'sex'], 'nationality':['nationality'], } for mk, keywords in kw_map.items(): if mk in mrz_data: for kw in keywords: mrz_map[kw] = mrz_data[mk] def get_mrz(field): fl = field.lower() for kw, v in mrz_map.items(): if kw in fl: return v return None summary += "### 📋 Field Comparison\n\n| Field | Value | Source |\n|---|---|---|\n" for field in all_f: fv = front_f.get(field, '') bv = back_f.get(field, '') mv = get_mrz(field) if fv and bv: if fv.lower() == bv.lower(): note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else ("⚠️ MRZ differs: **" + mv + "**" if mv else "") summary += f"| {field} | {fv} | Front+Back ✅ {note} |\n" else: if mv: summary += f"| {field} | ~~{fv}~~ / ~~{bv}~~ → **{mv}** | ✅ MRZ Override |\n" else: summary += f"| {field} | F: **{fv}** / B: **{bv}** | ⚠️ Mismatch |\n" elif fv: note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "") summary += f"| {field} | {fv} | Front only {note} |\n" elif bv: note = f"✅ MRZ Confirmed" if mv and any(x in bv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "") summary += f"| {field} | {bv} | Back only {note} |\n" return summary + "\n" # ╔══════════════════════════════════════════╗ # ║ STEP PIPELINE FUNCTIONS ║ # ╚══════════════════════════════════════════╝ def run_step1_extraction(model, processor, image, device, temperature, top_p, top_k, repetition_penalty, system_prompt=None): """Step 1: LLM → Raw OCR, original script, NO translation, NO coordinates""" def _generate(prompt_text): try: from qwen_vl_utils import process_vision_info HAS_QWEN_VL_UTILS = True except ImportError: HAS_QWEN_VL_UTILS = False sys_msg = system_prompt or "You are a helpful assistant." messages = [ {"role": "system", "content": sys_msg}, {"role": "user", "content": [ {"type": "image", "image": image}, {"type": "text", "text": prompt_text}, ]} ] # Step A: Build prompt string try: prompt = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) if not isinstance(prompt, str): raise TypeError("non-string returned") except Exception: # Manual Qwen3VL token format — universal fallback prompt = ( "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" "<|im_start|>user\n" "<|vision_start|><|image_pad|><|vision_end|>" f"{prompt_text}<|im_end|>\n" "<|im_start|>assistant\n" ) # Step B: Build inputs — 3 fallback tiers inputs = None # Tier 1: qwen_vl_utils + images/videos kwargs (Qwen3VL standard) if HAS_QWEN_VL_UTILS and inputs is None: try: image_inputs, video_inputs = process_vision_info(messages) proc_kwargs = { "text": [prompt], "padding": True, "return_tensors": "pt" } if image_inputs is not None and len(image_inputs) > 0: proc_kwargs["images"] = image_inputs if video_inputs is not None and len(video_inputs) > 0: proc_kwargs["videos"] = video_inputs inputs = processor(**proc_kwargs).to(device) print(" ✅ Tier1: qwen_vl_utils") except Exception as e: print(f" Tier1 failed: {e}") inputs = None # Tier 2: Direct PIL image (Qwen2VL style) if inputs is None: try: inputs = processor( text=[prompt], images=[image], padding=True, return_tensors="pt", ).to(device) print(" ✅ Tier2: direct PIL") except Exception as e: print(f" Tier2 failed: {e}") inputs = None # Tier 3: Text-only (last resort) if inputs is None: print(" ⚠️ Tier3: text-only fallback (no image — degraded)") inputs = processor( text=[prompt], padding=True, return_tensors="pt", ).to(device) with torch.no_grad(): out = model.generate( **inputs, max_new_tokens=600, do_sample=True, temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, ) gen = out[:, inputs['input_ids'].shape[1]:] decoded = processor.batch_decode(gen, skip_special_tokens=True) if isinstance(decoded, list): return decoded[0] if decoded else "" return str(decoded) if decoded else "" result = _generate(STEP1_EXTRACT_PROMPT) # Coordinate output detect → retry with simpler prompt if re.search(r'\(\d{1,4},\s*\d{1,4}\)', result) or '---TEXT_START---' not in result: print(" ⚠️ Retrying with fallback prompt...") fallback = ( "Read all text from this document image and write it line by line in plain text.\n" "Do NOT output coordinates or bounding boxes.\n" "Start output with:\n" "PHOTO_PRESENT: yes or no\n" "SIGNATURE_PRESENT: yes or no\n" "MRZ_PRESENT: yes or no\n" "DETECTED_LANGUAGE: name the language(s)\n" "---TEXT_START---\n" "[all text here exactly as printed]\n" "---TEXT_END---" ) result = _generate(fallback) return result def parse_step1_output(raw_output: str) -> dict: """Parse Step 1 structured output → metadata + original text""" result = { "photo_present": "❌ No", "photo_location": "N/A", "sig_present": "❌ No", "sig_location": "N/A", "mrz_present": "❌ No", "detected_lang": "Unknown", "original_text": raw_output, } def get(pattern, text, default="N/A"): m = re.search(pattern, text, re.IGNORECASE) return m.group(1).strip() if m else default photo = get(r'PHOTO_PRESENT:\s*(yes|no)', raw_output) result["photo_present"] = "✅ Yes" if photo.lower() == "yes" else "❌ No" result["photo_location"] = get(r'PHOTO_LOCATION:\s*([^\n]+)', raw_output) sig = get(r'SIGNATURE_PRESENT:\s*(yes|no)', raw_output) result["sig_present"] = "✅ Yes" if sig.lower() == "yes" else "❌ No" result["sig_location"] = get(r'SIGNATURE_LOCATION:\s*([^\n]+)', raw_output) mrz = get(r'MRZ_PRESENT:\s*(yes|no)', raw_output) result["mrz_present"] = "✅ Yes" if mrz.lower() == "yes" else "❌ No" result["detected_lang"] = get(r'DETECTED_LANGUAGE:\s*([^\n]+)', raw_output, "Unknown") m = re.search(r'---TEXT_START---\n?(.*?)---TEXT_END---', raw_output, re.DOTALL) if m: result["original_text"] = m.group(1).strip() return result def run_step2_structure(model, processor, metadata: dict, device, max_new_tokens, temperature, top_p, top_k, repetition_penalty): """Step 2: Python extracts English fields + MRZ. LLM only classifies + fills gaps.""" raw_text = metadata.get('original_text', '') # P2: Convert eastern numerals first raw_text_normalized = convert_eastern_numerals(raw_text) # P5: Separate scripts english_block, original_block = separate_scripts(raw_text_normalized) # P4: Direct English field extraction english_fields = extract_english_fields(raw_text_normalized) # P1: MRZ parse (authoritative) mrz_data = parse_mrz_lines(raw_text_normalized) # P3: Calendar detection + conversion (for display) calendar_sys = detect_calendar_system(raw_text) # Build python fields table if english_fields: tbl = "| Field (as printed on card) | Value (as printed) |\n|---|---|\n" for label, val in english_fields: tbl += f"| **{label}** | {val} |\n" else: tbl = "| — | No English label:value pairs detected |\n" # MRZ summary if mrz_data: mrz_summary = " | ".join([f"{k}: {v}" for k, v in mrz_data.items() if k != 'mrz_format']) mrz_summary = f"✅ {mrz_data.get('mrz_format','?')} parsed: {mrz_summary}" else: mrz_summary = "❌ No MRZ detected" # Non-Gregorian note cal_note = "" if calendar_sys == 'solar_hijri': cal_note = "\n> ⚠️ **Solar Hijri (Shamsi) calendar detected** — Python will convert dates to Gregorian." elif calendar_sys == 'lunar_hijri': cal_note = "\n> ⚠️ **Lunar Hijri calendar detected** — Python will convert dates to Gregorian." # Build prompt for LLM (classification + gaps only) prompt_text = STEP2_TEMPLATE.format( python_fields_table=tbl, mrz_summary=mrz_summary, english_block=english_block or "None", original_block=original_block or "None", ) messages = [{"role": "user", "content": [{"type": "text", "text": prompt_text}]}] try: prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) except: prompt = prompt_text inputs = processor( text=[prompt], padding=True, return_tensors="pt", ).to(device) streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) gen_kwargs = { **inputs, "streamer": streamer, "max_new_tokens": max_new_tokens, "do_sample": True, "temperature": temperature, "top_p": top_p, "top_k": top_k, "repetition_penalty": repetition_penalty, } thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() # Pre-build Python-verified sections # ── Pre-compute outside f-string (backslash fix for Python < 3.12) ── newline = "\n" mrz_pattern = r'^[A-Z0-9<]{25,50}$' ws_pattern = r'\s+' mrz_raw_lines = [] for _l in raw_text.split("\n"): _c = re.sub(ws_pattern, '', _l.strip()) if re.match(mrz_pattern, _c): mrz_raw_lines.append(_c) mrz_raw_display = newline.join(mrz_raw_lines) if mrz_raw_lines else "NOT PRESENT" mrz_table_str = build_mrz_table(mrz_data) if mrz_data else "_No MRZ detected._" # Pre-build Python-verified sections python_sections = ( "## 🖼️ Visual Elements\n\n" "| Element | Status | Location |\n" "|---------|--------|----------|\n" f"| 📷 Profile Photo | {metadata['photo_present']} | {metadata['photo_location']} |\n" f"| ✍️ Signature | {metadata['sig_present']} | {metadata['sig_location']} |\n" f"| 🔐 MRZ Zone | {metadata['mrz_present']} | Bottom strip |\n\n" "---\n\n" "## ✅ English Fields (Direct from Card — Not Modified)\n" f"{cal_note}\n\n" f"{tbl}\n\n" "---\n\n" "## 📜 Original Script\n\n" "```\n" f"{raw_text}\n" "```\n\n" "---\n\n" "## 🔐 MRZ Data\n\n" "```\n" f"{mrz_raw_display}\n" "```\n\n" f"{mrz_table_str}\n\n" "---\n\n" ) return streamer, thread, mrz_data, python_sections # ╔══════════════════════════════════════════╗ # ║ GRADIO HELPER CLASSES ║ # ╚══════════════════════════════════════════╝ class RadioAnimated(gr.HTML): def __init__(self, choices, value=None, **kwargs): if not choices or len(choices) < 2: raise ValueError("RadioAnimated requires at least 2 choices.") if value is None: value = choices[0] uid = uuid.uuid4().hex[:8] group_name = f"ra-{uid}" inputs_html = "\n".join( f'' f'' for i, c in enumerate(choices) ) html_template = f"""