Multi_ML_OCR / app.py
Chhagan005's picture
Update app.py
6fb62c2 verified
"""
╔══════════════════════════════════════════════════════════════════╗
║ CSM DUAL-CARD ID OCR SYSTEM — ARCHITECTURE NOTE ║
╠══════════════════════════════════════════════════════════════════╣
║ MODEL TASKS (8B VLM): ║
║ Step 1 → Raw OCR: All text, original script, no translate ║
║ Step 2 → Doc classify + non-English gap fill only ║
║ PYTHON TASKS (Authoritative): ║
║ MRZ parse+verify | Numeral convert | Calendar convert ║
║ English label extract | Script separate | Cross verify ║
╚══════════════════════════════════════════════════════════════════╝
"""
import os
import uuid
import time
import re
import datetime
from threading import Thread
from typing import Iterable, Dict, Any
import gradio as gr
import spaces
import torch
from PIL import Image
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
os.environ["HF_HOME"] = "/tmp/hf_home"
from transformers import (
AutoProcessor,
AutoModelForImageTextToText, # Universal VLM loader — Qwen2VL + Qwen3VL dono
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Specific class imports — graceful fallback
try:
from transformers import Qwen3VLForConditionalGeneration
QWEN3_AVAILABLE = True
print("✅ Qwen3VLForConditionalGeneration available")
except ImportError:
QWEN3_AVAILABLE = False
print("⚠️ Qwen3VL direct import not available — using AutoModel fallback")
try:
from transformers import Qwen2VLForConditionalGeneration
QWEN2_AVAILABLE = True
except ImportError:
QWEN2_AVAILABLE = False
try:
from transformers import Qwen2_5_VLForConditionalGeneration
QWEN25_AVAILABLE = True
except ImportError:
QWEN25_AVAILABLE = False
try:
from peft import PeftModel, PeftConfig
PEFT_AVAILABLE = True
print("✅ PEFT available")
except ImportError:
PEFT_AVAILABLE = False
print("⚠️ PEFT not available")
from gradio.themes import Soft
from gradio.themes.utils import colors, fonts, sizes
# ===== THEME =====
colors.steel_blue = colors.Color(
name="steel_blue",
c50="#EBF3F8", c100="#D3E5F0", c200="#A8CCE1", c300="#7DB3D2",
c400="#529AC3", c500="#4682B4", c600="#3E72A0", c700="#36638C",
c800="#2E5378", c900="#264364", c950="#1E3450",
)
class SteelBlueTheme(Soft):
def __init__(self, *, primary_hue=colors.gray, secondary_hue=colors.steel_blue,
neutral_hue=colors.slate, text_size=sizes.text_lg,
font=(fonts.GoogleFont("Outfit"), "Arial", "sans-serif"),
font_mono=(fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace")):
super().__init__(primary_hue=primary_hue, secondary_hue=secondary_hue,
neutral_hue=neutral_hue, text_size=text_size, font=font, font_mono=font_mono)
super().set(
background_fill_primary="*primary_50",
background_fill_primary_dark="*primary_900",
body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)",
body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)",
button_primary_text_color="white",
button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)",
button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_secondary_text_color="black",
button_secondary_background_fill="linear-gradient(90deg, *primary_300, *primary_300)",
button_secondary_background_fill_hover="linear-gradient(90deg, *primary_400, *primary_400)",
slider_color="*secondary_500",
block_title_text_weight="600",
block_border_width="3px",
block_shadow="*shadow_drop_lg",
button_primary_shadow="*shadow_drop_lg",
button_large_padding="11px",
color_accent_soft="*primary_100",
block_label_background_fill="*primary_200",
)
steel_blue_theme = SteelBlueTheme()
css = """
#main-title h1 { font-size: 2.3em !important; }
#output-title h2 { font-size: 2.2em !important; }
.ra-wrap{ width: fit-content; }
.ra-inner{ position: relative; display: inline-flex; align-items: center; gap: 0; padding: 6px;
background: var(--neutral-200); border-radius: 9999px; overflow: hidden; }
.ra-input{ display: none; }
.ra-label{ position: relative; z-index: 2; padding: 8px 16px; font-family: inherit; font-size: 14px;
font-weight: 600; color: var(--neutral-500); cursor: pointer; transition: color 0.2s; white-space: nowrap; }
.ra-highlight{ position: absolute; z-index: 1; top: 6px; left: 6px; height: calc(100% - 12px);
border-radius: 9999px; background: white; box-shadow: 0 2px 4px rgba(0,0,0,0.1);
transition: transform 0.2s, width 0.2s; }
.ra-input:checked + .ra-label{ color: black; }
.dark .ra-inner { background: var(--neutral-800); }
.dark .ra-label { color: var(--neutral-400); }
.dark .ra-highlight { background: var(--neutral-600); }
.dark .ra-input:checked + .ra-label { color: white; }
#gpu-duration-container { padding: 10px; border-radius: 8px;
background: var(--background-fill-secondary); border: 1px solid var(--border-color-primary); margin-top: 10px; }
"""
MAX_MAX_NEW_TOKENS = 4096
DEFAULT_MAX_NEW_TOKENS = 1024
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
print("Device:", torch.cuda.get_device_name(0))
print("Using:", device)
# ╔══════════════════════════════════════════╗
# ║ UNIVERSAL PROMPTS ║
# ╚══════════════════════════════════════════╝
STEP1_EXTRACT_PROMPT = """You are a universal OCR engine. Transcribe ALL visible text from this document image.
OUTPUT FORMAT — fill exactly as shown:
PHOTO_PRESENT: yes/no
PHOTO_LOCATION: [describe position: top-left / top-right / center-left / not found]
SIGNATURE_PRESENT: yes/no
SIGNATURE_LOCATION: [describe position: bottom-left / bottom-right / not found]
MRZ_PRESENT: yes/no
DETECTED_LANGUAGE: [list all languages visible e.g. Arabic+English, Farsi+English, Hindi+English, Chinese, English]
---TEXT_START---
[Every word, number, symbol, label and value visible — line by line]
[Original script preserved: Arabic, Farsi, Hindi, Chinese, Cyrillic etc. — DO NOT translate here]
[Copy label AND its value together: e.g. "DATE OF BIRTH 12/05/2003"]
[MRZ lines: copy character-perfect including ALL < symbols]
[Include corner text, watermarks, small print]
---TEXT_END---
ABSOLUTE RULES:
- NEVER output pixel coordinates like (50,68) or bounding boxes — plain text ONLY
- DO NOT translate in this step — original script as-is
- DO NOT skip or summarize any field
- Copy every character exactly including < symbols in MRZ"""
STEP2_TEMPLATE = """You are a universal KYC document analyst.
The Python pipeline has already extracted English fields and parsed MRZ.
Your job is ONLY: classify document + fill gaps from non-English text.
━━━ ALREADY EXTRACTED BY PYTHON (DO NOT RE-EXTRACT) ━━━
English Fields Found Directly on Card:
{python_fields_table}
MRZ Python Parse Result:
{mrz_summary}
━━━ YOUR INPUT DATA ━━━
English text block from card:
{english_block}
Non-English original script block:
{original_block}
━━━ YOUR TASKS — ONLY THESE 3 ━━━
TASK 1: Identify document type and issuing info
- Read English block and original block
- Keywords: PASSPORT/RESIDENT CARD/NATIONAL ID/DRIVING LICENCE/بطاقة/جواز/رخصة/आधार/PAN
- Top of card = issuing country/institution (NOT person name)
TASK 2: Classify non-English labels → check if already in English fields above
- If نام (Farsi: Name) value already in Python English fields → SKIP
- If شماره ملی (National Number) already in Python fields → SKIP
- Only add fields GENUINELY missing from Python extraction
TASK 3: Transliterate non-English values NOT found in English block
- Example: محمد → Mohammad | چراغی → Cheraghi
- Dates in Shamsi/Hijri: write BOTH original AND note calendar type
(DO NOT convert — Python handles conversion)
RULES:
- NEVER copy template placeholders like [fill here] or [value]
- NEVER re-state what Python already found
- NEVER guess values not visible in card
- If all fields already covered → write "✅ All fields covered by Python extraction"
━━━ OUTPUT FORMAT ━━━
---
## 📋 Document Classification
| | |
|---|---|
| **Document Type** | |
| **Issuing Country** | |
| **Issuing Authority** | |
---
## ➕ Additional Fields (non-English only — genuinely new)
| Label (Original) | Label (English) | Value (Original) | Value (Transliterated) |
|---|---|---|---|
| [only if not in Python fields above] | | | |
---
## 🗓️ Calendar Note (if non-Gregorian dates found)
| Original Date | Calendar System | Note |
|---|---|---|
| [date as on card] | [Solar Hijri / Lunar Hijri / Buddhist] | Python will convert |
---"""
def load_vl_model(model_id: str, quantization_config=None, pre_quantized: bool = False):
"""
Universal VLM loader — Qwen2VL / Qwen3VL / any VLM
pre_quantized=True → model already has weights quantized, no extra config needed
pre_quantized=False → apply quantization_config during load
"""
load_kwargs = {
"torch_dtype": "auto",
"device_map": "auto",
"trust_remote_code": True,
}
if quantization_config is not None and not pre_quantized:
load_kwargs["quantization_config"] = quantization_config
# Try 1: Qwen3VL (newest)
if QWEN3_AVAILABLE:
try:
return Qwen3VLForConditionalGeneration.from_pretrained(
model_id, **load_kwargs).eval()
except Exception as e:
print(f" Qwen3VL failed: {e}, trying AutoModel...")
# Try 2: AutoModelForImageTextToText (universal fallback)
try:
return AutoModelForImageTextToText.from_pretrained(
model_id, **load_kwargs).eval()
except Exception as e:
print(f" AutoModel failed: {e}, trying Qwen2VL...")
# Try 3: Qwen2VL last resort
if QWEN2_AVAILABLE:
return Qwen2VLForConditionalGeneration.from_pretrained(
model_id, **load_kwargs).eval()
raise RuntimeError(f"No compatible loader found for {model_id}")
# ╔══════════════════════════════════════════╗
# ║ MODEL LOADING ║
# ╚══════════════════════════════════════════╝
print("\n" + "="*70)
print("🚀 LOADING 4 MODELS")
print("="*70)
# 4-bit BitsAndBytes config (shared for quantized models)
bnb_4bit_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
)
# ── Model 1: Chhagan_ML-VL-OCR-v1 (LoRA on Qwen2VL base) ──
print("\n1️⃣ Chhagan_ML-VL-OCR-v1 (LoRA Refined)...")
MODEL_ID_C1 = "Chhagan005/Chhagan_ML-VL-OCR-v1"
CHHAGAN_V1_AVAILABLE = False
processor_c1 = model_c1 = None
if PEFT_AVAILABLE:
try:
config = PeftConfig.from_pretrained(MODEL_ID_C1)
base_id = config.base_model_name_or_path
processor_c1 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True)
base_c1 = load_vl_model(base_id)
model_c1 = PeftModel.from_pretrained(base_c1, MODEL_ID_C1).to(device).eval()
print(" ✅ Loaded!")
CHHAGAN_V1_AVAILABLE = True
except Exception as e:
print(f" ❌ Failed: {e}")
else:
print(" ⚠️ PEFT not available")
# ── Model 2: Chhagan-DocVL-Qwen3 (LoRA on Qwen3VL base) ──
print("\n2️⃣ Chhagan-DocVL-Qwen3 (Qwen3-VL Refined)...")
MODEL_ID_C2 = "Chhagan005/Chhagan-DocVL-Qwen3"
CHHAGAN_QWEN3_AVAILABLE = False
processor_c2 = model_c2 = None
if PEFT_AVAILABLE:
try:
config = PeftConfig.from_pretrained(MODEL_ID_C2)
base_id = config.base_model_name_or_path
processor_c2 = AutoProcessor.from_pretrained(base_id, trust_remote_code=True)
base_c2 = load_vl_model(base_id)
model_c2 = PeftModel.from_pretrained(base_c2, MODEL_ID_C2).to(device).eval()
print(" ✅ Loaded!")
CHHAGAN_QWEN3_AVAILABLE = True
except Exception as e:
print(f" ❌ Failed: {e}")
else:
print(" ⚠️ PEFT not available")
# ── Model 3: CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized) ──
print("\n3️⃣ CSM-DocExtract-VL-Q4KM (Full Qwen3VL, pre-quantized BNB)...")
MODEL_ID_Q4KM = "Chhagan005/CSM-DocExtract-VL-Q4KM"
CSM_Q4KM_AVAILABLE = False
processor_q4km = model_q4km = None
try:
processor_q4km = AutoProcessor.from_pretrained(
MODEL_ID_Q4KM, trust_remote_code=True
)
# Pre-quantized safetensors → torch_dtype=auto, NO extra quantization_config
model_q4km = Qwen3VLForConditionalGeneration.from_pretrained(
MODEL_ID_Q4KM,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
).eval()
print(" ✅ Loaded! (Qwen3VL pre-quantized BNB ~6.4GB)")
CSM_Q4KM_AVAILABLE = True
except Exception as e:
try:
model_q4km = AutoModelForImageTextToText.from_pretrained(
MODEL_ID_Q4KM,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
).eval()
print(" ✅ Loaded! (AutoModel fallback)")
CSM_Q4KM_AVAILABLE = True
except Exception as e2:
print(f" ❌ Failed: {e2}")
# ── Model 4: CSM-DocExtract-VL (Full Qwen3VL, BNB INT4 trained) ──
print("\n4️⃣ CSM-DocExtract-VL 4BNB (Full Qwen3VL, BNB INT4 trained)...")
MODEL_ID_4BNB = "Chhagan005/CSM-DocExtract-VL"
CSM_4BNB_AVAILABLE = False
processor_4bnb = model_4bnb = None
system_prompt_4bnb = "You are a helpful assistant." # default
try:
# Read custom system_prompt.txt — this model was trained with it
try:
from huggingface_hub import hf_hub_download
sp_path = hf_hub_download(repo_id=MODEL_ID_4BNB, filename="system_prompt.txt")
with open(sp_path, "r", encoding="utf-8") as f:
system_prompt_4bnb = f.read().strip()
print(f" 📋 system_prompt.txt loaded: {system_prompt_4bnb[:80]}...")
except Exception as sp_err:
print(f" ⚠️ system_prompt.txt not loaded: {sp_err} — using default")
processor_4bnb = AutoProcessor.from_pretrained(
MODEL_ID_4BNB, trust_remote_code=True
)
# BNB INT4 trained safetensors → torch_dtype=auto, NO extra quantization_config
# (ignore .gguf files — those are for llama.cpp, not transformers)
model_4bnb = Qwen3VLForConditionalGeneration.from_pretrained(
MODEL_ID_4BNB,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
ignore_mismatched_sizes=True, # GGUF files present — ignore safely
).eval()
print(" ✅ Loaded! (Qwen3VL BNB INT4 trained ~6.4GB)")
CSM_4BNB_AVAILABLE = True
except Exception as e:
try:
model_4bnb = AutoModelForImageTextToText.from_pretrained(
MODEL_ID_4BNB,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
).eval()
print(" ✅ Loaded! (AutoModel fallback)")
CSM_4BNB_AVAILABLE = True
except Exception as e2:
print(f" ❌ Failed: {e2}")
print("\n" + "="*70)
print("📊 MODEL STATUS")
print("="*70)
status = [
("Chhagan_ML-VL-OCR-v1", CHHAGAN_V1_AVAILABLE, "LoRA Fine-tuned"),
("Chhagan-DocVL-Qwen3", CHHAGAN_QWEN3_AVAILABLE, "Qwen3-VL Fine-tuned"),
("CSM-DocExtract-Q4KM", CSM_Q4KM_AVAILABLE, "Qwen3VL Q4KM pre-quantized"),
("CSM-DocExtract-4BNB", CSM_4BNB_AVAILABLE, "Qwen3VL BitsAndBytes 4-bit"),
]
for name, ok, note in status:
print(f" {'✅' if ok else '❌'} {name:<35} {note}")
print("="*70)
loaded = sum(x[1] for x in status)
print(f" Total loaded: {loaded}/4\n")
# ╔══════════════════════════════════════════╗
# ║ PYTHON PIPELINE FUNCTIONS ║
# ╚══════════════════════════════════════════╝
def convert_eastern_numerals(text: str) -> str:
"""P2: Convert Persian/Arabic/Devanagari numerals to Western 0-9"""
tables = [
str.maketrans('۰۱۲۳۴۵۶۷۸۹', '0123456789'), # Persian
str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789'), # Arabic
str.maketrans('०१२३४५६७८९', '0123456789'), # Devanagari
str.maketrans('০১২৩৪৫৬৭৮৯', '0123456789'), # Bengali
str.maketrans('੦੧੨੩੪੫੬੭੮੯', '0123456789'), # Gurmukhi
]
for table in tables:
text = text.translate(table)
return text
def detect_calendar_system(raw_text: str) -> str:
"""Detect calendar system from country/language context"""
text_upper = raw_text.upper()
if any(kw in raw_text for kw in ['جمهوری اسلامی ایران', 'IRAN', 'AFGHANISTAN', 'افغانستان']):
return 'solar_hijri'
if any(kw in text_upper for kw in ['SAUDI', 'ARABIA', 'السعودية', 'KUWAIT', 'QATAR', 'BAHRAIN', 'JORDAN']):
return 'lunar_hijri'
return 'gregorian'
def convert_shamsi_to_gregorian(shamsi_date: str) -> str:
"""P3: Solar Hijri (Shamsi) → Gregorian using khayyam library"""
try:
import khayyam
parts = re.split(r'[/\-\.]', shamsi_date.strip())
if len(parts) == 3:
y, m, d = int(parts[0]), int(parts[1]), int(parts[2])
jd = khayyam.JalaliDate(y, m, d)
greg = jd.todate()
return f"{greg.day:02d}/{greg.month:02d}/{greg.year}"
except ImportError:
# Approximate manual conversion if khayyam not installed
try:
parts = re.split(r'[/\-\.]', shamsi_date.strip())
y, m, d = int(parts[0]), int(parts[1]), int(parts[2])
greg_year = y + 621
return f"{d:02d}/{m:02d}/{greg_year} (approx)"
except:
pass
except Exception:
pass
return f"{shamsi_date} (Shamsi)"
def convert_hijri_to_gregorian(hijri_date: str) -> str:
"""P3: Lunar Hijri → Gregorian using hijri library"""
try:
from hijri_converter import convert
parts = re.split(r'[/\-\.]', hijri_date.strip())
if len(parts) == 3:
y, m, d = int(parts[0]), int(parts[1]), int(parts[2])
greg = convert.Hijri(y, m, d).to_gregorian()
return f"{greg.day:02d}/{greg.month:02d}/{greg.year}"
except ImportError:
try:
parts = re.split(r'[/\-\.]', hijri_date.strip())
y, m, d = int(parts[0]), int(parts[1]), int(parts[2])
greg_year = y - 43 + 622
return f"{d:02d}/{m:02d}/{greg_year} (approx)"
except:
pass
except:
pass
return f"{hijri_date} (Hijri)"
def separate_scripts(raw_text: str) -> tuple:
"""P5: Separate English/Latin lines from non-Latin script lines"""
english_lines = []
original_lines = []
for line in raw_text.split('\n'):
line = line.strip()
if not line:
continue
non_latin = sum(1 for c in line if ord(c) > 591)
total_alpha = sum(1 for c in line if c.isalpha())
if total_alpha == 0:
english_lines.append(line)
elif non_latin / max(total_alpha, 1) > 0.4:
original_lines.append(line)
else:
english_lines.append(line)
return '\n'.join(english_lines), '\n'.join(original_lines)
def extract_english_fields(raw_text: str) -> list:
"""P4: Extract English label:value pairs directly from card text — no AI"""
results = []
patterns = [
(r'(?:FULL\s+)?NAME\s*[:\-.]?\s*([A-Za-z][A-Za-z\s\-\.\']{1,60})', 'NAME'),
(r'DATE\s+OF\s+BIRTH\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'),
(r'\bDOB\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'),
(r'BIRTH\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'DATE OF BIRTH'),
(r'EXPIRY\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'),
(r'DATE\s+OF\s+EXPIRY\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'),
(r'VALID(?:\s+THRU|\s+UNTIL|ITY)?\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'),
(r'EXPIRATION\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'EXPIRY DATE'),
(r'(?:DATE\s+OF\s+)?ISSUE\s+DATE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'),
(r'DATE\s+OF\s+ISSUE\s*[:\-.]?\s*(\d{1,2}[\s/\-\.]\d{1,2}[\s/\-\.]\d{2,4})', 'ISSUE DATE'),
(r'CIVIL\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'CIVIL NUMBER'),
(r'PASSPORT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{6,12})', 'PASSPORT NUMBER'),
(r'LICENCE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'),
(r'LICENSE\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'LICENCE NUMBER'),
(r'AADHAAR\s*(?:NO\.?|NUMBER)?\s*[:\-.]?\s*(\d{4}\s?\d{4}\s?\d{4})', 'AADHAAR NUMBER'),
(r'\bPAN\s*[:\-.]?\s*([A-Z]{5}\d{4}[A-Z])', 'PAN NUMBER'),
(r'EMIRATES\s+ID\s*[:\-.]?\s*(\d{3}-\d{4}-\d{7}-\d)', 'EMIRATES ID'),
(r'(?:NATIONAL\s+)?ID\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'ID NUMBER'),
(r'DOCUMENT\s+(?:NO\.?|NUMBER)\s*[:\-.]?\s*([A-Z0-9\-]{4,20})', 'DOCUMENT NUMBER'),
(r'NATIONALITY\s*[:\-.]?\s*([A-Za-z]{3,30})', 'NATIONALITY'),
(r'(?:GENDER|SEX)\s*[:\-.]?\s*(MALE|FEMALE)', 'GENDER'),
(r'PLACE\s+OF\s+BIRTH\s*[:\-.]?\s*([A-Za-z\s,]{2,40})', 'PLACE OF BIRTH'),
(r'(?:PERMANENT\s+)?ADDRESS\s*[:\-.]?\s*(.{5,80})', 'ADDRESS'),
(r'BLOOD\s+(?:GROUP|TYPE)\s*[:\-.]?\s*([ABO]{1,2}[+-]?)', 'BLOOD GROUP'),
(r'(?:PROFESSION|OCCUPATION|JOB\s+TITLE)\s*[:\-.]?\s*(.{3,50})', 'PROFESSION'),
(r'FATHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "FATHER'S NAME"),
(r'MOTHER(?:\'?S)?\s+NAME\s*[:\-.]?\s*([A-Za-z\s]{3,50})', "MOTHER'S NAME"),
(r'EMPLOYER\s*[:\-.]?\s*(.{3,60})', 'EMPLOYER'),
]
seen = set()
for pattern, label in patterns:
m = re.search(pattern, raw_text, re.IGNORECASE)
if m and label not in seen:
val = m.group(1).strip()
if val and len(val) > 1 and '[' not in val:
results.append((label, val))
seen.add(label)
return results
def parse_mrz_lines(raw_text: str) -> dict:
"""P1: Authoritative Python MRZ parser — TD1, TD3, MRVA, MRVB"""
# Normalize: western numerals only
raw_text = convert_eastern_numerals(raw_text)
lines = []
for line in raw_text.split('\n'):
clean = re.sub(r'\s+', '', line.strip())
if re.match(r'^[A-Z0-9<]{25,50}$', clean):
lines.append(clean)
if not lines:
return {}
def decode_date(yymmdd: str, is_dob: bool = False) -> str:
try:
yy, mm, dd = int(yymmdd[0:2]), int(yymmdd[2:4]), int(yymmdd[4:6])
if not (1 <= mm <= 12 and 1 <= dd <= 31):
return f"Invalid ({yymmdd})"
cur_yy = datetime.datetime.now().year % 100
year = (1900 + yy) if (is_dob and yy > cur_yy) else (2000 + yy)
return f"{dd:02d}/{mm:02d}/{year}"
except:
return yymmdd
def clean_fill(s: str) -> str:
return re.sub(r'<+$', '', s).replace('<', ' ').strip()
def parse_name(line3: str) -> str:
name_clean = re.sub(r'<+$', '', line3)
if '<<' in name_clean:
parts = name_clean.split('<<')
surname = parts[0].replace('<', ' ').strip().title()
given = parts[1].replace('<', ' ').strip().title() if len(parts) > 1 else ''
return f"{given} {surname}".strip() if given else surname
return name_clean.replace('<', ' ').strip().title()
result = {}
# TD1: 3 lines, 28-36 chars
td1 = [l for l in lines if 28 <= len(l) <= 36]
if len(td1) >= 2:
l1, l2 = td1[0], td1[1]
l3 = td1[2] if len(td1) > 2 else ""
result['doc_type'] = clean_fill(l1[0:2])
result['country_code'] = clean_fill(l1[2:5])
result['doc_number'] = clean_fill(l1[5:14])
if len(l2) >= 19:
result['dob'] = decode_date(l2[0:6], is_dob=True)
sex = l2[7] if len(l2) > 7 else ''
result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown')
result['expiry'] = decode_date(l2[8:14], is_dob=False)
result['nationality'] = clean_fill(l2[15:18])
if l3:
result['name'] = parse_name(l3)
result['mrz_format'] = 'TD1'
return result
# TD3: 2 lines, 40-48 chars (Passports)
td3 = [l for l in lines if 40 <= len(l) <= 48]
if len(td3) >= 2:
l1, l2 = td3[0], td3[1]
result['doc_type'] = clean_fill(l1[0:2])
result['country_code'] = clean_fill(l1[2:5])
result['name'] = parse_name(l1[5:44])
if len(l2) >= 27:
result['doc_number'] = clean_fill(l2[0:9])
result['nationality'] = clean_fill(l2[10:13])
result['dob'] = decode_date(l2[13:19], is_dob=True)
sex = l2[20] if len(l2) > 20 else ''
result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown')
result['expiry'] = decode_date(l2[21:27], is_dob=False)
result['mrz_format'] = 'TD3'
return result
# MRVA/MRVB: 2 lines, 36 chars (Visas)
mrv = [l for l in lines if 36 <= len(l) <= 38]
if len(mrv) >= 2:
l1, l2 = mrv[0], mrv[1]
result['doc_type'] = clean_fill(l1[0:2])
result['country_code'] = clean_fill(l1[2:5])
result['name'] = parse_name(l1[5:36])
if len(l2) >= 27:
result['doc_number'] = clean_fill(l2[0:9])
result['nationality'] = clean_fill(l2[10:13])
result['dob'] = decode_date(l2[13:19], is_dob=True)
sex = l2[20] if len(l2) > 20 else ''
result['sex'] = 'Male' if sex == 'M' else ('Female' if sex == 'F' else 'Unknown')
result['expiry'] = decode_date(l2[21:27], is_dob=False)
result['mrz_format'] = 'MRVA/MRVB'
return result
return {}
def build_mrz_table(mrz_data: dict) -> str:
if not mrz_data:
return "No MRZ detected."
table = f"**Python Parsed MRZ — Authoritative ({mrz_data.get('mrz_format','?')} format):**\n\n"
table += "| Field | Verified Value |\n|---|---|\n"
fields = [
('mrz_format', 'MRZ Format'),
('doc_type', 'Document Type'),
('country_code', 'Issuing Country Code'),
('doc_number', 'Document / Civil Number'),
('name', 'Full Name'),
('dob', 'Date of Birth'),
('expiry', 'Expiry Date'),
('nationality', 'User Nationality'),
('sex', 'Gender'),
]
for key, label in fields:
if key in mrz_data:
table += f"| {label} | **{mrz_data[key]}** ✅ |\n"
return table
def build_unified_summary(front_result: str, back_result: str, mrz_data: dict) -> str:
"""P6: Merge front+back fields, MRZ as ground truth override"""
summary = "## 🔄 Unified Deduplicated Record\n\n"
if mrz_data:
summary += f"> ✅ *MRZ Python-parsed ({mrz_data.get('mrz_format','?')}) — MRZ values are **ground truth**.*\n\n"
summary += "### 🔐 MRZ Ground Truth\n\n"
summary += build_mrz_table(mrz_data) + "\n\n---\n\n"
else:
summary += "> *No MRZ — fields merged from front+back. Conflicts flagged ⚠️.*\n\n"
def get_rows(text):
rows = {}
m = re.search(r"## (?:✅|🗂️)[^\n]*\n\|[^\n]*\n\|[-| ]+\n(.*?)(?=\n---|\Z)", text, re.DOTALL)
if m:
for line in m.group(1).strip().split('\n'):
parts = [p.strip() for p in line.split('|') if p.strip()]
if len(parts) >= 2:
field = re.sub(r'[^\w\s/\']', '', parts[0]).strip()
val = parts[1].strip()
if val and val.lower() not in ('—', 'not on card', 'n/a', ''):
rows[field] = val
return rows
front_f = get_rows(front_result)
back_f = get_rows(back_result)
all_f = list(dict.fromkeys(list(front_f.keys()) + list(back_f.keys())))
# MRZ lookup
mrz_map = {}
if mrz_data:
kw_map = {
'name': ['name'],
'doc_number': ['civil', 'document', 'id', 'passport', 'licence'],
'dob': ['birth', 'dob'],
'expiry': ['expiry', 'expiration'],
'sex': ['gender', 'sex'],
'nationality':['nationality'],
}
for mk, keywords in kw_map.items():
if mk in mrz_data:
for kw in keywords:
mrz_map[kw] = mrz_data[mk]
def get_mrz(field):
fl = field.lower()
for kw, v in mrz_map.items():
if kw in fl:
return v
return None
summary += "### 📋 Field Comparison\n\n| Field | Value | Source |\n|---|---|---|\n"
for field in all_f:
fv = front_f.get(field, '')
bv = back_f.get(field, '')
mv = get_mrz(field)
if fv and bv:
if fv.lower() == bv.lower():
note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else ("⚠️ MRZ differs: **" + mv + "**" if mv else "")
summary += f"| {field} | {fv} | Front+Back ✅ {note} |\n"
else:
if mv:
summary += f"| {field} | ~~{fv}~~ / ~~{bv}~~ → **{mv}** | ✅ MRZ Override |\n"
else:
summary += f"| {field} | F: **{fv}** / B: **{bv}** | ⚠️ Mismatch |\n"
elif fv:
note = f"✅ MRZ Confirmed" if mv and any(x in fv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "")
summary += f"| {field} | {fv} | Front only {note} |\n"
elif bv:
note = f"✅ MRZ Confirmed" if mv and any(x in bv.lower() for x in mv.lower().split()) else (f"⚠️ MRZ: **{mv}**" if mv else "")
summary += f"| {field} | {bv} | Back only {note} |\n"
return summary + "\n"
# ╔══════════════════════════════════════════╗
# ║ STEP PIPELINE FUNCTIONS ║
# ╚══════════════════════════════════════════╝
def run_step1_extraction(model, processor, image, device, temperature, top_p, top_k, repetition_penalty, system_prompt=None):
"""Step 1: LLM → Raw OCR, original script, NO translation, NO coordinates"""
def _generate(prompt_text):
try:
from qwen_vl_utils import process_vision_info
HAS_QWEN_VL_UTILS = True
except ImportError:
HAS_QWEN_VL_UTILS = False
sys_msg = system_prompt or "You are a helpful assistant."
messages = [
{"role": "system", "content": sys_msg},
{"role": "user", "content": [
{"type": "image", "image": image},
{"type": "text", "text": prompt_text},
]}
]
# Step A: Build prompt string
try:
prompt = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
if not isinstance(prompt, str):
raise TypeError("non-string returned")
except Exception:
# Manual Qwen3VL token format — universal fallback
prompt = (
"<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
"<|im_start|>user\n"
"<|vision_start|><|image_pad|><|vision_end|>"
f"{prompt_text}<|im_end|>\n"
"<|im_start|>assistant\n"
)
# Step B: Build inputs — 3 fallback tiers
inputs = None
# Tier 1: qwen_vl_utils + images/videos kwargs (Qwen3VL standard)
if HAS_QWEN_VL_UTILS and inputs is None:
try:
image_inputs, video_inputs = process_vision_info(messages)
proc_kwargs = {
"text": [prompt],
"padding": True,
"return_tensors": "pt"
}
if image_inputs is not None and len(image_inputs) > 0:
proc_kwargs["images"] = image_inputs
if video_inputs is not None and len(video_inputs) > 0:
proc_kwargs["videos"] = video_inputs
inputs = processor(**proc_kwargs).to(device)
print(" ✅ Tier1: qwen_vl_utils")
except Exception as e:
print(f" Tier1 failed: {e}")
inputs = None
# Tier 2: Direct PIL image (Qwen2VL style)
if inputs is None:
try:
inputs = processor(
text=[prompt],
images=[image],
padding=True,
return_tensors="pt",
).to(device)
print(" ✅ Tier2: direct PIL")
except Exception as e:
print(f" Tier2 failed: {e}")
inputs = None
# Tier 3: Text-only (last resort)
if inputs is None:
print(" ⚠️ Tier3: text-only fallback (no image — degraded)")
inputs = processor(
text=[prompt],
padding=True,
return_tensors="pt",
).to(device)
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=600,
do_sample=True,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repetition_penalty=repetition_penalty,
)
gen = out[:, inputs['input_ids'].shape[1]:]
decoded = processor.batch_decode(gen, skip_special_tokens=True)
if isinstance(decoded, list):
return decoded[0] if decoded else ""
return str(decoded) if decoded else ""
result = _generate(STEP1_EXTRACT_PROMPT)
# Coordinate output detect → retry with simpler prompt
if re.search(r'\(\d{1,4},\s*\d{1,4}\)', result) or '---TEXT_START---' not in result:
print(" ⚠️ Retrying with fallback prompt...")
fallback = (
"Read all text from this document image and write it line by line in plain text.\n"
"Do NOT output coordinates or bounding boxes.\n"
"Start output with:\n"
"PHOTO_PRESENT: yes or no\n"
"SIGNATURE_PRESENT: yes or no\n"
"MRZ_PRESENT: yes or no\n"
"DETECTED_LANGUAGE: name the language(s)\n"
"---TEXT_START---\n"
"[all text here exactly as printed]\n"
"---TEXT_END---"
)
result = _generate(fallback)
return result
def parse_step1_output(raw_output: str) -> dict:
"""Parse Step 1 structured output → metadata + original text"""
result = {
"photo_present": "❌ No",
"photo_location": "N/A",
"sig_present": "❌ No",
"sig_location": "N/A",
"mrz_present": "❌ No",
"detected_lang": "Unknown",
"original_text": raw_output,
}
def get(pattern, text, default="N/A"):
m = re.search(pattern, text, re.IGNORECASE)
return m.group(1).strip() if m else default
photo = get(r'PHOTO_PRESENT:\s*(yes|no)', raw_output)
result["photo_present"] = "✅ Yes" if photo.lower() == "yes" else "❌ No"
result["photo_location"] = get(r'PHOTO_LOCATION:\s*([^\n]+)', raw_output)
sig = get(r'SIGNATURE_PRESENT:\s*(yes|no)', raw_output)
result["sig_present"] = "✅ Yes" if sig.lower() == "yes" else "❌ No"
result["sig_location"] = get(r'SIGNATURE_LOCATION:\s*([^\n]+)', raw_output)
mrz = get(r'MRZ_PRESENT:\s*(yes|no)', raw_output)
result["mrz_present"] = "✅ Yes" if mrz.lower() == "yes" else "❌ No"
result["detected_lang"] = get(r'DETECTED_LANGUAGE:\s*([^\n]+)', raw_output, "Unknown")
m = re.search(r'---TEXT_START---\n?(.*?)---TEXT_END---', raw_output, re.DOTALL)
if m:
result["original_text"] = m.group(1).strip()
return result
def run_step2_structure(model, processor, metadata: dict, device,
max_new_tokens, temperature, top_p, top_k, repetition_penalty):
"""Step 2: Python extracts English fields + MRZ. LLM only classifies + fills gaps."""
raw_text = metadata.get('original_text', '')
# P2: Convert eastern numerals first
raw_text_normalized = convert_eastern_numerals(raw_text)
# P5: Separate scripts
english_block, original_block = separate_scripts(raw_text_normalized)
# P4: Direct English field extraction
english_fields = extract_english_fields(raw_text_normalized)
# P1: MRZ parse (authoritative)
mrz_data = parse_mrz_lines(raw_text_normalized)
# P3: Calendar detection + conversion (for display)
calendar_sys = detect_calendar_system(raw_text)
# Build python fields table
if english_fields:
tbl = "| Field (as printed on card) | Value (as printed) |\n|---|---|\n"
for label, val in english_fields:
tbl += f"| **{label}** | {val} |\n"
else:
tbl = "| — | No English label:value pairs detected |\n"
# MRZ summary
if mrz_data:
mrz_summary = " | ".join([f"{k}: {v}" for k, v in mrz_data.items() if k != 'mrz_format'])
mrz_summary = f"✅ {mrz_data.get('mrz_format','?')} parsed: {mrz_summary}"
else:
mrz_summary = "❌ No MRZ detected"
# Non-Gregorian note
cal_note = ""
if calendar_sys == 'solar_hijri':
cal_note = "\n> ⚠️ **Solar Hijri (Shamsi) calendar detected** — Python will convert dates to Gregorian."
elif calendar_sys == 'lunar_hijri':
cal_note = "\n> ⚠️ **Lunar Hijri calendar detected** — Python will convert dates to Gregorian."
# Build prompt for LLM (classification + gaps only)
prompt_text = STEP2_TEMPLATE.format(
python_fields_table=tbl,
mrz_summary=mrz_summary,
english_block=english_block or "None",
original_block=original_block or "None",
)
messages = [{"role": "user", "content": [{"type": "text", "text": prompt_text}]}]
try:
prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
except:
prompt = prompt_text
inputs = processor(
text=[prompt],
padding=True,
return_tensors="pt",
).to(device)
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
gen_kwargs = {
**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens,
"do_sample": True, "temperature": temperature, "top_p": top_p,
"top_k": top_k, "repetition_penalty": repetition_penalty,
}
thread = Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
# Pre-build Python-verified sections
# ── Pre-compute outside f-string (backslash fix for Python < 3.12) ──
newline = "\n"
mrz_pattern = r'^[A-Z0-9<]{25,50}$'
ws_pattern = r'\s+'
mrz_raw_lines = []
for _l in raw_text.split("\n"):
_c = re.sub(ws_pattern, '', _l.strip())
if re.match(mrz_pattern, _c):
mrz_raw_lines.append(_c)
mrz_raw_display = newline.join(mrz_raw_lines) if mrz_raw_lines else "NOT PRESENT"
mrz_table_str = build_mrz_table(mrz_data) if mrz_data else "_No MRZ detected._"
# Pre-build Python-verified sections
python_sections = (
"## 🖼️ Visual Elements\n\n"
"| Element | Status | Location |\n"
"|---------|--------|----------|\n"
f"| 📷 Profile Photo | {metadata['photo_present']} | {metadata['photo_location']} |\n"
f"| ✍️ Signature | {metadata['sig_present']} | {metadata['sig_location']} |\n"
f"| 🔐 MRZ Zone | {metadata['mrz_present']} | Bottom strip |\n\n"
"---\n\n"
"## ✅ English Fields (Direct from Card — Not Modified)\n"
f"{cal_note}\n\n"
f"{tbl}\n\n"
"---\n\n"
"## 📜 Original Script\n\n"
"```\n"
f"{raw_text}\n"
"```\n\n"
"---\n\n"
"## 🔐 MRZ Data\n\n"
"```\n"
f"{mrz_raw_display}\n"
"```\n\n"
f"{mrz_table_str}\n\n"
"---\n\n"
)
return streamer, thread, mrz_data, python_sections
# ╔══════════════════════════════════════════╗
# ║ GRADIO HELPER CLASSES ║
# ╚══════════════════════════════════════════╝
class RadioAnimated(gr.HTML):
def __init__(self, choices, value=None, **kwargs):
if not choices or len(choices) < 2:
raise ValueError("RadioAnimated requires at least 2 choices.")
if value is None:
value = choices[0]
uid = uuid.uuid4().hex[:8]
group_name = f"ra-{uid}"
inputs_html = "\n".join(
f'<input class="ra-input" type="radio" name="{group_name}" id="{group_name}-{i}" value="{c}">'
f'<label class="ra-label" for="{group_name}-{i}">{c}</label>'
for i, c in enumerate(choices)
)
html_template = f"""
<div class="ra-wrap" data-ra="{uid}">
<div class="ra-inner"><div class="ra-highlight"></div>{inputs_html}</div>
</div>"""
js_on_load = r"""
(() => {
const highlight = element.querySelector('.ra-highlight');
const inputs = Array.from(element.querySelectorAll('.ra-input'));
if (!inputs.length) return;
const choices = inputs.map(i => i.value);
function setHighlight(idx) {
highlight.style.width = `calc(${100/choices.length}% - 6px)`;
highlight.style.transform = `translateX(${idx * 100}%)`;
}
function setVal(val, trigger=false) {
const idx = Math.max(0, choices.indexOf(val));
inputs.forEach((inp, i) => { inp.checked = (i === idx); });
setHighlight(idx);
props.value = choices[idx];
if (trigger) trigger('change', props.value);
}
setVal(props.value ?? choices[0], false);
inputs.forEach(inp => inp.addEventListener('change', () => setVal(inp.value, true)));
})();"""
super().__init__(value=value, html_template=html_template, js_on_load=js_on_load, **kwargs)
def apply_gpu_duration(val: str):
return int(val)
def calc_timeout_duration(model_name, text, image_front, image_back,
max_new_tokens, temperature, top_p, top_k,
repetition_penalty, gpu_timeout):
try:
base = int(gpu_timeout)
return base * 2 if (image_front is not None and image_back is not None) else base
except:
return 180
# ╔══════════════════════════════════════════╗
# ║ MAIN PIPELINE FUNCTION ║
# ╚══════════════════════════════════════════╝
@spaces.GPU(duration=calc_timeout_duration)
def generate_dual_card_ocr(model_name: str, text: str,
image_front: Image.Image, image_back: Image.Image,
max_new_tokens: int, temperature: float, top_p: float,
top_k: int, repetition_penalty: float, gpu_timeout: int):
# Model selection
model_map = {
"Chhagan-ID-OCR-v1 ⭐": (CHHAGAN_V1_AVAILABLE, processor_c1, model_c1),
"Chhagan-DocVL-Qwen3 🔥": (CHHAGAN_QWEN3_AVAILABLE, processor_c2, model_c2),
"CSM-DocExtract-Q4KM 🏆": (CSM_Q4KM_AVAILABLE, processor_q4km, model_q4km),
"CSM-DocExtract-4BNB 💎": (CSM_4BNB_AVAILABLE, processor_4bnb, model_4bnb),
}
if model_name not in model_map:
yield "Invalid model.", "Invalid model."; return
available, processor, model = model_map[model_name]
if not available:
yield f"{model_name} not available.", f"{model_name} not available."; return
if image_front is None and image_back is None:
yield "Please upload at least one card image.", "Please upload at least one card image."; return
full_output = ""
front_result = ""
back_result = ""
all_mrz_data = {}
front_meta_saved = {}
back_meta_saved = {}
# ───── FRONT CARD ─────
if image_front is not None:
full_output += "# 🎴 FRONT CARD\n\n"
full_output += "⏳ **Step 1/2 — Raw OCR (original script, no translation)...**\n\n"
yield full_output, full_output
# Model 4 ke liye system prompt pass karo
sys_p = system_prompt_4bnb if model_name == "CSM-DocExtract-4BNB 💎" else None
step1_raw = run_step1_extraction(model, processor, image_front, device,
temperature, top_p, top_k, repetition_penalty,
system_prompt=sys_p)
front_meta = parse_step1_output(step1_raw)
front_meta_saved = front_meta
full_output += f"✅ **Step 1 Done** — 🌐 Language: **{front_meta['detected_lang']}**\n\n"
full_output += "⏳ **Step 2/2 — Python extract + LLM classify...**\n\n"
yield full_output, full_output
streamer_f, thread_f, mrz_f, python_sections_f = run_step2_structure(
model, processor, front_meta, device,
max_new_tokens, temperature, top_p, top_k, repetition_penalty)
if mrz_f:
all_mrz_data = mrz_f
buffer_f = python_sections_f
yield full_output + buffer_f, full_output + buffer_f
for new_text in streamer_f:
buffer_f += new_text.replace("<|im_end|>", "").replace("<|endoftext|>", "")
time.sleep(0.01)
yield full_output + buffer_f, full_output + buffer_f
full_output += buffer_f + "\n\n"
front_result = buffer_f
thread_f.join()
# ───── BACK CARD ─────
if image_back is not None:
full_output += "\n\n---\n\n# 🎴 BACK CARD\n\n"
full_output += "⏳ **Step 1/2 — Raw OCR (original script, no translation)...**\n\n"
yield full_output, full_output
step1_raw_back = run_step1_extraction(model, processor, image_back, device,
temperature, top_p, top_k, repetition_penalty)
back_meta = parse_step1_output(step1_raw_back)
back_meta_saved = back_meta
full_output += f"✅ **Step 1 Done** — 🌐 Language: **{back_meta['detected_lang']}**\n\n"
full_output += "⏳ **Step 2/2 — Python extract + LLM classify...**\n\n"
yield full_output, full_output
streamer_b, thread_b, mrz_b, python_sections_b = run_step2_structure(
model, processor, back_meta, device,
max_new_tokens, temperature, top_p, top_k, repetition_penalty)
if mrz_b and not all_mrz_data:
all_mrz_data = mrz_b
buffer_b = python_sections_b
yield full_output + buffer_b, full_output + buffer_b
for new_text in streamer_b:
buffer_b += new_text.replace("<|im_end|>", "").replace("<|endoftext|>", "")
time.sleep(0.01)
yield full_output + buffer_b, full_output + buffer_b
full_output += buffer_b
back_result = buffer_b
thread_b.join()
# ───── UNIFIED SUMMARY ─────
if image_front is not None and image_back is not None:
full_output += "\n\n---\n\n"
full_output += build_unified_summary(front_result, back_result, all_mrz_data)
mrz_note = f"MRZ: ✅ {all_mrz_data.get('mrz_format','?')} verified" if all_mrz_data else "MRZ: ❌ Not detected"
full_output += f"\n\n---\n\n**✨ Complete** | Model: `{model_name}` | {mrz_note} | Pipeline: OCR → Python Extract → LLM Classify\n"
yield full_output, full_output
# ╔══════════════════════════════════════════╗
# ║ MODEL CHOICES ║
# ╚══════════════════════════════════════════╝
model_choices = []
if CHHAGAN_V1_AVAILABLE: model_choices.append("Chhagan-ID-OCR-v1 ⭐")
if CHHAGAN_QWEN3_AVAILABLE: model_choices.append("Chhagan-DocVL-Qwen3 🔥")
if CSM_Q4KM_AVAILABLE: model_choices.append("CSM-DocExtract-Q4KM 🏆")
if CSM_4BNB_AVAILABLE: model_choices.append("CSM-DocExtract-4BNB 💎")
if not model_choices: model_choices = ["No models available"]
dual_card_examples = [
["Extract complete information", "examples/5.jpg", None],
["Multilingual OCR with MRZ", "examples/4.jpg", None],
["Extract profile photo and signature", "examples/2.jpg", None],
]
# ╔══════════════════════════════════════════╗
# ║ GRADIO UI ║
# ╚══════════════════════════════════════════╝
demo = gr.Blocks(css=css, theme=steel_blue_theme)
with demo:
gr.Markdown("# 🌍 **CSM Dual-Card ID OCR System**", elem_id="main-title")
gr.Markdown("### *Universal Document Extraction — MRZ + Multilingual + Auto Calendar*")
loaded_models = []
if CHHAGAN_V1_AVAILABLE: loaded_models.append("ID-OCR-v1 ⭐")
if CHHAGAN_QWEN3_AVAILABLE: loaded_models.append("DocVL-Qwen3 🔥")
if CSM_Q4KM_AVAILABLE: loaded_models.append("Q4KM 🏆")
if CSM_4BNB_AVAILABLE: loaded_models.append("4BNB 💎")
model_info = f"**Loaded ({len(loaded_models)}/4):** {', '.join(loaded_models)}" if loaded_models else "⚠️ No models"
gr.Markdown(f"**Status:** {model_info}")
gr.Markdown("**Pipeline:** ✅ Step1: Raw OCR → ✅ Python: MRZ+English Extract → ✅ LLM: Classify+Gaps → ✅ Deduplicate")
with gr.Row():
with gr.Column(scale=2):
image_query = gr.Textbox(
label="💬 Custom Query (Optional)",
placeholder="Leave empty for automatic full extraction...",
value=""
)
gr.Markdown("### 📤 Upload ID Cards")
with gr.Row():
image_front = gr.Image(type="pil", label="🎴 Front Card", height=250)
image_back = gr.Image(type="pil", label="🎴 Back Card (Optional)", height=250)
image_submit = gr.Button("🚀 Extract + Translate + Structure", variant="primary", size="lg")
gr.Examples(
examples=dual_card_examples,
inputs=[image_query, image_front, image_back],
label="📸 Sample ID Cards"
)
with gr.Accordion("⚙️ Advanced Settings", open=False):
max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS)
temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6)
top_p = gr.Slider(label="Top-p", minimum=0.05, maximum=1.0, step=0.05, value=0.9)
top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50)
repetition_penalty= gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.1)
with gr.Column(scale=3):
gr.Markdown("## 📄 Extraction Results", elem_id="output-title")
output = gr.Textbox(label="Raw Output (Streaming)", interactive=True, lines=15)
with gr.Accordion("📝 Structured Preview", open=True):
markdown_output = gr.Markdown(label="Formatted Result")
model_choice = gr.Radio(
choices=model_choices,
label="🤖 Select Model",
value=model_choices[0] if model_choices else None,
info="🏆💎 = 8B Quantized (best) | 🔥 = Qwen3 Fine-tuned | ⭐ = LoRA"
)
with gr.Row(elem_id="gpu-duration-container"):
with gr.Column():
gr.Markdown("**⏱️ GPU Duration (seconds)**")
radioanimated_gpu_duration = RadioAnimated(
choices=["60", "90", "120", "180", "240"],
value="180",
elem_id="radioanimated_gpu_duration"
)
gpu_duration_state = gr.Number(value=180, visible=False)
gr.Markdown("""
**✨ What This Extracts:**
- 🔐 MRZ: TD1/TD3/MRVA/MRVB — Python parsed, 100% accurate
- ✅ English fields: Direct from card, not modified
- 📜 Original script: Arabic/Farsi/Hindi/Chinese as-is
- 🗓️ Calendar: Shamsi/Hijri → Gregorian conversion
- 🔢 Eastern numerals: ۱۲۳ → 123 automatic
- 🔄 Front+Back: Deduplicated, MRZ-verified
""")
radioanimated_gpu_duration.change(
fn=apply_gpu_duration,
inputs=radioanimated_gpu_duration,
outputs=[gpu_duration_state],
api_visibility="private"
)
image_submit.click(
fn=generate_dual_card_ocr,
inputs=[model_choice, image_query, image_front, image_back,
max_new_tokens, temperature, top_p, top_k,
repetition_penalty, gpu_duration_state],
outputs=[output, markdown_output]
)
gr.Markdown("""
---
### 🎯 Feature Matrix
| Feature | Method | Accuracy |
|---------|--------|---------|
| MRZ Parse (TD1/TD3/MRVA) | Python | 100% |
| English Labels Extract | Python Regex | 100% |
| Eastern Numeral Convert | Python char map | 100% |
| Shamsi/Hijri Calendar | Python library | 100% |
| Raw OCR (32+ scripts) | 8B VLM | 90%+ |
| Doc Type Classification | 8B VLM | 95%+ |
| Non-English Translation | 8B VLM | 90%+ |
| Front+Back Deduplication | Python | 100% |
### 📋 Supported Documents
🇮🇳 Aadhaar, PAN, Passport | 🇦🇪 Emirates ID | 🇸🇦 Iqama | 🇴🇲 Oman Resident Card
🌍 International Passports (MRZ) | 🚗 Driving Licences | 🇮🇷 Iranian National ID (Shamsi)
### 🔒 Privacy
All processing on-device | No data stored | GDPR compliant
""")
if __name__ == "__main__":
print("\n🚀 STARTING...")
try:
demo.queue(max_size=50).launch(
server_name="0.0.0.0", server_port=7860, show_error=True, share=False)
except Exception as e:
import traceback
print(f"❌ {e}")
traceback.print_exc()