import os import io import uuid import json import time import tempfile import unicodedata import re from dataclasses import dataclass from typing import List, Dict, Tuple import cv2 import numpy as np import torch from paddleocr import TextDetection from easyocr import Reader from rapidfuzz import fuzz import gradio as gr # ============ CORE VALIDATORS (UNCHANGED) ============ class VerhoeffValidator: d_table = [[0,1,2,3,4,5,6,7,8,9],[1,2,3,4,0,6,7,8,9,5],[2,3,4,0,1,7,8,9,5,6],[3,4,0,1,2,8,9,5,6,7],[4,0,1,2,3,9,5,6,7,8],[5,9,8,7,6,0,4,3,2,1],[6,5,9,8,7,1,0,4,3,2],[7,6,5,9,8,2,1,0,4,3],[8,7,6,5,9,3,2,1,0,4],[9,8,7,6,5,4,3,2,1,0]] p_table = [[0,1,2,3,4,5,6,7,8,9],[1,5,7,6,2,8,3,0,9,4],[5,8,0,3,7,9,6,1,4,2],[8,9,1,6,0,4,3,5,2,7],[9,4,5,3,1,2,6,8,7,0],[4,2,8,6,5,7,3,9,0,1],[2,7,9,3,8,0,6,4,1,5],[7,0,4,6,9,1,3,2,5,8]] @classmethod def validate(cls, n: str) -> bool: if not n or len(n)!=12 or not n.isdigit() or n[0] in '01': return False c=0 for i,ch in enumerate(reversed(n)): c=cls.d_table[c][cls.p_table[i%8][int(ch)]] return c==0 class PatternValidator: @staticmethod def find_aadhaar(t: str) -> List[str]: return [re.sub(r'\s','',m) for p in [r'\b[2-9]\d{3}\s?\d{4}\s?\d{4}\b', r'\b[2-9]\d{11}\b'] for m in re.findall(p,t) if VerhoeffValidator.validate(re.sub(r'\s','',m))] @staticmethod def find_pan(t: str) -> List[str]: return list(set(re.findall(r'\b[A-Z]{3}[PCHFATBLJG][A-Z]\d{4}[A-Z]\b', t.upper()))) class TextNormalizer: OCR_CORRECTIONS = {'O':'0','o':'0','l':'1','I':'1','Z':'2','z':'2','S':'5','G':'6','b':'6','T':'7','B':'8','g':'9','q':'9'} @staticmethod def normalize(text: str, aggressive: bool=False) -> str: if not text: return "" text = ''.join(ch for ch in unicodedata.normalize('NFKC',text) if unicodedata.category(ch)[0]!='C') if aggressive: def fix(m): s=m.group(0) for o,n in TextNormalizer.OCR_CORRECTIONS.items(): s=s.replace(o,n) return s text = re.sub(r'\b[0-9OolIZzSGbTBgq]{4,}\b', fix, text) return re.sub(r'\s+',' ',re.sub(r'[^\w\s\u0900-\u097F.,/-]','',text)).strip() # ============ CONFIGURATION ============ @dataclass class Config: fuzzy_threshold: int = 80 min_keywords: int = 1 max_image_dim: int = 2000 languages: List[str] = None doc_keywords: Dict[str, List[str]] = None def __post_init__(self): if self.languages is None: self.languages = ['en','hi'] if self.doc_keywords is None: self.doc_keywords = { "Aadhaar": ["uidai","aadhaar","aadhar","government","india","mera","naam","pehchaan","यूआईडीएआई","आधार","भारत","सरकार","जन्म","तिथि"], "PAN": ["permanent","account","number","income","tax","incometaxindia","pan","स्थायी","खाता","आयकर","पिता","नाम"], "Driving_License": ["driving","licence","motor","vehicles","rto","mcwg","lmv","ड्राइविंग","वाहन","परिवहन","चालविण्याचा","परवाना"], "Passport": ["passport","republic","india","ministry","external","affairs","पासपोर्ट","गणराज्य","विदेश","मंत्रालय"], "Ration_Card": ["ration","card","food","civil","supplies","apl","bpl","राशन","कार्ड","खाद्य","नागरी","पुरवठा"] } # ============ MAIN PIPELINE ============ class DocumentOCRVerifier: def __init__(self, config: Config=None): self.cfg = config or Config() # initialize PaddleOCR detector and EasyOCR reader try: self.detector = TextDetection(model_name="PP-OCRv5_mobile_det") except Exception: self.detector = None self.reader = Reader(self.cfg.languages, gpu=torch.cuda.is_available()) def _preprocess(self, img: np.ndarray) -> np.ndarray: img = self._resize(img) img = self._deskew(img) return self._enhance(img) def _resize(self, img: np.ndarray) -> np.ndarray: h,w = img.shape[:2] if max(h,w) > self.cfg.max_image_dim: scale = self.cfg.max_image_dim / max(h,w) img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) return img def _deskew(self, img: np.ndarray) -> np.ndarray: gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) contours,_ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: rect = cv2.minAreaRect(max(contours, key=cv2.contourArea)) angle = rect[-1] if angle < -45: angle = 90 + angle elif angle > 45: angle -= 90 if abs(angle) > 0.5: h,w = img.shape[:2] M = cv2.getRotationMatrix2D((w//2, h//2), angle, 1.0) img = cv2.warpAffine(img, M, (w,h), borderValue=(255,255,255)) return img def _enhance(self, img: np.ndarray) -> np.ndarray: denoised = cv2.fastNlMeansDenoisingColored(img, None, 10, 10, 7, 21) lab = cv2.cvtColor(denoised, cv2.COLOR_BGR2LAB) l,a,b = cv2.split(lab) l = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)).apply(l) enhanced = cv2.cvtColor(cv2.merge([l,a,b]), cv2.COLOR_LAB2BGR) kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]]) return cv2.addWeighted(cv2.filter2D(enhanced, -1, kernel), 0.6, enhanced, 0.4, 0) def _extract_keywords(self, text: str) -> List[str]: if not text: return [] return [t for t in re.split(r'\s+', text.strip()) if t] def _classify(self, text: str) -> Tuple[str, float, List[str]]: norm_text = TextNormalizer.normalize(text, aggressive=True) scores = {} for doc_type, keywords in self.cfg.doc_keywords.items(): matched = [] for kw in keywords: if kw.lower() in norm_text.lower(): matched.append(kw); continue words = norm_text.lower().split() for i,w in enumerate(words): if fuzz.ratio(kw.lower(), w) >= self.cfg.fuzzy_threshold: matched.append(kw); break phrase = " ".join(words[i:min(i+5, len(words))]) if fuzz.ratio(kw.lower(), phrase) >= self.cfg.fuzzy_threshold: matched.append(kw); break score = len(matched) if doc_type == "Aadhaar" and PatternValidator.find_aadhaar(text): score = 100 elif doc_type == "PAN" and PatternValidator.find_pan(text): score = 100 scores[doc_type] = {"score": score, "matched": matched} winner = max(scores.items(), key=lambda x: x[1]["score"]) if winner[1]["score"] >= self.cfg.min_keywords: conf = 0.95 if winner[1]["score"] == 100 else min(0.90, len(winner[1]["matched"])/len(self.cfg.doc_keywords[winner[0]]) + 0.3) return winner[0], conf, winner[1]["matched"] return "UNCLASSIFIED", 0.0, [] def verify(self, image_path: str, user_keywords: List[str]) -> Dict: img = cv2.imread(image_path) if img is None: return {"error": "Image not found", "imagePath": image_path} img = self._preprocess(img) # Region-based OCR with word-level granularity ocr_keywords = [] all_text = "" if self.detector: try: regions = self.detector.predict(input=image_path, batch_size=1) except Exception: regions = [] else: regions = [] # If detector provided regions, use them; otherwise fallback to whole-image read if regions: for res in regions: for poly, score in zip(res.get("dt_polys", []), res.get("dt_scores", [])): pts = np.array(poly, dtype=np.int32) x,y,w,h = cv2.boundingRect(pts) cropped = img[y:y+h, x:x+w] texts = self.reader.readtext(cropped, detail=0) if texts: text = texts[0] words = self._extract_keywords(text) ocr_keywords.extend(words) all_text += " " + text else: # fallback: run reader on whole image texts = self.reader.readtext(img, detail=0) if texts: for t in texts: ocr_keywords.extend(self._extract_keywords(t)) all_text += " " + t # Classification doc_type, accuracy, matched_keywords = self._classify(all_text) # Verification - match against combined text for phrase support # Preserve raw input keywords (split externally) but perform exact matching on the combined OCR text without further altering user's internal spacing raw_input_keywords = user_keywords # Do minimal trimming for matching (only strip outer whitespace) minimal_norm_user_keywords = [kw.strip() for kw in raw_input_keywords if kw is not None] exact_matches = list(set([kw for kw in minimal_norm_user_keywords if kw.lower() in all_text.lower()])) status = "verified" if exact_matches else "not_verified" return { "documentType": doc_type, "documentTypeAccuracy": round(accuracy, 4), "ocrKeywords": ocr_keywords, "inputUserKeywords": minimal_norm_user_keywords, "rawInputUserKeywords": raw_input_keywords, "exactMatchingKeywords": exact_matches, "verificationStatus": status, "imagePath": image_path } # ============ APP ============ verifier = DocumentOCRVerifier() def save_upload_to_tmp(uploaded_file) -> str: """ Save an uploaded file-like object (from Gradio) to /tmp with a unique name. Returns absolute path. """ if isinstance(uploaded_file, str) and os.path.exists(uploaded_file): return uploaded_file tmp_dir = "/tmp/ocr_app" os.makedirs(tmp_dir, exist_ok=True) ext = ".jpg" # preserve original extension if available if hasattr(uploaded_file, "name") and uploaded_file.name: _, e = os.path.splitext(uploaded_file.name) if e: ext = e fname = f"{int(time.time())}_{uuid.uuid4().hex}{ext}" out_path = os.path.join(tmp_dir, fname) # uploaded_file could be bytes or file path if isinstance(uploaded_file, bytes): with open(out_path, "wb") as f: f.write(uploaded_file) else: # Gradio sometimes gives a path try: with open(uploaded_file, "rb") as src, open(out_path, "wb") as dst: dst.write(src.read()) except Exception: # last resort: try to read as numpy array (if provided) try: import PIL.Image as Image im = Image.open(uploaded_file).convert("RGB") im.save(out_path) except Exception: raise return out_path def display_uploaded_image(image): """ Immediately display the uploaded image without processing. """ if image is None: return None return image def run_ocr(image, keywords_raw: str): """ image: uploaded file path or bytes (Gradio Image component with type='file' or 'numpy') keywords_raw: raw string entered by user. Split by comma EXACTLY to form keywords. Preserve internal spacing. """ if image is None: return "