from __future__ import annotations import logging import os import re import sys import tempfile from dataclasses import dataclass, field, fields from enum import Enum from pathlib import Path from typing import Callable, Iterator, Optional, Protocol import gradio as gr import pandas as pd import torch from PIL import Image from transformers import AutoModelForImageTextToText, AutoProcessor logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", stream=sys.stderr, ) logger = logging.getLogger(__name__) # ╔══════════════════════════════════════════════════════════════╗ # ║ DOMAIN MODELS ║ # ╚══════════════════════════════════════════════════════════════╝ class ExtractionStatus(Enum): SUCCESS = "success" PARTIAL = "partial" FAILED = "failed" @dataclass(frozen=True, slots=True) class KTPData: """Immutable value object — extracted KTP fields.""" nik: Optional[str] = None nama: Optional[str] = None tempat_lahir: Optional[str] = None tanggal_lahir: Optional[str] = None @property def status(self) -> ExtractionStatus: populated = sum(1 for f in fields(self) if getattr(self, f.name) is not None) if populated == len(fields(self)): return ExtractionStatus.SUCCESS return ExtractionStatus.PARTIAL if populated > 0 else ExtractionStatus.FAILED def to_dict(self) -> dict[str, Optional[str]]: labels = { "nik": "NIK", "nama": "Nama", "tempat_lahir": "Tempat Lahir", "tanggal_lahir": "Tanggal Lahir", } return {labels[f.name]: getattr(self, f.name) for f in fields(self)} @dataclass(frozen=True, slots=True) class ExtractionResult: """Result of processing a single image.""" filename: str data: KTPData raw_text: str = "" error: Optional[str] = None def to_row(self) -> dict: return {"Filename": self.filename, **self.data.to_dict(), "Status": self.data.status.value} # ╔══════════════════════════════════════════════════════════════╗ # ║ PARSER — pure functions, no I/O, no model dependency ║ # ╚══════════════════════════════════════════════════════════════╝ _NIK = re.compile(r"\b(\d{16})\b") _DATE = re.compile(r"(\d{2}[-/]\d{2}[-/]\d{4})") _NAMA_PATTERNS: list[re.Pattern] = [ re.compile( r"(?:Nama|NAMA)\s*[:/]?\s*([A-Z][A-Z\s'.]{2,}?)" r"(?=\s+(?:WNI|WNA|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/])|$)", re.IGNORECASE, ), re.compile( r"\b\d{16}\b\s+([A-Z][A-Z\s'.]{2,}?)" r"(?=\s+(?:WNI|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/]))", re.IGNORECASE, ), ] _TEMPAT_PATTERNS: list[re.Pattern] = [ re.compile( r"(?:Tempat\s*/?\s*Tgl\s*Lahir|TTL)\s*[:/]?\s*([A-Z][A-Za-z\s]+?)(?=\s*[,]?\s*\d{2}[-/])", re.IGNORECASE, ), re.compile(r"([A-Z][A-Z\s]{2,}?)\s*[,]?\s*\d{2}[-/]\d{2}[-/]\d{4}"), ] def _first_match(patterns: list[re.Pattern], text: str, group: int = 1) -> Optional[str]: for p in patterns: m = p.search(text) if m: return m.group(group).strip().rstrip(",.") return None def parse_ktp(raw_text: str) -> KTPData: """Parse raw OCR text into structured KTP data. Pure, deterministic, testable.""" text = " ".join(raw_text.split()) nik = _NIK.search(text) date = _DATE.search(text) return KTPData( nik=nik.group(1) if nik else None, nama=_first_match(_NAMA_PATTERNS, text), tempat_lahir=_first_match(_TEMPAT_PATTERNS, text), tanggal_lahir=date.group(1).replace("/", "-") if date else None, ) # ╔══════════════════════════════════════════════════════════════╗ # ║ OCR ENGINE — owns model lifecycle and inference ║ # ╚══════════════════════════════════════════════════════════════╝ class OCREngine(Protocol): def recognize(self, image: Image.Image) -> str: ... @dataclass class ModelConfig: model_path: str = "emisilab/model-ocr-ktp-v1" max_length: int = 1024 use_fp16: bool = True class HuggingFaceOCR: """Lazy-loading HF vision-language OCR engine.""" def __init__(self, config: ModelConfig | None = None) -> None: self._cfg = config or ModelConfig() self._device = "cuda" if torch.cuda.is_available() else "cpu" self._dtype = torch.float16 if (self._cfg.use_fp16 and self._device == "cuda") else torch.float32 self._processor: AutoProcessor | None = None self._model: AutoModelForImageTextToText | None = None def _ensure_loaded(self) -> None: if self._model is not None: return logger.info("Loading %s on %s (%s)", self._cfg.model_path, self._device, self._dtype) self._processor = AutoProcessor.from_pretrained(self._cfg.model_path, use_fast=True) self._model = ( AutoModelForImageTextToText.from_pretrained(self._cfg.model_path, torch_dtype=self._dtype) .to(self._device) .eval() ) logger.info("Model ready.") @property def is_available(self) -> bool: try: self._ensure_loaded() return True except Exception: logger.exception("Model unavailable") return False @torch.inference_mode() def recognize(self, image: Image.Image) -> str: self._ensure_loaded() assert self._processor and self._model px = self._processor(images=image, return_tensors="pt").pixel_values.to( device=self._device, dtype=self._dtype ) ids = self._model.generate(px, max_length=self._cfg.max_length) return self._processor.batch_decode(ids, skip_special_tokens=True)[0] # ╔══════════════════════════════════════════════════════════════╗ # ║ PIPELINE — composes engine + parser ║ # ╚══════════════════════════════════════════════════════════════╝ COLUMNS = ["Filename", "NIK", "Nama", "Tempat Lahir", "Tanggal Lahir", "Status"] ProgressCallback = Optional[Callable[[float, str], None]] class ExtractionPipeline: def __init__(self, engine: OCREngine) -> None: self._engine = engine def process_one(self, path: Path) -> ExtractionResult: try: image = Image.open(path).convert("RGB") raw = self._engine.recognize(image) return ExtractionResult(filename=path.name, data=parse_ktp(raw), raw_text=raw) except Exception as e: logger.exception("Failed: %s", path.name) return ExtractionResult(filename=path.name, data=KTPData(), error=str(e)) def process_batch(self, paths: list[Path], on_progress: ProgressCallback = None) -> pd.DataFrame: rows = [] for i, p in enumerate(paths, 1): if on_progress: on_progress(i / len(paths), f"Processing {p.name} ({i}/{len(paths)})") rows.append(self.process_one(p).to_row()) return pd.DataFrame(rows, columns=COLUMNS) if rows else pd.DataFrame(columns=COLUMNS) # ╔══════════════════════════════════════════════════════════════╗ # ║ GRADIO UI — thin presentation layer ║ # ╚══════════════════════════════════════════════════════════════╝ engine = HuggingFaceOCR() pipeline = ExtractionPipeline(engine) def on_extract(files: list[str] | None, progress: gr.Progress = gr.Progress()): if not files: return pd.DataFrame(columns=COLUMNS), None if not engine.is_available: raise gr.Error("Model failed to load — check Space logs.") df = pipeline.process_batch( [Path(f) for f in files], on_progress=lambda frac, msg: progress(frac, desc=msg), ) csv_path = Path(tempfile.gettempdir()) / "ktp_results.csv" df.to_csv(csv_path, index=False) return df, str(csv_path) def on_preview(files: list[str] | None): return [Image.open(f) for f in files] if files else [] with gr.Blocks(theme=gr.themes.Soft(), title="KTP OCR Extractor") as demo: gr.Markdown( "# KTP OCR Extractor 🇮🇩\n" "Upload KTP images → extract **NIK, Nama, Tempat Lahir, Tanggal Lahir** automatically." ) with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload KTP Images", file_count="multiple", file_types=["image"], type="filepath", ) gallery = gr.Gallery(label="Preview", columns=3, height=200) extract_btn = gr.Button("Extract", variant="primary", size="lg") with gr.Column(scale=2): result_table = gr.DataFrame(label="Results", headers=COLUMNS) csv_download = gr.File(label="Download CSV") file_input.change(on_preview, file_input, gallery) extract_btn.click(on_extract, file_input, [result_table, csv_download]) if __name__ == "__main__": demo.launch()