Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| ╔══════════════════════════════════════════════════════════════════╗ | |
| ║ CSM DUAL-CARD ID OCR SYSTEM — ARCHITECTURE NOTE ║ | |
| ╠══════════════════════════════════════════════════════════════════╣ | |
| ║ MODEL TASKS (8B VLM): ║ | |
| ║ Step 1 → Raw OCR: All text, original script, no translate ║ | |
| ║ Step 2 → Doc classify + non-English gap fill only ║ | |
| ║ PYTHON TASKS (Authoritative): ║ | |
| ║ MRZ parse+verify | Numeral convert | Calendar convert ║ | |
| ║ English label extract | Script separate | Cross verify ║ | |
| ╚══════════════════════════════════════════════════════════════════╝ | |
| """ | |
| import os | |
| import uuid | |
| import time | |
| import re | |
| import datetime | |
| from threading import Thread | |
| from typing import Iterable, Dict, Any | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from PIL import Image | |
| os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
| os.environ["HF_HOME"] = "/tmp/hf_home" | |
| from transformers import ( | |
| AutoProcessor, | |
| AutoModelForImageTextToText, # Universal VLM loader — Qwen2VL + Qwen3VL dono | |
| TextIteratorStreamer, | |
| BitsAndBytesConfig, | |
| ) | |
| # Specific class imports — graceful fallback | |
| try: | |
| from transformers import Qwen3VLForConditionalGeneration | |
| QWEN3_AVAILABLE = True | |
| print("✅ Qwen3VLForConditionalGeneration available") | |
| except ImportError: | |
| QWEN3_AVAILABLE = False | |
| print("⚠️ Qwen3VL direct import not available — using AutoModel fallback") | |
| try: | |
| from transformers import Qwen2VLForConditionalGeneration | |
| QWEN2_AVAILABLE = True | |
| except ImportError: | |
| QWEN2_AVAILABLE = False | |
| try: | |
| from transformers import Qwen2_5_VLForConditionalGeneration | |
| QWEN25_AVAILABLE = True | |
| except ImportError: | |
| QWEN25_AVAILABLE = False | |
| try: | |
| from peft import PeftModel, PeftConfig | |
| PEFT_AVAILABLE = True | |
| print("✅ PEFT available") | |
| except ImportError: | |
| PEFT_AVAILABLE = False | |
| print("⚠️ PEFT not available") | |
| from gradio.themes import Soft | |
| from gradio.themes.utils import colors, fonts, sizes | |
| # ===== THEME ===== | |
| colors.steel_blue = colors.Color( | |
| name="steel_blue", | |
| c50="#EBF3F8", c100="#D3E5F0", c200="#A8CCE1", c300="#7DB3D2", | |
| c400="#529AC3", c500="#4682B4", c600="#3E72A0", c700="#36638C", | |
| c800="#2E5378", c900="#264364", c950="#1E3450", | |
| ) | |
| class SteelBlueTheme(Soft): | |
| def __init__(self, *, primary_hue=colors.gray, secondary_hue=colors.steel_blue, | |
| neutral_hue=colors.slate, text_size=sizes.text_lg, | |
| font=(fonts.GoogleFont("Outfit"), "Arial", "sans-serif"), | |
| font_mono=(fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace")): | |
| super().__init__(primary_hue=primary_hue, secondary_hue=secondary_hue, | |
| neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono) | |
| super().set( | |
| background_fill_primary="*primary_50", | |
| background_fill_primary_dark="*primary_900", | |
| body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)", | |
| body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)", | |
| button_primary_text_color="white", | |
| button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)", | |
| button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)", | |
| button_secondary_text_color="black", | |
| button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)", | |
| button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)", | |
| slider_color="*secondary_500", | |
| block_title_text_weight="600", | |
| block_border_width="3px", | |
| block_shadow="*shadow_drop_lg", | |
| button_primary_shadow="*shadow_drop_lg", | |
| button_large_padding="11px", | |
| color_accent_soft="*primary_100", | |
| block_label_background_fill="*primary_200", | |
| ) | |
| steel_blue_theme = SteelBlueTheme() | |
| css = """ | |
| #main-title h1 { font-size: 2.3em !important; } | |
| #output-title h2 { font-size: 2.2em !important; } | |
| .ra-wrap{ width: fit-content; } | |
| .ra-inner{ position: relative; display: inline-flex; align-items: center; gap: 0; padding: 6px; | |
| background: var(--neutral-200); border-radius: 9999px; overflow: hidden; } | |
| .ra-input{ display: none; } | |
| .ra-label{ position: relative; z-index: 2; padding: 8px 16px; font-family: inherit; font-size: 14px; | |
| font-weight: 600; color: var(--neutral-500); cursor: pointer; transition: color 0.2s; white-space: nowrap; } | |
| .ra-highlight{ position: absolute; z-index: 1; top: 6px; left: 6px; height: calc(100% - 12px); | |
| border-radius: 9999px; background: white; box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| transition: transform 0.2s, width 0.2s; } | |
| .ra-input:checked + .ra-label{ color: black; } | |
| .dark .ra-inner { background: var(--neutral-800); } | |
| .dark .ra-label { color: var(--neutral-400); } | |
| .dark .ra-highlight { background: var(--neutral-600); } | |
| .dark .ra-input:checked + .ra-label { color: white; } | |
| #gpu-duration-container { padding: 10px; border-radius: 8px; | |
| background: var(--background-fill-secondary); border: 1px solid var(--border-color-primary); margin-top: 10px; } | |
| """ | |
| MAX_MAX_NEW_TOKENS = 4096 | |
| DEFAULT_MAX_NEW_TOKENS = 1024 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| print("CUDA available:", torch.cuda.is_available()) | |
| if torch.cuda.is_available(): | |
| print("Device:", torch.cuda.get_device_name(0)) | |
| print("Using:", device) | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ UNIVERSAL PROMPTS ║ | |
| # ╚══════════════════════════════════════════╝ | |
| STEP1_EXTRACT_PROMPT = """You are a universal OCR engine. Transcribe ALL visible text from this document image. | |
| OUTPUT FORMAT — fill exactly as shown: | |
| PHOTO_PRESENT: yes/no | |
| PHOTO_LOCATION: [describe position: top-left / top-right / center-left / not found] | |
| SIGNATURE_PRESENT: yes/no | |
| SIGNATURE_LOCATION: [describe position: bottom-left / bottom-right / not found] | |
| MRZ_PRESENT: yes/no | |
| DETECTED_LANGUAGE: [list all languages visible e.g. Arabic+English, Farsi+English, Hindi+English, Chinese, English] | |
| ---TEXT_START--- | |
| [Every word, number, symbol, label and value visible — line by line] | |
| [Original script preserved: Arabic, Farsi, Hindi, Chinese, Cyrillic etc. — DO NOT translate here] | |
| [Copy label AND its value together: e.g. "DATE OF BIRTH 12/05/2003"] | |
| [MRZ lines: copy character-perfect including ALL < symbols] | |
| [Include corner text, watermarks, small print] | |
| ---TEXT_END--- | |
| ABSOLUTE RULES: | |
| - NEVER output pixel coordinates like (50,68) or bounding boxes — plain text ONLY | |
| - DO NOT translate in this step — original script as-is | |
| - DO NOT skip or summarize any field | |
| - Copy every character exactly including < symbols in MRZ""" | |
| STEP2_TEMPLATE = """You are a universal KYC document analyst. | |
| The Python pipeline has already extracted English fields and parsed MRZ. | |
| Your job is ONLY: classify document + fill gaps from non-English text. | |
| ━━━ ALREADY EXTRACTED BY PYTHON (DO NOT RE-EXTRACT) ━━━ | |
| English Fields Found Directly on Card: | |
| {python_fields_table} | |
| MRZ Python Parse Result: | |
| {mrz_summary} | |
| ━━━ YOUR INPUT DATA ━━━ | |
| English text block from card: | |
| {english_block} | |
| Non-English original script block: | |
| {original_block} | |
| ━━━ YOUR TASKS — ONLY THESE 3 ━━━ | |
| TASK 1: Identify document type and issuing info | |
| - Read English block and original block | |
| - Keywords: PASSPORT/RESIDENT CARD/NATIONAL ID/DRIVING LICENCE/بطاقة/جواز/رخصة/आधार/PAN | |
| - Top of card = issuing country/institution (NOT person name) | |
| TASK 2: Classify non-English labels → check if already in English fields above | |
| - If نام (Farsi: Name) value already in Python English fields → SKIP | |
| - If شماره ملی (National Number) already in Python fields → SKIP | |
| - Only add fields GENUINELY missing from Python extraction | |
| TASK 3: Transliterate non-English values NOT found in English block | |
| - Example: محمد → Mohammad | چراغی → Cheraghi | |
| - Dates in Shamsi/Hijri: write BOTH original AND note calendar type | |
| (DO NOT convert — Python handles conversion) | |
| RULES: | |
| - NEVER copy template placeholders like [fill here] or [value] | |
| - NEVER re-state what Python already found | |
| - NEVER guess values not visible in card | |
| - If all fields already covered → write "✅ All fields covered by Python extraction" | |
| ━━━ OUTPUT FORMAT ━━━ | |
| --- | |
| ## 📋 Document Classification | |
| | | | | |
| |---|---| | |
| | **Document Type** | | | |
| | **Issuing Country** | | | |
| | **Issuing Authority** | | | |
| --- | |
| ## ➕ Additional Fields (non-English only — genuinely new) | |
| | Label (Original) | Label (English) | Value (Original) | Value (Transliterated) | | |
| |---|---|---|---| | |
| | [only if not in Python fields above] | | | | | |
| --- | |
| ## 🗓️ Calendar Note (if non-Gregorian dates found) | |
| | Original Date | Calendar System | Note | | |
| |---|---|---| | |
| | [date as on card] | [Solar Hijri / Lunar Hijri / Buddhist] | Python will convert | | |
| ---""" | |
| def load_vl_model(model_id: str, quantization_config=None, pre_quantized: bool = False): | |
| """ | |
| Universal VLM loader — Qwen2VL / Qwen3VL / any VLM | |
| pre_quantized=True → model already has weights quantized, no extra config needed | |
| pre_quantized=False → apply quantization_config during load | |
| """ | |
| load_kwargs = { | |
| "torch_dtype": "auto", | |
| "device_map": "auto", | |
| "trust_remote_code": True, | |
| } | |
| if quantization_config is not None and not pre_quantized: | |
| load_kwargs["quantization_config"] = quantization_config | |
| # Try 1: Qwen3VL (newest) | |
| if QWEN3_AVAILABLE: | |
| try: | |
| return Qwen3VLForConditionalGeneration.from_pretrained( | |
| model_id, **load_kwargs).eval() | |
| except Exception as e: | |
| print(f" Qwen3VL failed: {e}, trying AutoModel...") | |
| # Try 2: AutoModelForImageTextToText (universal fallback) | |
| try: | |
| return AutoModelForImageTextToText.from_pretrained( | |
| model_id, **load_kwargs).eval() | |
| except Exception as e: | |
| print(f" AutoModel failed: {e}, trying Qwen2VL...") | |
| # Try 3: Qwen2VL last resort | |
| if QWEN2_AVAILABLE: | |
| return Qwen2VLForConditionalGeneration.from_pretrained( | |
| model_id, **load_kwargs).eval() | |
| raise RuntimeError(f"No compatible loader found for {model_id}") | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ MODEL LOADING ║ | |
| # ╚══════════════════════════════════════════╝ | |
| print("\n" + "="*70) | |
| print("🚀 LOADING 4 MODELS") | |
| print("="*70) | |
| # 4-bit BitsAndBytes config (shared for quantized models) | |
| bnb_4bit_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| # ── Model 1: Chhagan_ML-VL-OCR-v1 (LoRA on Qwen2VL base) ── | |
| print("\n1️⃣ Chhagan_ML-VL-OCR-v1 (LoRA Refined)...") | |
| MODEL_ID_C1 = "Chhagan005/Chhagan_ML-VL-OCR-v1" | |
| CHHAGAN_V1_AVAILABLE = False | |
| processor_c1 = model_c1 = None | |
| if PEFT_AVAILABLE: | |
| try: | |
| config = PeftConfig.from_pretrained(MODEL_ID_C1) | |
| base_id = config.base_model_name_or_path | |
| processor_c1 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True) | |
| base_c1 = load_vl_model(base_id) | |
| model_c1 = PeftModel.from_pretrained(base_c1, MODEL_ID_C1).to(device).eval() | |
| print(" ✅ Loaded!") | |
| CHHAGAN_V1_AVAILABLE = True | |
| except Exception as e: | |
| print(f" ❌ Failed: {e}") | |
| else: | |
| print(" ⚠️ PEFT not available") | |
| # ── Model 2: Chhagan-DocVL-Qwen3 (LoRA on Qwen3VL base) ── | |
| print("\n2️⃣ Chhagan-DocVL-Qwen3 (Qwen3-VL Refined)...") | |
| MODEL_ID_C2 = "Chhagan005/Chhagan-DocVL-Qwen3" | |
| CHHAGAN_QWEN3_AVAILABLE = False | |
| processor_c2 = model_c2 = None | |
| if PEFT_AVAILABLE: | |
| try: | |
| config = PeftConfig.from_pretrained(MODEL_ID_C2) | |
| base_id = config.base_model_name_or_path | |
| processor_c2 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True) | |
| base_c2 = load_vl_model(base_id) | |
| model_c2 = PeftModel.from_pretrained(base_c2, MODEL_ID_C2).to(device).eval() | |
| print(" ✅ Loaded!") | |
| CHHAGAN_QWEN3_AVAILABLE = True | |
| except Exception as e: | |
| print(f" ❌ Failed: {e}") | |
| else: | |
| print(" ⚠️ PEFT not available") | |
| # ── Model 3: CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized) ── | |
| print("\n3️⃣ CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized BNB)...") | |
| MODEL_ID_Q4KM = "Chhagan005/CSM-DocExtract-VL-Q4KM" | |
| CSM_Q4KM_AVAILABLE = False | |
| processor_q4km = model_q4km = None | |
| try: | |
| processor_q4km = AutoProcessor.from_pretrained( | |
| MODEL_ID_Q4KM, trust_remote_code=True | |
| ) | |
| # Pre-quantized safetensors → torch_dtype=auto, NO extra quantization_config | |
| model_q4km = Qwen3VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_Q4KM, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ).eval() | |
| print(" ✅ Loaded! (Qwen3VL pre-quantized BNB ~6.4GB)") | |
| CSM_Q4KM_AVAILABLE = True | |
| except Exception as e: | |
| try: | |
| model_q4km = AutoModelForImageTextToText.from_pretrained( | |
| MODEL_ID_Q4KM, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ).eval() | |
| print(" ✅ Loaded! (AutoModel fallback)") | |
| CSM_Q4KM_AVAILABLE = True | |
| except Exception as e2: | |
| print(f" ❌ Failed: {e2}") | |
| # ── Model 4: CSM-DocExtract-VL (Full Qwen3VL, BNB INT4 trained) ── | |
| print("\n4️⃣ CSM-DocExtract-VL 4BNB (Full Qwen3VL, BNB INT4 trained)...") | |
| MODEL_ID_4BNB = "Chhagan005/CSM-DocExtract-VL" | |
| CSM_4BNB_AVAILABLE = False | |
| processor_4bnb = model_4bnb = None | |
| system_prompt_4bnb = "You are a helpful assistant." # default | |
| try: | |
| # Read custom system_prompt.txt — this model was trained with it | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| sp_path = hf_hub_download(repo_id=MODEL_ID_4BNB, filename="system_prompt.txt") | |
| with open(sp_path, "r", encoding="utf-8") as f: | |
| system_prompt_4bnb = f.read().strip() | |
| print(f" 📋 system_prompt.txt loaded: {system_prompt_4bnb[:80]}...") | |
| except Exception as sp_err: | |
| print(f" ⚠️ system_prompt.txt not loaded: {sp_err} — using default") | |
| processor_4bnb = AutoProcessor.from_pretrained( | |
| MODEL_ID_4BNB, trust_remote_code=True | |
| ) | |
| # BNB INT4 trained safetensors → torch_dtype=auto, NO extra quantization_config | |
| # (ignore .gguf files — those are for llama.cpp, not transformers) | |
| model_4bnb = Qwen3VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_4BNB, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ignore_mismatched_sizes=True, # GGUF files present — ignore safely | |
| ).eval() | |
| print(" ✅ Loaded! (Qwen3VL BNB INT4 trained ~6.4GB)") | |
| CSM_4BNB_AVAILABLE = True | |
| except Exception as e: | |
| try: | |
| model_4bnb = AutoModelForImageTextToText.from_pretrained( | |
| MODEL_ID_4BNB, | |
| torch_dtype="auto", | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ).eval() | |
| print(" ✅ Loaded! (AutoModel fallback)") | |
| CSM_4BNB_AVAILABLE = True | |
| except Exception as e2: | |
| print(f" ❌ Failed: {e2}") | |
| print("\n" + "="*70) | |
| print("📊 MODEL STATUS") | |
| print("="*70) | |
| status = [ | |
| ("Chhagan_ML-VL-OCR-v1", CHHAGAN_V1_AVAILABLE, "LoRA Fine-tuned"), | |
| ("Chhagan-DocVL-Qwen3", CHHAGAN_QWEN3_AVAILABLE, "Qwen3-VL Fine-tuned"), | |
| ("CSM-DocExtract-Q4KM", CSM_Q4KM_AVAILABLE, "Qwen3VL Q4KM pre-quantized"), | |
| ("CSM-DocExtract-4BNB", CSM_4BNB_AVAILABLE, "Qwen3VL BitsAndBytes 4-bit"), | |
| ] | |
| for name, ok, note in status: | |
| print(f" {'✅' if ok else '❌'} {name:<35} {note}") | |
| print("="*70) | |
| loaded = sum(x[1] for x in status) | |
| print(f" Total loaded: {loaded}/4\n") | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ PYTHON PIPELINE FUNCTIONS ║ | |
| # ╚══════════════════════════════════════════╝ | |
| def convert_eastern_numerals(text: str) -> str: | |
| """P2: Convert Persian/Arabic/Devanagari numerals to Western 0-9""" | |
| tables = [ | |
| str.maketrans('۰۱۲۳۴۵۶۷۸۹', '0123456789'), # Persian | |
| str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789'), # Arabic | |
| str.maketrans('०१२३४५६७८९', '0123456789'), # Devanagari | |
| str.maketrans('০১২৩৪৫৬৭৮৯', '0123456789'), # Bengali | |
| str.maketrans('੦੧੨੩੪੫੬੭੮੯', '0123456789'), # Gurmukhi | |
| ] | |
| for table in tables: | |
| text = text.translate(table) | |
| return text | |
| def detect_calendar_system(raw_text: str) -> str: | |
| """Detect calendar system from country/language context""" | |
| text_upper = raw_text.upper() | |
| if any(kw in raw_text for kw in ['جمهوری اسلامی ایران', 'IRAN', 'AFGHANISTAN', 'افغانستان']): | |
| return 'solar_hijri' | |
| if any(kw in text_upper for kw in ['SAUDI', 'ARABIA', 'السعودية', 'KUWAIT', 'QATAR', 'BAHRAIN', 'JORDAN']): | |
| return 'lunar_hijri' | |
| return 'gregorian' | |
| def convert_shamsi_to_gregorian(shamsi_date: str) -> str: | |
| """P3: Solar Hijri (Shamsi) → Gregorian using khayyam library""" | |
| try: | |
| import khayyam | |
| parts = re.split(r'[/\-\.]', shamsi_date.strip()) | |
| if len(parts) == 3: | |
| y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) | |
| jd = khayyam.JalaliDate(y, m, d) | |
| greg = jd.todate() | |
| return f"{greg.day:02d}/{greg.month:02d}/{greg.year}" | |
| except ImportError: | |
| # Approximate manual conversion if khayyam not installed | |
| try: | |
| parts = re.split(r'[/\-\.]', shamsi_date.strip()) | |
| y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) | |
| greg_year = y + 621 | |
| return f"{d:02d}/{m:02d}/{greg_year} (approx)" | |
| except: | |
| pass | |
| except Exception: | |
| pass | |
| return f"{shamsi_date} (Shamsi)" | |
| def convert_hijri_to_gregorian(hijri_date: str) -> str: | |
| """P3: Lunar Hijri → Gregorian using hijri library""" | |
| try: | |
| from hijri_converter import convert | |
| parts = re.split(r'[/\-\.]', hijri_date.strip()) | |
| if len(parts) == 3: | |
| y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) | |
| greg = convert.Hijri(y, m, d).to_gregorian() | |
| return f"{greg.day:02d}/{greg.month:02d}/{greg.year}" | |
| except ImportError: | |
| try: | |
| parts = re.split(r'[/\-\.]', hijri_date.strip()) | |
| y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) | |
| greg_year = y - 43 + 622 | |
| return f"{d:02d}/{m:02d}/{greg_year} (approx)" | |
| except: | |
| pass | |
| except: | |
| pass | |
| return f"{hijri_date} (Hijri)" | |
| def separate_scripts(raw_text: str) -> tuple: | |
| """P5: Separate English/Latin lines from non-Latin script lines""" | |
| english_lines = [] | |
| original_lines = [] | |
| for line in raw_text.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| non_latin = sum(1 for c in line if ord(c) > 591) | |
| total_alpha = sum(1 for c in line if c.isalpha()) | |
| if total_alpha == 0: | |
| english_lines.append(line) | |
| elif non_latin / max(total_alpha, 1) > 0.4: | |
| original_lines.append(line) | |
| else: | |
| english_lines.append(line) | |
| return '\n'.join(english_lines), '\n'.join(original_lines) | |
| def extract_english_fields(raw_text: str) -> list: | |
| """P4: Extract English label:value pairs directly from card text — no AI""" | |
| results = [] | |
| patterns = [ | |
| (r'(?:FULL\s+)?NAME\s*[:\-.]?\s*([A-Za-z][A-Za-z\s\-\.\']{1,60})', 'NAME'), | |
| (r'DATE\s+OF\s+BIRTH\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), | |
| (r'\bDOB\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), | |
| (r'BIRTH\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'), | |
| (r'EXPIRY\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), | |
| (r'DATE\s+OF\s+EXPIRY\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), | |
| (r'VALID(?:\s+THRU|\s+UNTIL|ITY)?\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), | |
| (r'EXPIRATION\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'), | |
| (r'(?:DATE\s+OF\s+)?ISSUE\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'), | |
| (r'DATE\s+OF\s+ISSUE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'), | |
| (r'CIVIL\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'CIVIL NUMBER'), | |
| (r'PASSPORT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{6,12})', 'PASSPORT NUMBER'), | |
| (r'LICENCE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'), | |
| (r'LICENSE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'), | |
| (r'AADHAAR\s*(?:NO\.?|NUMBER)?\s*[:\-.]?\s*(\d{4}\s?\d{4}\s?\d{4})', 'AADHAAR NUMBER'), | |
| (r'\bPAN\s*[:\-.]?\s*([A-Z]{5}\d{4}[A-Z])', 'PAN NUMBER'), | |
| (r'EMIRATES\s+ID\s*[:\-.]?\s*(\d{3}-\d{4}-\d{7}-\d)', 'EMIRATES ID'), | |
| (r'(?:NATIONAL\s+)?ID\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'ID NUMBER'), | |
| (r'DOCUMENT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'DOCUMENT NUMBER'), | |
| (r'NATIONALITY\s*[:\-.]?\s*([A-Za-z]{3,30})', 'NATIONALITY'), | |
| (r'(?:GENDER|SEX)\s*[:\-.]?\s*(MALE|FEMALE)', 'GENDER'), | |
| (r'PLACE\s+OF\s+BIRTH\s*[:\-.]?\s*([A-Za-z\s,]{2,40})', 'PLACE OF BIRTH'), | |
| (r'(?:PERMANENT\s+)?ADDRESS\s*[:\-.]?\s*(.{5,80})', 'ADDRESS'), | |
| (r'BLOOD\s+(?:GROUP|TYPE)\s*[:\-.]?\s*([ABO]{1,2}[+-]?)', 'BLOOD GROUP'), | |
| (r'(?:PROFESSION|OCCUPATION|JOB\s+TITLE)\s*[:\-.]?\s*(.{3,50})', 'PROFESSION'), | |
| (r'FATHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "FATHER'S NAME"), | |
| (r'MOTHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "MOTHER'S NAME"), | |
| (r'EMPLOYER\s*[:\-.]?\s*(.{3,60})', 'EMPLOYER'), | |
| ] | |
| seen = set() | |
| for pattern, label in patterns: | |
| m = re.search(pattern, raw_text, re.IGNORECASE) | |
| if m and label not in seen: | |
| val = m.group(1).strip() | |
| if val and len(val) > 1 and '[' not in val: | |
| results.append((label, val)) | |
| seen.add(label) | |
| return results | |
| def parse_mrz_lines(raw_text: str) -> dict: | |
| """P1: Authoritative Python MRZ parser — TD1, TD3, MRVA, MRVB""" | |
| # Normalize: western numerals only | |
| raw_text = convert_eastern_numerals(raw_text) | |
| lines = [] | |
| for line in raw_text.split('\n'): | |
| clean = re.sub(r'\s+', '', line.strip()) | |
| if re.match(r'^[A-Z0-9<]{25,50}$', clean): | |
| lines.append(clean) | |
| if not lines: | |
| return {} | |
| def decode_date(yymmdd: str, is_dob: bool = False) -> str: | |
| try: | |
| yy, mm, dd = int(yymmdd[0:2]), int(yymmdd[2:4]), int(yymmdd[4:6]) | |
| if not (1 <= mm <= 12 and 1 <= dd <= 31): | |
| return f"Invalid ({yymmdd})" | |
| cur_yy = datetime.datetime.now().year % 100 | |
| year = (1900 + yy) if (is_dob and yy > cur_yy) else (2000 + yy) | |
| return f"{dd:02d}/{mm:02d}/{year}" | |
| except: | |
| return yymmdd | |
| def clean_fill(s: str) -> str: | |
| return re.sub(r'<+$', '', s).replace('<', ' ').strip() | |
| def parse_name(line3: str) -> str: | |
| name_clean = re.sub(r'<+$', '', line3) | |
| if '<<' in name_clean: | |
| parts = name_clean.split('<<') | |
| surname = parts[0].replace('<', ' ').strip().title() | |
| given = parts[1].replace('<', ' ').strip().title() if len(parts) > 1 else '' | |
| return f"{given} {surname}".strip() if given else surname | |
| return name_clean.replace('<', ' ').strip().title() | |
| result = {} | |
| # TD1: 3 lines, 28-36 chars | |
| td1 = [l for l in lines if 28 <= len(l) <= 36] | |
| if len(td1) >= 2: | |
| l1, l2 = td1[0], td1[1] | |
| l3 = td1[2] if len(td1) > 2 else "" | |
| result['doc_type'] = clean_fill(l1[0:2]) | |
| result['country_code'] = clean_fill(l1[2:5]) | |
| result['doc_number'] = clean_fill(l1[5:14]) | |
| if len(l2) >= 19: | |
| result['dob'] = decode_date(l2[0:6], is_dob=True) | |
| sex = l2[7] if len(l2) > 7 else '' | |
| result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') | |
| result['expiry'] = decode_date(l2[8:14], is_dob=False) | |
| result['nationality'] = clean_fill(l2[15:18]) | |
| if l3: | |
| result['name'] = parse_name(l3) | |
| result['mrz_format'] = 'TD1' | |
| return result | |
| # TD3: 2 lines, 40-48 chars (Passports) | |
| td3 = [l for l in lines if 40 <= len(l) <= 48] | |
| if len(td3) >= 2: | |
| l1, l2 = td3[0], td3[1] | |
| result['doc_type'] = clean_fill(l1[0:2]) | |
| result['country_code'] = clean_fill(l1[2:5]) | |
| result['name'] = parse_name(l1[5:44]) | |
| if len(l2) >= 27: | |
| result['doc_number'] = clean_fill(l2[0:9]) | |
| result['nationality'] = clean_fill(l2[10:13]) | |
| result['dob'] = decode_date(l2[13:19], is_dob=True) | |
| sex = l2[20] if len(l2) > 20 else '' | |
| result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') | |
| result['expiry'] = decode_date(l2[21:27], is_dob=False) | |
| result['mrz_format'] = 'TD3' | |
| return result | |
| # MRVA/MRVB: 2 lines, 36 chars (Visas) | |
| mrv = [l for l in lines if 36 <= len(l) <= 38] | |
| if len(mrv) >= 2: | |
| l1, l2 = mrv[0], mrv[1] | |
| result['doc_type'] = clean_fill(l1[0:2]) | |
| result['country_code'] = clean_fill(l1[2:5]) | |
| result['name'] = parse_name(l1[5:36]) | |
| if len(l2) >= 27: | |
| result['doc_number'] = clean_fill(l2[0:9]) | |
| result['nationality'] = clean_fill(l2[10:13]) | |
| result['dob'] = decode_date(l2[13:19], is_dob=True) | |
| sex = l2[20] if len(l2) > 20 else '' | |
| result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown') | |
| result['expiry'] = decode_date(l2[21:27], is_dob=False) | |
| result['mrz_format'] = 'MRVA/MRVB' | |
| return result | |
| return {} | |
| def build_mrz_table(mrz_data: dict) -> str: | |
| if not mrz_data: | |
| return "No MRZ detected." | |
| table = f"**Python Parsed MRZ — Authoritative ({mrz_data.get('mrz_format','?')} format):**\n\n" | |
| table += "| Field | Verified Value |\n|---|---|\n" | |
| fields = [ | |
| ('mrz_format', 'MRZ Format'), | |
| ('doc_type', 'Document Type'), | |
| ('country_code', 'Issuing Country Code'), | |
| ('doc_number', 'Document / Civil Number'), | |
| ('name', 'Full Name'), | |
| ('dob', 'Date of Birth'), | |
| ('expiry', 'Expiry Date'), | |
| ('nationality', 'User Nationality'), | |
| ('sex', 'Gender'), | |
| ] | |
| for key, label in fields: | |
| if key in mrz_data: | |
| table += f"| {label} | **{mrz_data[key]}** ✅ |\n" | |
| return table | |
| def build_unified_summary(front_result: str, back_result: str, mrz_data: dict) -> str: | |
| """P6: Merge front+back fields, MRZ as ground truth override""" | |
| summary = "## 🔄 Unified Deduplicated Record\n\n" | |
| if mrz_data: | |
| summary += f"> ✅ *MRZ Python-parsed ({mrz_data.get('mrz_format','?')}) — MRZ values are **ground truth**.*\n\n" | |
| summary += "### 🔐 MRZ Ground Truth\n\n" | |
| summary += build_mrz_table(mrz_data) + "\n\n---\n\n" | |
| else: | |
| summary += "> *No MRZ — fields merged from front+back. Conflicts flagged ⚠️.*\n\n" | |
| def get_rows(text): | |
| rows = {} | |
| m = re.search(r"## (?:✅|🗂️)[^\n]*\n\|[^\n]*\n\|[-| ]+\n(.*?)(?=\n---|\Z)", text, re.DOTALL) | |
| if m: | |
| for line in m.group(1).strip().split('\n'): | |
| parts = [p.strip() for p in line.split('|') if p.strip()] | |
| if len(parts) >= 2: | |
| field = re.sub(r'[^\w\s/\']', '', parts[0]).strip() | |
| val = parts[1].strip() | |
| if val and val.lower() not in ('—', 'not on card', 'n/a', ''): | |
| rows[field] = val | |
| return rows | |
| front_f = get_rows(front_result) | |
| back_f = get_rows(back_result) | |
| all_f = list(dict.fromkeys(list(front_f.keys()) + list(back_f.keys()))) | |
| # MRZ lookup | |
| mrz_map = {} | |
| if mrz_data: | |
| kw_map = { | |
| 'name': ['name'], | |
| 'doc_number': ['civil', 'document', 'id', 'passport', 'licence'], | |
| 'dob': ['birth', 'dob'], | |
| 'expiry': ['expiry', 'expiration'], | |
| 'sex': ['gender', 'sex'], | |
| 'nationality':['nationality'], | |
| } | |
| for mk, keywords in kw_map.items(): | |
| if mk in mrz_data: | |
| for kw in keywords: | |
| mrz_map[kw] = mrz_data[mk] | |
| def get_mrz(field): | |
| fl = field.lower() | |
| for kw, v in mrz_map.items(): | |
| if kw in fl: | |
| return v | |
| return None | |
| summary += "### 📋 Field Comparison\n\n| Field | Value | Source |\n|---|---|---|\n" | |
| for field in all_f: | |
| fv = front_f.get(field, '') | |
| bv = back_f.get(field, '') | |
| mv = get_mrz(field) | |
| if fv and bv: | |
| if fv.lower() == bv.lower(): | |
| note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else ("⚠️ MRZ differs: **" + mv + "**" if mv else "") | |
| summary += f"| {field} | {fv} | Front+Back ✅ {note} |\n" | |
| else: | |
| if mv: | |
| summary += f"| {field} | ~~{fv}~~ / ~~{bv}~~ → **{mv}** | ✅ MRZ Override |\n" | |
| else: | |
| summary += f"| {field} | F: **{fv}** / B: **{bv}** | ⚠️ Mismatch |\n" | |
| elif fv: | |
| note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "") | |
| summary += f"| {field} | {fv} | Front only {note} |\n" | |
| elif bv: | |
| note = f"✅ MRZ Confirmed" if mv and any(x in bv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "") | |
| summary += f"| {field} | {bv} | Back only {note} |\n" | |
| return summary + "\n" | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ STEP PIPELINE FUNCTIONS ║ | |
| # ╚══════════════════════════════════════════╝ | |
| def run_step1_extraction(model, processor, image, device, temperature, top_p, top_k, repetition_penalty, system_prompt=None): | |
| """Step 1: LLM → Raw OCR, original script, NO translation, NO coordinates""" | |
| def _generate(prompt_text): | |
| try: | |
| from qwen_vl_utils import process_vision_info | |
| HAS_QWEN_VL_UTILS = True | |
| except ImportError: | |
| HAS_QWEN_VL_UTILS = False | |
| sys_msg = system_prompt or "You are a helpful assistant." | |
| messages = [ | |
| {"role": "system", "content": sys_msg}, | |
| {"role": "user", "content": [ | |
| {"type": "image", "image": image}, | |
| {"type": "text", "text": prompt_text}, | |
| ]} | |
| ] | |
| # Step A: Build prompt string | |
| try: | |
| prompt = processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| if not isinstance(prompt, str): | |
| raise TypeError("non-string returned") | |
| except Exception: | |
| # Manual Qwen3VL token format — universal fallback | |
| prompt = ( | |
| "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" | |
| "<|im_start|>user\n" | |
| "<|vision_start|><|image_pad|><|vision_end|>" | |
| f"{prompt_text}<|im_end|>\n" | |
| "<|im_start|>assistant\n" | |
| ) | |
| # Step B: Build inputs — 3 fallback tiers | |
| inputs = None | |
| # Tier 1: qwen_vl_utils + images/videos kwargs (Qwen3VL standard) | |
| if HAS_QWEN_VL_UTILS and inputs is None: | |
| try: | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| proc_kwargs = { | |
| "text": [prompt], | |
| "padding": True, | |
| "return_tensors": "pt" | |
| } | |
| if image_inputs is not None and len(image_inputs) > 0: | |
| proc_kwargs["images"] = image_inputs | |
| if video_inputs is not None and len(video_inputs) > 0: | |
| proc_kwargs["videos"] = video_inputs | |
| inputs = processor(**proc_kwargs).to(device) | |
| print(" ✅ Tier1: qwen_vl_utils") | |
| except Exception as e: | |
| print(f" Tier1 failed: {e}") | |
| inputs = None | |
| # Tier 2: Direct PIL image (Qwen2VL style) | |
| if inputs is None: | |
| try: | |
| inputs = processor( | |
| text=[prompt], | |
| images=[image], | |
| padding=True, | |
| return_tensors="pt", | |
| ).to(device) | |
| print(" ✅ Tier2: direct PIL") | |
| except Exception as e: | |
| print(f" Tier2 failed: {e}") | |
| inputs = None | |
| # Tier 3: Text-only (last resort) | |
| if inputs is None: | |
| print(" ⚠️ Tier3: text-only fallback (no image — degraded)") | |
| inputs = processor( | |
| text=[prompt], | |
| padding=True, | |
| return_tensors="pt", | |
| ).to(device) | |
| with torch.no_grad(): | |
| out = model.generate( | |
| **inputs, | |
| max_new_tokens=600, | |
| do_sample=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k, | |
| repetition_penalty=repetition_penalty, | |
| ) | |
| gen = out[:, inputs['input_ids'].shape[1]:] | |
| decoded = processor.batch_decode(gen, skip_special_tokens=True) | |
| if isinstance(decoded, list): | |
| return decoded[0] if decoded else "" | |
| return str(decoded) if decoded else "" | |
| result = _generate(STEP1_EXTRACT_PROMPT) | |
| # Coordinate output detect → retry with simpler prompt | |
| if re.search(r'\(\d{1,4},\s*\d{1,4}\)', result) or '---TEXT_START---' not in result: | |
| print(" ⚠️ Retrying with fallback prompt...") | |
| fallback = ( | |
| "Read all text from this document image and write it line by line in plain text.\n" | |
| "Do NOT output coordinates or bounding boxes.\n" | |
| "Start output with:\n" | |
| "PHOTO_PRESENT: yes or no\n" | |
| "SIGNATURE_PRESENT: yes or no\n" | |
| "MRZ_PRESENT: yes or no\n" | |
| "DETECTED_LANGUAGE: name the language(s)\n" | |
| "---TEXT_START---\n" | |
| "[all text here exactly as printed]\n" | |
| "---TEXT_END---" | |
| ) | |
| result = _generate(fallback) | |
| return result | |
| def parse_step1_output(raw_output: str) -> dict: | |
| """Parse Step 1 structured output → metadata + original text""" | |
| result = { | |
| "photo_present": "❌ No", | |
| "photo_location": "N/A", | |
| "sig_present": "❌ No", | |
| "sig_location": "N/A", | |
| "mrz_present": "❌ No", | |
| "detected_lang": "Unknown", | |
| "original_text": raw_output, | |
| } | |
| def get(pattern, text, default="N/A"): | |
| m = re.search(pattern, text, re.IGNORECASE) | |
| return m.group(1).strip() if m else default | |
| photo = get(r'PHOTO_PRESENT:\s*(yes|no)', raw_output) | |
| result["photo_present"] = "✅ Yes" if photo.lower() == "yes" else "❌ No" | |
| result["photo_location"] = get(r'PHOTO_LOCATION:\s*([^\n]+)', raw_output) | |
| sig = get(r'SIGNATURE_PRESENT:\s*(yes|no)', raw_output) | |
| result["sig_present"] = "✅ Yes" if sig.lower() == "yes" else "❌ No" | |
| result["sig_location"] = get(r'SIGNATURE_LOCATION:\s*([^\n]+)', raw_output) | |
| mrz = get(r'MRZ_PRESENT:\s*(yes|no)', raw_output) | |
| result["mrz_present"] = "✅ Yes" if mrz.lower() == "yes" else "❌ No" | |
| result["detected_lang"] = get(r'DETECTED_LANGUAGE:\s*([^\n]+)', raw_output, "Unknown") | |
| m = re.search(r'---TEXT_START---\n?(.*?)---TEXT_END---', raw_output, re.DOTALL) | |
| if m: | |
| result["original_text"] = m.group(1).strip() | |
| return result | |
| def run_step2_structure(model, processor, metadata: dict, device, | |
| max_new_tokens, temperature, top_p, top_k, repetition_penalty): | |
| """Step 2: Python extracts English fields + MRZ. LLM only classifies + fills gaps.""" | |
| raw_text = metadata.get('original_text', '') | |
| # P2: Convert eastern numerals first | |
| raw_text_normalized = convert_eastern_numerals(raw_text) | |
| # P5: Separate scripts | |
| english_block, original_block = separate_scripts(raw_text_normalized) | |
| # P4: Direct English field extraction | |
| english_fields = extract_english_fields(raw_text_normalized) | |
| # P1: MRZ parse (authoritative) | |
| mrz_data = parse_mrz_lines(raw_text_normalized) | |
| # P3: Calendar detection + conversion (for display) | |
| calendar_sys = detect_calendar_system(raw_text) | |
| # Build python fields table | |
| if english_fields: | |
| tbl = "| Field (as printed on card) | Value (as printed) |\n|---|---|\n" | |
| for label, val in english_fields: | |
| tbl += f"| **{label}** | {val} |\n" | |
| else: | |
| tbl = "| — | No English label:value pairs detected |\n" | |
| # MRZ summary | |
| if mrz_data: | |
| mrz_summary = " | ".join([f"{k}: {v}" for k, v in mrz_data.items() if k != 'mrz_format']) | |
| mrz_summary = f"✅ {mrz_data.get('mrz_format','?')} parsed: {mrz_summary}" | |
| else: | |
| mrz_summary = "❌ No MRZ detected" | |
| # Non-Gregorian note | |
| cal_note = "" | |
| if calendar_sys == 'solar_hijri': | |
| cal_note = "\n> ⚠️ **Solar Hijri (Shamsi) calendar detected** — Python will convert dates to Gregorian." | |
| elif calendar_sys == 'lunar_hijri': | |
| cal_note = "\n> ⚠️ **Lunar Hijri calendar detected** — Python will convert dates to Gregorian." | |
| # Build prompt for LLM (classification + gaps only) | |
| prompt_text = STEP2_TEMPLATE.format( | |
| python_fields_table=tbl, | |
| mrz_summary=mrz_summary, | |
| english_block=english_block or "None", | |
| original_block=original_block or "None", | |
| ) | |
| messages = [{"role": "user", "content": [{"type": "text", "text": prompt_text}]}] | |
| try: | |
| prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| except: | |
| prompt = prompt_text | |
| inputs = processor( | |
| text=[prompt], | |
| padding=True, | |
| return_tensors="pt", | |
| ).to(device) | |
| streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) | |
| gen_kwargs = { | |
| **inputs, "streamer": streamer, "max_new_tokens": max_new_tokens, | |
| "do_sample": True, "temperature": temperature, "top_p": top_p, | |
| "top_k": top_k, "repetition_penalty": repetition_penalty, | |
| } | |
| thread = Thread(target=model.generate, kwargs=gen_kwargs) | |
| thread.start() | |
| # Pre-build Python-verified sections | |
| # ── Pre-compute outside f-string (backslash fix for Python < 3.12) ── | |
| newline = "\n" | |
| mrz_pattern = r'^[A-Z0-9<]{25,50}$' | |
| ws_pattern = r'\s+' | |
| mrz_raw_lines = [] | |
| for _l in raw_text.split("\n"): | |
| _c = re.sub(ws_pattern, '', _l.strip()) | |
| if re.match(mrz_pattern, _c): | |
| mrz_raw_lines.append(_c) | |
| mrz_raw_display = newline.join(mrz_raw_lines) if mrz_raw_lines else "NOT PRESENT" | |
| mrz_table_str = build_mrz_table(mrz_data) if mrz_data else "_No MRZ detected._" | |
| # Pre-build Python-verified sections | |
| python_sections = ( | |
| "## 🖼️ Visual Elements\n\n" | |
| "| Element | Status | Location |\n" | |
| "|---------|--------|----------|\n" | |
| f"| 📷 Profile Photo | {metadata['photo_present']} | {metadata['photo_location']} |\n" | |
| f"| ✍️ Signature | {metadata['sig_present']} | {metadata['sig_location']} |\n" | |
| f"| 🔐 MRZ Zone | {metadata['mrz_present']} | Bottom strip |\n\n" | |
| "---\n\n" | |
| "## ✅ English Fields (Direct from Card — Not Modified)\n" | |
| f"{cal_note}\n\n" | |
| f"{tbl}\n\n" | |
| "---\n\n" | |
| "## 📜 Original Script\n\n" | |
| "```\n" | |
| f"{raw_text}\n" | |
| "```\n\n" | |
| "---\n\n" | |
| "## 🔐 MRZ Data\n\n" | |
| "```\n" | |
| f"{mrz_raw_display}\n" | |
| "```\n\n" | |
| f"{mrz_table_str}\n\n" | |
| "---\n\n" | |
| ) | |
| return streamer, thread, mrz_data, python_sections | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ GRADIO HELPER CLASSES ║ | |
| # ╚══════════════════════════════════════════╝ | |
| class RadioAnimated(gr.HTML): | |
| def __init__(self, choices, value=None, **kwargs): | |
| if not choices or len(choices) < 2: | |
| raise ValueError("RadioAnimated requires at least 2 choices.") | |
| if value is None: | |
| value = choices[0] | |
| uid = uuid.uuid4().hex[:8] | |
| group_name = f"ra-{uid}" | |
| inputs_html = "\n".join( | |
| f'<input class="ra-input" type="radio" name="{group_name}" id="{group_name}-{i}" value="{c}">' | |
| f'<label class="ra-label" for="{group_name}-{i}">{c}</label>' | |
| for i, c in enumerate(choices) | |
| ) | |
| html_template = f""" | |
| <div class="ra-wrap" data-ra="{uid}"> | |
| <div class="ra-inner"><div class="ra-highlight"></div>{inputs_html}</div> | |
| </div>""" | |
| js_on_load = r""" | |
| (() => { | |
| const highlight = element.querySelector('.ra-highlight'); | |
| const inputs = Array.from(element.querySelectorAll('.ra-input')); | |
| if (!inputs.length) return; | |
| const choices = inputs.map(i => i.value); | |
| function setHighlight(idx) { | |
| highlight.style.width = `calc(${100/choices.length}% - 6px)`; | |
| highlight.style.transform = `translateX(${idx * 100}%)`; | |
| } | |
| function setVal(val, trigger=false) { | |
| const idx = Math.max(0, choices.indexOf(val)); | |
| inputs.forEach((inp, i) => { inp.checked = (i === idx); }); | |
| setHighlight(idx); | |
| props.value = choices[idx]; | |
| if (trigger) trigger('change', props.value); | |
| } | |
| setVal(props.value ?? choices[0], false); | |
| inputs.forEach(inp => inp.addEventListener('change', () => setVal(inp.value, true))); | |
| })();""" | |
| super().__init__(value=value, html_template=html_template, js_on_load=js_on_load, **kwargs) | |
| def apply_gpu_duration(val: str): | |
| return int(val) | |
| def calc_timeout_duration(model_name, text, image_front, image_back, | |
| max_new_tokens, temperature, top_p, top_k, | |
| repetition_penalty, gpu_timeout): | |
| try: | |
| base = int(gpu_timeout) | |
| return base * 2 if (image_front is not None and image_back is not None) else base | |
| except: | |
| return 180 | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ MAIN PIPELINE FUNCTION ║ | |
| # ╚══════════════════════════════════════════╝ | |
| def generate_dual_card_ocr(model_name: str, text: str, | |
| image_front: Image.Image, image_back: Image.Image, | |
| max_new_tokens: int, temperature: float, top_p: float, | |
| top_k: int, repetition_penalty: float, gpu_timeout: int): | |
| # Model selection | |
| model_map = { | |
| "Chhagan-ID-OCR-v1 ⭐": (CHHAGAN_V1_AVAILABLE, processor_c1, model_c1), | |
| "Chhagan-DocVL-Qwen3 🔥": (CHHAGAN_QWEN3_AVAILABLE, processor_c2, model_c2), | |
| "CSM-DocExtract-Q4KM 🏆": (CSM_Q4KM_AVAILABLE, processor_q4km, model_q4km), | |
| "CSM-DocExtract-4BNB 💎": (CSM_4BNB_AVAILABLE, processor_4bnb, model_4bnb), | |
| } | |
| if model_name not in model_map: | |
| yield "Invalid model.", "Invalid model."; return | |
| available, processor, model = model_map[model_name] | |
| if not available: | |
| yield f"{model_name} not available.", f"{model_name} not available."; return | |
| if image_front is None and image_back is None: | |
| yield "Please upload at least one card image.", "Please upload at least one card image."; return | |
| full_output = "" | |
| front_result = "" | |
| back_result = "" | |
| all_mrz_data = {} | |
| front_meta_saved = {} | |
| back_meta_saved = {} | |
| # ───── FRONT CARD ───── | |
| if image_front is not None: | |
| full_output += "# 🎴 FRONT CARD\n\n" | |
| full_output += "⏳ **Step 1/2 — Raw OCR (original script, no translation)...**\n\n" | |
| yield full_output, full_output | |
| # Model 4 ke liye system prompt pass karo | |
| sys_p = system_prompt_4bnb if model_name == "CSM-DocExtract-4BNB 💎" else None | |
| step1_raw = run_step1_extraction(model, processor, image_front, device, | |
| temperature, top_p, top_k, repetition_penalty, | |
| system_prompt=sys_p) | |
| front_meta = parse_step1_output(step1_raw) | |
| front_meta_saved = front_meta | |
| full_output += f"✅ **Step 1 Done** — 🌐 Language: **{front_meta['detected_lang']}**\n\n" | |
| full_output += "⏳ **Step 2/2 — Python extract + LLM classify...**\n\n" | |
| yield full_output, full_output | |
| streamer_f, thread_f, mrz_f, python_sections_f = run_step2_structure( | |
| model, processor, front_meta, device, | |
| max_new_tokens, temperature, top_p, top_k, repetition_penalty) | |
| if mrz_f: | |
| all_mrz_data = mrz_f | |
| buffer_f = python_sections_f | |
| yield full_output + buffer_f, full_output + buffer_f | |
| for new_text in streamer_f: | |
| buffer_f += new_text.replace("<|im_end|>", "").replace("<|endoftext|>", "") | |
| time.sleep(0.01) | |
| yield full_output + buffer_f, full_output + buffer_f | |
| full_output += buffer_f + "\n\n" | |
| front_result = buffer_f | |
| thread_f.join() | |
| # ───── BACK CARD ───── | |
| if image_back is not None: | |
| full_output += "\n\n---\n\n# 🎴 BACK CARD\n\n" | |
| full_output += "⏳ **Step 1/2 — Raw OCR (original script, no translation)...**\n\n" | |
| yield full_output, full_output | |
| step1_raw_back = run_step1_extraction(model, processor, image_back, device, | |
| temperature, top_p, top_k, repetition_penalty) | |
| back_meta = parse_step1_output(step1_raw_back) | |
| back_meta_saved = back_meta | |
| full_output += f"✅ **Step 1 Done** — 🌐 Language: **{back_meta['detected_lang']}**\n\n" | |
| full_output += "⏳ **Step 2/2 — Python extract + LLM classify...**\n\n" | |
| yield full_output, full_output | |
| streamer_b, thread_b, mrz_b, python_sections_b = run_step2_structure( | |
| model, processor, back_meta, device, | |
| max_new_tokens, temperature, top_p, top_k, repetition_penalty) | |
| if mrz_b and not all_mrz_data: | |
| all_mrz_data = mrz_b | |
| buffer_b = python_sections_b | |
| yield full_output + buffer_b, full_output + buffer_b | |
| for new_text in streamer_b: | |
| buffer_b += new_text.replace("<|im_end|>", "").replace("<|endoftext|>", "") | |
| time.sleep(0.01) | |
| yield full_output + buffer_b, full_output + buffer_b | |
| full_output += buffer_b | |
| back_result = buffer_b | |
| thread_b.join() | |
| # ───── UNIFIED SUMMARY ───── | |
| if image_front is not None and image_back is not None: | |
| full_output += "\n\n---\n\n" | |
| full_output += build_unified_summary(front_result, back_result, all_mrz_data) | |
| mrz_note = f"MRZ: ✅ {all_mrz_data.get('mrz_format','?')} verified" if all_mrz_data else "MRZ: ❌ Not detected" | |
| full_output += f"\n\n---\n\n**✨ Complete** | Model: `{model_name}` | {mrz_note} | Pipeline: OCR → Python Extract → LLM Classify\n" | |
| yield full_output, full_output | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ MODEL CHOICES ║ | |
| # ╚══════════════════════════════════════════╝ | |
| model_choices = [] | |
| if CHHAGAN_V1_AVAILABLE: model_choices.append("Chhagan-ID-OCR-v1 ⭐") | |
| if CHHAGAN_QWEN3_AVAILABLE: model_choices.append("Chhagan-DocVL-Qwen3 🔥") | |
| if CSM_Q4KM_AVAILABLE: model_choices.append("CSM-DocExtract-Q4KM 🏆") | |
| if CSM_4BNB_AVAILABLE: model_choices.append("CSM-DocExtract-4BNB 💎") | |
| if not model_choices: model_choices = ["No models available"] | |
| dual_card_examples = [ | |
| ["Extract complete information", "examples/5.jpg", None], | |
| ["Multilingual OCR with MRZ", "examples/4.jpg", None], | |
| ["Extract profile photo and signature", "examples/2.jpg", None], | |
| ] | |
| # ╔══════════════════════════════════════════╗ | |
| # ║ GRADIO UI ║ | |
| # ╚══════════════════════════════════════════╝ | |
| demo = gr.Blocks(css=css, theme=steel_blue_theme) | |
| with demo: | |
| gr.Markdown("# 🌍 **CSM Dual-Card ID OCR System**", elem_id="main-title") | |
| gr.Markdown("### *Universal Document Extraction — MRZ + Multilingual + Auto Calendar*") | |
| loaded_models = [] | |
| if CHHAGAN_V1_AVAILABLE: loaded_models.append("ID-OCR-v1 ⭐") | |
| if CHHAGAN_QWEN3_AVAILABLE: loaded_models.append("DocVL-Qwen3 🔥") | |
| if CSM_Q4KM_AVAILABLE: loaded_models.append("Q4KM 🏆") | |
| if CSM_4BNB_AVAILABLE: loaded_models.append("4BNB 💎") | |
| model_info = f"**Loaded ({len(loaded_models)}/4):** {', '.join(loaded_models)}" if loaded_models else "⚠️ No models" | |
| gr.Markdown(f"**Status:** {model_info}") | |
| gr.Markdown("**Pipeline:** ✅ Step1: Raw OCR → ✅ Python: MRZ+English Extract → ✅ LLM: Classify+Gaps → ✅ Deduplicate") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| image_query = gr.Textbox( | |
| label="💬 Custom Query (Optional)", | |
| placeholder="Leave empty for automatic full extraction...", | |
| value="" | |
| ) | |
| gr.Markdown("### 📤 Upload ID Cards") | |
| with gr.Row(): | |
| image_front = gr.Image(type="pil", label="🎴 Front Card", height=250) | |
| image_back = gr.Image(type="pil", label="🎴 Back Card (Optional)", height=250) | |
| image_submit = gr.Button("🚀 Extract + Translate + Structure", variant="primary", size="lg") | |
| gr.Examples( | |
| examples=dual_card_examples, | |
| inputs=[image_query, image_front, image_back], | |
| label="📸 Sample ID Cards" | |
| ) | |
| with gr.Accordion("⚙️ Advanced Settings", open=False): | |
| max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) | |
| temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6) | |
| top_p = gr.Slider(label="Top-p", minimum=0.05, maximum=1.0, step=0.05, value=0.9) | |
| top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) | |
| repetition_penalty= gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.1) | |
| with gr.Column(scale=3): | |
| gr.Markdown("## 📄 Extraction Results", elem_id="output-title") | |
| output = gr.Textbox(label="Raw Output (Streaming)", interactive=True, lines=15) | |
| with gr.Accordion("📝 Structured Preview", open=True): | |
| markdown_output = gr.Markdown(label="Formatted Result") | |
| model_choice = gr.Radio( | |
| choices=model_choices, | |
| label="🤖 Select Model", | |
| value=model_choices[0] if model_choices else None, | |
| info="🏆💎 = 8B Quantized (best) | 🔥 = Qwen3 Fine-tuned | ⭐ = LoRA" | |
| ) | |
| with gr.Row(elem_id="gpu-duration-container"): | |
| with gr.Column(): | |
| gr.Markdown("**⏱️ GPU Duration (seconds)**") | |
| radioanimated_gpu_duration = RadioAnimated( | |
| choices=["60", "90", "120", "180", "240"], | |
| value="180", | |
| elem_id="radioanimated_gpu_duration" | |
| ) | |
| gpu_duration_state = gr.Number(value=180, visible=False) | |
| gr.Markdown(""" | |
| **✨ What This Extracts:** | |
| - 🔐 MRZ: TD1/TD3/MRVA/MRVB — Python parsed, 100% accurate | |
| - ✅ English fields: Direct from card, not modified | |
| - 📜 Original script: Arabic/Farsi/Hindi/Chinese as-is | |
| - 🗓️ Calendar: Shamsi/Hijri → Gregorian conversion | |
| - 🔢 Eastern numerals: ۱۲۳ → 123 automatic | |
| - 🔄 Front+Back: Deduplicated, MRZ-verified | |
| """) | |
| radioanimated_gpu_duration.change( | |
| fn=apply_gpu_duration, | |
| inputs=radioanimated_gpu_duration, | |
| outputs=[gpu_duration_state], | |
| api_visibility="private" | |
| ) | |
| image_submit.click( | |
| fn=generate_dual_card_ocr, | |
| inputs=[model_choice, image_query, image_front, image_back, | |
| max_new_tokens, temperature, top_p, top_k, | |
| repetition_penalty, gpu_duration_state], | |
| outputs=[output, markdown_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### 🎯 Feature Matrix | |
| | Feature | Method | Accuracy | | |
| |---------|--------|---------| | |
| | MRZ Parse (TD1/TD3/MRVA) | Python | 100% | | |
| | English Labels Extract | Python Regex | 100% | | |
| | Eastern Numeral Convert | Python char map | 100% | | |
| | Shamsi/Hijri Calendar | Python library | 100% | | |
| | Raw OCR (32+ scripts) | 8B VLM | 90%+ | | |
| | Doc Type Classification | 8B VLM | 95%+ | | |
| | Non-English Translation | 8B VLM | 90%+ | | |
| | Front+Back Deduplication | Python | 100% | | |
| ### 📋 Supported Documents | |
| 🇮🇳 Aadhaar, PAN, Passport | 🇦🇪 Emirates ID | 🇸🇦 Iqama | 🇴🇲 Oman Resident Card | |
| 🌍 International Passports (MRZ) | 🚗 Driving Licences | 🇮🇷 Iranian National ID (Shamsi) | |
| ### 🔒 Privacy | |
| All processing on-device | No data stored | GDPR compliant | |
| """) | |
| if __name__ == "__main__": | |
| print("\n🚀 STARTING...") | |
| try: | |
| demo.queue(max_size=50).launch( | |
| server_name="0.0.0.0", server_port=7860, show_error=True, share=False) | |
| except Exception as e: | |
| import traceback | |
| print(f"❌ {e}") | |
| traceback.print_exc() | |