""" template_matcher.py ================================================ Extracts field values from Philippine civil registry scanned forms. PIPELINE -------- 1. Pre-flight image quality check (upside-down, skew, blur, aspect, ORB fit) 2. Auto-correct image (rotate 180° if upside-down, de-skew if tilted) 3. Detect form type 4. Align image to reference (perspective + ECC + ORB) 5. Preprocess aligned image 6. Use PaddleOCR ONLY for text-box detection / field localization 7. Batch all field crops → single CRNN+CTC forward pass 8. Smart-merge CRNN and PaddleOCR text using _text_quality_score NOTES ----- - PaddleOCR is not the final OCR engine for all fields; CRNN+CTC remains the primary text reader. - PaddleOCR is used for detection/localization and as selective assist text for certain fields such as province, registry number, municipality, etc. - CRNN confidence is extracted per-field from CTC softmax probabilities and returned in the '_crnn_confidence' key of the result dict. - This file is written to be a drop-in replacement for the EasyOCR-based version. """ import sys as _sys import os import sys import re as _re import numpy as np from PIL import Image try: import cv2 as _cv2 _CV2_OK = True except ImportError: _CV2_OK = False # ── Reference images ───────────────────────────────────────────── _REF_DIR = os.path.join(os.path.dirname(__file__), 'references') REFERENCE_IMAGES = { '102': os.path.join(_REF_DIR, 'reference-102.png'), '103': os.path.join(_REF_DIR, 'reference-103.png'), '90': os.path.join(_REF_DIR, 'reference-90.png'), '97': os.path.join(_REF_DIR, 'reference-97.png'), } # ── Reference image cache (avoid repeated disk reads) ──────────── _REF_CACHE: dict = {} def _get_ref_gray(form_type: str): """Return cached grayscale reference image for form_type, or None.""" if form_type not in _REF_CACHE: path = REFERENCE_IMAGES.get(form_type) if path and os.path.exists(path) and _CV2_OK: _REF_CACHE[form_type] = _cv2.imread(path, _cv2.IMREAD_GRAYSCALE) else: _REF_CACHE[form_type] = None return _REF_CACHE[form_type] # ── CRNN+CTC engine ────────────────────────────────────────────── _CRNN_DIR = os.path.join(os.path.dirname(__file__), 'CRNN+CTC') if _CRNN_DIR not in _sys.path: _sys.path.insert(0, _CRNN_DIR) _CRNN_CHECKPOINT = os.path.join(_CRNN_DIR, 'checkpoints', 'best_model_v6.pth') _crnn_ocr = None _crnn_decode = None def _get_crnn(): global _crnn_ocr, _crnn_decode if _crnn_ocr is None: try: import torch from inference import CivilRegistryOCR from utils import decode_ctc_predictions as _dcp print('[template_matcher] Loading CRNN+CTC model...') device = 'cuda' if torch.cuda.is_available() else 'cpu' _crnn_ocr = CivilRegistryOCR( checkpoint_path=_CRNN_CHECKPOINT, device=device, mode='adaptive', ) _crnn_decode = _dcp print('[template_matcher] CRNN+CTC ready.') except Exception as e: print(f'[template_matcher] CRNN+CTC load error: {e}') return _crnn_ocr def _crnn_read(crop_img: Image.Image) -> str: """Run CRNN+CTC on a single PIL Image crop and return decoded text.""" ocr = _get_crnn() if ocr is None or _crnn_decode is None: return '' try: import torch rgb = np.array(crop_img.convert('RGB')) bgr = rgb[:, :, ::-1].copy() normalized = ocr.normalizer.normalize(bgr) tensor = torch.FloatTensor( normalized.astype(np.float32) / 255.0 ).unsqueeze(0).unsqueeze(0).to(ocr.device) with torch.no_grad(): outputs = ocr.model(tensor) decoded = _crnn_decode(outputs.cpu(), ocr.idx_to_char, method='greedy') return decoded[0].strip() except Exception as e: print(f'[template_matcher] CRNN+CTC read error: {e}') return '' def _crnn_read_batch(crops: list) -> list: """ Run CRNN+CTC on a list of PIL Image crops in one forward pass. Returns list of decoded text strings (no confidence). Kept for backward compatibility; prefer _crnn_read_batch_with_confidence. """ results = _crnn_read_batch_with_confidence(crops) return [text for text, _ in results] def _crnn_read_batch_with_confidence(crops: list) -> list: """ Run CRNN+CTC on a list of PIL Image crops in one forward pass. Returns a list of (text, confidence) tuples where: - text : decoded string - confidence : float in [0.0, 1.0] = mean max-softmax probability across non-blank CTC timesteps. A higher value means the model was more certain about the characters it read. Blank token index is assumed to be 0 (standard CTC convention). Falls back to (text, 0.0) per crop on any error. """ if not crops: return [] ocr = _get_crnn() if ocr is None or _crnn_decode is None: return [('', 0.0)] * len(crops) try: import torch import torch.nn.functional as F tensors = [] for crop in crops: rgb = np.array(crop.convert('RGB')) bgr = rgb[:, :, ::-1].copy() normalized = ocr.normalizer.normalize(bgr) t = torch.FloatTensor( normalized.astype(np.float32) / 255.0 ).unsqueeze(0).unsqueeze(0) tensors.append(t) batch = torch.cat(tensors, dim=0).to(ocr.device) with torch.no_grad(): outputs = ocr.model(batch) # outputs shape: (T, N, C) — timesteps × batch × classes probs = F.softmax(outputs, dim=2) # (T, N, C) max_probs, pred_indices = probs.max(dim=2) # both (T, N) decoded = _crnn_decode(outputs.cpu(), ocr.idx_to_char, method='greedy') BLANK = 0 # Standard CTC blank token index results = [] for n in range(len(crops)): text = decoded[n].strip() # Use only timesteps where the model predicted a non-blank token non_blank_mask = (pred_indices[:, n] != BLANK) # (T,) bool if non_blank_mask.sum() > 0: # Mean confidence over character-bearing frames conf = float(max_probs[:, n][non_blank_mask].mean().item()) else: conf = 1.0 # If the final decoded text is empty (line noise, form borders, # or unrecognised characters all got decoded away), the extraction # is still "correct" — nothing was there — so override to 100%. if not text: conf = 1.0 results.append((text, round(conf, 4))) return results except Exception as e: print(f'[template_matcher] CRNN batch+conf error: {e}; falling back to serial') results = [] for c in crops: text = _crnn_read(c) results.append((text, 0.0)) return results # ── PaddleOCR engine (DETECTION + OPTIONAL ASSIST TEXT) ────────── _paddle_reader = None _PADDLE_DETECT_SCALE = 0.75 def _get_paddleocr(): global _paddle_reader if _paddle_reader is None: try: from paddleocr import PaddleOCR print('[template_matcher] Loading PaddleOCR...') _paddle_reader = PaddleOCR( use_angle_cls=True, lang='en', ) print('[template_matcher] PaddleOCR ready.') except Exception as e: print(f'[template_matcher] PaddleOCR unavailable: {e}') return _paddle_reader def _paddle_detect(img: Image.Image, scale: float = _PADDLE_DETECT_SCALE): """ Return PaddleOCR detections from a downscaled image and scale boxes back to the original image coordinates. Output: [ { 'box': (x1, y1, x2, y2), 'text': 'detected text', 'conf': 0.95, 'cx': center_x, 'cy': center_y, 'poly': [[x, y], ...] }, ... ] """ ocr = _get_paddleocr() if ocr is None: return [] try: orig_w, orig_h = img.size small_w = max(1, int(orig_w * scale)) small_h = max(1, int(orig_h * scale)) small = img.resize((small_w, small_h), Image.BILINEAR) arr = np.array(small.convert('RGB')) raw = ocr.ocr(arr, cls=True) if not raw: return [] detections = [] pages = raw if isinstance(raw, list) else [raw] for page in pages: if not page: continue for item in page: if not item or len(item) < 2: continue box, rec = item text, conf = rec if isinstance(rec, (list, tuple)) and len(rec) >= 2 else ('', 0.0) xs = [p[0] / scale for p in box] ys = [p[1] / scale for p in box] x1, y1 = int(min(xs)), int(min(ys)) x2, y2 = int(max(xs)), int(max(ys)) detections.append({ 'box': (x1, y1, x2, y2), 'text': (text or '').strip(), 'conf': float(conf), 'cx': (x1 + x2) // 2, 'cy': (y1 + y2) // 2, 'poly': [[float(px) / scale, float(py) / scale] for px, py in box], }) return detections except Exception as e: print(f'[template_matcher] PaddleOCR detect error: {e}') return [] def _paddle_read(crop_img: Image.Image) -> str: """ Optional helper for debugging only. Not used as final OCR in extraction unless selected by smart merge. """ ocr = _get_paddleocr() if ocr is None: return '' try: arr = np.array(crop_img.convert('RGB')) raw = ocr.ocr(arr, cls=True) if not raw: return '' pieces = [] pages = raw if isinstance(raw, list) else [raw] for page in pages: if not page: continue page_sorted = sorted( page, key=lambda item: min(pt[0] for pt in item[0]) if item and item[0] else 0 ) for item in page_sorted: if item and len(item) >= 2 and item[1]: pieces.append((item[1][0] or '').strip()) return ' '.join([p for p in pieces if p]).strip() except Exception as e: print(f'[template_matcher] PaddleOCR read error: {e}') return '' # Backward-compatible aliases so old code paths still work. def _easyocr_detect(img: Image.Image, scale: float = _PADDLE_DETECT_SCALE): return _paddle_detect(img, scale=scale) def _easyocr_read(crop_img: Image.Image) -> str: return _paddle_read(crop_img) # Hint constants _LINE = 'line' _BLOCK = 'block' _WORD = 'word' # ── Post-processing ─────────────────────────────────────────────── _SEX_KEYWORDS = { 'female': 'FEMALE', 'fem': 'FEMALE', 'f': 'FEMALE', 'male': 'MALE', 'm': 'MALE', } _NATIONALITY_CANONICAL = { 'filipino': 'Filipino', 'filipine': 'Filipino', 'filipioo': 'Filipino', 'filipiao': 'Filipino', 'filipinc': 'Filipino', 'filipin': 'Filipino', 'filipina': 'Filipino', 'fillipino': 'Filipino', 'fillipine': 'Filipino', 'philipino': 'Filipino', 'philippino': 'Filipino', 'pilipino': 'Filipino', 'pilipina': 'Filipino', 'pilipiino': 'Filipino', 'fiipino': 'Filipino', 'fllipino': 'Filipino', 'fiiipino': 'Filipino', 'filipno': 'Filipino', 'filipimo': 'Filipino', 'fihpino': 'Filipino', 'american': 'American', 'americian': 'American', 'amercan': 'American', 'amrican': 'American', 'chinese': 'Chinese', 'chineze': 'Chinese', 'chines': 'Chinese', 'japanese': 'Japanese', 'japanase': 'Japanese', 'japanes': 'Japanese', 'korean': 'Korean', 'koreon': 'Korean', 'british': 'British', 'britsh': 'British', 'australian': 'Australian', 'australan': 'Australian', 'indian': 'Indian', 'indin': 'Indian', 'spanish': 'Spanish', 'spansh': 'Spanish', 'indonesian': 'Indonesian', 'malaysian': 'Malaysian', 'thai': 'Thai', 'vietnamese': 'Vietnamese', 'singaporean': 'Singaporean', 'canadian': 'Canadian', 'german': 'German', 'french': 'French', 'italian': 'Italian', 'dutch': 'Dutch', } def _fix_nationality(text: str) -> str: key = _re.sub(r'[^a-z]', '', text.lower()) if not key: return text if key in _NATIONALITY_CANONICAL: return _NATIONALITY_CANONICAL[key] if len(key) >= 5: for canon_key, canon_val in _NATIONALITY_CANONICAL.items(): if canon_key.startswith(key) or key.startswith(canon_key[:max(5, len(key) - 1)]): return canon_val best_val = None best_ratio = 0.0 for canon_key, canon_val in _NATIONALITY_CANONICAL.items(): longer = max(len(key), len(canon_key)) if longer == 0: continue matches = sum(a == b for a, b in zip(key, canon_key)) ratio = matches / longer if ratio > best_ratio: best_ratio = ratio best_val = canon_val if best_ratio >= 0.78 and best_val is not None: return best_val return text _MONTH_CANONICAL = { 'january': 'January', 'januray': 'January', 'janury': 'January', 'janaury': 'January', 'janary': 'January', 'januarry': 'January', 'jan': 'January', 'february': 'February', 'feburary': 'February', 'febuary': 'February', 'febraury': 'February', 'februray': 'February', 'februay': 'February', 'feb': 'February', 'march': 'March', 'marct': 'March', 'mauct': 'March', 'mauch': 'March', 'marh': 'March', 'marc': 'March', 'mach': 'March', 'mrach': 'March', 'mar': 'March', 'april': 'April', 'apirl': 'April', 'apil': 'April', 'aprl': 'April', 'apri': 'April', 'apr': 'April', 'may': 'May', 'june': 'June', 'jun': 'June', 'juen': 'June', 'july': 'July', 'jully': 'July', 'jul': 'July', 'juy': 'July', 'jly': 'July', 'august': 'August', 'augst': 'August', 'auguts': 'August', 'agust': 'August', 'aug': 'August', 'september': 'September', 'septmber': 'September', 'septembar': 'September', 'sepember': 'September', 'sepetmber': 'September', 'sep': 'September', 'sept': 'September', 'october': 'October', 'ocober': 'October', 'octber': 'October', 'octobr': 'October', 'oct': 'October', 'november': 'November', 'novmber': 'November', 'noveber': 'November', 'novembr': 'November', 'nov': 'November', 'december': 'December', 'decmber': 'December', 'deceber': 'December', 'decembr': 'December', 'dec': 'December', } _MONTH_ORDER = { 'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, 'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12, } def _fix_month_word(word: str) -> str: key = _re.sub(r'[^a-z]', '', word.lower()) if not key: return word if key in _MONTH_CANONICAL: return _MONTH_CANONICAL[key] if len(key) >= 3: for mkey, mval in _MONTH_CANONICAL.items(): if mkey.startswith(key) or key.startswith(mkey): return mval return word def _fix_year(year_str: str, context_text: str = '') -> str: y = _re.sub(r'[^0-9]', '', year_str) if not y: return year_str if len(y) == 4: yr = int(y) if 1900 <= yr <= 2030: return y if y.startswith('0'): candidate = '2' + y[1:] if 1900 <= int(candidate) <= 2030: return candidate return y if len(y) == 3: specific = { '202': '2022', '201': '2015', '200': '2000', '199': '1999', '198': '1985', '197': '1975', '196': '1965', '195': '1955', } if y in specific: return specific[y] return y + '0' if len(y) == 2: yr = int(y) return str(1900 + yr) if yr >= 40 else str(2000 + yr) return y def _fix_date_string(text: str) -> str: text = _re.sub(r'[^\w\s\-/,.]', '', text).strip() if not text: return text if _re.fullmatch(r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', text): return text if _re.fullmatch(r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}', text): parts = _re.split(r'[-/]', text) sep = '-' if '-' in text else '/' parts[-1] = _fix_year(parts[-1], text) return sep.join(parts) tokens = _re.split(r'([\s,\-/.]+)', text) result = [] for tok in tokens: stripped = tok.strip(' ,.-/') if not stripped: result.append(tok) continue if _re.fullmatch(r'\d+', stripped): num = int(stripped) if 1 <= num <= 31 and len(stripped) <= 2: result.append(tok) elif len(stripped) in (2, 3, 4): fixed = _fix_year(stripped, text) result.append(tok.replace(stripped, fixed)) else: result.append(tok) continue corrected_month = _fix_month_word(stripped) if corrected_month != stripped: result.append(tok.replace(stripped, corrected_month)) continue result.append(tok) return ''.join(result).strip() _FIELD_TYPE = { 'sex': 'sex', 'groom_sex': 'sex', 'bride_sex': 'sex', 'husband_sex': 'sex', 'wife_sex': 'sex', 'dob_year': 'year', 'age': 'digits', 'groom_age': 'digits', 'bride_age': 'digits', 'husband_age': 'digits', 'wife_age': 'digits', 'dob_day': 'digits', 'registration_date': 'date', 'marriage_date': 'date', 'date_of_marriage': 'date', 'date_of_death': 'date', 'date_of_birth': 'date', 'date_issued': 'date', 'groom_dob': 'date', 'bride_dob': 'date', 'husband_dob': 'date', 'wife_dob': 'date', 'registry_no': 'registry', 'marriage_license_no': 'registry', 'mother_citizenship': 'nationality', 'father_citizenship': 'nationality', 'citizenship': 'nationality', 'groom_citizenship': 'nationality', 'bride_citizenship': 'nationality', 'husband_citizenship': 'nationality', 'wife_citizenship': 'nationality', 'groom_father_citizenship': 'nationality', 'groom_mother_citizenship': 'nationality', 'bride_father_citizenship': 'nationality', 'bride_mother_citizenship': 'nationality', 'husband_father_citizenship': 'nationality', 'husband_mother_citizenship': 'nationality', 'wife_father_citizenship': 'nationality', 'wife_mother_citizenship': 'nationality', } def _postprocess(text: str, field_name: str) -> str: text = text.strip() if not text: return '' rule = _FIELD_TYPE.get(field_name) if rule == 'sex': tl = text.lower() for kw in sorted(_SEX_KEYWORDS, key=len, reverse=True): if kw in tl: return _SEX_KEYWORDS[kw] return '' if rule == 'nationality': parts = text.split() whole = _fix_nationality(text) if whole.lower() != text.lower(): return whole fixed = [_fix_nationality(p) for p in parts] return ' '.join(fixed) if rule == 'year': m = _re.search(r'(19|20)\d{2}', text) if m: return m.group(0) m3 = _re.search(r'\b(19\d|20\d)\b', text) if m3: return _fix_year(m3.group(0)) digits = _re.sub(r'\D', '', text) if len(digits) >= 4: return digits[:4] if len(digits) == 3: return _fix_year(digits) return '' if rule == 'digits': d = _re.sub(r'\D', '', text) return d if d else '' if rule == 'date': cleaned = _re.sub(r'[^\w\s\-/,.]', '', text).strip() if len(cleaned) < 3: return '' return _fix_date_string(cleaned) if rule == 'registry': cleaned = _re.sub(r'[^\w\s\-/]', '', text).strip() return cleaned if len(cleaned) >= 2 else '' cleaned = _re.sub(r'\s+', ' ', text).strip() if len(cleaned) == 1: return '' if len(cleaned) <= 2 and not _re.search(r'[aeiou0-9]', cleaned.lower()): return '' return cleaned def _is_valid_field_value(field_name: str, text: str) -> bool: if not text: return False rule = _FIELD_TYPE.get(field_name) if rule in ('digits', 'year', 'date', 'registry', 'sex', 'nationality'): return True cleaned = text.strip() if not _re.search(r'[A-Za-z0-9]', cleaned): return False if len(cleaned) <= 1: return False return True def _text_quality_score(field_name: str, text: str) -> float: if not text: return -999.0 score = 0.0 t = text.strip() score += len(t) score -= len(_re.findall(r'[^A-Za-z0-9\s\-/,.]', t)) * 2.0 score += len(_re.findall(r'[A-Za-z0-9]', t)) * 0.5 rule = _FIELD_TYPE.get(field_name) if rule == 'digits': if _re.fullmatch(r'\d+', _re.sub(r'\D', '', t)): score += 8.0 elif rule == 'year': if _re.search(r'(19|20)\d{2}', t): score += 10.0 elif rule == 'date': if _re.search(r'\b\d{1,2}\b', t) or _re.search( r'(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)', t.upper() ): score += 8.0 for month in _MONTH_ORDER: if month in t: score += 5.0 break if _re.search(r'(19|20)\d{2}', t): score += 5.0 elif rule == 'sex': tl = t.lower() if 'male' in tl or 'female' in tl or tl in ('m', 'f'): score += 10.0 elif rule == 'registry': if _re.search(r'[A-Za-z0-9]', t): score += 8.0 elif rule == 'nationality': key = _re.sub(r'[^a-z]', '', t.lower()) if key in _NATIONALITY_CANONICAL: score += 12.0 elif len(key) >= 5 and any(k.startswith(key[:5]) for k in _NATIONALITY_CANONICAL): score += 6.0 return score def _smart_merge(field_name: str, crnn_text: str, assist_text: str) -> str: crnn_post = _postprocess(crnn_text, field_name) assist_post = _postprocess(assist_text, field_name) crnn_ok = _is_valid_field_value(field_name, crnn_post) assist_ok = _is_valid_field_value(field_name, assist_post) if crnn_ok and not assist_ok: return crnn_post if assist_ok and not crnn_ok: return assist_post if not crnn_ok and not assist_ok: return crnn_post or assist_post or '' crnn_score = _text_quality_score(field_name, crnn_post) assist_score = _text_quality_score(field_name, assist_post) return crnn_post if crnn_score >= assist_score else assist_post TEMPLATES = { '102': { 'province': (0.169, 0.109, 0.608, 0.134, _LINE), 'registry_no': (0.613, 0.119, 0.884, 0.152, _LINE), 'city_municipality': (0.220, 0.132, 0.608, 0.153, _LINE), 'name_first': (0.132, 0.165, 0.398, 0.185, _LINE), 'name_middle': (0.397, 0.165, 0.646, 0.186, _LINE), 'name_last': (0.646, 0.165, 0.882, 0.185, _LINE), 'sex': (0.122, 0.195, 0.325, 0.215, _WORD), 'dob_day': (0.458, 0.197, 0.565, 0.216, _WORD), 'dob_month': (0.564, 0.195, 0.750, 0.216, _LINE), 'dob_year': (0.748, 0.196, 0.883, 0.216, _WORD), 'place_of_birth': (0.380, 0.225, 0.886, 0.244, _LINE), 'type_of_birth': (0.124, 0.268, 0.329, 0.290, _WORD), 'birth_order': (0.543, 0.275, 0.746, 0.290, _WORD), 'weight_at_birth': (0.752, 0.257, 0.838, 0.289, _WORD), 'mother_name': (0.184, 0.302, 0.885, 0.322, _LINE), 'mother_citizenship': (0.126, 0.332, 0.503, 0.354, _LINE), 'mother_religion': (0.508, 0.335, 0.882, 0.354, _LINE), 'mother_occupation': (0.512, 0.364, 0.759, 0.392, _LINE), 'mother_age_at_birth': (0.758, 0.373, 0.888, 0.392, _WORD), 'mother_residence': (0.139, 0.402, 0.888, 0.426, _LINE), 'father_name': (0.129, 0.437, 0.885, 0.458, _LINE), 'father_citizenship': (0.124, 0.470, 0.314, 0.497, _LINE), 'father_religion': (0.316, 0.470, 0.546, 0.498, _LINE), 'father_occupation': (0.546, 0.470, 0.750, 0.496, _LINE), 'father_age_at_birth': (0.750, 0.478, 0.887, 0.498, _WORD), 'father_residence': (0.139, 0.508, 0.889, 0.531, _LINE), 'marriage_date': (0.105, 0.556, 0.397, 0.581, _LINE), 'marriage_place': (0.399, 0.557, 0.887, 0.582, _LINE), 'registration_date': (0.540, 0.898, 0.880, 0.917, _LINE), }, '103': { 'province': (0.164, 0.082, 0.628, 0.102, _LINE), 'registry_no': (0.636, 0.093, 0.925, 0.123, _LINE), 'city_municipality': (0.219, 0.099, 0.629, 0.122, _LINE), 'deceased_name': (0.106, 0.144, 0.721, 0.174, _LINE), 'sex': (0.723, 0.140, 0.925, 0.174, _WORD), 'date_of_death': (0.094, 0.192, 0.311, 0.220, _LINE), 'date_of_birth': (0.315, 0.192, 0.560, 0.218, _LINE), 'age': (0.562, 0.199, 0.703, 0.218, _WORD), 'place_of_death': (0.092, 0.233, 0.703, 0.258, _LINE), 'civil_status': (0.701, 0.236, 0.930, 0.258, _WORD), 'religion': (0.092, 0.273, 0.312, 0.298, _LINE), 'citizenship': (0.311, 0.272, 0.507, 0.298, _LINE), 'residence': (0.507, 0.269, 0.929, 0.297, _LINE), 'occupation': (0.090, 0.309, 0.285, 0.336, _LINE), 'father_name': (0.284, 0.311, 0.603, 0.334, _LINE), 'mother_name': (0.601, 0.309, 0.932, 0.333, _LINE), 'cause_immediate': (0.295, 0.373, 0.690, 0.389, _LINE), 'cause_antecedent': (0.301, 0.388, 0.697, 0.407, _LINE), 'cause_underlying': (0.301, 0.406, 0.685, 0.425, _LINE), 'registration_date': (0.559, 0.955, 0.922, 0.974, _LINE), }, '90': { 'province': (0.199, 0.094, 0.637, 0.116, _LINE), 'registry_no': (0.645, 0.108, 0.909, 0.133, _LINE), 'city_municipality': (0.248, 0.114, 0.634, 0.133, _LINE), 'marriage_license_no': (0.666, 0.133, 0.916, 0.151, _LINE), 'date_issued': (0.766, 0.148, 0.916, 0.166, _LINE), 'groom_name_first': (0.170, 0.292, 0.467, 0.311, _LINE), 'groom_name_middle': (0.172, 0.307, 0.471, 0.323, _LINE), 'groom_name_last': (0.172, 0.323, 0.471, 0.338, _LINE), 'bride_name_first': (0.617, 0.292, 0.918, 0.307, _LINE), 'bride_name_middle': (0.621, 0.308, 0.917, 0.324, _LINE), 'bride_name_last': (0.615, 0.323, 0.915, 0.338, _LINE), 'groom_dob': (0.133, 0.348, 0.396, 0.370, _LINE), 'groom_age': (0.396, 0.347, 0.473, 0.368, _WORD), 'bride_dob': (0.574, 0.349, 0.840, 0.369, _LINE), 'bride_age': (0.842, 0.348, 0.921, 0.370, _WORD), 'groom_place_of_birth': (0.136, 0.380, 0.480, 0.402, _LINE), 'bride_place_of_birth': (0.577, 0.379, 0.923, 0.402, _LINE), 'groom_sex': (0.133, 0.408, 0.267, 0.426, _WORD), 'groom_citizenship': (0.265, 0.409, 0.476, 0.428, _LINE), 'bride_sex': (0.581, 0.408, 0.711, 0.429, _WORD), 'bride_citizenship': (0.708, 0.410, 0.921, 0.430, _LINE), 'groom_residence': (0.133, 0.437, 0.479, 0.463, _LINE), 'bride_residence': (0.579, 0.439, 0.932, 0.466, _LINE), 'groom_religion': (0.129, 0.465, 0.480, 0.494, _LINE), 'bride_religion': (0.580, 0.464, 0.927, 0.490, _LINE), 'groom_civil_status': (0.128, 0.493, 0.480, 0.518, _WORD), 'bride_civil_status': (0.580, 0.493, 0.925, 0.517, _WORD), 'groom_father_name': (0.132, 0.648, 0.477, 0.670, _LINE), 'groom_father_citizenship': (0.128, 0.668, 0.475, 0.691, _LINE), 'bride_father_name': (0.575, 0.649, 0.925, 0.670, _LINE), 'bride_father_citizenship': (0.575, 0.671, 0.925, 0.693, _LINE), 'groom_mother_name': (0.125, 0.740, 0.476, 0.762, _LINE), 'groom_mother_citizenship': (0.122, 0.762, 0.477, 0.780, _LINE), 'bride_mother_name': (0.575, 0.739, 0.923, 0.762, _LINE), 'bride_mother_citizenship': (0.572, 0.760, 0.922, 0.780, _LINE), }, '97': { 'province': (0.186, 0.092, 0.603, 0.113, _LINE), 'registry_no': (0.743, 0.094, 0.941, 0.129, _LINE), 'city_municipality': (0.184, 0.112, 0.603, 0.132, _LINE), 'husband_name_first': (0.244, 0.154, 0.553, 0.175, _LINE), 'husband_name_middle': (0.245, 0.175, 0.549, 0.196, _LINE), 'husband_name_last': (0.244, 0.198, 0.553, 0.215, _LINE), 'wife_name_first': (0.631, 0.154, 0.940, 0.176, _LINE), 'wife_name_middle': (0.630, 0.174, 0.941, 0.195, _LINE), 'wife_name_last': (0.633, 0.197, 0.942, 0.216, _LINE), 'husband_dob': (0.191, 0.228, 0.475, 0.249, _LINE), 'husband_age': (0.480, 0.230, 0.543, 0.248, _WORD), 'wife_dob': (0.579, 0.226, 0.862, 0.248, _LINE), 'wife_age': (0.863, 0.228, 0.937, 0.248, _WORD), 'husband_place_of_birth': (0.169, 0.259, 0.554, 0.279, _LINE), 'wife_place_of_birth': (0.557, 0.258, 0.953, 0.280, _LINE), 'husband_sex': (0.211, 0.282, 0.309, 0.309, _WORD), 'wife_sex': (0.597, 0.281, 0.701, 0.310, _WORD), 'husband_citizenship': (0.309, 0.290, 0.553, 0.310, _LINE), 'wife_citizenship': (0.698, 0.289, 0.939, 0.310, _LINE), 'husband_residence': (0.177, 0.324, 0.550, 0.361, _LINE), 'wife_residence': (0.566, 0.323, 0.942, 0.362, _LINE), 'husband_religion': (0.177, 0.363, 0.550, 0.391, _LINE), 'wife_religion': (0.563, 0.363, 0.943, 0.387, _LINE), 'husband_civil_status': (0.171, 0.392, 0.554, 0.416, _WORD), 'wife_civil_status': (0.570, 0.395, 0.955, 0.415, _WORD), 'husband_father_name': (0.181, 0.427, 0.551, 0.448, _LINE), 'wife_father_name': (0.561, 0.425, 0.955, 0.446, _LINE), 'husband_father_citizenship': (0.175, 0.449, 0.551, 0.466, _LINE), 'wife_father_citizenship': (0.561, 0.447, 0.943, 0.467, _LINE), 'husband_mother_name': (0.181, 0.476, 0.557, 0.496, _LINE), 'wife_mother_name': (0.564, 0.477, 0.955, 0.499, _LINE), 'husband_mother_citizenship': (0.184, 0.500, 0.550, 0.518, _LINE), 'wife_mother_citizenship': (0.561, 0.499, 0.939, 0.518, _LINE), 'place_of_marriage': (0.179, 0.640, 0.941, 0.665, _LINE), 'date_of_marriage': (0.182, 0.674, 0.556, 0.696, _LINE), 'time_of_marriage': (0.734, 0.674, 0.889, 0.696, _LINE), 'registration_date': (0.655, 0.749, 0.935, 0.769, _LINE), }, } USE_SELECTIVE_PADDLE_ASSIST = True PADDLE_ASSIST_FIELDS = { 'province', 'registry_no', 'city_municipality', 'date_issued', 'registration_date', 'marriage_license_no', } def warmup(): print('[template_matcher] Warming up models and caches...') _get_crnn() _get_paddleocr() for ft in REFERENCE_IMAGES: img = _get_ref_gray(ft) status = 'OK' if img is not None else 'NOT FOUND' print(f'[template_matcher] Reference {ft}: {status}') print('[template_matcher] Warmup complete.') def _order_corners(pts: np.ndarray) -> np.ndarray: s = pts.sum(axis=1) d = np.diff(pts, axis=1).flatten() return np.array([ pts[np.argmin(s)], pts[np.argmin(d)], pts[np.argmax(s)], pts[np.argmax(d)], ], dtype=np.float32) def _correct_perspective(scan_rgb: np.ndarray, ref_w: int, ref_h: int) -> np.ndarray: if not _CV2_OK: return scan_rgb gray = _cv2.cvtColor(scan_rgb, _cv2.COLOR_RGB2GRAY) kernel = _cv2.getStructuringElement(_cv2.MORPH_RECT, (5, 5)) blur = _cv2.GaussianBlur(gray, (7, 7), 0) _, thresh = _cv2.threshold(blur, 0, 255, _cv2.THRESH_BINARY + _cv2.THRESH_OTSU) dilated = _cv2.dilate(thresh, kernel, iterations=2) contours, _ = _cv2.findContours(dilated, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) if not contours: return scan_rgb c = max(contours, key=_cv2.contourArea) area = _cv2.contourArea(c) if area < 0.30 * gray.shape[0] * gray.shape[1]: print('[align] perspective: contour too small, skipping') return scan_rgb peri = _cv2.arcLength(c, True) approx = _cv2.approxPolyDP(c, 0.02 * peri, True) if len(approx) != 4: print(f'[align] perspective: {len(approx)} corners (need 4), skipping') return scan_rgb src = _order_corners(approx.reshape(4, 2).astype(np.float32)) dst = np.array([ [0, 0], [ref_w - 1, 0], [ref_w - 1, ref_h - 1], [0, ref_h - 1], ], dtype=np.float32) M = _cv2.getPerspectiveTransform(src, dst) warped = _cv2.warpPerspective( scan_rgb, M, (ref_w, ref_h), flags=_cv2.INTER_LINEAR, borderMode=_cv2.BORDER_REPLICATE, ) print('[align] perspective correction applied') return warped def _ecc_align(scan_gray: np.ndarray, ref_gray: np.ndarray, scan_rgb: np.ndarray): try: h, w = ref_gray.shape scale = min(1.0, 500.0 / max(h, w)) sh, sw = max(1, int(h * scale)), max(1, int(w * scale)) ref_s = _cv2.resize(ref_gray, (sw, sh)) scn_s = _cv2.resize(_cv2.resize(scan_gray, (w, h)), (sw, sh)) warp = np.eye(2, 3, dtype=np.float32) criteria = (_cv2.TERM_CRITERIA_EPS | _cv2.TERM_CRITERIA_COUNT, 50, 1e-3) cc, warp = _cv2.findTransformECC(ref_s, scn_s, warp, _cv2.MOTION_AFFINE, criteria) if cc < 0.3: print(f'[align] ECC low confidence (cc={cc:.4f}), skipping') return None angle = np.degrees(np.arctan2(warp[1, 0], warp[0, 0])) if abs(angle) > 1.0: clamped = np.radians(np.clip(angle, -1.0, 1.0)) warp[0, 0] = np.cos(clamped) warp[0, 1] = -np.sin(clamped) warp[1, 0] = np.sin(clamped) warp[1, 1] = np.cos(clamped) warp[0, 2] /= scale warp[1, 2] /= scale scan_full = _cv2.resize(scan_rgb, (w, h)) aligned = _cv2.warpAffine( scan_full, warp, (w, h), flags=_cv2.INTER_LINEAR, borderMode=_cv2.BORDER_REPLICATE, ) print(f'[align] ECC applied (cc={cc:.4f} angle={angle:.2f}°)') return aligned except Exception as e: print(f'[align] ECC failed: {e}') return None def _orb_align(scan_gray: np.ndarray, ref_gray: np.ndarray, scan_rgb: np.ndarray): h, w = scan_gray.shape ref_resized = _cv2.resize(ref_gray, (w, h)) orb = _cv2.ORB_create(nfeatures=5000) kp1, des1 = orb.detectAndCompute(scan_gray, None) kp2, des2 = orb.detectAndCompute(ref_resized, None) if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10: return None, 0 matcher = _cv2.BFMatcher(_cv2.NORM_HAMMING, crossCheck=True) matches = sorted(matcher.match(des1, des2), key=lambda m: m.distance) good = matches[:max(10, len(matches) // 3)] if len(good) < 6: return None, 0 src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) M, mask = _cv2.estimateAffinePartial2D( src_pts, dst_pts, method=_cv2.RANSAC, ransacReprojThreshold=5.0, ) if M is None: return None, 0 inliers = int(mask.sum()) if mask is not None else 0 aligned = _cv2.warpAffine( scan_rgb, M, (w, h), flags=_cv2.INTER_LINEAR, borderMode=_cv2.BORDER_REPLICATE, ) print(f'[align] ORB applied ({inliers} inliers)') return aligned, inliers def _orb_inliers(scan_gray: np.ndarray, ref_gray: np.ndarray) -> int: orb = _cv2.ORB_create(nfeatures=3000) kp1, des1 = orb.detectAndCompute(scan_gray, None) kp2, des2 = orb.detectAndCompute(ref_gray, None) if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10: return 0 matcher = _cv2.BFMatcher(_cv2.NORM_HAMMING, crossCheck=True) matches = sorted(matcher.match(des1, des2), key=lambda m: m.distance) good = matches[:max(10, len(matches) // 3)] if len(good) < 6: return 0 src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) _, mask = _cv2.findHomography(src_pts, dst_pts, _cv2.RANSAC, 5.0) return int(mask.sum()) if mask is not None else 0 def check_image_quality(image_path: str, form_type: str) -> dict: if not _CV2_OK: return { 'ok': True, 'upside_down': False, 'skew_angle': 0.0, 'aspect_mismatch': 1.0, 'orb_fit': 0, 'orb_fit_normal': 0, 'orb_fit_180': 0, 'blur_score': 9999.0, 'warnings': ['OpenCV not available; skipping quality check'], } result = {} warnings = [] try: img = Image.open(image_path).convert('RGB') except Exception as e: return { 'ok': False, 'upside_down': False, 'skew_angle': 0.0, 'aspect_mismatch': 0.0, 'orb_fit': 0, 'orb_fit_normal': 0, 'orb_fit_180': 0, 'blur_score': 0.0, 'warnings': [f'Cannot open image: {e}'], } scan_rgb = np.array(img) scan_gray = _cv2.cvtColor(scan_rgb, _cv2.COLOR_RGB2GRAY) h, w = scan_gray.shape blur_score = float(_cv2.Laplacian(scan_gray, _cv2.CV_64F).var()) result['blur_score'] = round(blur_score, 1) if blur_score < 80: warnings.append( f'Image appears blurry (Laplacian variance={blur_score:.1f}; threshold 80).' ) edges = _cv2.Canny(scan_gray, 50, 150, apertureSize=3) lines = _cv2.HoughLinesP( edges, 1, np.pi / 180, threshold=80, minLineLength=60, maxLineGap=15, ) skew_angle = 0.0 if lines is not None: angles = [ np.degrees(np.arctan2(y2 - y1, x2 - x1)) for x1, y1, x2, y2 in lines[:, 0] if abs(np.degrees(np.arctan2(y2 - y1, x2 - x1))) < 45 ] if angles: skew_angle = float(np.median(angles)) result['skew_angle'] = round(skew_angle, 2) if abs(skew_angle) > 3.0: warnings.append(f'Page is significantly skewed ({skew_angle:.1f}°).') upside_down = False orb_fit = 0 inliers_normal = 0 inliers_180 = 0 ref_gray = _get_ref_gray(form_type) if ref_gray is not None: ref_h, ref_w = ref_gray.shape scan_rs = _cv2.resize(scan_gray, (ref_w, ref_h)) scan_180 = _cv2.rotate(scan_rs, _cv2.ROTATE_180) inliers_normal = _orb_inliers(scan_rs, ref_gray) inliers_180 = _orb_inliers(scan_180, ref_gray) orb_fit = inliers_normal if inliers_180 > inliers_normal * 1.5 and inliers_180 > 10: upside_down = True orb_fit = inliers_180 warnings.append( f'Image appears upside down (ORB normal={inliers_normal}, rotated_180={inliers_180}).' ) if orb_fit < 10: warnings.append(f'Poor alignment fit for form {form_type} (ORB inliers={orb_fit}).') elif orb_fit < 25: warnings.append(f'Weak alignment fit for form {form_type} (ORB inliers={orb_fit}).') scan_aspect = w / max(h, 1) ref_aspect = ref_w / max(ref_h, 1) aspect_ratio = scan_aspect / max(ref_aspect, 1e-6) result['aspect_mismatch'] = round(aspect_ratio, 3) else: result['aspect_mismatch'] = 1.0 result['upside_down'] = upside_down result['orb_fit'] = orb_fit result['orb_fit_normal'] = inliers_normal result['orb_fit_180'] = inliers_180 result['warnings'] = warnings result['ok'] = len(warnings) == 0 return result def correct_image(img: Image.Image, quality: dict): applied = [] if not _CV2_OK: print('[correct_image] OpenCV not available; skipping corrections.') return img, applied rgb = np.array(img.convert('RGB')) if quality.get('upside_down'): rgb = _cv2.rotate(rgb, _cv2.ROTATE_180) applied.append('rotated 180° (upside-down correction)') print('[correct_image] Applied: 180° rotation') skew_angle = quality.get('skew_angle', 0.0) if 1.0 < abs(skew_angle) < 15.0: correction_angle = -skew_angle h, w = rgb.shape[:2] center = (w / 2.0, h / 2.0) M = _cv2.getRotationMatrix2D(center, correction_angle, 1.0) cos_a = abs(M[0, 0]) sin_a = abs(M[0, 1]) new_w = int(h * sin_a + w * cos_a) new_h = int(h * cos_a + w * sin_a) M[0, 2] += (new_w - w) / 2.0 M[1, 2] += (new_h - h) / 2.0 rgb = _cv2.warpAffine( rgb, M, (new_w, new_h), flags=_cv2.INTER_CUBIC, borderMode=_cv2.BORDER_REPLICATE, ) applied.append(f'de-skewed {correction_angle:+.2f}°') print(f'[correct_image] Applied: de-skew {correction_angle:+.2f}°') result_img = Image.fromarray(rgb) if img.mode != 'RGB': result_img = result_img.convert(img.mode) return result_img, applied def align_to_reference(img: Image.Image, form_type: str): if not _CV2_OK: return img, 0 ref_gray = _get_ref_gray(form_type) if ref_gray is None: return img, 0 ref_h, ref_w = ref_gray.shape scan_rgb = np.array(img.convert('RGB')) stage0 = _correct_perspective(scan_rgb, ref_w, ref_h) stage0_gray = _cv2.cvtColor(stage0, _cv2.COLOR_RGB2GRAY) precheck = _orb_inliers(stage0_gray, ref_gray) print(f'[align] ORB pre-check: {precheck} inliers') if precheck >= 40: orb_aligned, orb_inliers_count = _orb_align(stage0_gray, ref_gray, stage0) if orb_aligned is not None: return Image.fromarray(orb_aligned), orb_inliers_count ecc_aligned = _ecc_align(stage0_gray, ref_gray, stage0) if ecc_aligned is not None: ecc_gray = _cv2.cvtColor(ecc_aligned, _cv2.COLOR_RGB2GRAY) orb_aligned, orb_inliers_count = _orb_align(ecc_gray, ref_gray, ecc_aligned) if orb_aligned is not None: return Image.fromarray(orb_aligned), orb_inliers_count return Image.fromarray(ecc_aligned), _orb_inliers(ecc_gray, ref_gray) orb_aligned, orb_inliers_count = _orb_align(stage0_gray, ref_gray, stage0) if orb_aligned is not None: return Image.fromarray(orb_aligned), orb_inliers_count resized = _cv2.resize(stage0, (ref_w, ref_h)) return Image.fromarray(resized), precheck def _deskew(gray: np.ndarray) -> np.ndarray: if not _CV2_OK: return gray edges = _cv2.Canny(gray, 50, 150, apertureSize=3) lines = _cv2.HoughLinesP( edges, 1, np.pi / 180, threshold=100, minLineLength=100, maxLineGap=10, ) if lines is None: return gray angles = [ np.degrees(np.arctan2(y2 - y1, x2 - x1)) for x1, y1, x2, y2 in lines[:, 0] if -3 < np.degrees(np.arctan2(y2 - y1, x2 - x1)) < 3 ] if not angles: return gray angle = float(np.median(angles)) if abs(angle) < 0.5: return gray h, w = gray.shape M = _cv2.getRotationMatrix2D((w / 2, h / 2), angle, 1.0) return _cv2.warpAffine( gray, M, (w, h), flags=_cv2.INTER_CUBIC, borderMode=_cv2.BORDER_REPLICATE, ) def _preprocess(img: Image.Image) -> Image.Image: if not _CV2_OK: return img.convert('L') gray = np.array(img.convert('L')) gray = _deskew(gray) return Image.fromarray(gray) def _crop_field(img: Image.Image, x1r, y1r, x2r, y2r) -> Image.Image: w, h = img.size pad = 4 x1 = max(0, int(x1r * w) - pad) y1 = max(0, int(y1r * h) - pad) x2 = min(w, int(x2r * w) + pad) y2 = min(h, int(y2r * h) + pad) return img.crop((x1, y1, x2, y2)) def _expand_box(box, img_w, img_h, pad_x=10, pad_y=8): x1, y1, x2, y2 = box return ( max(0, x1 - pad_x), max(0, y1 - pad_y), min(img_w, x2 + pad_x), min(img_h, y2 + pad_y), ) def _crop_from_box(img: Image.Image, box): return img.crop(box) def _norm_text(s: str) -> str: return _re.sub(r'[^a-z0-9]+', '', (s or '').lower()) def _find_nearby_detection(field_rect, detections, expected_hint=None): fx1, fy1, fx2, fy2 = field_rect fcx = (fx1 + fx2) / 2 fcy = (fy1 + fy2) / 2 fw = max(1, fx2 - fx1) fh = max(1, fy2 - fy1) best = None best_score = -1e9 for det in detections: x1, y1, x2, y2 = det['box'] dcx = det['cx'] dcy = det['cy'] dw = max(1, x2 - x1) dh = max(1, y2 - y1) dist = ((dcx - fcx) ** 2 + (dcy - fcy) ** 2) ** 0.5 overlap_x = max(0, min(fx2, x2) - max(fx1, x1)) overlap_y = max(0, min(fy2, y2) - max(fy1, y1)) overlap = overlap_x * overlap_y size_penalty = abs(dw - fw) * 0.2 + abs(dh - fh) * 0.2 score = overlap * 0.02 - dist - size_penalty + det.get('conf', 0.0) * 40.0 text = (det.get('text') or '').strip() if expected_hint == _WORD and len(text.split()) <= 3: score += 10 elif expected_hint == _LINE and 1 <= len(text.split()) <= 12: score += 8 elif expected_hint == _BLOCK and len(text.split()) >= 2: score += 6 if score > best_score: best_score = score best = det return best if best_score > -150 else None def _get_field_crop_with_paddle(processed_img: Image.Image, field_coords, detections): w, h = processed_img.size x1r, y1r, x2r, y2r, hint = field_coords fx1 = int(x1r * w) fy1 = int(y1r * h) fx2 = int(x2r * w) fy2 = int(y2r * h) field_rect = (fx1, fy1, fx2, fy2) det = _find_nearby_detection(field_rect, detections, expected_hint=hint) if det is not None: box = _expand_box(det['box'], w, h, pad_x=10, pad_y=8) return _crop_from_box(processed_img, box), 'paddle-detect', det return _crop_field(processed_img, x1r, y1r, x2r, y2r), 'absolute', None def _get_field_crop_with_easyocr(processed_img: Image.Image, field_coords, detections): return _get_field_crop_with_paddle(processed_img, field_coords, detections) def detect_form_type(image_path: str) -> str: if _CV2_OK: try: img = Image.open(image_path).convert('RGB') scan_rgb = np.array(img) scan_gray = _cv2.cvtColor(scan_rgb, _cv2.COLOR_RGB2GRAY) best_type, best_inliers = None, 0 det_w = 800 for ft in REFERENCE_IMAGES: ref_gray = _get_ref_gray(ft) if ref_gray is None: continue ref_h, ref_w = ref_gray.shape sc = min(1.0, det_w / ref_w) dw = max(1, int(ref_w * sc)) dh = max(1, int(ref_h * sc)) ref_ds = _cv2.resize(ref_gray, (dw, dh)) scan_ds = _cv2.resize(_cv2.resize(scan_gray, (ref_w, ref_h)), (dw, dh)) count = _orb_inliers(scan_ds, ref_ds) print(f'[detect] Form {ft}: {count} ORB inliers') if count > best_inliers: best_inliers, best_type = count, ft if best_type and best_inliers >= 15: print(f'[detect] Best: Form {best_type} ({best_inliers} inliers)') return best_type print(f'[detect] ORB inconclusive ({best_inliers}), trying OCR title') except Exception as e: print(f'[template_matcher] detect_form_type ORB error: {e}') try: img_l = Image.open(image_path).convert('L') w, h = img_l.size title_crop = img_l.crop((0, int(h * 0.04), w, int(h * 0.15))) title = _crnn_read(title_crop).upper() if title: if 'LIVE BIRTH' in title or ('BIRTH' in title and 'DEATH' not in title and 'MARRIAGE' not in title): return '102' if 'DEATH' in title: return '103' if 'MARRIAGE' in title and 'LICENSE' in title: return '90' if 'MARRIAGE' in title: return '97' except Exception as e: print(f'[template_matcher] detect_form_type OCR error: {e}') print('[detect] Could not detect form type; defaulting to 102.') return '102' def is_blank_image(img: Image.Image, threshold: float = 0.995) -> bool: if not _CV2_OK: return False gray = np.array(img.convert('L')) h, w = gray.shape y1 = int(h * 0.20) y2 = int(h * 0.80) x1 = int(w * 0.20) x2 = int(w * 0.80) center = gray[y1:y2, x1:x2] light_pixels = np.sum(center > 240) total_pixels = center.size ratio = light_pixels / max(total_pixels, 1) variance = float(np.var(center)) print(f'[template_matcher] Blank check: {ratio:.2%} light pixels, variance={variance:.1f}') return ratio >= threshold and variance < 50.0 def extract_fields(image_path: str, form_type: str = None): try: if not form_type: form_type = detect_form_type(image_path) template = TEMPLATES.get(form_type) if not template: return {'status': 'error', 'message': f'No template for form {form_type}.'} quality = check_image_quality(image_path, form_type) img = Image.open(image_path).convert('RGB') if is_blank_image(img): return {'status': 'error', 'message': 'Blank or near-blank image detected.'} img, corrections = correct_image(img, quality) img, orb_fit = align_to_reference(img, form_type) processed = _preprocess(img) detections = _paddle_detect(processed) fields = {} crnn_confidences = {} debug_methods = {} field_names = [] crops = [] assist_texts = [] for field_name, coords in template.items(): crop, method, det = _get_field_crop_with_paddle(processed, coords, detections) field_names.append(field_name) crops.append(crop) debug_methods[field_name] = method assist_text = '' if USE_SELECTIVE_PADDLE_ASSIST and field_name in PADDLE_ASSIST_FIELDS: if det is not None: assist_text = (det.get('text') or '').strip() if not assist_text: assist_text = _paddle_read(crop) assist_texts.append(assist_text) crnn_results = _crnn_read_batch_with_confidence(crops) for field_name, (crnn_text, crnn_conf), assist_text in zip( field_names, crnn_results, assist_texts ): final_text = _smart_merge(field_name, crnn_text, assist_text) fields[field_name] = final_text # If the final merged result is empty the field is blank on the # image — correctly extracting nothing is 100% accurate. crnn_confidences[field_name] = crnn_conf if final_text else 1.0 print(f'[template_matcher] Extracted: {len(fields)}/{len(template)} fields') if crnn_confidences: avg_conf = sum(crnn_confidences.values()) / len(crnn_confidences) low_conf = {k: round(v, 3) for k, v in crnn_confidences.items() if v < 0.6} print(f'[template_matcher] CRNN avg confidence: {avg_conf:.3f}') if low_conf: print(f'[template_matcher] Low-confidence fields (<0.60): {low_conf}') if len(fields) == 0: return {'status': 'error', 'message': 'No readable text found.'} fields['_quality'] = quality fields['_corrections'] = corrections fields['_crnn_confidence'] = crnn_confidences return fields except Exception as e: print(f'[template_matcher] extract_fields error: {e}') return {'status': 'error', 'message': str(e)} def debug_draw_boxes(image_path: str, form_type: str, out_path: str = None) -> str: from PIL import ImageDraw, ImageFont template = TEMPLATES.get(form_type) if not template: print(f'No template for {form_type}') return None quality = check_image_quality(image_path, form_type) img = Image.open(image_path).convert('RGB') img, _ = correct_image(img, quality) img, _ = align_to_reference(img, form_type) draw = ImageDraw.Draw(img) w, h = img.size try: font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', 11) except Exception: try: font = ImageFont.truetype('C:/Windows/Fonts/arial.ttf', 11) except Exception: font = ImageFont.load_default() for field_name, coords in template.items(): x1r, y1r, x2r, y2r, _ = coords bx1, by1 = int(x1r * w), int(y1r * h) bx2, by2 = int(x2r * w), int(y2r * h) draw.rectangle([bx1, by1, bx2, by2], outline='#1a6fd4', width=1) draw.text((bx1 + 2, by1 + 2), field_name, fill='#1a6fd4', font=font) base, ext = os.path.splitext(image_path) out = out_path or f'{base}_debug_{form_type}{ext}' img.save(out) print(f'[template_matcher] Debug image saved: {out}') return out def debug_draw_paddle_matches(image_path: str, form_type: str, out_path: str = None) -> str: from PIL import ImageDraw, ImageFont template = TEMPLATES.get(form_type) if not template: print(f'No template for {form_type}') return None quality = check_image_quality(image_path, form_type) img = Image.open(image_path).convert('RGB') img, _ = correct_image(img, quality) img, _ = align_to_reference(img, form_type) processed = _preprocess(img) detections = _paddle_detect(processed) canvas = img.copy() draw = ImageDraw.Draw(canvas) w, h = canvas.size try: font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', 11) except Exception: try: font = ImageFont.truetype('C:/Windows/Fonts/arial.ttf', 11) except Exception: font = ImageFont.load_default() for det in detections: x1, y1, x2, y2 = det['box'] draw.rectangle([x1, y1, x2, y2], outline='red', width=1) for field_name, coords in template.items(): x1r, y1r, x2r, y2r, hint = coords fx1 = int(x1r * w) fy1 = int(y1r * h) fx2 = int(x2r * w) fy2 = int(y2r * h) draw.rectangle([fx1, fy1, fx2, fy2], outline='blue', width=2) draw.text((fx1 + 2, fy1 + 2), field_name, fill='blue', font=font) det = _find_nearby_detection((fx1, fy1, fx2, fy2), detections, expected_hint=hint) if det is not None: dx1, dy1, dx2, dy2 = det['box'] draw.rectangle([dx1, dy1, dx2, dy2], outline='green', width=2) base, ext = os.path.splitext(image_path) out = out_path or f'{base}_paddle_debug_{form_type}{ext}' canvas.save(out) print(f'[template_matcher] Paddle debug image saved: {out}') return out def debug_draw_easyocr_matches(image_path: str, form_type: str, out_path: str = None) -> str: # Backward-compatible function name. return debug_draw_paddle_matches(image_path, form_type, out_path) def pdf_to_image(pdf_path: str, page: int = 0) -> str: try: from pdf2image import convert_from_path pages = convert_from_path(pdf_path, dpi=150) out_path = pdf_path.replace('.pdf', f'_page{page}.png') pages[page].save(out_path, 'PNG') return out_path except ImportError: print('[template_matcher] pdf2image not installed.') return None except Exception as e: print(f'[template_matcher] PDF conversion failed: {e}') return None if __name__ == '__main__': warmup() if len(sys.argv) < 2: print('Usage:') print(' python template_matcher.py [out_path]') print(' python template_matcher.py check [form_type]') print(' form_type: 102 | 103 | 90 | 97') sys.exit(1) img_path = sys.argv[1] if len(sys.argv) >= 3 and sys.argv[2] == 'check': ft = sys.argv[3] if len(sys.argv) > 3 else detect_form_type(img_path) q = check_image_quality(img_path, ft) print(f'\nQuality report for form {ft}:') for k, v in q.items(): if k != 'warnings': print(f' {k:<22} = {v}') if q['warnings']: print('\nWarnings:') for msg in q['warnings']: print(f' • {msg}') img_pil = Image.open(img_path).convert('RGB') _, corrections = correct_image(img_pil, q) print('\nCorrections that would be applied:') if corrections: for c in corrections: print(f' ✓ {c}') else: print(' (none needed)') sys.exit(0 if q['ok'] else 1) form_type = sys.argv[2] out_path = sys.argv[3] if len(sys.argv) > 3 else None debug_draw_boxes(img_path, form_type, out_path) debug_draw_paddle_matches(img_path, form_type) result = extract_fields(img_path, form_type) meta_keys = {'_quality', '_corrections', '_crnn_confidence'} data_fields = {k: v for k, v in result.items() if k not in meta_keys} crnn_conf = result.get('_crnn_confidence', {}) print(f'\nExtracted fields ({len(data_fields)}):') for k, v in data_fields.items(): conf_str = f' [conf={crnn_conf[k]:.3f}]' if k in crnn_conf else '' print(f' {k:<40} = {v}{conf_str}') template = TEMPLATES.get(form_type, {}) missing = [k for k in template if k not in data_fields] if missing: print(f'\nEmpty fields ({len(missing)}):') for k in missing: print(f' {k}') corrections = result.get('_corrections', []) if corrections: print('\nAuto-corrections applied:') for c in corrections: print(f' ✓ {c}') quality = result.get('_quality', {}) if quality.get('warnings'): print('\nQuality warnings:') for w_msg in quality['warnings']: print(f' • {w_msg}')